refactor: simplify device discovery and state publication

pull/106/head
Jeff Culverhouse 3 months ago
parent 516e2b5638
commit 3d412f20f7

@ -101,15 +101,10 @@ class AmcrestServiceProtocol(Protocol):
def b_to_gb(self, total: int) -> float: ...
def b_to_mb(self, total: int) -> float: ...
def classify_device(self, device: dict) -> str: ...
def get_platform(self, device_id: str) -> str: ...
def get_component(self, device_id: str) -> dict[str, Any]: ...
def get_device_availability_topic(self, device_id: str) -> str: ...
def get_device_image_topic(self, device_id: str) -> str: ...
def get_device_name(self, device_id: str) -> str: ...
def get_device_name_slug(self, device_id: str) -> str: ...
def get_device_state_topic(self, device_id: str, mode_name: str = "") -> str: ...
def get_next_event(self) -> dict[str, Any] | None: ...
def get_snapshot(self, device_id: str) -> str | None: ...
def handle_signal(self, signum: int, _: FrameType | None) -> Any: ...
def heartbeat_ready(self) -> None: ...
def increase_api_calls(self) -> None: ...

@ -115,16 +115,52 @@ class AmcrestMixin:
"origin": {"name": self.service_name, "sw": self.config["version"], "support_url": "https://github.com/weirdTangent/amcrest2mqtt"},
"qos": self.qos,
"cmps": {
"video": {
"platform": "camera",
"name": "Video",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "video"),
"topic": self.mqtt_helper.stat_t(device_id, "camera", "video"),
"camera": {
"p": "camera",
"name": "Camera",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "camera"),
"topic": self.mqtt_helper.stat_t(device_id, "camera", "snapshot"),
"sup_str": True,
"str_src": rtc_url,
"image_encoding": "b64",
"icon": "mdi:video",
"web_url": rtc_url,
},
"snapshot": {
"p": "image",
"name": "Snapshot",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "snapshot"),
"image_topic": self.mqtt_helper.stat_t(device_id, "camera", "snapshot"),
"image_encoding": "b64",
"icon": "mdi:camera",
},
"motion": {
"p": "binary_sensor",
"name": "Motion",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "motion"),
"stat_t": self.mqtt_helper.stat_t(device_id, "binary_sensor", "motion"),
"jsn_atr_t": self.mqtt_helper.stat_t(device_id, "attributes"),
"payload_on": True,
"payload_off": False,
"device_class": "motion",
"icon": "mdi:eye-outline",
},
"motion_region": {
"p": "sensor",
"name": "Motion region",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "motion_region"),
"stat_t": self.mqtt_helper.stat_t(device_id, "sensor", "motion_region"),
"icon": "mdi:map-marker",
},
"motion_snapshot": {
"p": "image",
"name": "Motion snapshot",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "motion_snapshot"),
"image_topic": self.mqtt_helper.stat_t(device_id, "image", "motion_snapshot"),
"image_encoding": "b64",
"icon": "mdi:camera",
},
"reboot": {
"platform": "button",
"p": "button",
"name": "Reboot",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "reboot"),
"cmd_t": self.mqtt_helper.cmd_t(device_id, "button", "reboot"),
@ -132,25 +168,8 @@ class AmcrestMixin:
"icon": "mdi:restart",
"entity_category": "diagnostic",
},
"snapshot": {
"platform": "image",
"name": "Timed snapshot",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "snapshot"),
"image_topic": self.mqtt_helper.stat_t(device_id, "image", "snapshot"),
"image_encoding": "b64",
"content_type": "image/jpeg",
"icon": "mdi:camera",
},
"recording": {
"platform": "sensor",
"name": "Last recording",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "recording"),
"stat_t": self.mqtt_helper.stat_t(device_id, "sensor", "recording"),
"device_class": "timestamp",
"icon": "mdi:clock",
},
"privacy": {
"platform": "switch",
"p": "switch",
"name": "Privacy mode",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "privacy"),
"stat_t": self.mqtt_helper.stat_t(device_id, "switch", "privacy"),
@ -161,7 +180,7 @@ class AmcrestMixin:
"icon": "mdi:camera-outline",
},
"motion_detection": {
"platform": "switch",
"p": "switch",
"name": "Motion detection",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "motion_detection"),
"stat_t": self.mqtt_helper.stat_t(device_id, "switch", "motion_detection"),
@ -171,8 +190,15 @@ class AmcrestMixin:
"device_class": "switch",
"icon": "mdi:motion-sensor",
},
"event_text": {
"p": "sensor",
"name": "Last event",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "event_text"),
"stat_t": self.mqtt_helper.stat_t(device_id, "sensor", "event_text"),
"icon": "mdi:note",
},
"save_recordings": {
"platform": "switch",
"p": "switch",
"name": "Save recordings",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "save_recordings"),
"stat_t": self.mqtt_helper.stat_t(device_id, "switch", "save_recordings"),
@ -182,34 +208,8 @@ class AmcrestMixin:
"device_class": "switch",
"icon": "mdi:content-save-outline",
},
"motion": {
"platform": "binary_sensor",
"name": "Motion sensor",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "motion"),
"stat_t": self.mqtt_helper.stat_t(device_id, "binary_sensor", "motion"),
"payload_on": True,
"payload_off": False,
"device_class": "motion",
"icon": "mdi:eye-outline",
},
"motion_region": {
"platform": "sensor",
"name": "Motion region",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "motion_region"),
"stat_t": self.mqtt_helper.stat_t(device_id, "sensor", "motion_region"),
"icon": "mdi:map-marker",
},
"motion_snapshot": {
"platform": "image",
"name": "Motion snapshot",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "motion_snapshot"),
"image_topic": self.mqtt_helper.stat_t(device_id, "image", "motion_snapshot"),
"image_encoding": "b64",
"content_type": "image/jpeg",
"icon": "mdi:camera",
},
"storage_used": {
"platform": "sensor",
"p": "sensor",
"name": "Storage used",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "storage_used"),
"stat_t": self.mqtt_helper.stat_t(device_id, "sensor", "storage_used"),
@ -220,7 +220,7 @@ class AmcrestMixin:
"icon": "mdi:micro-sd",
},
"storage_used_pct": {
"platform": "sensor",
"p": "sensor",
"name": "Storage used %",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "storage_used_pct"),
"stat_t": self.mqtt_helper.stat_t(device_id, "sensor", "storage_used_pct"),
@ -230,7 +230,7 @@ class AmcrestMixin:
"icon": "mdi:micro-sd",
},
"storage_total": {
"platform": "sensor",
"p": "sensor",
"name": "Storage total",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "storage_total"),
"stat_t": self.mqtt_helper.stat_t(device_id, "sensor", "storage_total"),
@ -240,38 +240,21 @@ class AmcrestMixin:
"entity_category": "diagnostic",
"icon": "mdi:micro-sd",
},
"event_text": {
"platform": "sensor",
"name": "Last event",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "event_text"),
"stat_t": self.mqtt_helper.stat_t(device_id, "sensor", "event_text"),
"icon": "mdi:note",
},
"event": {
"platform": "sensor",
"name": "Last event time",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "event"),
"stat_t": self.mqtt_helper.stat_t(device_id, "sensor", "event"),
"device_class": "timestamp",
"icon": "mdi:clock",
},
},
}
if "media" in self.config and "media_source" in self.config["media"]:
device["cmps"]["recording_url"] = {
"platform": "sensor",
"p": "sensor",
"name": "Recording url",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "recording_url"),
"stat_t": self.mqtt_helper.stat_t(device_id, "sensor", "recording_url"),
"clip_url": f"{self.config["media"]["media_source"]}/{camera["device_name"]}-latest.mp4",
"icon": "mdi:web",
"enabled_by_default": False,
}
if camera.get("is_doorbell", None):
device["cmps"]["doorbell"] = {
"platform": "binary_sensor",
"p": "binary_sensor",
"name": "Doorbell" if camera["device_name"] == "Doorbell" else f"{camera["device_name"]} Doorbell",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "doorbell"),
"stat_t": self.mqtt_helper.stat_t(device_id, "binary_sensor", "doorbell"),
@ -282,7 +265,7 @@ class AmcrestMixin:
if camera.get("is_ad410", None):
device["cmps"]["human"] = {
"platform": "binary_sensor",
"p": "binary_sensor",
"name": "Human Sensor",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "human"),
"stat_t": self.mqtt_helper.stat_t(device_id, "binary_sensor", "human"),
@ -291,9 +274,19 @@ class AmcrestMixin:
"icon": "mdi:person",
}
self.upsert_state(device_id, internal={})
self.upsert_device(device_id, component=device, cmps={k: v for k, v in device["cmps"].items()})
await self.build_device_states(device_id)
self.upsert_device(device_id, component=device)
# initial states because many of these won't update until something happens
# or this is the only time we'll ever set them
self.upsert_state(
device_id,
internal={},
webrtc=rtc_url,
switch={"save_recordings": "ON" if "path" in self.config["media"] else "OFF"},
binary_sensor={"motion": False},
sensor={"motion_region": ""},
attributes={"recording_url": f"{self.config["media"]["media_source"]}/{camera["device_name"]}-latest.mp4"},
image={"motion_snapshot": ""},
)
if not self.is_discovered(device_id):
self.logger.info(f'added new camera: "{camera["device_name"]}" {camera["vendor"]} {camera["device_type"]}] ({device_id})')

@ -194,9 +194,9 @@ class AmcrestAPIMixin:
# return our last known state if something fails
current: dict[str, str | float] = (
{
"used_percent": states["sensor"]["storage_used_pct"],
"used": states["sensor"]["storage_used"],
"total": states["sensor"]["storage_total"],
"used_percent": states["sensor"].get("storage_used_pct", 0),
"used": states["sensor"].get("storage_used", 0),
"total": states["sensor"].get("storage_total", 0),
}
if "sensor" in states
else {}
@ -361,9 +361,9 @@ class AmcrestAPIMixin:
return None
device = self.amcrest_devices[device_id]
timeout = 20
timeout = 10
max_tries = 3
base_backoff = 15
base_backoff = 5
# Respect privacy mode (default False if missing)
if device.get("privacy_mode", False):
@ -392,6 +392,7 @@ class AmcrestAPIMixin:
encoded = encoded_b.decode("ascii")
self.upsert_state(
device_id,
camera={"snapshot": encoded},
image={"snapshot": encoded},
)
await self.publish_device_state(device_id)
@ -413,13 +414,6 @@ class AmcrestAPIMixin:
self.logger.info(f"getting snapshot failed after {max_tries} tries for {self.get_device_name(device_id)}")
return None
def get_snapshot(self: Amcrest2Mqtt, device_id: str) -> str | None:
if device_id not in self.amcrest_devices:
self.logger.warning(f"device not found for {device_id}")
return None
return self.amcrest_devices[device_id]["snapshot"] if "snapshot" in self.devices[device_id] else None
# Recorded file -------------------------------------------------------------------------------
async def get_recorded_file(self: Amcrest2Mqtt, device_id: str, file: str, encode: bool = True) -> str | None:

@ -2,7 +2,6 @@
# Copyright (c) 2025 Jeff Culverhouse
import asyncio
from typing import TYPE_CHECKING
from datetime import datetime, timezone
if TYPE_CHECKING:
from amcrest2mqtt.interface import AmcrestServiceProtocol as Amcrest2Mqtt
@ -23,44 +22,29 @@ class EventsMixin:
# if one of our known sensors
if event in ["motion", "human", "doorbell", "recording", "privacy_mode", "Reboot"]:
if event == "recording":
if event == "recording" and "file" in payload:
self.logger.debug(f'recording event for "{self.get_device_name(device_id)}": {payload["file"]}')
if payload["file"].endswith(".jpg"):
image = await self.get_recorded_file(device_id, payload["file"])
if image:
if self.upsert_state(
device_id,
camera={"eventshot": image},
sensor={"event_time": datetime.now(timezone.utc).isoformat()},
):
needs_publish.add(device_id)
event += ": snapshot"
needs_publish.add(device_id)
event += ": snapshot"
elif payload["file"].endswith(".mp4"):
if "path" in self.config["media"] and self.states[device_id]["switch"].get("save_recordings", "OFF") == "ON":
await self.store_recording_in_media(device_id, payload["file"])
event += ": video"
elif event == "motion":
region = payload["region"] if payload["state"] != "off" else "n/a"
if payload["file"].endswith(".jpg"):
image = await self.get_recorded_file(device_id, payload["file"])
if image:
if self.upsert_state(
device_id,
camera={"eventshot": image},
sensor={"event_time": datetime.now(timezone.utc).isoformat()},
):
needs_publish.add(device_id)
event += ": snapshot"
elif payload["file"].endswith(".mp4"):
if "path" in self.config["media"] and self.states[device_id]["switch"].get("save_recordings", "OFF") == "ON":
await self.store_recording_in_media(device_id, payload["file"])
event += ": video"
if self.upsert_state(
region = payload["region"] if payload["state"] != "off" else ""
motion = f": {region}" if region else f": {payload["state"]}"
self.upsert_state(
device_id,
binary_sensor={"motion": payload["state"]},
sensor={"motion_region": region, "event_time": datetime.now(timezone.utc).isoformat()},
):
needs_publish.add(device_id)
event += f": ({region}) - {payload["state"]}"
sensor={"motion_region": region},
)
needs_publish.add(device_id)
event += motion
else:
if isinstance(payload, str):
event += ": " + payload
@ -76,17 +60,13 @@ class EventsMixin:
needs_publish.add(device_id)
# record just these "events": text and time
if self.upsert_state(
device_id,
sensor={
"event_text": event,
"event_time": datetime.now(timezone.utc).isoformat(),
},
):
needs_publish.add(device_id)
self.upsert_state(device_id, sensor={"event_text": event})
needs_publish.add(device_id)
self.logger.debug(f'processed event for "{self.get_device_name(device_id)}": {event} with {payload}')
else:
self.logger.debug(f'ignored event for "{self.get_device_name(device_id)}": {event} with {payload}')
# we ignore these on purpose, but log if something unexpected comes through
if event not in ["NtpAdjustTime", "TimeChange", "RtspSessionDisconnect"]:
self.logger.debug(f'ignored unexpected event for "{self.get_device_name(device_id)}": {event} with {payload}')
tasks = [self.publish_device_state(device_id) for device_id in needs_publish]
if tasks:

@ -268,9 +268,6 @@ class HelpersMixin:
except PermissionError as err:
self.logger.error(f"permission error saving recording to {file_path}: {err!r}")
return None
except IOError as err:
self.logger.error(f"failed to save recording to {file_path}: {err!r}")
return None
except Exception as err:
self.logger.error(f"failed to save recording to {file_path}: {err!r}")
return None
@ -337,31 +334,9 @@ class HelpersMixin:
def get_component(self: Amcrest2Mqtt, device_id: str) -> dict[str, Any]:
return cast(dict[str, Any], self.devices[device_id]["component"])
def get_platform(self: Amcrest2Mqtt, device_id: str) -> str:
return cast(str, self.devices[device_id]["component"].get("platform", "unknown"))
def is_discovered(self: Amcrest2Mqtt, device_id: str) -> bool:
return cast(bool, self.states[device_id]["internal"].get("discovered", False))
def get_device_state_topic(self: Amcrest2Mqtt, device_id: str, mode_name: str = "") -> str:
component = self.get_component(device_id)["cmps"][f"{device_id}_{mode_name}"] if mode_name else self.get_component(device_id)
match component["platform"]:
case "camera":
return cast(str, component["topic"])
case "image":
return cast(str, component["image_topic"])
case _:
return cast(str, component.get("stat_t") or component.get("state_topic"))
def get_device_image_topic(self: Amcrest2Mqtt, device_id: str) -> str:
component = self.get_component(device_id)
return cast(str, component["topic"])
def get_device_availability_topic(self: Amcrest2Mqtt, device_id: str) -> str:
component = self.get_component(device_id)
return cast(str, component.get("avty_t") or component.get("availability_topic"))
# Upsert devices and states -------------------------------------------------------------------
def assert_no_tuples(self: Amcrest2Mqtt, data: Any, path: str = "root") -> None:

@ -34,7 +34,7 @@ class PublishMixin:
"qos": self.qos,
"cmps": {
"server": {
"platform": "binary_sensor",
"p": "binary_sensor",
"name": self.service_name,
"uniq_id": self.mqtt_helper.svc_unique_id("server"),
"stat_t": self.mqtt_helper.stat_t(device_id, "service", "server"),
@ -45,7 +45,7 @@ class PublishMixin:
"icon": "mdi:server",
},
"api_calls": {
"platform": "sensor",
"p": "sensor",
"name": "API calls today",
"uniq_id": self.mqtt_helper.svc_unique_id("api_calls"),
"stat_t": self.mqtt_helper.stat_t(device_id, "service", "api_calls"),
@ -55,7 +55,7 @@ class PublishMixin:
"icon": "mdi:api",
},
"rate_limited": {
"platform": "binary_sensor",
"p": "binary_sensor",
"name": "Rate limited",
"uniq_id": self.mqtt_helper.svc_unique_id("rate_limited"),
"stat_t": self.mqtt_helper.stat_t(device_id, "service", "rate_limited"),
@ -66,7 +66,7 @@ class PublishMixin:
"icon": "mdi:speedometer-slow",
},
"last_call": {
"platform": "sensor",
"p": "sensor",
"name": "Last device check",
"uniq_id": self.mqtt_helper.svc_unique_id("last_call"),
"stat_t": self.mqtt_helper.stat_t(device_id, "service", "last_call"),
@ -75,7 +75,7 @@ class PublishMixin:
"icon": "mdi:clock-outline",
},
"refresh_interval": {
"platform": "number",
"p": "number",
"name": "Refresh interval",
"uniq_id": self.mqtt_helper.svc_unique_id("refresh_interval"),
"stat_t": self.mqtt_helper.stat_t(device_id, "service", "refresh_interval"),
@ -88,7 +88,7 @@ class PublishMixin:
"mode": "box",
},
"storage_interval": {
"platform": "number",
"p": "number",
"name": "Storage interval",
"uniq_id": self.mqtt_helper.svc_unique_id("storage_interval"),
"stat_t": self.mqtt_helper.stat_t(device_id, "service", "storage_interval"),
@ -101,7 +101,7 @@ class PublishMixin:
"mode": "box",
},
"snapshot_interval": {
"platform": "number",
"p": "number",
"name": "Snapshot interval",
"uniq_id": self.mqtt_helper.svc_unique_id("snapshot_interval"),
"stat_t": self.mqtt_helper.stat_t(device_id, "service", "snapshot_interval"),
@ -117,7 +117,7 @@ class PublishMixin:
}
topic = self.mqtt_helper.disc_t("device", device_id)
payload = {k: v for k, v in device.items() if k != "platform"}
payload = {k: v for k, v in device.items() if k != "p"}
await asyncio.to_thread(self.mqtt_helper.safe_publish, topic, json.dumps(payload))
self.upsert_state(device_id, internal={"discovered": True})
@ -153,25 +153,19 @@ class PublishMixin:
# Devices -------------------------------------------------------------------------------------
async def publish_device_discovery(self: Amcrest2Mqtt, device_id: str) -> None:
if self.is_discovered(device_id):
return
topic = self.mqtt_helper.disc_t("device", device_id)
component = self.get_component(device_id)
await asyncio.to_thread(self.mqtt_helper.safe_publish, topic, json.dumps(component))
payload = json.dumps(self.devices[device_id]["component"])
await asyncio.to_thread(self.mqtt_helper.safe_publish, topic, payload)
self.upsert_state(device_id, internal={"discovered": True})
async def publish_device_availability(self: Amcrest2Mqtt, device_id: str, online: bool = True) -> None:
topic = self.mqtt_helper.avty_t(device_id)
payload = "online" if online else "offline"
avty_t = self.get_device_availability_topic(device_id)
await asyncio.to_thread(self.mqtt_helper.safe_publish, avty_t, payload)
await asyncio.to_thread(self.mqtt_helper.safe_publish, topic, payload)
async def publish_device_state(self: Amcrest2Mqtt, device_id: str, subject: str = "", sub: str = "") -> None:
if not self.is_discovered(device_id):
self.logger.debug(f"discovery not complete for {device_id} yet, holding off on sending state")
return
for state, value in self.states[device_id].items():
if subject and state != subject:
continue

Loading…
Cancel
Save