From 9eb890bad8c4ded311e65654f2dfb09f23f33257 Mon Sep 17 00:00:00 2001 From: Jeff Culverhouse Date: Sun, 26 Oct 2025 16:12:51 -0600 Subject: [PATCH] feature: added media option to store event recordings on filesystem --- .gitignore | 3 +- README.md | 10 +- config.yaml.sample | 4 + src/amcrest2mqtt/app.py | 9 +- src/amcrest2mqtt/core.py | 6 +- src/amcrest2mqtt/mixins/amcrest.py | 88 +++++++++-- src/amcrest2mqtt/mixins/amcrest_api.py | 20 ++- src/amcrest2mqtt/mixins/events.py | 5 +- src/amcrest2mqtt/mixins/helpers.py | 202 ++++++++++++++++++++++++- src/amcrest2mqtt/mixins/mqtt.py | 35 +++-- src/amcrest2mqtt/mixins/topics.py | 10 +- src/amcrest2mqtt/mixins/util.py | 149 ------------------ src/interface.py | 8 +- 13 files changed, 338 insertions(+), 211 deletions(-) delete mode 100644 src/amcrest2mqtt/mixins/util.py diff --git a/.gitignore b/.gitignore index 429b32a..f8ee84b 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,8 @@ venv/ # Local testing and notes config config/ -config.yaml +media +media/ npm-debug.log NOTES coverage/ diff --git a/README.md b/README.md index 27233a2..5888d26 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,10 @@ Uses the [`python-amcrest`](https://github.com/tchellomello/python-amcrest) libr Forked from [dchesterton/amcrest2mqtt](https://github.com/dchesterton/amcrest2mqtt) -A few notes: -* "Rediscover" button added to service - when pressed, device discovery is re-run so HA will rediscover deleted devices +UPDATES: +* 10/2025 Added a "media" config where mp4 recordings of motions events can be stored +* 10/2025 "Rediscover" button added to service - when pressed, device discovery is re-run so HA will rediscover deleted devices + ## Docker For `docker-compose`, use the [configuration included](https://github.com/weirdtangent/amcrest2mqtt/blob/master/docker-compose.yaml) in this repository. @@ -37,9 +39,11 @@ It supports the following environment variables: - `MQTT_TLS_CERT` (required if using TLS) - path to the private cert - `MQTT_TLS_KEY` (required if using TLS) - path to the private key - `MQTT_PREFIX` (optional, default = amgrest2mqtt) -- `MQTT_HOMEASSISTANT` (optional, default = true) - `MQTT_DISCOVERY_PREFIX` (optional, default = 'homeassistant') +- `MEDIA_PATH` (optional) - path to store motion recordings (mp4) files +- `MEDIA_SOURCE` (optional) - HomeAssistant url for accessing those recordings (see config.yaml.sample) + - `TZ` (required, timezone identifier, see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List) - `STORAGE_UPDATE_INTERVAL` (optional, default = 900) - how often to fetch storage stats (in seconds) - `SNAPSHOT_UPDATE_INTERVAL` (optional, default = 60) - how often to fetch camera snapshot (in seconds) diff --git a/config.yaml.sample b/config.yaml.sample index 590dedd..9908af7 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -31,5 +31,9 @@ amcrest: - FrontYard - Patio +media: + path: /media + media_source: media-source://media_source/local/videos + timezone: America/New_York hide_ts: False diff --git a/src/amcrest2mqtt/app.py b/src/amcrest2mqtt/app.py index 223ea9f..fb3902b 100644 --- a/src/amcrest2mqtt/app.py +++ b/src/amcrest2mqtt/app.py @@ -12,10 +12,11 @@ from .core import Amcrest2Mqtt def build_parser() -> argparse.ArgumentParser: - p = argparse.ArgumentParser(prog="govee2mqtt", exit_on_error=True) + p = argparse.ArgumentParser(prog="amcrest2mqtt", exit_on_error=True) p.add_argument( "-c", "--config", + default="/config", help="Directory or file path for config.yaml (defaults to /config/config.yaml)", ) return p @@ -39,15 +40,11 @@ def main(argv=None): loop.run_until_complete(amcrest2mqtt.main_loop()) else: raise - except TypeError as e: - logger.error(f"TypeError: {e}", exc_info=True) - except ValueError as e: - logger.error(f"ValueError: {e}", exc_info=True) except KeyboardInterrupt: logger.warning("Shutdown requested (Ctrl+C). Exiting gracefully...") except asyncio.CancelledError: logger.warning("Main loop cancelled.") except Exception as e: - logger.exception(f"Unhandled exception in main loop: {e}", exc_info=True) + logger.error(f"unhandled exception: {e}", exc_info=True) finally: logger.info("amcrest2mqtt stopped.") diff --git a/src/amcrest2mqtt/core.py b/src/amcrest2mqtt/core.py index 6649c72..c97845e 100644 --- a/src/amcrest2mqtt/core.py +++ b/src/amcrest2mqtt/core.py @@ -1,4 +1,4 @@ -from .mixins.util import UtilMixin +from .mixins.helpers import HelpersMixin from .mixins.mqtt import MqttMixin from .mixins.topics import TopicsMixin from .mixins.events import EventsMixin @@ -6,20 +6,18 @@ from .mixins.service import ServiceMixin from .mixins.amcrest import AmcrestMixin from .mixins.amcrest_api import AmcrestAPIMixin from .mixins.refresh import RefreshMixin -from .mixins.helpers import HelpersMixin from .mixins.loops import LoopsMixin from .base import Base class Amcrest2Mqtt( - UtilMixin, + HelpersMixin, EventsMixin, TopicsMixin, ServiceMixin, AmcrestMixin, AmcrestAPIMixin, RefreshMixin, - HelpersMixin, LoopsMixin, MqttMixin, Base, diff --git a/src/amcrest2mqtt/mixins/amcrest.py b/src/amcrest2mqtt/mixins/amcrest.py index b82d89b..24a3d61 100644 --- a/src/amcrest2mqtt/mixins/amcrest.py +++ b/src/amcrest2mqtt/mixins/amcrest.py @@ -60,7 +60,7 @@ class AmcrestMixin: "avty_t": self.get_state_topic(device_id, "attributes"), "avty_tpl": "{{ value_json.camera }}", "json_attributes_topic": self.get_state_topic(device_id, "attributes"), - "icon": "mdi:camera", + "icon": "mdi:video", "via_device": self.get_service_device(), "device": { "name": device["device_name"], @@ -95,10 +95,9 @@ class AmcrestMixin: device["device_type"], ) - modes["snapshot"] = { "component_type": "image", - "name": "Snapshot", + "name": "Timed snapshot", "uniq_id": f"{self.service_slug}_{self.get_device_slug(device_id, 'snapshot')}", "topic": self.get_state_topic(device_id, "snapshot"), "image_encoding": "b64", @@ -110,7 +109,34 @@ class AmcrestMixin: "via_device": self.get_service_device(), "device": device_block, } - self.upsert_state(device_id, image={"snapshot": None}) + + modes["recording_time"] = { + "component_type": "sensor", + "name": "Recording time", + "uniq_id": f"{self.service_slug}_{self.get_device_slug(device_id, 'recording_time')}", + "stat_t": self.get_state_topic(device_id, "recording_time"), + "avty_t": self.get_state_topic(device_id, "attributes"), + "avty_tpl": "{{ value_json.camera }}", + "json_attributes_topic": self.get_state_topic(device_id, "attributes"), + "device_class": "timestamp", + "icon": "mdi:clock", + "via_device": self.get_service_device(), + "device": device_block, + } + + modes["recording_url"] = { + "component_type": "sensor", + "name": "Recording url", + "uniq_id": f"{self.service_slug}_{self.get_device_slug(device_id, 'recording_url')}", + "stat_t": self.get_state_topic(device_id, "recording_url"), + "avty_t": self.get_state_topic(device_id, "attributes"), + "avty_tpl": "{{ value_json.camera }}", + "clip_url": f"media-source://media_source/local/Videos/amcrest/{device["device_name"]}-latest.mp4", + "json_attributes_topic": self.get_state_topic(device_id, "attributes"), + "icon": "mdi:web", + "via_device": self.get_service_device(), + "device": device_block, + } modes["privacy"] = { "component_type": "switch", @@ -119,11 +145,11 @@ class AmcrestMixin: "stat_t": self.get_state_topic(device_id, "switch", "privacy"), "avty_t": self.get_state_topic(device_id, "attributes"), "avty_tpl": "{{ value_json.camera }}", - "cmd_t": self.get_command_topic(device_id, "switch"), + "cmd_t": self.get_command_topic(device_id, "switch", "privacy"), "payload_on": "ON", "payload_off": "OFF", "device_class": "switch", - "icon": "mdi:camera-off", + "icon": "mdi:camera-outline", "via_device": self.get_service_device(), "device": device_block, } @@ -135,7 +161,7 @@ class AmcrestMixin: "stat_t": self.get_state_topic(device_id, "switch", "motion_detection"), "avty_t": self.get_state_topic(device_id, "attributes"), "avty_tpl": "{{ value_json.camera }}", - "cmd_t": self.get_command_topic(device_id, "switch"), + "cmd_t": self.get_command_topic(device_id, "switch", "motion_detection"), "payload_on": "ON", "payload_off": "OFF", "device_class": "switch", @@ -144,6 +170,22 @@ class AmcrestMixin: "device": device_block, } + modes["save_recordings"] = { + "component_type": "switch", + "name": "Save recordings", + "uniq_id": f"{self.service_slug}_{self.get_device_slug(device_id, 'save_recordings')}", + "stat_t": self.get_state_topic(device_id, "switch", "save_recordings"), + "avty_t": self.get_state_topic(device_id, "internal"), + "avty_tpl": "{{ value_json.media_path }}", + "cmd_t": self.get_command_topic(device_id, "switch", "save_recordings"), + "payload_on": "ON", + "payload_off": "OFF", + "device_class": "switch", + "icon": "mdi:content-save-outline", + "via_device": self.get_service_device(), + "device": device_block, + } + modes["motion"] = { "component_type": "binary_sensor", "name": "Motion sensor", @@ -154,7 +196,7 @@ class AmcrestMixin: "payload_on": True, "payload_off": False, "device_class": "motion", - "icon": "mdi:motion-sensor-alarm", + "icon": "mdi:eye-outline", "via_device": self.get_service_device(), "device": device_block, } @@ -166,7 +208,22 @@ class AmcrestMixin: "stat_t": self.get_state_topic(device_id, "sensor", "motion_region"), "avty_t": self.get_state_topic(device_id, "attributes"), "avty_tpl": "{{ value_json.camera }}", - "icon": "mdi:location", + "icon": "mdi:map-marker", + "via_device": self.get_service_device(), + "device": device_block, + } + + modes["motion_snapshot"] = { + "component_type": "image", + "name": "Motion snapshot", + "uniq_id": f"{self.service_slug}_{self.get_device_slug(device_id, 'motion_snapshot')}", + "topic": self.get_state_topic(device_id, "motion_snapshot"), + "image_encoding": "b64", + "content_type": "image/jpeg", + "avty_t": self.get_state_topic(device_id, "attributes"), + "avty_tpl": "{{ value_json.camera }}", + "json_attributes_topic": self.get_state_topic(device_id, "attributes"), + "icon": "mdi:camera", "via_device": self.get_service_device(), "device": device_block, } @@ -225,7 +282,6 @@ class AmcrestMixin: "stat_t": self.get_state_topic(device_id, "sensor", "event_text"), "avty_t": self.get_state_topic(device_id, "attributes"), "avty_tpl": "{{ value_json.camera }}", - "entity_category": "diagnostic", "icon": "mdi:note", "via_device": self.get_service_device(), "device": device_block, @@ -238,8 +294,7 @@ class AmcrestMixin: "avty_t": self.get_state_topic(device_id, "attributes"), "avty_tpl": "{{ value_json.camera }}", "device_class": "timestamp", - "entity_category": "diagnostic", - "icon": "mdi:calendar", + "icon": "mdi:clock", "via_device": self.get_service_device(), "device": device_block, } @@ -277,18 +332,21 @@ class AmcrestMixin: # defaults - which build_device_states doesn't update (events do) self.upsert_state( device_id, - internal={"discovered": False}, + internal={"discovered": False, "media_path": True if "path" in self.config["media"] else False}, camera={"video": None}, - image={"snapshot": None}, + image={"snapshot": None, "motion_snapshot": None}, + switch={"save_recordings": "ON" if "path" in self.config["media"] else "OFF"}, binary_sensor={ "motion": False, "doorbell": False, "human": False, }, sensor={ - "motion_region": None, + "motion_region": "n/a", "event_text": None, "event_time": None, + "recording_time": None, + "recording_url": None, }, ) self.upsert_device(device_id, component=component, modes=modes) diff --git a/src/amcrest2mqtt/mixins/amcrest_api.py b/src/amcrest2mqtt/mixins/amcrest_api.py index d485724..68a4bfe 100644 --- a/src/amcrest2mqtt/mixins/amcrest_api.py +++ b/src/amcrest2mqtt/mixins/amcrest_api.py @@ -123,11 +123,13 @@ class AmcrestAPIMixin(object): self.logger.error(f"Failed to communicate with device ({self.get_device_name(device_id)}) for storage stats") except LoginError: self.logger.error(f"Failed to authenticate with device ({self.get_device_name(device_id)}) for storage stats") + if not storage: + return return { - "used_percent": str(storage["used_percent"]), - "used": self.to_gb(storage["used"]), - "total": self.to_gb(storage["total"]), + "used_percent": storage.get("used_percent", "unknown"), + "used": self.b_to_gb(storage["used"][0]), + "total": self.b_to_gb(storage["total"][0]), } # Privacy config ------------------------------------------------------------------------------ @@ -252,7 +254,7 @@ class AmcrestAPIMixin(object): # Recorded file ------------------------------------------------------------------------------- - def get_recorded_file(self, device_id: str, file: str) -> str | None: + def get_recorded_file(self, device_id: str, file: str, encode: bool = True) -> str | None: device = self.amcrest_devices[device_id] tries = 0 @@ -260,14 +262,20 @@ class AmcrestAPIMixin(object): try: data_raw = device["camera"].download_file(file) if data_raw: + if not encode: + if len(data_raw) < self.mb_to_b(100): + return data_raw + else: + self.logger.error(f"Raw recording is too large: {self.b_to_mb(len(data_raw))} MB") + return data_base64 = base64.b64encode(data_raw) self.logger.info( f"Processed recording from ({self.get_device_name(device_id)}) {len(data_raw)} bytes raw, and {len(data_base64)} bytes base64" ) - if len(data_base64) < 100 * 1024 * 1024 * 1024: + if len(data_base64) < self.mb_to_b(100): return data_base64 else: - self.logger.error("Processed recording is too large") + self.logger.error(f"Encoded recording is too large: {self.b_to_mb(len(data_base64))} MB") return except CommError: tries += 1 diff --git a/src/amcrest2mqtt/mixins/events.py b/src/amcrest2mqtt/mixins/events.py index 1309435..d722504 100644 --- a/src/amcrest2mqtt/mixins/events.py +++ b/src/amcrest2mqtt/mixins/events.py @@ -43,6 +43,9 @@ class EventsMixin: camera={"eventshot": image}, sensor={"event_time": datetime.now(timezone.utc).isoformat()}, ) + elif payload["file"].endswith(".mp4"): + if "path" in self.config["media"] and self.states[device_id]["switch"]["save_recordings"] == "ON": + await self.store_recording_in_media(device_id, payload["file"]) else: self.logger.info(f"Got event for {self.get_device_name(device_id)}: {event} - {payload}") if event == "motion": @@ -50,7 +53,7 @@ class EventsMixin: device_id, binary_sensor={"motion": payload["state"]}, sensor={ - "motion_region": payload["region"] if payload["state"] != "off" else "", + "motion_region": payload["region"] if payload["state"] != "off" else "n/a", "event_time": datetime.now(timezone.utc).isoformat(), }, ) diff --git a/src/amcrest2mqtt/mixins/helpers.py b/src/amcrest2mqtt/mixins/helpers.py index 04cef50..68402e1 100644 --- a/src/amcrest2mqtt/mixins/helpers.py +++ b/src/amcrest2mqtt/mixins/helpers.py @@ -1,15 +1,23 @@ # SPDX-License-Identifier: MIT # Copyright (c) 2025 Jeff Culverhouse from deepmerge import Merger +import ipaddress +import logging import os import signal +import socket import threading +import yaml +from pathlib import Path +from datetime import datetime, timezone from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from amcrest2mqtt.core import Amcrest2Mqtt from amcrest2mqtt.interface import AmcrestServiceProtocol +READY_FILE = os.getenv("READY_FILE", "/tmp/amcrest2mqtt.ready") + class HelpersMixin: if TYPE_CHECKING: @@ -36,10 +44,16 @@ class HelpersMixin: # send command to Amcrest ----------------------------------------------------------------------- - def send_command(self: Amcrest2Mqtt, device_id, response): - return + def handle_device_command(self: Amcrest2Mqtt, device_id: str, handler: str, message: str) -> None: + match handler: + case "save_recordings": + if message == "ON" and "path" not in self.config["media"]: + self.logger.error("User tried to turn on save_recordings, but there is no media path set") + return + self.upsert_state(device_id, switch={"save_recordings": message}) + self.publish_device_state(device_id) - def handle_service_message(self: Amcrest2Mqtt, handler, message): + def handle_service_command(self: Amcrest2Mqtt, handler: str, message: str) -> None: match handler: case "storage_refresh": self.device_interval = message @@ -69,6 +83,188 @@ class HelpersMixin: # Utility functions --------------------------------------------------------------------------- + def read_file(self: Amcrest2Mqtt, file_name: str) -> str: + with open(file_name, "r") as file: + data = file.read().replace("\n", "") + + return data + + def mb_to_b(self: Amcrest2Mqtt, total: int) -> int: + return total * 1024 * 1024 + + def b_to_mb(self: Amcrest2Mqtt, total: int) -> float: + return round(float(total) / 1024 / 1024, 2) + + def b_to_gb(self: Amcrest2Mqtt, total: int) -> float: + return round(float(total) / 1024 / 1024 / 1024, 2) + + def is_ipv4(self: Amcrest2Mqtt, string: str) -> bool: + try: + ipaddress.IPv4Network(string) + return True + except ValueError: + return False + + def get_ip_address(self: Amcrest2Mqtt, string: str) -> str: + if self.is_ipv4(string): + return string + try: + for i in socket.getaddrinfo(string, None): + if i[0] == socket.AddressFamily.AF_INET: + return i[4][0] + except socket.gaierror as e: + raise Exception(f"Failed to resolve {string}: {e}") + raise Exception(f"Failed to find IP address for {string}") + + def _csv(self: Amcrest2Mqtt, env_name): + v = os.getenv(env_name) + if not v: + return None + return [s.strip() for s in v.split(",") if s.strip()] + + def load_config(self: Amcrest2Mqtt, config_arg: str = None) -> list[str, Any]: + version = os.getenv("BLINK2MQTT_VERSION", self.read_file("VERSION")) + config_from = "env" + config = {} + + # Determine config file path + config_path = config_arg or "/config" + config_path = os.path.expanduser(config_path) + config_path = os.path.abspath(config_path) + + if os.path.isdir(config_path): + config_file = os.path.join(config_path, "config.yaml") + elif os.path.isfile(config_path): + config_file = config_path + config_path = os.path.dirname(config_file) + else: + # If it's not a valid path but looks like a filename, handle gracefully + if config_path.endswith(".yaml"): + config_file = config_path + else: + config_file = os.path.join(config_path, "config.yaml") + + # Try to load from YAML + if os.path.exists(config_file): + try: + with open(config_file, "r") as f: + config = yaml.safe_load(f) or {} + config_from = "file" + except Exception as e: + logging.warning(f"Failed to load config from {config_file}: {e}") + else: + logging.warning(f"Config file not found at {config_file}, falling back to environment vars") + + # Merge with environment vars (env vars override nothing if file exists) + mqtt = config.get("mqtt", {}) + amcrest = config.get("amcrest", {}) + webrtc = amcrest.get("webrtc", {}) + media = config.get("media", {}) + + # Determine media path (optional) + media_path = media.get("path", None) + if media_path: + media_path = os.path.expanduser(media_path) + media_path = os.path.abspath(media_path) + + if os.path.exists(media_path) and os.access(media_path, os.W_OK): + media["path"] = media_path + self.logger.info(f"Will be storing recordings in {media_path}, watch that it doesn't fill up your file system") + else: + self.logger.info("media_path not configured, not found, or is not writable. Will not be saving recordings") + + # fmt: off + mqtt = { + "host": mqtt.get("host") or os.getenv("MQTT_HOST", "localhost"), + "port": int(mqtt.get("port") or os.getenv("MQTT_PORT", 1883)), + "qos": int(mqtt.get("qos") or os.getenv("MQTT_QOS", 0)), + "username": mqtt.get("username") or os.getenv("MQTT_USERNAME", ""), + "password": mqtt.get("password") or os.getenv("MQTT_PASSWORD", ""), + "tls_enabled": mqtt.get("tls_enabled") or (os.getenv("MQTT_TLS_ENABLED", "false").lower() == "true"), + "tls_ca_cert": mqtt.get("tls_ca_cert") or os.getenv("MQTT_TLS_CA_CERT"), + "tls_cert": mqtt.get("tls_cert") or os.getenv("MQTT_TLS_CERT"), + "tls_key": mqtt.get("tls_key") or os.getenv("MQTT_TLS_KEY"), + "prefix": mqtt.get("prefix") or os.getenv("MQTT_PREFIX", "amcrest2mqtt"), + "discovery_prefix": mqtt.get("discovery_prefix") or os.getenv("MQTT_DISCOVERY_PREFIX", "homeassistant"), + } + + hosts = amcrest.get("hosts") or self._csv("AMCREST_HOSTS") or [] + names = amcrest.get("names") or self._csv("AMCREST_NAMES") or [] + sources = webrtc.get("sources") or self._csv("AMCREST_SOURCES") or [] + + amcrest = { + "hosts": hosts, + "names": names, + "port": int(amcrest.get("port") or os.getenv("AMCREST_PORT", 80)), + "username": amcrest.get("username") or os.getenv("AMCREST_USERNAME", ""), + "password": amcrest.get("password") or os.getenv("AMCREST_PASSWORD", ""), + "storage_update_interval": int(amcrest.get("storage_update_interval") or os.getenv("AMCREST_STORAGE_UPDATE_INTERVAL", 900)), + "snapshot_update_interval": int(amcrest.get("snapshot_update_interval") or os.getenv("AMCREST_SNAPSHOT_UPDATE_INTERVAL", 60)), + "webrtc": { + "host": webrtc.get("host") or os.getenv("AMCREST_WEBRTC_HOST", ""), + "port": int(webrtc.get("port") or os.getenv("AMCREST_WEBRTC_PORT", 1984)), + "link": webrtc.get("link") or os.getenv("AMCREST_WEBRTC_LINK", "webrtc"), + "sources": sources, + }, + } + + config = { + "mqtt": mqtt, + "amcrest": amcrest, + "debug": config.get("debug", os.getenv("DEBUG", "").lower() == "true"), + "hide_ts": config.get("hide_ts", os.getenv("HIDE_TS", "").lower() == "true"), + "timezone": config.get("timezone", os.getenv("TZ", "UTC")), + "media": media, + "config_from": config_from, + "config_path": config_path, + "version": version, + } + # fmt: on + + # Validate required fields + if not config["amcrest"].get("username") or not config["amcrest"].get("password"): + raise ValueError("`amcrest.username` and `amcrest.password` are required in config file or AMCREST_USERNAME and AMCREST_PASSWORD env vars") + + # Ensure list lengths match (sources is optional) + if len(hosts) != len(names): + raise ValueError("`amcrest.hosts` and `amcrest.names` must be the same length") + if sources and len(sources) != len(hosts): + raise ValueError("`amcrest.webrtc.sources` must match the length of `amcrest.hosts`/`amcrest.names` if provided") + + return config + + async def store_recording_in_media(self: Amcrest2Mqtt, device_id: str, amcrest_file: str) -> str | None: + recording = self.get_recorded_file(device_id, amcrest_file, encode=False) + if recording: + name = self.get_device_name_slug(device_id) + time = datetime.now().strftime("%Y%m%d-%H%M%S") + path = self.config["media"]["path"] + file_name = f"{name}-{time}.mp4" + file_path = Path(f"{path}/{file_name}") + try: + file_path.write_bytes(recording) + + self.upsert_state( + device_id, + media={"recording": file_path}, + sensor={"recording_time": datetime.now(timezone.utc).isoformat()}, + ) + local_file = Path(f"./{file_name}") + latest_link = Path(f"{path}/{name}-latest.mp4") + if latest_link.is_symlink(): + latest_link.unlink() + latest_link.symlink_to(local_file) + + if "media_source" in self.config["media"]: + url = f"{self.config["media"]["media_source"]}/{file_name}" + self.upsert_state(device_id, sensor={"recording_url": url}) + return url + except IOError as e: + self.logger.error(f"Failed to save recordingt to {path}: {e}") + return + + self.logger.error(f"Failed to download recording from device {self.get_device_name(device_id)}") + def _handle_signal(self: Amcrest2Mqtt, signum, frame=None): """Handle SIGTERM/SIGINT and exit cleanly or forcefully.""" sig_name = signal.Signals(signum).name diff --git a/src/amcrest2mqtt/mixins/mqtt.py b/src/amcrest2mqtt/mixins/mqtt.py index b8bcfcc..7885ab8 100644 --- a/src/amcrest2mqtt/mixins/mqtt.py +++ b/src/amcrest2mqtt/mixins/mqtt.py @@ -91,9 +91,15 @@ class MqttMixin: self.logger.info("Closed MQTT connection") if self.running and (self.mqtt_connect_time is None or time.time() > self.mqtt_connect_time + 10): - # lets use a new client_id for a reconnect attempt - self.client_id = self.get_new_client_id() - self.mqttc_create() + # clear connect_time and try to restart + self.mqtt_connect_time = None + while not self.mqtt_connect_time: + try: + self.client_id = self.get_new_client_id() + self.mqttc_create() + except Exception as e: + self.logger.error(f"Trouble reconnecting to MQTT (retry in 10 s): {e}") + time.sleep(10) else: self.logger.info("MQTT disconnect — stopping service loop") self.running = False @@ -114,7 +120,7 @@ class MqttMixin: return self._handle_homeassistant_message(payload) if components[0] == self.service_slug and components[1] == "service": - return self.handle_service_message(components[2], payload) + return self.handle_service_command(components[2], payload) if components[0] == self.service_slug: return self._handle_device_topic(components, payload) @@ -122,7 +128,6 @@ class MqttMixin: # self.logger.debug(f"Ignoring unrelated MQTT topic: {topic}") def _decode_payload(self: Amcrest2Mqtt, raw): - """Try to decode MQTT payload as JSON, fallback to UTF-8 string, else None.""" try: return json.loads(raw) except (json.JSONDecodeError, UnicodeDecodeError, TypeError, ValueError): @@ -147,28 +152,26 @@ class MqttMixin: self.logger.warning(f"Got MQTT message for unknown device: {device_id}") return - self.logger.debug(f"Got message for {self.get_device_name(device_id)}: {payload}") - self.send_command(device_id, payload) + self.logger.debug(f"Got message for {self.get_device_name(device_id)}: {attribute} => {payload}") + self.handle_device_command(device_id, attribute, payload) def _parse_device_topic(self: Amcrest2Mqtt, components): - """Extract (vendor, device_id, attribute) from an MQTT topic components list (underscore-delimited).""" try: if components[-1] != "set": return (None, None, None) # Example topics: # amcrest2mqtt/light/amcrest2mqtt_2BEFD0C907BB6BF2/set - # amcrest2mqtt/light/amcrest2mqtt_2BEFD0C907BB6BF2/brightness/set + # amcrest2mqtt/light/amcrest2mqtt_2BEFD0C907BB6BF2/save_recordings/set - # Case 1: ...//set - if len(components) >= 4 and "_" in components[-2]: - vendor, device_id = components[-2].split("_", 1) - attribute = None - - # Case 2: ...///set - elif len(components) >= 5 and "_" in components[-3]: + # Case 1: ...//set + if len(components) >= 5 and "_" in components[-3]: vendor, device_id = components[-3].split("_", 1) attribute = components[-2] + # Case 2: ...///set + elif len(components) >= 4 and "_" in components[-2]: + vendor, device_id = components[-2].split("_", 1) + attribute = None else: raise ValueError(f"Malformed topic (expected underscore): {'/'.join(components)}") diff --git a/src/amcrest2mqtt/mixins/topics.py b/src/amcrest2mqtt/mixins/topics.py index 07e017c..902cd02 100644 --- a/src/amcrest2mqtt/mixins/topics.py +++ b/src/amcrest2mqtt/mixins/topics.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: MIT # Copyright (c) 2025 Jeff Culverhouse import random +import re import string from typing import TYPE_CHECKING, Optional @@ -21,6 +22,9 @@ class TopicsMixin: def get_device_slug(self: Amcrest2Mqtt, device_id: str, type: Optional[str] = None) -> str: return "_".join(filter(None, [self.service_slug, device_id.replace(":", ""), type])) + def get_device_name_slug(self: Amcrest2Mqtt, device_id: str) -> str: + return re.sub(r"[^a-zA-Z0-9]+", "_", self.get_device_name(device_id)) + def get_vendor_device_slug(self: Amcrest2Mqtt, device_id): return f"{self.service_slug}-{device_id.replace(':', '')}" @@ -58,15 +62,15 @@ class TopicsMixin: component = device_entry.get("component") or device_entry.get("component_type") or category return f"{self.mqtt_config['discovery_prefix']}/{component}/{self.get_device_slug(device_id)}/{item}/{attribute}" - def get_command_topic(self: Amcrest2Mqtt, device_id, category, command="set") -> str: + def get_command_topic(self: Amcrest2Mqtt, device_id, category, item=None, command="set") -> str: if device_id == "service": - return f"{self.service_slug}/service/{category}/{command}" + return f"{self.service_slug}/service/{category}/{item}" # if category is not passed in, device must exist already if not category: category = self.devices[device_id]["component"]["component_type"] - return f"{self.service_slug}/{category}/{self.get_device_slug(device_id)}/{command}" + return f"{self.service_slug}/{category}/{self.get_device_slug(device_id)}/{item}/{command}" # Device propertiesi -------------------------------------------------------------------------- diff --git a/src/amcrest2mqtt/mixins/util.py b/src/amcrest2mqtt/mixins/util.py deleted file mode 100644 index 5035660..0000000 --- a/src/amcrest2mqtt/mixins/util.py +++ /dev/null @@ -1,149 +0,0 @@ -# SPDX-License-Identifier: MIT -# Copyright (c) 2025 Jeff Culverhouse -import ipaddress -import logging -import os -import socket -import yaml -from typing import TYPE_CHECKING, Any - -if TYPE_CHECKING: - from amcrest2mqtt.core import Amcrest2Mqtt - from amcrest2mqtt.interface import AmcrestServiceProtocol - -READY_FILE = os.getenv("READY_FILE", "/tmp/amcrest2mqtt.ready") - - -class UtilMixin: - if TYPE_CHECKING: - self: "AmcrestServiceProtocol" - - def read_file(self: Amcrest2Mqtt, file_name: str) -> str: - with open(file_name, "r") as file: - data = file.read().replace("\n", "") - - return data - - def to_gb(self: Amcrest2Mqtt, total: [int]) -> str: - return str(round(float(total[0]) / 1024 / 1024 / 1024, 2)) - - def is_ipv4(self: Amcrest2Mqtt, string: str) -> bool: - try: - ipaddress.IPv4Network(string) - return True - except ValueError: - return False - - def get_ip_address(self: Amcrest2Mqtt, string: str) -> str: - if self.is_ipv4(string): - return string - try: - for i in socket.getaddrinfo(string, None): - if i[0] == socket.AddressFamily.AF_INET: - return i[4][0] - except socket.gaierror as e: - raise Exception(f"Failed to resolve {string}: {e}") - raise Exception(f"Failed to find IP address for {string}") - - def _csv(self: Amcrest2Mqtt, env_name): - v = os.getenv(env_name) - if not v: - return None - return [s.strip() for s in v.split(",") if s.strip()] - - def load_config(self: Amcrest2Mqtt, config_arg=None) -> list[str, Any]: - version = os.getenv("BLINK2MQTT_VERSION", self.read_file("VERSION")) - config_from = "env" - config = {} - - # Determine config file path - config_path = config_arg or "/config" - config_path = os.path.expanduser(config_path) - config_path = os.path.abspath(config_path) - - if os.path.isdir(config_path): - config_file = os.path.join(config_path, "config.yaml") - elif os.path.isfile(config_path): - config_file = config_path - config_path = os.path.dirname(config_file) - else: - # If it's not a valid path but looks like a filename, handle gracefully - if config_path.endswith(".yaml"): - config_file = config_path - else: - config_file = os.path.join(config_path, "config.yaml") - - # Try to load from YAML - if os.path.exists(config_file): - try: - with open(config_file, "r") as f: - config = yaml.safe_load(f) or {} - config_from = "file" - except Exception as e: - logging.warning(f"Failed to load config from {config_file}: {e}") - else: - logging.warning(f"Config file not found at {config_file}, falling back to environment vars") - - # Merge with environment vars (env vars override nothing if file exists) - mqtt = config.get("mqtt", {}) - amcrest = config.get("amcrest", {}) - webrtc = amcrest.get("webrtc", {}) - - # fmt: off - mqtt = { - "host": mqtt.get("host") or os.getenv("MQTT_HOST", "localhost"), - "port": int(mqtt.get("port") or os.getenv("MQTT_PORT", 1883)), - "qos": int(mqtt.get("qos") or os.getenv("MQTT_QOS", 0)), - "username": mqtt.get("username") or os.getenv("MQTT_USERNAME", ""), - "password": mqtt.get("password") or os.getenv("MQTT_PASSWORD", ""), - "tls_enabled": mqtt.get("tls_enabled") or (os.getenv("MQTT_TLS_ENABLED", "false").lower() == "true"), - "tls_ca_cert": mqtt.get("tls_ca_cert") or os.getenv("MQTT_TLS_CA_CERT"), - "tls_cert": mqtt.get("tls_cert") or os.getenv("MQTT_TLS_CERT"), - "tls_key": mqtt.get("tls_key") or os.getenv("MQTT_TLS_KEY"), - "prefix": mqtt.get("prefix") or os.getenv("MQTT_PREFIX", "amcrest2mqtt"), - "discovery_prefix": mqtt.get("discovery_prefix") or os.getenv("MQTT_DISCOVERY_PREFIX", "homeassistant"), - } - - hosts = amcrest.get("hosts") or self._csv("AMCREST_HOSTS") or [] - names = amcrest.get("names") or self._csv("AMCREST_NAMES") or [] - sources = webrtc.get("sources") or self._csv("AMCREST_SOURCES") or [] - - amcrest = { - "hosts": hosts, - "names": names, - "port": int(amcrest.get("port") or os.getenv("AMCREST_PORT", 80)), - "username": amcrest.get("username") or os.getenv("AMCREST_USERNAME", ""), - "password": amcrest.get("password") or os.getenv("AMCREST_PASSWORD", ""), - "storage_update_interval": int(amcrest.get("storage_update_interval") or os.getenv("AMCREST_STORAGE_UPDATE_INTERVAL", 900)), - "snapshot_update_interval": int(amcrest.get("snapshot_update_interval") or os.getenv("AMCREST_SNAPSHOT_UPDATE_INTERVAL", 60)), - "webrtc": { - "host": webrtc.get("host") or os.getenv("AMCREST_WEBRTC_HOST", ""), - "port": int(webrtc.get("port") or os.getenv("AMCREST_WEBRTC_PORT", 1984)), - "link": webrtc.get("link") or os.getenv("AMCREST_WEBRTC_LINK", "webrtc"), - "sources": sources, - }, - } - - config = { - "mqtt": mqtt, - "amcrest": amcrest, - "debug": config.get("debug", os.getenv("DEBUG", "").lower() == "true"), - "hide_ts": config.get("hide_ts", os.getenv("HIDE_TS", "").lower() == "true"), - "timezone": config.get("timezone", os.getenv("TZ", "UTC")), - "config_from": config_from, - "config_path": config_path, - "version": version, - } - # fmt: on - - # Validate required fields - if not config["amcrest"].get("username") or not config["amcrest"].get("password"): - raise ValueError("`amcrest.username` and `amcrest.password` are required in config file or AMCREST_USERNAME and AMCREST_PASSWORD env vars") - - # Ensure list lengths match (sources is optional) - if len(hosts) != len(names): - raise ValueError("`amcrest.hosts` and `amcrest.names` must be the same length") - if sources and len(sources) != len(hosts): - raise ValueError("`amcrest.webrtc.sources` must match the length of `amcrest.hosts`/`amcrest.names` if provided") - - return config diff --git a/src/interface.py b/src/interface.py index ed6bda8..6986dc2 100644 --- a/src/interface.py +++ b/src/interface.py @@ -32,7 +32,7 @@ class AmcrestServiceProtocol(Protocol): def get_attribute_topic(self: Amcrest2Mqtt, device_id, category, item, attribute) -> str: ... def get_availability_topic(self: Amcrest2Mqtt, device_id, category="availability", item=None) -> str: ... def get_camera(self, host: str) -> AmcrestCamera: ... - def get_command_topic(self: Amcrest2Mqtt, device_id, category, command="set") -> str: ... + def get_command_topic(self: Amcrest2Mqtt, device_id, category, item=None, command="set") -> str: ... def get_component_type(self: Amcrest2Mqtt, device_id): ... def get_component(self: Amcrest2Mqtt, device_id): ... def get_device_availability_topic(self: Amcrest2Mqtt, device_id): ... @@ -59,11 +59,12 @@ class AmcrestServiceProtocol(Protocol): def get_state_topic(self: Amcrest2Mqtt, device_id, category, item=None) -> str: ... def get_storage_stats(self, device_id: str) -> dict[str, str]: ... def get_vendor_device_slug(self: Amcrest2Mqtt, device_id): ... - def handle_service_message(self: Amcrest2Mqtt, handler, message): ... + def handle_device_command(self: Amcrest2Mqtt, device_id: str, handler: str, message: str) -> None: ... + def handle_service_command(self: Amcrest2Mqtt, handler: str, message: str) -> None: ... def is_discovered(self: "Amcrest2Mqtt", device_id: str) -> bool: ... def is_ipv4(self: Amcrest2Mqtt, string: str) -> bool: ... def is_rate_limited(self: Amcrest2Mqtt): ... - def load_config(self: Amcrest2Mqtt, config_arg=None) -> list[str, Any]: ... + def load_config(self: Amcrest2Mqtt, config_arg: str = None, media_arg: str = None) -> list[str, Any]: ... def main(argv=None): ... def mqtt_on_connect(self: Amcrest2Mqtt, client, userdata, flags, reason_code, properties): ... def mqtt_on_disconnect(self: Amcrest2Mqtt, client, userdata, flags, reason_code, properties): ... @@ -81,7 +82,6 @@ class AmcrestServiceProtocol(Protocol): def read_file(self: Amcrest2Mqtt, file_name: str) -> str: ... def rediscover_all(self: Amcrest2Mqtt): ... def safe_split_device(self: Amcrest2Mqtt, topic, segment): ... - def send_command(self: Amcrest2Mqtt, device_id, response): ... def set_motion_detection(self, device_id: str, switch: bool) -> str: ... def set_privacy_mode(self, device_id: str, switch: bool) -> str: ... def to_gb(self: Amcrest2Mqtt, total: [int]) -> str: ...