refactor: simplify device setup; just use cmps instead of modes

pull/106/head
Jeff Culverhouse 3 months ago
parent 07b81abecf
commit 366ef61544

@ -1,5 +1,5 @@
# syntax=docker/dockerfile:1.7-labs
FROM python:3-slim
FROM python:3.14-slim
# ===== Project Variables =====
ARG APP_NAME=amcrest2mqtt

@ -1,6 +1,5 @@
from .mixins.helpers import HelpersMixin
from .mixins.mqtt import MqttMixin
from .mixins.topics import TopicsMixin
from .mixins.events import EventsMixin
from .mixins.publish import PublishMixin
from .mixins.amcrest import AmcrestMixin
@ -13,7 +12,6 @@ from .base import Base
class Amcrest2Mqtt(
HelpersMixin,
EventsMixin,
TopicsMixin,
PublishMixin,
AmcrestMixin,
AmcrestAPIMixin,

@ -41,7 +41,7 @@ class AmcrestServiceProtocol(Protocol):
snapshot_update_interval: int
states: dict[str, Any]
async def build_camera(self, device: dict) -> str: ...
async def build_camera(self, camera: dict) -> str: ...
async def build_component(self, device: dict) -> str: ...
async def build_device_states(self, device_id: str) -> bool: ...
async def check_event_queue_loop(self) -> None: ...
@ -107,8 +107,6 @@ class AmcrestServiceProtocol(Protocol):
def get_device_name(self, device_id: str) -> str: ...
def get_device_name_slug(self, device_id: str) -> str: ...
def get_device_state_topic(self, device_id: str, mode_name: str = "") -> str: ...
def get_mode(self, device_id: str, mode_name: str) -> dict[str, Any]: ...
def get_modes(self, device_id: str) -> dict[str, Any]: ...
def get_next_event(self) -> dict[str, Any] | None: ...
def get_recorded_file(self, device_id: str, file: str, encode: bool = True) -> str | None: ...
def get_snapshot(self, device_id: str) -> str | None: ...

@ -88,7 +88,6 @@ class AmcrestMixin:
rtc_url = f"http://{rtc_host}:{rtc_port}/{rtc_link}?src={rtc_source}"
device = {
"platform": "mqtt",
"stat_t": self.mqtt_helper.stat_t(device_id, "state"),
"avty_t": self.mqtt_helper.avty_t(device_id),
"device": {
@ -310,7 +309,7 @@ class AmcrestMixin:
"motion_region": "n/a",
},
)
self.upsert_device(device_id, component=device, modes={k: v for k, v in device["cmps"].items()})
self.upsert_device(device_id, component=device, cmps={k: v for k, v in device["cmps"].items()})
await self.build_device_states(device_id)
if not self.is_discovered(device_id):

@ -5,6 +5,7 @@ from deepmerge.merger import Merger
import ipaddress
import os
import pathlib
import re
import signal
import socket
import threading
@ -319,6 +320,42 @@ class HelpersMixin:
threading.Timer(5.0, _force_exit).start()
# Device properties --------------------------------------------------------------------------
def get_device_name(self: Amcrest2Mqtt, device_id: str) -> str:
return cast(str, self.devices[device_id]["component"]["device"]["name"])
def get_device_name_slug(self: Amcrest2Mqtt, device_id: str) -> str:
return re.sub(r"[^a-zA-Z0-9]+", "_", self.get_device_name(device_id).lower())
def get_component(self: Amcrest2Mqtt, device_id: str) -> dict[str, Any]:
return cast(dict[str, Any], self.devices[device_id]["component"])
def get_platform(self: Amcrest2Mqtt, device_id: str) -> str:
return cast(str, self.devices[device_id]["component"].get("platform", "unknown"))
def is_discovered(self: Amcrest2Mqtt, device_id: str) -> bool:
return cast(bool, self.states[device_id]["internal"].get("discovered", False))
def get_device_state_topic(self: Amcrest2Mqtt, device_id: str, mode_name: str = "") -> str:
component = self.get_component(device_id)["cmps"][f"{device_id}_{mode_name}"] if mode_name else self.get_component(device_id)
match component["platform"]:
case "camera":
return cast(str, component["topic"])
case "image":
return cast(str, component["image_topic"])
case _:
return cast(str, component.get("stat_t") or component.get("state_topic"))
def get_device_image_topic(self: Amcrest2Mqtt, device_id: str) -> str:
component = self.get_component(device_id)
return cast(str, component["topic"])
def get_device_availability_topic(self: Amcrest2Mqtt, device_id: str) -> str:
component = self.get_component(device_id)
return cast(str, component.get("avty_t") or component.get("availability_topic"))
# Upsert devices and states -------------------------------------------------------------------
def assert_no_tuples(self: Amcrest2Mqtt, data: Any, path: str = "root") -> None:

@ -15,7 +15,6 @@ class PublishMixin:
device_id = "service"
device = {
"platform": "mqtt",
"stat_t": self.mqtt_helper.stat_t(device_id, "service"),
"cmd_t": self.mqtt_helper.cmd_t(device_id),
"avty_t": self.mqtt_helper.avty_t(device_id),
@ -156,13 +155,12 @@ class PublishMixin:
# Devices -------------------------------------------------------------------------------------
async def publish_device_discovery(self: Amcrest2Mqtt, device_id: str) -> None:
component = self.get_component(device_id)
for slug, mode in self.get_modes(device_id).items():
component["cmps"][f"{device_id}_{slug}"] = mode
if self.is_discovered(device_id):
return
topic = self.mqtt_helper.disc_t("device", device_id)
payload = {k: v for k, v in component.items() if k != "platform"}
await asyncio.to_thread(self.mqtt_helper.safe_publish, topic, json.dumps(payload), retain=True)
component = self.get_component(device_id)
await asyncio.to_thread(self.mqtt_helper.safe_publish, topic, json.dumps(component), retain=True)
self.upsert_state(device_id, internal={"discovered": True})
async def publish_device_availability(self: Amcrest2Mqtt, device_id: str, online: bool = True) -> None:

@ -1,52 +0,0 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Jeff Culverhouse
import re
from typing import TYPE_CHECKING, cast, Any
if TYPE_CHECKING:
from amcrest2mqtt.interface import AmcrestServiceProtocol as Amcrest2Mqtt
class TopicsMixin:
# Device properties --------------------------------------------------------------------------
def get_device_name(self: Amcrest2Mqtt, device_id: str) -> str:
return cast(str, self.devices[device_id]["component"]["device"]["name"])
def get_device_name_slug(self: Amcrest2Mqtt, device_id: str) -> str:
return re.sub(r"[^a-zA-Z0-9]+", "_", self.get_device_name(device_id).lower())
def get_component(self: Amcrest2Mqtt, device_id: str) -> dict[str, Any]:
return cast(dict[str, Any], self.devices[device_id]["component"])
def get_platform(self: Amcrest2Mqtt, device_id: str) -> str:
return cast(str, self.devices[device_id]["component"].get("platform", "unknown"))
def get_modes(self: Amcrest2Mqtt, device_id: str) -> dict[str, Any]:
return cast(dict[str, Any], self.devices[device_id]["modes"])
def get_mode(self: Amcrest2Mqtt, device_id: str, mode_name: str) -> dict[str, Any]:
return cast(dict[str, Any], self.devices[device_id]["modes"][mode_name])
def is_discovered(self: Amcrest2Mqtt, device_id: str) -> bool:
return cast(bool, self.states[device_id]["internal"].get("discovered", False))
def get_device_state_topic(self: Amcrest2Mqtt, device_id: str, mode_name: str = "") -> str:
component = self.get_mode(device_id, mode_name) if mode_name else self.get_component(device_id)
match component["platform"]:
case "camera":
return cast(str, component["topic"])
case "image":
return cast(str, component["image_topic"])
case _:
return cast(str, component.get("stat_t") or component.get("state_topic"))
def get_device_image_topic(self: Amcrest2Mqtt, device_id: str) -> str:
component = self.get_component(device_id)
return cast(str, component["topic"])
def get_device_availability_topic(self: Amcrest2Mqtt, device_id: str) -> str:
component = self.get_component(device_id)
return cast(str, component.get("avty_t") or component.get("availability_topic"))
Loading…
Cancel
Save