feature: move things around, cleaner code
parent
072afbed68
commit
1d52185673
@ -0,0 +1,3 @@
|
||||
# libexpat is only pulled in via apt-get install git during build.
|
||||
# It is not used in the final runtime image or by blink2mqtt at all.
|
||||
CVE-2025-59375
|
||||
@ -1,43 +0,0 @@
|
||||
🚀 Version 3.0.0 — Major Refactor and Rearchitecture
|
||||
|
||||
This release represents a complete modernization of amcrest2mqtt, bringing cleaner structure, better MQTT handling, and richer event data.
|
||||
|
||||
Highlights
|
||||
|
||||
- Modularized codebase under src/amcrest2mqtt/
|
||||
|
||||
- Brand-new MqttMixin with resilient reconnect, structured logs, and HA rediscovery support
|
||||
|
||||
- HelpersMixin for device-state building and service-level control commands
|
||||
|
||||
- AmcrestApiMixin replaces direct device calls with consolidated error handling
|
||||
|
||||
- New sensor.event_time (timestamp) and sensor.event_text entities for human-readable event tracking
|
||||
|
||||
- Added doorbell and human detection binary sensors for supported models (AD110/AD410)
|
||||
|
||||
- Proper Home Assistant schema compliance: ISO 8601 timestamps, availability templates, and via-device linkage
|
||||
|
||||
- Clean shutdown on SIGTERM/SIGINT and improved signal management
|
||||
|
||||
- Full developer environment setup (black, ruff, pytest, coverage settings)
|
||||
|
||||
- Utility script tools/clear_mqtt.sh for clearing retained topics
|
||||
|
||||
- Docker image metadata updated with links, license, and version labels
|
||||
|
||||
Breaking Changes
|
||||
|
||||
- Moved all code to src/ package layout — update imports and mount paths if using bind mounts.
|
||||
|
||||
- MQTT topics slightly restructured for consistency across entities.
|
||||
|
||||
- Deprecated util.py; its helpers are now integrated into mixins.
|
||||
|
||||
1.0.1
|
||||
- lookup camera hostnames to get ip at setup time, so we aren't doing
|
||||
100k lookups every day (in my 4 camera setup, for example)
|
||||
|
||||
1.0.0
|
||||
- initial release
|
||||
|
||||
@ -0,0 +1,121 @@
|
||||
from amcrest import AmcrestCamera
|
||||
from argparse import Namespace
|
||||
from asyncio import AbstractEventLoop
|
||||
from datetime import datetime, timezone
|
||||
from logging import Logger
|
||||
from mqtt_helper import MqttHelper
|
||||
from paho.mqtt.client import Client, MQTTMessage, ConnectFlags, DisconnectFlags
|
||||
from paho.mqtt.reasoncodes import ReasonCode
|
||||
from paho.mqtt.properties import Properties
|
||||
from types import FrameType
|
||||
from typing import Protocol, Any
|
||||
|
||||
|
||||
class AmcrestServiceProtocol(Protocol):
|
||||
api_calls: int
|
||||
args: Namespace | None
|
||||
amcrest_config: dict[str, Any]
|
||||
amcrest_devices: dict[str, dict[str, Any]]
|
||||
client_id: str
|
||||
config: dict[str, Any]
|
||||
device_interval: int
|
||||
device_list_interval: int
|
||||
devices: dict[str, Any]
|
||||
discovery_complete: bool
|
||||
events: list
|
||||
last_call_date: str
|
||||
logger: Logger
|
||||
loop: AbstractEventLoop
|
||||
mqtt_config: dict[str, Any]
|
||||
mqtt_connect_time: datetime
|
||||
mqtt_helper: MqttHelper
|
||||
mqttc: Client
|
||||
qos: int
|
||||
rate_limited: bool
|
||||
running: bool
|
||||
service_name: str
|
||||
service: str
|
||||
storage_update_interval: int
|
||||
snapshot_update_interval: int
|
||||
states: dict[str, Any]
|
||||
timezone: timezone
|
||||
|
||||
async def build_camera(self, device: dict) -> str: ...
|
||||
async def build_component(self, device: dict) -> str: ...
|
||||
async def check_event_queue_loop(self) -> None: ...
|
||||
async def check_for_events(self) -> None: ...
|
||||
async def collect_all_device_events(self) -> None: ...
|
||||
async def collect_all_device_snapshots(self) -> None: ...
|
||||
async def collect_events_loop(self) -> None: ...
|
||||
async def collect_snapshots_loop(self) -> None: ...
|
||||
async def connect_to_devices(self) -> dict[str, Any]: ...
|
||||
async def device_loop(self) -> None: ...
|
||||
async def get_events_from_device(self, device_id: str) -> None: ...
|
||||
async def get_snapshot_from_device(self, device_id: str) -> str | None: ...
|
||||
async def main_loop(self) -> None: ...
|
||||
async def process_device_event(self, device_id: str, code: str, payload: Any) -> None: ...
|
||||
async def refresh_all_devices(self) -> None: ...
|
||||
async def setup_device_list(self) -> None: ...
|
||||
async def store_recording_in_media(self, device_id: str, amcrest_file: str) -> str | None: ...
|
||||
|
||||
def _csv(self, env_name: str) -> list[str] | None: ...
|
||||
def _assert_no_tuples(self, data: Any, path: str = "root") -> None: ...
|
||||
def _decode_payload(self, raw: bytes) -> Any: ...
|
||||
def _handle_device_topic(self, components: list[str], payload: str) -> None: ...
|
||||
def _handle_homeassistant_message(self, payload: str) -> None: ...
|
||||
def _handle_signal(self, signum: int, frame: FrameType | None) -> Any: ...
|
||||
def _parse_device_topic(self, components: list[str]) -> list[str | None] | None: ...
|
||||
|
||||
def b_to_gb(self, total: int) -> float: ...
|
||||
def b_to_mb(self, total: int) -> float: ...
|
||||
def build_device_states(self, device_id: str) -> None: ...
|
||||
def classify_device(self, device: dict) -> str: ...
|
||||
def get_api_calls(self) -> int: ...
|
||||
def get_camera(self, host: str) -> AmcrestCamera: ...
|
||||
def get_component_type(self, device_id: str) -> str: ...
|
||||
def get_component(self, device_id: str) -> dict[str, Any]: ...
|
||||
def get_device_availability_topic(self, device_id: str) -> str: ...
|
||||
def get_device_image_topic(self, device_id: str) -> str: ...
|
||||
def get_device_name(self, device_id: str) -> str: ...
|
||||
def get_device_name_slug(self, device_id: str) -> str: ...
|
||||
def get_device_state_topic(self, device_id: str, mode_name: str = "") -> str: ...
|
||||
def get_device(self, host: str, device_name: str, index: int) -> None: ...
|
||||
def get_ip_address(self, string: str) -> str: ...
|
||||
def get_last_call_date(self) -> str: ...
|
||||
def get_mode(self, device_id: str, mode_name: str) -> dict[str, Any]: ...
|
||||
def get_modes(self, device_id: str) -> dict[str, Any]: ...
|
||||
def get_motion_detection(self, device_id: str) -> bool: ...
|
||||
def get_next_event(self) -> dict[str, Any] | None: ...
|
||||
def get_privacy_mode(self, device_id: str) -> bool: ...
|
||||
def get_recorded_file(self, device_id: str, file: str, encode: bool = True) -> str | None: ...
|
||||
def get_snapshot(self, device_id: str) -> str | None: ...
|
||||
def get_storage_stats(self, device_id: str) -> dict[str, str | float]: ...
|
||||
def handle_device_command(self, device_id: str, handler: str, message: str) -> None: ...
|
||||
def handle_service_command(self, handler: str, message: str) -> None: ...
|
||||
def is_discovered(self, device_id: str) -> bool: ...
|
||||
def is_ipv4(self, string: str) -> bool: ...
|
||||
def is_rate_limited(self) -> bool: ...
|
||||
def load_config(self, config_arg: Any | None) -> dict[str, Any]: ...
|
||||
def mb_to_b(self, total: int) -> int: ...
|
||||
def mqtt_on_connect(
|
||||
self, client: Client, userdata: dict[str, Any], flags: ConnectFlags, reason_code: ReasonCode, properties: Properties | None
|
||||
) -> None: ...
|
||||
def mqtt_on_disconnect(self, client: Client, userdata: Any, flags: DisconnectFlags, reason_code: ReasonCode, properties: Properties | None) -> None: ...
|
||||
def mqtt_on_message(self, client: Client, userdata: Any, msg: MQTTMessage) -> None: ...
|
||||
def mqtt_on_subscribe(self, client: Client, userdata: Any, mid: int, reason_code_list: list[ReasonCode], properties: Properties) -> None: ...
|
||||
def mqtt_on_log(self, client: Client, userdata: Any, paho_log_level: int, msg: str) -> None: ...
|
||||
def mqtt_safe_publish(self, topic: str, payload: str | bool | int | dict, **kwargs: Any) -> None: ...
|
||||
def mqttc_create(self) -> None: ...
|
||||
def publish_device_availability(self, device_id: str, online: bool = True) -> None: ...
|
||||
def publish_device_discovery(self, device_id: str) -> None: ...
|
||||
def publish_device_state(self, device_id: str) -> None: ...
|
||||
def publish_service_availability(self, avail: str = "online") -> None: ...
|
||||
def publish_service_discovery(self) -> None: ...
|
||||
def publish_service_state(self) -> None: ...
|
||||
def read_file(self, file_name: str) -> str: ...
|
||||
def rediscover_all(self) -> None: ...
|
||||
def safe_split_device(self, topic: str, segment: str) -> list[str]: ...
|
||||
def set_motion_detection(self, device_id: str, switch: bool) -> str: ...
|
||||
def set_privacy_mode(self, device_id: str, switch: bool) -> str: ...
|
||||
def upsert_device(self, device_id: str, **kwargs: dict[str, Any] | str | int | bool) -> None: ...
|
||||
def upsert_state(self, device_id: str, **kwargs: dict[str, Any] | str | int | bool) -> None: ...
|
||||
@ -0,0 +1,255 @@
|
||||
from datetime import datetime
|
||||
import json
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from amcrest2mqtt.interface import AmcrestServiceProtocol as Amcrest2Mqtt
|
||||
|
||||
|
||||
class PublishMixin:
|
||||
|
||||
# Service -------------------------------------------------------------------------------------
|
||||
|
||||
def publish_service_discovery(self: Amcrest2Mqtt) -> None:
|
||||
device_block = self.mqtt_helper.device_block(
|
||||
self.service_name,
|
||||
self.mqtt_helper.service_slug,
|
||||
"weirdTangent",
|
||||
self.config["version"],
|
||||
)
|
||||
|
||||
self.logger.info("Publishing service entity")
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.mqtt_helper.disc_t("binary_sensor", "service"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": self.service_name,
|
||||
"uniq_id": self.mqtt_helper.svc_unique_id("service"),
|
||||
"stat_t": self.mqtt_helper.svc_t("service"),
|
||||
"device_class": "connectivity",
|
||||
"icon": "mdi:server",
|
||||
"device": device_block,
|
||||
"origin": {
|
||||
"name": self.service_name,
|
||||
"sw_version": self.config["version"],
|
||||
"support_url": "https://github.com/weirdtangent/amcrest2mqtt",
|
||||
},
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.mqtt_helper.disc_t("sensor", "api_calls"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} API Calls Today",
|
||||
"uniq_id": self.mqtt_helper.svc_unique_id("api_calls"),
|
||||
"stat_t": self.mqtt_helper.stat_t("service", "service", "api_calls"),
|
||||
"json_attr_t": self.mqtt_helper.attr_t("service", "service", "api_calls", "attributes"),
|
||||
"unit_of_measurement": "calls",
|
||||
"icon": "mdi:api",
|
||||
"state_class": "total_increasing",
|
||||
"device": device_block,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.mqtt_helper.disc_t("binary_sensor", "rate_limited"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} Rate Limited by Amcrest",
|
||||
"uniq_id": self.mqtt_helper.svc_unique_id("rate_limited"),
|
||||
"stat_t": self.mqtt_helper.stat_t("service", "service", "rate_limited"),
|
||||
"json_attr_t": self.mqtt_helper.attr_t("service", "service", "rate_limited", "attributes"),
|
||||
"payload_on": "YES",
|
||||
"payload_off": "NO",
|
||||
"device_class": "problem",
|
||||
"icon": "mdi:speedometer-slow",
|
||||
"device": device_block,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.mqtt_helper.disc_t("number", "storage_refresh"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} Device Refresh Interval",
|
||||
"uniq_id": self.mqtt_helper.svc_unique_id("storage_refresh"),
|
||||
"stat_t": self.mqtt_helper.stat_t("service", "service", "storage_refresh"),
|
||||
"json_attr_t": self.mqtt_helper.attr_t("service", "service", "storage_refresh", "attributes"),
|
||||
"cmd_t": self.mqtt_helper.cmd_t("service", "storage_refresh"),
|
||||
"unit_of_measurement": "s",
|
||||
"min": 1,
|
||||
"max": 3600,
|
||||
"step": 1,
|
||||
"icon": "mdi:timer-refresh",
|
||||
"device": device_block,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.mqtt_helper.disc_t("number", "device_list_refresh"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} Device List Refresh Interval",
|
||||
"uniq_id": self.mqtt_helper.svc_unique_id("device_list_refresh"),
|
||||
"stat_t": self.mqtt_helper.stat_t("service", "service", "device_list_refresh"),
|
||||
"json_attr_t": self.mqtt_helper.attr_t("service", "service", "device_list_refresh", "attributes"),
|
||||
"cmd_t": self.mqtt_helper.cmd_t("service", "device_list_refresh"),
|
||||
"unit_of_measurement": "s",
|
||||
"min": 1,
|
||||
"max": 3600,
|
||||
"step": 1,
|
||||
"icon": "mdi:format-list-bulleted",
|
||||
"device": device_block,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.mqtt_helper.disc_t("number", "snapshot_refresh"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} Snapshot Refresh Interval",
|
||||
"uniq_id": self.mqtt_helper.svc_unique_id("snapshot_refresh"),
|
||||
"stat_t": self.mqtt_helper.stat_t("service", "service", "snapshot_refresh"),
|
||||
"json_attr_t": self.mqtt_helper.attr_t("service", "service", "snapshot_refresh", "attributes"),
|
||||
"cmd_t": self.mqtt_helper.cmd_t("service", "snapshot_refresh"),
|
||||
"unit_of_measurement": "m",
|
||||
"min": 1,
|
||||
"max": 60,
|
||||
"step": 1,
|
||||
"icon": "mdi:lightning-bolt",
|
||||
"device": device_block,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.mqtt_helper.disc_t("button", "refresh_device_list"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} Refresh Device List",
|
||||
"uniq_id": self.mqtt_helper.svc_unique_id("refresh_device_list"),
|
||||
"cmd_t": self.mqtt_helper.cmd_t("service", "refresh_device_list", "command"),
|
||||
"payload_press": "refresh",
|
||||
"icon": "mdi:refresh",
|
||||
"device": device_block,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.logger.debug(f"[HA] Discovery published for {self.service} ({self.mqtt_helper.service_slug})")
|
||||
|
||||
def publish_service_availability(self: Amcrest2Mqtt, avail: str = "online") -> None:
|
||||
self.mqtt_safe_publish(self.mqtt_helper.svc_t("status"), avail, qos=self.qos, retain=True)
|
||||
|
||||
def publish_service_state(self: Amcrest2Mqtt) -> None:
|
||||
service = {
|
||||
"state": "online",
|
||||
"api_calls": {
|
||||
"api_calls": self.get_api_calls(),
|
||||
"last_api_call": self.get_last_call_date(),
|
||||
},
|
||||
"rate_limited": "YES" if self.is_rate_limited() else "NO",
|
||||
"storage_refresh": self.device_interval,
|
||||
"device_list_refresh": self.device_list_interval,
|
||||
"snapshot_refresh": self.snapshot_update_interval,
|
||||
}
|
||||
|
||||
payload: Any
|
||||
for key, value in service.items():
|
||||
if isinstance(value, dict):
|
||||
payload = value.get(key)
|
||||
if isinstance(payload, datetime):
|
||||
payload = payload.isoformat()
|
||||
payload = json.dumps(payload)
|
||||
else:
|
||||
payload = str(value)
|
||||
|
||||
self.mqtt_safe_publish(
|
||||
self.mqtt_helper.stat_t("service", "service", key),
|
||||
payload,
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
|
||||
# Devices -------------------------------------------------------------------------------------
|
||||
|
||||
def publish_device_discovery(self: Amcrest2Mqtt, device_id: str) -> None:
|
||||
def _publish_one(dev_id: str, defn: dict, suffix: str = "") -> None:
|
||||
# Compute a per-mode device_id for topic namespacing
|
||||
eff_device_id = dev_id if not suffix else f"{dev_id}_{suffix}"
|
||||
|
||||
# Grab this component's discovery topic
|
||||
topic = self.mqtt_helper.disc_t(defn["component_type"], f"{dev_id}_{suffix}" if suffix else dev_id)
|
||||
|
||||
# Shallow copy to avoid mutating source
|
||||
payload = {k: v for k, v in defn.items() if k != "component_type"}
|
||||
|
||||
# Publish discovery
|
||||
self.mqtt_safe_publish(topic, json.dumps(payload), retain=True)
|
||||
|
||||
# Mark discovered in state (per published entity)
|
||||
self.states.setdefault(eff_device_id, {}).setdefault("internal", {})["discovered"] = 1
|
||||
|
||||
component = self.get_component(device_id)
|
||||
_publish_one(device_id, component)
|
||||
|
||||
# Publish any modes (0..n)
|
||||
modes = self.get_modes(device_id)
|
||||
for slug, mode in modes.items():
|
||||
_publish_one(device_id, mode, suffix=slug)
|
||||
|
||||
def publish_device_availability(self: Amcrest2Mqtt, device_id: str, online: bool = True) -> None:
|
||||
payload = "online" if online else "offline"
|
||||
|
||||
avty_t = self.get_device_availability_topic(device_id)
|
||||
self.mqtt_safe_publish(avty_t, payload, retain=True)
|
||||
|
||||
def publish_device_state(self: Amcrest2Mqtt, device_id: str) -> None:
|
||||
def _publish_one(dev_id: str, defn: str | dict[str, Any], suffix: str = "") -> None:
|
||||
# Grab this component's state topic
|
||||
topic = self.get_device_state_topic(dev_id, suffix)
|
||||
|
||||
# Shallow copy to avoid mutating source
|
||||
if isinstance(defn, dict):
|
||||
flat: dict[str, Any] = {k: v for k, v in defn.items() if k != "component_type"}
|
||||
|
||||
# Add metadata
|
||||
meta = self.states[dev_id].get("meta")
|
||||
if isinstance(meta, dict) and "last_update" in meta:
|
||||
flat["last_update"] = meta["last_update"]
|
||||
self.mqtt_safe_publish(topic, json.dumps(flat), retain=True)
|
||||
else:
|
||||
self.mqtt_safe_publish(topic, defn, retain=True)
|
||||
|
||||
if not self.is_discovered(device_id):
|
||||
self.logger.debug(f"[device state] Discovery not complete for {device_id} yet, holding off on sending state")
|
||||
return
|
||||
|
||||
states = self.states[device_id]
|
||||
_publish_one(device_id, states[self.get_component_type(device_id)])
|
||||
|
||||
# Publish any modes (0..n)
|
||||
modes = self.get_modes(device_id)
|
||||
for name, mode in modes.items():
|
||||
component_type = mode["component_type"]
|
||||
|
||||
# if no state yet, skip it
|
||||
if component_type not in states or (isinstance(states[component_type], dict) and name not in states[component_type]):
|
||||
continue
|
||||
|
||||
type_states = states[component_type][name] if isinstance(states[component_type], dict) else states[component_type]
|
||||
_publish_one(device_id, type_states, name)
|
||||
@ -1,185 +0,0 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Jeff Culverhouse
|
||||
from datetime import datetime
|
||||
import json
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from amcrest2mqtt.core import Amcrest2Mqtt
|
||||
from amcrest2mqtt.interface import AmcrestServiceProtocol
|
||||
|
||||
|
||||
class ServiceMixin:
|
||||
if TYPE_CHECKING:
|
||||
self: "AmcrestServiceProtocol"
|
||||
|
||||
def publish_service_discovery(self: Amcrest2Mqtt):
|
||||
app = self.get_device_block(self.service_slug, self.service_name)
|
||||
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.get_discovery_topic("binary_sensor", self.service_slug),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": self.service_name,
|
||||
"uniq_id": self.service_slug,
|
||||
"stat_t": self.get_service_topic("status"),
|
||||
"payload_on": "online",
|
||||
"payload_off": "offline",
|
||||
"device_class": "connectivity",
|
||||
"icon": "mdi:server",
|
||||
"device": app,
|
||||
"origin": {
|
||||
"name": self.service_name,
|
||||
"sw_version": self.config["version"],
|
||||
"support_url": "https://github.com/weirdtangent/amcrest2mqtt",
|
||||
},
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.get_discovery_topic("sensor", f"{self.service_slug}_api_calls"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} API Calls Today",
|
||||
"uniq_id": f"{self.service_slug}_api_calls",
|
||||
"stat_t": self.get_state_topic("service", "service", "api_calls"),
|
||||
"json_attr_t": self.get_attribute_topic("service", "service", "api_calls", "attributes"),
|
||||
"unit_of_measurement": "calls",
|
||||
"icon": "mdi:api",
|
||||
"state_class": "total_increasing",
|
||||
"device": app,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.get_discovery_topic("binary_sensor", f"{self.service_slug}_rate_limited"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} Rate Limited by Amcrest",
|
||||
"uniq_id": f"{self.service_slug}_rate_limited",
|
||||
"stat_t": self.get_state_topic("service", "service", "rate_limited"),
|
||||
"json_attr_t": self.get_attribute_topic("service", "service", "rate_limited", "attributes"),
|
||||
"payload_on": "yes",
|
||||
"payload_off": "no",
|
||||
"device_class": "problem",
|
||||
"icon": "mdi:speedometer-slow",
|
||||
"device": app,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.get_discovery_topic("number", f"{self.service_slug}_storage_refresh"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} Device Refresh Interval",
|
||||
"uniq_id": f"{self.service_slug}_storage_refresh",
|
||||
"stat_t": self.get_state_topic("service", "service", "storage_refresh"),
|
||||
"json_attr_t": self.get_attribute_topic("service", "service", "storage_refresh", "attributes"),
|
||||
"cmd_t": self.get_command_topic("service", "storage_refresh"),
|
||||
"unit_of_measurement": "s",
|
||||
"min": 1,
|
||||
"max": 3600,
|
||||
"step": 1,
|
||||
"icon": "mdi:timer-refresh",
|
||||
"device": app,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.get_discovery_topic("number", f"{self.service_slug}_device_list_refresh"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} Device List Refresh Interval",
|
||||
"uniq_id": f"{self.service_slug}_device_list_refresh",
|
||||
"stat_t": self.get_state_topic("service", "service", "device_list_refresh"),
|
||||
"json_attr_t": self.get_attribute_topic("service", "service", "device_list_refresh", "attributes"),
|
||||
"cmd_t": self.get_command_topic("service", "device_list_refresh"),
|
||||
"unit_of_measurement": "s",
|
||||
"min": 1,
|
||||
"max": 3600,
|
||||
"step": 1,
|
||||
"icon": "mdi:format-list-bulleted",
|
||||
"device": app,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.get_discovery_topic("number", f"{self.service_slug}_snapshot_refresh"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} Device Boost Refresh Interval",
|
||||
"uniq_id": f"{self.service_slug}_snapshot_refresh",
|
||||
"stat_t": self.get_state_topic("service", "service", "snapshot_refresh"),
|
||||
"json_attr_t": self.get_attribute_topic("service", "service", "snapshot_refresh", "attributes"),
|
||||
"cmd_t": self.get_command_topic("service", "snapshot_refresh"),
|
||||
"unit_of_measurement": "s",
|
||||
"min": 1,
|
||||
"max": 30,
|
||||
"step": 1,
|
||||
"icon": "mdi:lightning-bolt",
|
||||
"device": app,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.mqtt_safe_publish(
|
||||
topic=self.get_discovery_topic("button", f"{self.service_slug}_refresh_device_list"),
|
||||
payload=json.dumps(
|
||||
{
|
||||
"name": f"{self.service_name} Refresh Device List",
|
||||
"uniq_id": f"{self.service_slug}_refresh_device_list",
|
||||
"cmd_t": self.get_command_topic("service", "refresh_device_list", "command"),
|
||||
"payload_press": "refresh",
|
||||
"icon": "mdi:refresh",
|
||||
"device": app,
|
||||
}
|
||||
),
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
self.logger.debug(f"[HA] Discovery published for {self.service} ({self.service_slug})")
|
||||
|
||||
def publish_service_availability(self: Amcrest2Mqtt):
|
||||
self.mqtt_safe_publish(self.get_service_topic("status"), "online", qos=self.qos, retain=True)
|
||||
|
||||
def publish_service_state(self: Amcrest2Mqtt):
|
||||
service = {
|
||||
"state": "online",
|
||||
"api_calls": {
|
||||
"api_calls": self.get_api_calls(),
|
||||
"last_api_call": self.get_last_call_date(),
|
||||
},
|
||||
"rate_limited": "yes" if self.is_rate_limited() else "no",
|
||||
"storage_refresh": self.device_interval,
|
||||
"device_list_refresh": self.device_list_interval,
|
||||
"snapshot_refresh": self.device_boost_interval,
|
||||
}
|
||||
|
||||
for key, value in service.items():
|
||||
# Scalars like "state" -> just publish as is (but as a string)
|
||||
if not isinstance(value, dict):
|
||||
payload = str(value)
|
||||
else:
|
||||
payload = value.get(key)
|
||||
if isinstance(payload, datetime):
|
||||
payload = payload.isoformat()
|
||||
payload = json.dumps(payload)
|
||||
|
||||
self.mqtt_safe_publish(
|
||||
self.get_state_topic("service", "service", key),
|
||||
payload,
|
||||
qos=self.mqtt_config["qos"],
|
||||
retain=True,
|
||||
)
|
||||
@ -1,128 +1,52 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Jeff Culverhouse
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import TYPE_CHECKING, cast, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from amcrest2mqtt.core import Amcrest2Mqtt
|
||||
from amcrest2mqtt.interface import AmcrestServiceProtocol
|
||||
from amcrest2mqtt.interface import AmcrestServiceProtocol as Amcrest2Mqtt
|
||||
|
||||
|
||||
class TopicsMixin:
|
||||
if TYPE_CHECKING:
|
||||
self: "AmcrestServiceProtocol"
|
||||
|
||||
def get_new_client_id(self: Amcrest2Mqtt):
|
||||
return self.mqtt_config["prefix"] + "-" + "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
|
||||
|
||||
# Slug strings --------------------------------------------------------------------------------
|
||||
# Device properties --------------------------------------------------------------------------
|
||||
|
||||
def get_device_slug(self: Amcrest2Mqtt, device_id: str, type: Optional[str] = None) -> str:
|
||||
return "_".join(filter(None, [self.service_slug, device_id.replace(":", ""), type]))
|
||||
def get_device_name(self: Amcrest2Mqtt, device_id: str) -> str:
|
||||
return cast(str, self.devices[device_id]["component"]["device"]["name"])
|
||||
|
||||
def get_device_name_slug(self: Amcrest2Mqtt, device_id: str) -> str:
|
||||
return re.sub(r"[^a-zA-Z0-9]+", "_", self.get_device_name(device_id))
|
||||
|
||||
def get_vendor_device_slug(self: Amcrest2Mqtt, device_id):
|
||||
return f"{self.service_slug}-{device_id.replace(':', '')}"
|
||||
|
||||
# Topic strings -------------------------------------------------------------------------------
|
||||
|
||||
def get_service_device(self: Amcrest2Mqtt):
|
||||
return self.service
|
||||
|
||||
def get_service_topic(self: Amcrest2Mqtt, topic):
|
||||
return f"{self.service_slug}/status/{topic}"
|
||||
|
||||
def get_device_topic(self: Amcrest2Mqtt, component_type, device_id, *parts) -> str:
|
||||
if device_id == "service":
|
||||
return "/".join([self.service_slug, *map(str, parts)])
|
||||
|
||||
device_slug = self.get_device_slug(device_id)
|
||||
return "/".join([self.service_slug, component_type, device_slug, *map(str, parts)])
|
||||
|
||||
def get_discovery_topic(self: Amcrest2Mqtt, component, item) -> str:
|
||||
return f"{self.mqtt_config['discovery_prefix']}/{component}/{item}/config"
|
||||
|
||||
def get_state_topic(self: Amcrest2Mqtt, device_id, category, item=None) -> str:
|
||||
topic = f"{self.service_slug}/{category}" if device_id == "service" else f"{self.service_slug}/devices/{self.get_device_slug(device_id)}/{category}"
|
||||
return f"{topic}/{item}" if item else topic
|
||||
|
||||
def get_availability_topic(self: Amcrest2Mqtt, device_id, category="availability", item=None) -> str:
|
||||
topic = f"{self.service_slug}/{category}" if device_id == "service" else f"{self.service_slug}/devices/{self.get_device_slug(device_id)}/{category}"
|
||||
return f"{topic}/{item}" if item else topic
|
||||
|
||||
def get_attribute_topic(self: Amcrest2Mqtt, device_id, category, item, attribute) -> str:
|
||||
if device_id == "service":
|
||||
return f"{self.service_slug}/{category}/{item}/{attribute}"
|
||||
return re.sub(r"[^a-zA-Z0-9]+", "_", self.get_device_name(device_id).lower())
|
||||
|
||||
device_entry = self.devices.get(device_id, {})
|
||||
component = device_entry.get("component") or device_entry.get("component_type") or category
|
||||
return f"{self.mqtt_config['discovery_prefix']}/{component}/{self.get_device_slug(device_id)}/{item}/{attribute}"
|
||||
def get_component(self: Amcrest2Mqtt, device_id: str) -> dict[str, Any]:
|
||||
return cast(dict[str, Any], self.devices[device_id]["component"])
|
||||
|
||||
def get_command_topic(self: Amcrest2Mqtt, device_id, category, item=None, command="set") -> str:
|
||||
if device_id == "service":
|
||||
return f"{self.service_slug}/service/{category}/{item}"
|
||||
def get_component_type(self: Amcrest2Mqtt, device_id: str) -> str:
|
||||
return cast(str, self.devices[device_id]["component"].get("component_type", "unknown"))
|
||||
|
||||
# if category is not passed in, device must exist already
|
||||
if not category:
|
||||
category = self.devices[device_id]["component"]["component_type"]
|
||||
def get_modes(self: Amcrest2Mqtt, device_id: str) -> dict[str, Any]:
|
||||
return cast(dict[str, Any], self.devices[device_id]["modes"])
|
||||
|
||||
return f"{self.service_slug}/{category}/{self.get_device_slug(device_id)}/{item}/{command}"
|
||||
def get_mode(self: Amcrest2Mqtt, device_id: str, mode_name: str) -> dict[str, Any]:
|
||||
return cast(dict[str, Any], self.devices[device_id]["modes"][mode_name])
|
||||
|
||||
# Device propertiesi --------------------------------------------------------------------------
|
||||
def is_discovered(self: Amcrest2Mqtt, device_id: str) -> bool:
|
||||
return cast(bool, self.states[device_id]["internal"].get("discovered", False))
|
||||
|
||||
def get_device_name(self: Amcrest2Mqtt, device_id):
|
||||
return self.devices[device_id]["component"]["name"]
|
||||
|
||||
def get_component(self: Amcrest2Mqtt, device_id):
|
||||
return self.devices[device_id]["component"]
|
||||
|
||||
def get_component_type(self: Amcrest2Mqtt, device_id):
|
||||
return self.devices[device_id]["component"]["component_type"]
|
||||
|
||||
def get_modes(self: "Amcrest2Mqtt", device_id):
|
||||
return self.devices[device_id].get("modes", {})
|
||||
|
||||
def get_mode(self: "Amcrest2Mqtt", device_id, mode_name):
|
||||
modes = self.devices[device_id].get("modes", {})
|
||||
return modes.get(mode_name, {})
|
||||
|
||||
def get_last_update(self: "Amcrest2Mqtt", device_id: str) -> str:
|
||||
return self.states[device_id]["internal"].get("last_update", None)
|
||||
|
||||
def is_discovered(self: "Amcrest2Mqtt", device_id: str) -> bool:
|
||||
return self.states[device_id]["internal"].get("discovered", False)
|
||||
|
||||
def get_device_state_topic(self: "Amcrest2Mqtt", device_id, mode_name=None):
|
||||
def get_device_state_topic(self: Amcrest2Mqtt, device_id: str, mode_name: str = "") -> str:
|
||||
component = self.get_mode(device_id, mode_name) if mode_name else self.get_component(device_id)
|
||||
component_type = component["component_type"]
|
||||
|
||||
if component_type in ["camera", "image"]:
|
||||
return component.get("topic", None)
|
||||
else:
|
||||
return component.get("stat_t", component.get("state_topic", None))
|
||||
match component["component_type"]:
|
||||
case "camera":
|
||||
return cast(str, component["topic"])
|
||||
case "image":
|
||||
return cast(str, component["image_topic"])
|
||||
case _:
|
||||
return cast(str, component.get("stat_t") or component.get("state_topic"))
|
||||
|
||||
def get_device_availability_topic(self: Amcrest2Mqtt, device_id):
|
||||
def get_device_image_topic(self: Amcrest2Mqtt, device_id: str) -> str:
|
||||
component = self.get_component(device_id)
|
||||
return component.get("avty_t", component.get("availability_topic", None))
|
||||
|
||||
# Misc helpers --------------------------------------------------------------------------------
|
||||
|
||||
def get_device_block(self: Amcrest2Mqtt, id, name, vendor="Amcrest", sku=None):
|
||||
device = {"name": name, "identifiers": [id], "manufacturer": vendor}
|
||||
return cast(str, component["topic"])
|
||||
|
||||
if sku:
|
||||
device["model"] = sku
|
||||
|
||||
if name == self.service_name:
|
||||
device.update(
|
||||
{
|
||||
"suggested_area": "House",
|
||||
"manufacturer": "weirdTangent",
|
||||
"sw_version": self.config["version"],
|
||||
}
|
||||
)
|
||||
return device
|
||||
def get_device_availability_topic(self: Amcrest2Mqtt, device_id: str) -> str:
|
||||
component = self.get_component(device_id)
|
||||
return cast(str, component.get("avty_t") or component.get("availability_topic"))
|
||||
|
||||
@ -1,89 +0,0 @@
|
||||
import argparse
|
||||
from amcrest import AmcrestCamera
|
||||
from typing import Protocol, Optional, Any
|
||||
from amcrest2mqtt.core import Amcrest2Mqtt
|
||||
|
||||
# grep -ERh --exclude interface.py 'def\s+[^_]' src/ | sed -E "s/^[[:space:]]+//g" | awk '{ print " ", $0, "..." }' | sort
|
||||
|
||||
|
||||
class AmcrestServiceProtocol(Protocol):
|
||||
"""Common interface so mixins can type-hint against the full service."""
|
||||
|
||||
async def build_camera(self: Amcrest2Mqtt, device: str) -> str: ...
|
||||
async def build_component(self: Amcrest2Mqtt, device: dict) -> str: ...
|
||||
async def check_event_queue_loop(self: Amcrest2Mqtt): ...
|
||||
async def check_for_events(self: Amcrest2Mqtt) -> None: ...
|
||||
async def collect_all_device_events(self: Amcrest2Mqtt) -> None: ...
|
||||
async def collect_all_device_snapshots(self: Amcrest2Mqtt) -> None: ...
|
||||
async def collect_events_loop(self: Amcrest2Mqtt): ...
|
||||
async def collect_snapshots_loop(self: Amcrest2Mqtt): ...
|
||||
async def connect_to_devices(self: Amcrest2Mqtt) -> dict[str, Any]: ...
|
||||
async def device_loop(self: Amcrest2Mqtt): ...
|
||||
async def get_events_from_device(self, device_id: str) -> None: ...
|
||||
async def get_snapshot_from_device(self, device_id: str) -> str | None: ...
|
||||
async def main_loop(self: Amcrest2Mqtt): ...
|
||||
async def process_device_event(self, device_id: str, code: str, payload: Any): ...
|
||||
async def refresh_all_devices(self: Amcrest2Mqtt): ...
|
||||
async def setup_device_list(self: Amcrest2Mqtt) -> None: ...
|
||||
def build_device_states(self: Amcrest2Mqtt, device_id: str) -> None: ...
|
||||
def build_parser() -> argparse.ArgumentParser: ...
|
||||
def classify_device(self: Amcrest2Mqtt, device: str) -> str: ...
|
||||
def get_api_calls(self: Amcrest2Mqtt): ...
|
||||
def get_attribute_topic(self: Amcrest2Mqtt, device_id, category, item, attribute) -> str: ...
|
||||
def get_availability_topic(self: Amcrest2Mqtt, device_id, category="availability", item=None) -> str: ...
|
||||
def get_camera(self, host: str) -> AmcrestCamera: ...
|
||||
def get_command_topic(self: Amcrest2Mqtt, device_id, category, item=None, command="set") -> str: ...
|
||||
def get_component_type(self: Amcrest2Mqtt, device_id): ...
|
||||
def get_component(self: Amcrest2Mqtt, device_id): ...
|
||||
def get_device_availability_topic(self: Amcrest2Mqtt, device_id): ...
|
||||
def get_device_block(self: Amcrest2Mqtt, id, name, vendor="Amcrest", sku=None): ...
|
||||
def get_device_name(self: Amcrest2Mqtt, device_id): ...
|
||||
def get_device_slug(self: Amcrest2Mqtt, device_id: str, type: Optional[str] = None) -> str: ...
|
||||
def get_device_state_topic(self: "Amcrest2Mqtt", device_id, mode_name=None): ...
|
||||
def get_device_topic(self: Amcrest2Mqtt, component_type, device_id, *parts) -> str: ...
|
||||
def get_device(self, host: str, device_name: str) -> None: ...
|
||||
def get_discovery_topic(self: Amcrest2Mqtt, component, item) -> str: ...
|
||||
def get_ip_address(self: Amcrest2Mqtt, string: str) -> str: ...
|
||||
def get_last_call_date(self: Amcrest2Mqtt): ...
|
||||
def get_last_update(self: "Amcrest2Mqtt", device_id: str) -> str: ...
|
||||
def get_mode(self: "Amcrest2Mqtt", device_id, mode_name): ...
|
||||
def get_modes(self: "Amcrest2Mqtt", device_id): ...
|
||||
def get_motion_detection(self, device_id: str) -> bool: ...
|
||||
def get_new_client_id(self: Amcrest2Mqtt): ...
|
||||
def get_next_event(self: Amcrest2Mqtt) -> str | None: ...
|
||||
def get_privacy_mode(self, device_id: str) -> bool: ...
|
||||
def get_recorded_file(self, device_id: str, file: str) -> str | None: ...
|
||||
def get_service_device(self: Amcrest2Mqtt): ...
|
||||
def get_service_topic(self: Amcrest2Mqtt, topic): ...
|
||||
def get_snapshot(self, device_id: str) -> str | None: ...
|
||||
def get_state_topic(self: Amcrest2Mqtt, device_id, category, item=None) -> str: ...
|
||||
def get_storage_stats(self, device_id: str) -> dict[str, str]: ...
|
||||
def get_vendor_device_slug(self: Amcrest2Mqtt, device_id): ...
|
||||
def handle_device_command(self: Amcrest2Mqtt, device_id: str, handler: str, message: str) -> None: ...
|
||||
def handle_service_command(self: Amcrest2Mqtt, handler: str, message: str) -> None: ...
|
||||
def is_discovered(self: "Amcrest2Mqtt", device_id: str) -> bool: ...
|
||||
def is_ipv4(self: Amcrest2Mqtt, string: str) -> bool: ...
|
||||
def is_rate_limited(self: Amcrest2Mqtt): ...
|
||||
def load_config(self: Amcrest2Mqtt, config_arg: str = None, media_arg: str = None) -> list[str, Any]: ...
|
||||
def main(argv=None): ...
|
||||
def mqtt_on_connect(self: Amcrest2Mqtt, client, userdata, flags, reason_code, properties): ...
|
||||
def mqtt_on_disconnect(self: Amcrest2Mqtt, client, userdata, flags, reason_code, properties): ...
|
||||
def mqtt_on_log(self: Amcrest2Mqtt, client, userdata, paho_log_level, msg): ...
|
||||
def mqtt_on_message(self: Amcrest2Mqtt, client, userdata, msg): ...
|
||||
def mqtt_on_subscribe(self: Amcrest2Mqtt, client, userdata, mid, reason_code_list, properties): ...
|
||||
def mqtt_safe_publish(self: Amcrest2Mqtt, topic, payload, **kwargs): ...
|
||||
def mqttc_create(self: Amcrest2Mqtt): ...
|
||||
def publish_device_availability(self: Amcrest2Mqtt, device_id, online: bool = True): ...
|
||||
def publish_device_discovery(self: Amcrest2Mqtt, device_id: str) -> None: ...
|
||||
def publish_device_state(self: Amcrest2Mqtt, device_id: str) -> None: ...
|
||||
def publish_service_availability(self: Amcrest2Mqtt): ...
|
||||
def publish_service_discovery(self: Amcrest2Mqtt): ...
|
||||
def publish_service_state(self: Amcrest2Mqtt): ...
|
||||
def read_file(self: Amcrest2Mqtt, file_name: str) -> str: ...
|
||||
def rediscover_all(self: Amcrest2Mqtt): ...
|
||||
def safe_split_device(self: Amcrest2Mqtt, topic, segment): ...
|
||||
def set_motion_detection(self, device_id: str, switch: bool) -> str: ...
|
||||
def set_privacy_mode(self, device_id: str, switch: bool) -> str: ...
|
||||
def to_gb(self: Amcrest2Mqtt, total: [int]) -> str: ...
|
||||
def upsert_device(self: Amcrest2Mqtt, device_id: str, **kwargs: dict[str, Any] | str | int | bool) -> None: ...
|
||||
def upsert_state(self: Amcrest2Mqtt, device_id, **kwargs: dict[str, Any] | str | int | bool) -> None: ...
|
||||
Loading…
Reference in New Issue