feature: add reboot button, fix other switches

pull/106/head
Jeff Culverhouse 3 months ago
parent 1d8c622603
commit a6aee2159d

@ -7,7 +7,7 @@ Uses the [`python-amcrest`](https://github.com/tchellomello/python-amcrest) libr
Forked from [dchesterton/amcrest2mqtt](https://github.com/dchesterton/amcrest2mqtt)
UPDATES:
* 10/2025 Added a "media" config where mp4 recordings of motions events can be stored
* 10/2025 Added a "media" config where mp4 recordings of motions events can be stored (11/2025 now with "max_size" option)
* 10/2025 "Rediscover" button added to service - when pressed, device discovery is re-run so HA will rediscover deleted devices

@ -155,7 +155,7 @@ dev = [
]
[tool.mypy]
python_version = "3.12"
python_version = "3.14"
ignore_missing_imports = true
check_untyped_defs = true
disallow_incomplete_defs = true
@ -167,6 +167,8 @@ warn_unreachable = true
strict_equality = true
allow_untyped_globals = false
allow_redefinition = false
no_implicit_optional = true
warn_unused_configs = true
# Temporarily quiet noisy files or folders
[[tool.mypy.overrides]]

@ -94,6 +94,7 @@ class AmcrestServiceProtocol(Protocol):
def increase_api_calls(self) -> None: ...
def is_discovered(self, device_id: str) -> bool: ...
def is_ipv4(self, string: str) -> bool: ...
def is_rebooting(self, device_id: str) -> bool: ...
def list_from_env(self, env_name: str) -> list[str]: ...
def load_config(self, config_arg: Any | None) -> dict[str, Any]: ...
def mark_ready(self) -> None: ...
@ -113,12 +114,13 @@ class AmcrestServiceProtocol(Protocol):
def publish_service_discovery(self) -> None: ...
def publish_service_state(self) -> None: ...
def read_file(self, file_name: str) -> str: ...
def reboot_device(self, device_id: str) -> None: ...
def rediscover_all(self) -> None: ...
def restore_state(self) -> None: ...
def restore_state_values(self, api_calls: int, last_call_date: str) -> None: ...
def safe_split_device(self, topic: str, segment: str) -> list[str]: ...
def save_state(self) -> None: ...
def set_motion_detection(self, device_id: str, switch: bool) -> str: ...
def set_privacy_mode(self, device_id: str, switch: bool) -> str: ...
def set_motion_detection(self, device_id: str, switch: bool) -> None: ...
def set_privacy_mode(self, device_id: str, switch: bool) -> None: ...
def upsert_device(self, device_id: str, **kwargs: dict[str, Any] | str | int | bool) -> bool: ...
def upsert_state(self, device_id: str, **kwargs: dict[str, Any] | str | int | bool) -> bool: ...

@ -114,6 +114,18 @@ class AmcrestMixin:
device["software_version"],
)
modes["reboot"] = {
"component_type": "button",
"name": "Reboot",
"uniq_id": self.mqtt_helper.dev_unique_id(device_id, "reboot"),
"cmd_t": self.mqtt_helper.cmd_t(device_id, "button", "reboot"),
"payload_press": "PRESS",
"icon": "mdi:restart",
"entity_category": "diagnostic",
"via_device": self.mqtt_helper.service_slug,
"device": device_block,
}
modes["snapshot"] = {
"component_type": "image",
"name": "Timed snapshot",

@ -4,7 +4,7 @@ from amcrest import AmcrestCamera
from amcrest.exceptions import LoginError, AmcrestError, CommError
import asyncio
import base64
from datetime import datetime
from datetime import datetime, timedelta
import random
from typing import TYPE_CHECKING, Any, cast
@ -108,6 +108,28 @@ class AmcrestAPIMixin:
},
}
def reboot_device(self: Amcrest2Mqtt, device_id: str) -> None:
device = self.amcrest_devices[device_id]
if not device["camera"]:
self.logger.warning(f"camera not found for {self.get_device_name(device_id)}")
return None
response = device["camera"].reboot().strip()
self.logger.info(f"Sent REBOOT signal to {self.get_device_name(device_id)}, {response}")
if response == "OK":
self.upsert_state(device_id, internal={"reboot": datetime.now()})
def is_rebooting(self: Amcrest2Mqtt, device_id: str) -> bool:
states = self.states[device_id]
if "reboot" not in states["internal"]:
return False
reboot_time = states["internal"]["reboot"]
if reboot_time + timedelta(minutes=2) > datetime.now():
return True
states["internal"].pop("reboot")
if states["sensor"].get("event_text", "").startswith("Reboot"):
self.upsert_state(device_id, sensor={"event_text": ""})
return False
# Storage stats -------------------------------------------------------------------------------
def get_storage_stats(self: Amcrest2Mqtt, device_id: str) -> dict[str, str | float]:
@ -168,23 +190,25 @@ class AmcrestAPIMixin:
return privacy_mode
def set_privacy_mode(self: Amcrest2Mqtt, device_id: str, switch: bool) -> str:
def set_privacy_mode(self: Amcrest2Mqtt, device_id: str, switch: bool) -> None:
device = self.amcrest_devices[device_id]
if not device["camera"]:
self.logger.warning(f"camera not found for {self.get_device_name(device_id)}")
return ""
return None
try:
response = cast(str, device["camera"].set_privacy(switch).strip())
response = str(device["camera"].set_privacy(switch)).strip()
self.increase_api_calls()
self.logger.debug(f"Set privacy_mode on {self.get_device_name(device_id)} to {switch}, got back: {response}")
if response == "OK":
self.upsert_state(device_id, switch={"privacy": "ON" if switch else "OFF"})
self.publish_device_state(device_id)
except CommError as err:
self.logger.error(f"failed to set privacy mode on ({self.get_device_name(device_id)}): {err}")
return ""
except LoginError as err:
self.logger.error(f"failed to auth to device ({self.get_device_name(device_id)}): {err}")
return ""
return response
return None
# Motion detection config ---------------------------------------------------------------------
@ -211,30 +235,39 @@ class AmcrestAPIMixin:
return motion_detection
def set_motion_detection(self: Amcrest2Mqtt, device_id: str, switch: bool) -> str:
def set_motion_detection(self: Amcrest2Mqtt, device_id: str, switch: bool) -> None:
device = self.amcrest_devices[device_id]
if not device["camera"]:
self.logger.warning(f"camera not found for {self.get_device_name(device_id)}")
return ""
return None
try:
response = str(device["camera"].set_motion_detection(switch))
response = bool(device["camera"].set_motion_detection(switch))
self.increase_api_calls()
self.logger.debug(f"Set motion_detection on {self.get_device_name(device_id)} to {switch}, got back: {response}")
if response:
self.upsert_state(device_id, switch={"motion_detection": "ON" if switch else "OFF"})
self.publish_device_state(device_id)
except CommError:
self.logger.error(f"Failed to communicate with device ({self.get_device_name(device_id)}) to set motion detections")
return ""
except LoginError:
self.logger.error(f"Failed to authenticate with device ({self.get_device_name(device_id)}) to set motion detections")
return ""
return response
return None
# Snapshots -----------------------------------------------------------------------------------
async def collect_all_device_snapshots(self: Amcrest2Mqtt) -> None:
tasks = [self.get_snapshot_from_device(device_id) for device_id in self.amcrest_devices]
await asyncio.gather(*tasks)
tasks = []
for device_id in self.amcrest_devices:
if self.is_rebooting(device_id):
self.logger.debug(f"skipping snapshot for {self.get_device_name(device_id)}, still rebooting")
continue
tasks.append(self.get_snapshot_from_device(device_id))
if tasks:
await asyncio.gather(*tasks)
async def get_snapshot_from_device(self: Amcrest2Mqtt, device_id: str) -> str | None:
device = self.amcrest_devices[device_id]
@ -251,6 +284,8 @@ class AmcrestAPIMixin:
for attempt in range(1, SNAPSHOT_MAX_TRIES + 1):
try:
if self.is_rebooting(device_id):
return None
image_bytes = await asyncio.wait_for(camera.async_snapshot(), timeout=SNAPSHOT_TIMEOUT_S)
self.increase_api_calls()
if not image_bytes:
@ -302,6 +337,8 @@ class AmcrestAPIMixin:
tries = 0
while tries < 3:
try:
if self.is_rebooting(device_id):
return None
data_raw = cast(bytes, device["camera"].download_file(file))
self.increase_api_calls()
if data_raw:
@ -331,8 +368,15 @@ class AmcrestAPIMixin:
# Events --------------------------------------------------------------------------------------
async def collect_all_device_events(self: Amcrest2Mqtt) -> None:
tasks = [self.get_events_from_device(device_id) for device_id in self.amcrest_devices]
await asyncio.gather(*tasks)
tasks = []
for device_id in self.amcrest_devices:
if self.is_rebooting(device_id):
self.logger.debug(f"skipping collecting events for {self.get_device_name(device_id)}, still rebooting")
continue
tasks.append(self.get_events_from_device(device_id))
if tasks:
await asyncio.gather(*tasks)
async def get_events_from_device(self: Amcrest2Mqtt, device_id: str) -> None:
device = self.amcrest_devices[device_id]
@ -343,6 +387,8 @@ class AmcrestAPIMixin:
tries = 0
while tries < 3:
try:
if self.is_rebooting(device_id):
return None
async for code, payload in device["camera"].async_event_actions("All"):
await self.process_device_event(device_id, code, payload)
self.increase_api_calls()

@ -1,6 +1,5 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Jeff Culverhouse
import asyncio
from typing import TYPE_CHECKING
from datetime import datetime, timezone
@ -9,10 +8,6 @@ if TYPE_CHECKING:
class EventsMixin:
async def collect_all_device_events(self: Amcrest2Mqtt) -> None:
tasks = [self.get_events_from_device(device_id) for device_id in self.amcrest_devices]
await asyncio.gather(*tasks)
async def check_for_events(self: Amcrest2Mqtt) -> None:
needs_publish = set()
@ -27,7 +22,7 @@ class EventsMixin:
states = self.states[device_id]
# if one of our known sensors
if event in ["motion", "human", "doorbell", "recording", "privacy_mode"]:
if event in ["motion", "human", "doorbell", "recording", "privacy_mode", "Reboot"]:
if event == "recording":
if payload["file"].endswith(".jpg"):
image = self.get_recorded_file(device_id, payload["file"])
@ -54,8 +49,13 @@ class EventsMixin:
)
event += f": ({region}) - {payload["state"]}"
else:
self.upsert_state(device_id, sensor={event: payload})
event += ": " + payload["state"]
if isinstance(payload, str):
event += ": " + payload
elif isinstance(payload, dict):
if "state" in payload:
event += ": " + payload["state"]
if "action" in payload:
event += ": " + payload["action"]
# other ways to infer "privacy mode" has been turned off and we need to update
if event in ["motion", "human", "doorbell"] and states["switch"]["privacy"] != "OFF":
@ -69,9 +69,10 @@ class EventsMixin:
"event_time": datetime.now(timezone.utc).isoformat(),
},
)
self.logger.debug(f'processed event for "{self.get_device_name(device_id)}": {event} with {payload}')
needs_publish.add(device_id)
else:
self.logger.info(f'Ignored event for "{self.get_device_name(device_id)}": {event} with {payload}')
self.logger.debug(f'ignored event for "{self.get_device_name(device_id)}": {event} with {payload}')
for id in needs_publish:
self.publish_device_state(id)

@ -27,6 +27,10 @@ class ConfigError(ValueError):
class HelpersMixin:
def build_device_states(self: Amcrest2Mqtt, device_id: str) -> bool:
if self.is_rebooting(device_id):
self.logger.debug(f"skipping device states for {self.get_device_name(device_id)}, still rebooting")
return False
storage = self.get_storage_stats(device_id)
privacy = self.get_privacy_mode(device_id)
motion_detection = self.get_motion_detection(device_id)
@ -55,6 +59,12 @@ class HelpersMixin:
return
self.upsert_state(device_id, switch={"save_recordings": message})
self.publish_device_state(device_id)
case "motion_detection":
self.set_motion_detection(device_id, message == "ON")
case "privacy":
self.set_privacy_mode(device_id, message == "ON")
case "reboot":
self.reboot_device(device_id)
def handle_service_command(self: Amcrest2Mqtt, handler: str, message: str) -> None:
match handler:

@ -93,7 +93,8 @@ class MqttMixin:
client.subscribe("homeassistant/status")
client.subscribe(f"{self.mqtt_helper.service_slug}/service/+/set")
client.subscribe(f"{self.mqtt_helper.service_slug}/service/+/command")
client.subscribe(f"{self.mqtt_helper.service_slug}/switch/#")
client.subscribe(f"{self.mqtt_helper.service_slug}/+/switch/+/set")
client.subscribe(f"{self.mqtt_helper.service_slug}/+/button/+/set")
def mqtt_on_disconnect(
self: Amcrest2Mqtt, client: Client, userdata: Any, flags: DisconnectFlags, reason_code: ReasonCode, properties: Properties | None
@ -176,20 +177,11 @@ class MqttMixin:
return None
# Example topics:
# amcrest2mqtt/light/amcrest2mqtt_2BEFD0C907BB6BF2/set
# amcrest2mqtt/light/amcrest2mqtt_2BEFD0C907BB6BF2/save_recordings/set
# Case 1: .../<device>/<attribute>set
if len(components) >= 5 and "_" in components[-3]:
vendor, device_id = components[-3].split("_", 1)
attribute = components[-2]
# Case 2: .../<device>/<attribute>/set
elif len(components) >= 4 and "_" in components[-2]:
vendor, device_id = components[-2].split("_", 1)
attribute = None
else:
raise ValueError(f"Malformed topic (expected underscore): {'/'.join(components)}")
# amcrest2mqtt/amcrest2mqtt_2BEFD0C907BB6BF2/switch/save_recordings/set
# amcrest2mqtt/amcrest2mqtt_2BEFD0C907BB6BF2/button/reboot/set
vendor, device_id = components[1].split("_", 1)
attribute = components[-2]
return [vendor, device_id, attribute]

@ -21,6 +21,9 @@ class RefreshMixin:
tasks = []
for device_id in self.devices:
if self.is_rebooting(device_id):
self.logger.debug(f"skipping refresh for {self.get_device_name(device_id)}, still rebooting")
continue
tasks.append(_refresh(device_id))
if tasks:
await asyncio.gather(*tasks)

Loading…
Cancel
Save