feature: add api_call counter, save/restore state to /config dir; simplify getting class vars

pull/106/head
Jeff Culverhouse 3 months ago
parent f244b55898
commit 114073e465

@ -4,8 +4,11 @@ import argparse
from datetime import datetime
import logging
from mqtt_helper import MqttHelper
import json
from json_logging import get_logger
import os
from paho.mqtt.client import Client
from pathlib import Path
from types import TracebackType
from typing import Any, cast, Self
@ -60,7 +63,7 @@ class Base:
self.device_list_interval = self.config["amcrest"].get("device_list_interval", 300)
self.api_calls = 0
self.last_call_date = ""
self.last_call_date = datetime.now()
self.rate_limited = False
def __enter__(self: Self) -> Amcrest2Mqtt:
@ -69,6 +72,7 @@ class Base:
super_enter()
cast(Any, self).mqttc_create()
cast(Any, self).restore_state()
self.running = True
return cast(Amcrest2Mqtt, self)
@ -79,6 +83,7 @@ class Base:
super_exit(exc_type, exc_val, exc_tb)
self.running = False
cast(Any, self).save_state()
if cast(Any, self).mqttc is not None:
try:
@ -95,3 +100,22 @@ class Base:
self.logger.warning(f"error during MQTT disconnect: {err}")
self.logger.info("exiting gracefully")
def save_state(self: Amcrest2Mqtt) -> None:
data_file = Path(self.config["config_path"]) / "amcrest2mqtt.dat"
state = {
"api_calls": self.api_calls,
"last_call_date": str(self.last_call_date),
}
with open(data_file, "w", encoding="utf-8") as file:
json.dump(state, file, indent=4)
self.logger.info(f"Saved state to {data_file}")
def restore_state(self: Amcrest2Mqtt) -> None:
data_file = Path(self.config["config_path"]) / "amcrest2mqtt.dat"
if os.path.exists(data_file):
with open(data_file, "r") as file:
state = json.loads(file.read())
self.api_calls = state["api_calls"]
self.last_call_date = datetime.strptime(state["last_call_date"], "%Y-%m-%d %H:%M:%S.%f")
self.logger.info(f"Restored state from {data_file}: {self.api_calls} / {str(self.last_call_date)}")

@ -23,7 +23,7 @@ class AmcrestServiceProtocol(Protocol):
devices: dict[str, Any]
discovery_complete: bool
events: list
last_call_date: str
last_call_date: datetime
logger: Logger
loop: AbstractEventLoop
mqtt_config: dict[str, Any]
@ -69,7 +69,6 @@ class AmcrestServiceProtocol(Protocol):
def b_to_mb(self, total: int) -> float: ...
def build_device_states(self, device_id: str) -> bool: ...
def classify_device(self, device: dict) -> str: ...
def get_api_calls(self) -> int: ...
def get_camera(self, host: str) -> AmcrestCamera: ...
def get_component_type(self, device_id: str) -> str: ...
def get_component(self, device_id: str) -> dict[str, Any]: ...
@ -80,7 +79,6 @@ class AmcrestServiceProtocol(Protocol):
def get_device_state_topic(self, device_id: str, mode_name: str = "") -> str: ...
def get_device(self, host: str, device_name: str, index: int) -> None: ...
def get_ip_address(self, string: str) -> str: ...
def get_last_call_date(self) -> str: ...
def get_mode(self, device_id: str, mode_name: str) -> dict[str, Any]: ...
def get_modes(self, device_id: str) -> dict[str, Any]: ...
def get_motion_detection(self, device_id: str) -> bool: ...
@ -93,9 +91,9 @@ class AmcrestServiceProtocol(Protocol):
def handle_service_command(self, handler: str, message: str) -> None: ...
def handle_signal(self, signum: int, _: FrameType | None) -> Any: ...
def heartbeat_ready(self) -> None: ...
def increase_api_calls(self) -> None: ...
def is_discovered(self, device_id: str) -> bool: ...
def is_ipv4(self, string: str) -> bool: ...
def is_rate_limited(self) -> bool: ...
def list_from_env(self, env_name: str) -> list[str]: ...
def load_config(self, config_arg: Any | None) -> dict[str, Any]: ...
def mark_ready(self) -> None: ...
@ -116,8 +114,10 @@ class AmcrestServiceProtocol(Protocol):
def publish_service_state(self) -> None: ...
def read_file(self, file_name: str) -> str: ...
def rediscover_all(self) -> None: ...
def restore_state(self) -> None: ...
def restore_state_values(self, api_calls: int, last_call_date: str) -> None: ...
def safe_split_device(self, topic: str, segment: str) -> list[str]: ...
def set_last_call_date(self) -> None: ...
def save_state(self) -> None: ...
def set_motion_detection(self, device_id: str, switch: bool) -> str: ...
def set_privacy_mode(self, device_id: str, switch: bool) -> str: ...
def upsert_device(self, device_id: str, **kwargs: dict[str, Any] | str | int | bool) -> bool: ...

@ -17,19 +17,11 @@ SNAPSHOT_BASE_BACKOFF_S = 5
class AmcrestAPIMixin:
def get_api_calls(self: Amcrest2Mqtt) -> int:
return self.api_calls
def get_last_call_date(self: Amcrest2Mqtt) -> str:
return self.last_call_date
def set_last_call_date(self: Amcrest2Mqtt) -> None:
self.last_call_date = datetime.now(timezone.utc).isoformat()
def is_rate_limited(self: Amcrest2Mqtt) -> bool:
return self.rate_limited
# ----------------------------------------------------------------------------------------------
def increase_api_calls(self: Amcrest2Mqtt) -> None:
if not self.last_call_date or self.last_call_date.date() != datetime.now().date():
self.api_calls = 0
self.last_call_date = datetime.now()
self.api_calls += 1
async def connect_to_devices(self: Amcrest2Mqtt) -> dict[str, Any]:
semaphore = asyncio.Semaphore(5)
@ -52,6 +44,7 @@ class AmcrestAPIMixin:
def get_camera(self: Amcrest2Mqtt, host: str) -> AmcrestCamera:
config = self.amcrest_config
self.increase_api_calls()
return AmcrestCamera(host, config["port"], config["username"], config["password"], verbose=False)
def get_device(self: Amcrest2Mqtt, host: str, device_name: str, index: int) -> None:
@ -61,7 +54,7 @@ class AmcrestAPIMixin:
host_ip = self.get_ip_address(host)
device = self.get_camera(host_ip)
camera = device.camera
self.set_last_call_date()
self.increase_api_calls()
except LoginError:
self.logger.error(f'invalid username/password to connect to device "{host}", fix in config.yaml')
return
@ -134,7 +127,7 @@ class AmcrestAPIMixin:
try:
storage = device["camera"].storage_all
self.set_last_call_date()
self.increase_api_calls()
except CommError as err:
self.logger.error(f"failed to get storage stats from ({self.get_device_name(device_id)}): {err}")
return current
@ -165,7 +158,7 @@ class AmcrestAPIMixin:
privacy = device["camera"].privacy_config().split()
privacy_mode = True if privacy[0].split("=")[1] == "true" else False
device["privacy_mode"] = privacy_mode
self.set_last_call_date()
self.increase_api_calls()
except CommError as err:
self.logger.error(f"failed to get privacy mode from ({self.get_device_name(device_id)}): {err}")
return current
@ -183,7 +176,7 @@ class AmcrestAPIMixin:
try:
response = cast(str, device["camera"].set_privacy(switch).strip())
self.set_last_call_date()
self.increase_api_calls()
except CommError as err:
self.logger.error(f"failed to set privacy mode on ({self.get_device_name(device_id)}): {err}")
return ""
@ -208,7 +201,7 @@ class AmcrestAPIMixin:
try:
motion_detection = bool(device["camera"].is_motion_detector_on())
self.set_last_call_date()
self.increase_api_calls()
except CommError as err:
self.logger.error(f"failed to get motion detection switch on ({self.get_device_name(device_id)}): {err}")
return current
@ -227,7 +220,7 @@ class AmcrestAPIMixin:
try:
response = str(device["camera"].set_motion_detection(switch))
self.set_last_call_date()
self.increase_api_calls()
except CommError:
self.logger.error(f"Failed to communicate with device ({self.get_device_name(device_id)}) to set motion detections")
return ""
@ -259,7 +252,7 @@ class AmcrestAPIMixin:
for attempt in range(1, SNAPSHOT_MAX_TRIES + 1):
try:
image_bytes = await asyncio.wait_for(camera.async_snapshot(), timeout=SNAPSHOT_TIMEOUT_S)
self.set_last_call_date()
self.increase_api_calls()
if not image_bytes:
self.logger.warning(f"Snapshot: empty image from {self.get_device_name(device_id)}")
return None
@ -314,7 +307,7 @@ class AmcrestAPIMixin:
while tries < 3:
try:
data_raw = cast(bytes, device["camera"].download_file(file))
self.set_last_call_date()
self.increase_api_calls()
if data_raw:
if not encode:
if len(data_raw) < self.mb_to_b(100):
@ -356,7 +349,7 @@ class AmcrestAPIMixin:
try:
async for code, payload in device["camera"].async_event_actions("All"):
await self.process_device_event(device_id, code, payload)
self.set_last_call_date()
self.increase_api_calls()
return
except CommError:
tries += 1

@ -61,16 +61,17 @@ class EventsMixin:
if event in ["motion", "human", "doorbell"] and states["switch"]["privacy"] != "OFF":
self.upsert_state(device_id, switch={"privacy_mode": "OFF"})
# send everything to the device's event_text/time
self.logger.debug(f'got event {{{event}: {payload}}} for "{self.get_device_name(device_id)}"')
self.upsert_state(
device_id,
sensor={
"event_text": event,
"event_time": datetime.now(timezone.utc).isoformat(),
},
)
needs_publish.add(device_id)
# send most everything to the device's event_text/time
if event not in ["NTPAdjustTime"]:
self.logger.debug(f'got event {{{event}: {payload}}} for "{self.get_device_name(device_id)}"')
self.upsert_state(
device_id,
sensor={
"event_text": event,
"event_time": datetime.now(timezone.utc).isoformat(),
},
)
needs_publish.add(device_id)
for id in needs_publish:
self.publish_device_state(id)

@ -1,4 +1,5 @@
import json
from datetime import timezone
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
@ -111,6 +112,7 @@ class PublishMixin:
"max": 3600,
"step": 1,
"icon": "mdi:timer-refresh",
"mode": "box",
"device": device_block,
}
),
@ -131,6 +133,7 @@ class PublishMixin:
"max": 3600,
"step": 1,
"icon": "mdi:format-list-bulleted",
"mode": "box",
"device": device_block,
}
),
@ -151,6 +154,7 @@ class PublishMixin:
"max": 60,
"step": 1,
"icon": "mdi:lightning-bolt",
"mode": "box",
"device": device_block,
}
),
@ -178,10 +182,17 @@ class PublishMixin:
self.mqtt_helper.safe_publish(self.mqtt_helper.avty_t("service"), status, qos=self.qos, retain=True)
def publish_service_state(self: Amcrest2Mqtt) -> None:
# we keep last_call_date in localtime so it rolls-over the api call counter
# at the right time (midnight, local) but we want to send last_call_date
# to HomeAssistant as UTC
last_call_date = self.last_call_date
local_tz = last_call_date.astimezone().tzinfo
utc_dt = last_call_date.replace(tzinfo=local_tz).astimezone(timezone.utc)
service = {
"api_calls": self.get_api_calls(),
"last_call_date": self.get_last_call_date(),
"rate_limited": "YES" if self.is_rate_limited() else "NO",
"api_calls": self.api_calls,
"last_call_date": utc_dt.isoformat(),
"rate_limited": "YES" if self.rate_limited else "NO",
"storage_refresh": self.device_interval,
"device_list_refresh": self.device_list_interval,
"snapshot_refresh": self.snapshot_update_interval,
@ -190,7 +201,7 @@ class PublishMixin:
for key, value in service.items():
self.mqtt_helper.safe_publish(
self.mqtt_helper.stat_t("service", "service", key),
json.dumps(value) if isinstance(value, dict) else str(value),
json.dumps(value) if isinstance(value, dict) else value,
qos=self.mqtt_config["qos"],
retain=True,
)
@ -247,3 +258,5 @@ class PublishMixin:
type_states = states[component_type][name] if isinstance(states[component_type], dict) else states[component_type]
_publish_one(device_id, type_states, name)
self.publish_service_state()

Loading…
Cancel
Save