fix: code cleanup; fix service sensors; reduce logging

pull/106/head
Jeff Culverhouse 3 months ago
parent 50ea60f311
commit a414715f26

@ -35,18 +35,18 @@ def main() -> int:
with Amcrest2Mqtt(args=args) as amcrest2mqtt:
try:
asyncio.run(amcrest2mqtt.main_loop())
except RuntimeError as e:
if "asyncio.run() cannot be called from a running event loop" in str(e):
except RuntimeError as err:
if "asyncio.run() cannot be called from a running event loop" in str(err):
# Nested event loop (common in tests or Jupyter) — fall back gracefully
loop = asyncio.get_event_loop()
loop.run_until_complete(amcrest2mqtt.main_loop())
else:
raise
except ConfigError as e:
logger.error(f"Fatal config error was found: {e}")
except ConfigError as err:
logger.error(f"Fatal config error was found: {err}")
return 1
except MqttError as e:
logger.error(f"MQTT service problems: {e}")
except MqttError as err:
logger.error(f"MQTT service problems: {err}")
return 1
except KeyboardInterrupt:
logger.warning("Shutdown requested (Ctrl+C). Exiting gracefully...")
@ -54,8 +54,8 @@ def main() -> int:
except asyncio.CancelledError:
logger.warning("Main loop cancelled.")
return 1
except Exception as e:
logger.error(f"unhandled exception: {e}", exc_info=True)
except Exception as err:
logger.error(f"unhandled exception: {err}", exc_info=True)
return 1
finally:
logger.info("amcrest2mqtt stopped.")

@ -87,14 +87,14 @@ class Base:
try:
cast(Any, self).publish_service_availability("offline")
cast(Any, self).mqttc.loop_stop()
except Exception as e:
self.logger.debug(f"MQTT loop_stop failed: {e}")
except Exception as err:
self.logger.debug(f"Mqtt loop_stop failed: {err}")
if cast(Any, self).mqttc.is_connected():
try:
cast(Any, self).mqttc.disconnect()
self.logger.info("Disconnected from MQTT broker")
except Exception as e:
self.logger.warning(f"Error during MQTT disconnect: {e}")
self.logger.info("disconnected from MQTT broker")
except Exception as err:
self.logger.warning(f"error during MQTT disconnect: {err}")
self.logger.info("Exiting gracefully")
self.logger.info("exiting gracefully")

@ -59,14 +59,12 @@ class AmcrestServiceProtocol(Protocol):
async def setup_device_list(self) -> None: ...
async def store_recording_in_media(self, device_id: str, amcrest_file: str) -> str | None: ...
def _csv(self, env_name: str) -> list[str] | None: ...
def _assert_no_tuples(self, data: Any, path: str = "root") -> None: ...
def _decode_payload(self, raw: bytes) -> Any: ...
def _handle_device_topic(self, components: list[str], payload: str) -> None: ...
def _handle_homeassistant_message(self, payload: str) -> None: ...
def _handle_signal(self, signum: int, frame: FrameType | None) -> Any: ...
def _parse_device_topic(self, components: list[str]) -> list[str | None] | None: ...
def assert_no_tuples(self, data: Any, path: str = "root") -> None: ...
def b_to_gb(self, total: int) -> float: ...
def b_to_mb(self, total: int) -> float: ...
def build_device_states(self, device_id: str) -> None: ...
@ -93,10 +91,12 @@ class AmcrestServiceProtocol(Protocol):
def get_storage_stats(self, device_id: str) -> dict[str, str | float]: ...
def handle_device_command(self, device_id: str, handler: str, message: str) -> None: ...
def handle_service_command(self, handler: str, message: str) -> None: ...
def handle_signal(self, signum: int, _: FrameType | None) -> Any: ...
def heartbeat_ready(self) -> None: ...
def is_discovered(self, device_id: str) -> bool: ...
def is_ipv4(self, string: str) -> bool: ...
def is_rate_limited(self) -> bool: ...
def list_from_env(self, env_name: str) -> list[str]: ...
def load_config(self, config_arg: Any | None) -> dict[str, Any]: ...
def mark_ready(self) -> None: ...
def mb_to_b(self, total: int) -> int: ...

@ -9,14 +9,14 @@ if TYPE_CHECKING:
class AmcrestMixin:
async def setup_device_list(self: Amcrest2Mqtt) -> None:
self.logger.info("Setting up device list from config")
self.logger.debug("setting up device list from config")
devices = await self.connect_to_devices()
amcrest_devices = await self.connect_to_devices()
self.publish_service_state()
seen_devices = set()
for device in devices.values():
for device in amcrest_devices.values():
created = await self.build_component(device)
if created:
seen_devices.add(created)
@ -25,12 +25,12 @@ class AmcrestMixin:
missing_devices = set(self.devices.keys()) - seen_devices
for device_id in missing_devices:
self.publish_device_availability(device_id, online=False)
self.logger.warning(f"Device {device_id} not seen in Amcrest API list — marked offline")
self.logger.warning(f"device {device_id} not seen in Amcrest API list — marked offline")
# Handle first discovery completion
if not self.discovery_complete:
await asyncio.sleep(1)
self.logger.info("First-time device setup and discovery is done")
self.logger.info("device setup and discovery is done")
self.discovery_complete = True
# convert Amcrest device capabilities into MQTT components
@ -62,7 +62,7 @@ class AmcrestMixin:
]:
return "camera"
else:
self.logger.error(f"Device you specified is not a supported model: {device["device_type"]}")
self.logger.error(f"device you specified is not a supported model: {device["device_type"]}")
return ""
async def build_camera(self: Amcrest2Mqtt, device: dict) -> str:
@ -352,7 +352,7 @@ class AmcrestMixin:
self.build_device_states(device_id)
if not self.states[device_id]["internal"].get("discovered", None):
self.logger.info(f'Added new camera: "{device["device_name"]}" {device["vendor"]} {device["device_type"]}] ({device_id})')
self.logger.info(f'added new camera: "{device["device_name"]}" {device["vendor"]} {device["device_type"]}] ({device_id})')
self.publish_device_discovery(device_id)
self.publish_device_availability(device_id, online=True)

@ -35,16 +35,16 @@ class AmcrestAPIMixin:
async with semaphore:
await asyncio.to_thread(self.get_device, host, name, index)
self.logger.info(f'Connecting to: {self.amcrest_config["hosts"]}')
self.logger.debug(f'connecting to: {self.amcrest_config["hosts"]}')
tasks = []
index = 0
for host, name in zip(cast(str, self.amcrest_config["hosts"]), cast(str, self.amcrest_config["names"])):
for host, name in zip(self.amcrest_config["hosts"], self.amcrest_config["names"]):
tasks.append(_connect_device(host, name, index))
index += 1
await asyncio.gather(*tasks)
self.logger.info("Connecting to hosts done.")
self.logger.info("connecting to hosts done")
return {d: self.amcrest_devices[d]["config"] for d in self.amcrest_devices.keys()}
def get_camera(self: Amcrest2Mqtt, host: str) -> AmcrestCamera:
@ -53,23 +53,28 @@ class AmcrestAPIMixin:
def get_device(self: Amcrest2Mqtt, host: str, device_name: str, index: int) -> None:
camera = None
try:
# resolve host and setup camera by ip so we aren't making 100k DNS lookups per day
try:
host_ip = self.get_ip_address(host)
device = self.get_camera(host_ip)
camera = device.camera
except LoginError:
self.logger.error(f'invalid username/password to connect to device "{host}", fix in config.yaml')
return
except AmcrestError as err:
self.logger.error(f'unexpected error connecting to device "{host}", check config.yaml: {err}')
return
except Exception as err:
self.logger.error(f"Error with {host}: {err}")
self.logger.error(f"error connecting to {host}: {err}")
return
serial_number = camera.serial_number
device_type = camera.device_type.replace("type=", "").strip()
is_ad110 = device_type == "AD110"
is_ad410 = device_type == "AD410"
is_doorbell = is_ad110 or is_ad410
serial_number = camera.serial_number
version = camera.software_information[0].replace("version=", "").strip()
build = camera.software_information[1].strip()
sw_version = f"{version} ({build})"
@ -80,7 +85,7 @@ class AmcrestAPIMixin:
mac_address = network_config[f"table.Network.{interface}.PhysicalAddress"].upper()
if camera.serial_number not in self.amcrest_devices:
self.logger.info(f"Connected to {host} with serial number {camera.serial_number}")
self.logger.info(f"connected to {host} with serial number {camera.serial_number}")
self.amcrest_devices[serial_number] = {
"camera": camera,
@ -107,21 +112,21 @@ class AmcrestAPIMixin:
}
self.get_privacy_mode(serial_number)
except LoginError:
self.logger.error(f'Invalid username/password to connect to device "{host}", fix in config.yaml')
except AmcrestError as err:
self.logger.error(f'Failed to connect to device "{host}", check config.yaml and restart to try again: {err}')
# Storage stats -------------------------------------------------------------------------------
def get_storage_stats(self: Amcrest2Mqtt, device_id: str) -> dict[str, str | float]:
device = self.amcrest_devices[device_id]
if not device["camera"]:
self.logger.warning(f"camera not found for {self.get_device_name(device_id)}")
return {}
try:
storage = self.amcrest_devices[device_id]["camera"].storage_all
except CommError:
self.logger.error(f"Failed to communicate with device ({self.get_device_name(device_id)}) for storage stats")
storage = device["camera"].storage_all
except CommError as err:
self.logger.error(f"failed to get storage stats from ({self.get_device_name(device_id)}): {err}")
return {}
except LoginError:
self.logger.error(f"Failed to authenticate with device ({self.get_device_name(device_id)}) for storage stats")
except LoginError as err:
self.logger.error(f"failed to auth to ({self.get_device_name(device_id)}): {err}")
return {}
return {
@ -134,27 +139,38 @@ class AmcrestAPIMixin:
def get_privacy_mode(self: Amcrest2Mqtt, device_id: str) -> bool:
device = self.amcrest_devices[device_id]
if not device["camera"]:
self.logger.warning(f"camera not found for {self.get_device_name(device_id)}")
return False
try:
privacy = device["camera"].privacy_config().split()
privacy_mode = True if privacy[0].split("=")[1] == "true" else False
device["privacy_mode"] = privacy_mode
except CommError:
self.logger.error(f"Failed to communicate with device ({self.get_device_name(device_id)}) to get privacy mode")
except LoginError:
self.logger.error(f"Failed to authenticate with device ({self.get_device_name(device_id)}) to get privacy mode")
except CommError as err:
self.logger.error(f"failed to get privacy mode from ({self.get_device_name(device_id)}): {err}")
return False
except LoginError as err:
self.logger.error(f"failed to auth to device ({self.get_device_name(device_id)}): {err}")
return False
return privacy_mode
def set_privacy_mode(self: Amcrest2Mqtt, device_id: str, switch: bool) -> str:
device = self.amcrest_devices[device_id]
if not device["camera"]:
self.logger.warning(f"camera not found for {self.get_device_name(device_id)}")
return ""
try:
response = cast(str, device["camera"].set_privacy(switch).strip())
except CommError:
self.logger.error(f"Failed to communicate with device ({self.get_device_name(device_id)}) to set privacy mode")
except LoginError:
self.logger.error(f"Failed to authenticate with device ({self.get_device_name(device_id)}) to set privacy mode")
except CommError as err:
self.logger.error(f"failed to set privacy mode on ({self.get_device_name(device_id)}): {err}")
return ""
except LoginError as err:
self.logger.error(f"failed to auth to device ({self.get_device_name(device_id)}): {err}")
return ""
return response
# Motion detection config ---------------------------------------------------------------------
@ -162,16 +178,16 @@ class AmcrestAPIMixin:
def get_motion_detection(self: Amcrest2Mqtt, device_id: str) -> bool:
device = self.amcrest_devices[device_id]
if not device["camera"]:
self.logger.warning(f"Cannot get motion_detection, no camera found for {self.get_device_name(device_id)}")
self.logger.warning(f"camera not found for {self.get_device_name(device_id)}")
return False
try:
motion_detection: bool = device["camera"].is_motion_detector_on()
except CommError:
self.logger.error(f"Failed to communicate with device ({self.get_device_name(device_id)}) to get motion detection")
motion_detection = bool(device["camera"].is_motion_detector_on())
except CommError as err:
self.logger.error(f"failed to get motion detection switch on ({self.get_device_name(device_id)}): {err}")
return False
except LoginError:
self.logger.error(f"Failed to authenticate with device ({self.get_device_name(device_id)}) to get motion detection")
except LoginError as err:
self.logger.error(f"failed to auth to device ({self.get_device_name(device_id)}): {err}")
return False
return motion_detection
@ -179,7 +195,7 @@ class AmcrestAPIMixin:
def set_motion_detection(self: Amcrest2Mqtt, device_id: str, switch: bool) -> str:
device = self.amcrest_devices[device_id]
if not device["camera"]:
self.logger.warning(f"Cannot set motion_detection, no camera found for {self.get_device_name(device_id)}")
self.logger.warning(f"camera not found for {self.get_device_name(device_id)}")
return ""
try:
@ -204,13 +220,13 @@ class AmcrestAPIMixin:
# Respect privacy mode (default False if missing)
if device.get("privacy_mode", False):
self.logger.info(f"Snapshot: skip {self.get_device_name(device_id)} (privacy mode ON)")
self.logger.info(f"skipping snapshot for {self.get_device_name(device_id)} (privacy mode ON)")
return None
camera = device.get("camera")
if camera is None:
self.logger.error(f"Snapshot: device {self.get_device_name(device_id)} has no 'camera' object")
if not device["camera"]:
self.logger.warning(f"camera not found for {self.get_device_name(device_id)}")
return None
camera = device["camera"]
for attempt in range(1, SNAPSHOT_MAX_TRIES + 1):
try:
@ -231,7 +247,7 @@ class AmcrestAPIMixin:
)
self.publish_device_state(device_id)
self.logger.debug(f"Snapshot: {self.get_device_name(device_id)} {len(image_bytes)} raw bytes -> {len(encoded)} b64 chars")
self.logger.debug(f"got snapshot from {self.get_device_name(device_id)} {len(image_bytes)} raw bytes -> {len(encoded)} b64 chars")
return encoded
except asyncio.CancelledError:
@ -245,16 +261,16 @@ class AmcrestAPIMixin:
delay = SNAPSHOT_BASE_BACKOFF_S * (2 ** (attempt - 1))
delay += random.uniform(0, 0.25)
self.logger.debug(
f"Snapshot: attempt {attempt}/{SNAPSHOT_MAX_TRIES} failed for {self.get_device_name(device_id)}: {err!r}; retrying in {delay:.2f}s"
f"snapshot attempt {attempt}/{SNAPSHOT_MAX_TRIES} failed for {self.get_device_name(device_id)}: {err!r}; retrying in {delay:.2f}s"
)
await asyncio.sleep(delay)
# Any other unexpected exception: log and stop
except Exception as err: # noqa: BLE001 (log-and-drop is intentional here)
self.logger.exception(f"Snapshot: unexpected error for {self.get_device_name(device_id)}: {err!r}")
self.logger.exception(f"snapshot: unexpected error for {self.get_device_name(device_id)}: {err!r}")
return None
self.logger.info(f"Snapshot: failed after {SNAPSHOT_MAX_TRIES} tries for {self.get_device_name(device_id)}")
self.logger.info(f"getting snapshot failed after {SNAPSHOT_MAX_TRIES} tries for {self.get_device_name(device_id)}")
return None
def get_snapshot(self: Amcrest2Mqtt, device_id: str) -> str | None:
@ -274,24 +290,23 @@ class AmcrestAPIMixin:
if len(data_raw) < self.mb_to_b(100):
return data_raw.decode("latin-1")
else:
self.logger.error(f"Raw recording is too large: {self.b_to_mb(len(data_raw))} MB")
self.logger.error(f"skipping raw recording, too large: {self.b_to_mb(len(data_raw))} MB")
return None
data_base64 = base64.b64encode(data_raw)
self.logger.info(
f"Processed recording from ({self.get_device_name(device_id)}) {len(data_raw)} bytes raw, and {len(data_base64)} bytes base64"
self.logger.debug(
f"processed recording from ({self.get_device_name(device_id)}) {len(data_raw)} bytes raw, and {len(data_base64)} bytes base64"
)
if len(data_base64) < self.mb_to_b(100):
return data_raw.decode("latin-1")
else:
self.logger.error(f"Encoded recording is too large: {self.b_to_mb(len(data_base64))} MB")
self.logger.error(f"skipping recording, too large: {self.b_to_mb(len(data_base64))} MB")
return None
except CommError:
tries += 1
except LoginError:
tries += 1
if tries == 3:
self.logger.error(f"Failed to communicate with device ({self.get_device_name(device_id)}) to get recorded file")
self.logger.error(f"failed to get recording from ({self.get_device_name(device_id)})")
return None
# Events --------------------------------------------------------------------------------------
@ -302,28 +317,28 @@ class AmcrestAPIMixin:
async def get_events_from_device(self: Amcrest2Mqtt, device_id: str) -> None:
device = self.amcrest_devices[device_id]
if not device["camera"]:
self.logger.warning(f"camera not found for {self.get_device_name(device_id)}")
return None
tries = 0
while tries < 3:
try:
async for code, payload in device["camera"].async_event_actions("All"):
await self.process_device_event(device_id, code, payload)
return
except CommError:
tries += 1
except LoginError:
tries += 1
if tries == 3:
self.logger.error(f"Failed to communicate for events for device ({self.get_device_name(device_id)})")
self.logger.error(f"failed to check for events on ({self.get_device_name(device_id)})")
async def process_device_event(self: Amcrest2Mqtt, device_id: str, code: str, payload: Any) -> None:
try:
device = self.amcrest_devices[device_id]
config = device["config"]
# if code != 'NewFile' and code != 'InterVideoAccess':
# self.logger.info(f'Event on {self.get_device_name(device_id)} - {code}: {payload}')
if (code == "ProfileAlarmTransmit" and config["is_ad110"]) or (code == "VideoMotion" and not config["is_ad110"]):
motion_payload = {"state": "on" if payload["action"] == "Start" else "off", "region": ", ".join(payload["data"]["RegionName"])}
self.events.append({"device_id": device_id, "event": "motion", "payload": motion_payload})
@ -347,6 +362,7 @@ class AmcrestAPIMixin:
elif code == "LensMaskClose":
device["privacy_mode"] = False
self.events.append({"device_id": device_id, "event": "privacy_mode", "payload": "off"})
# lets send these but not bother logging them here
elif code == "TimeChange":
self.events.append({"device_id": device_id, "event": code, "payload": payload["action"]})
@ -354,17 +370,19 @@ class AmcrestAPIMixin:
self.events.append({"device_id": device_id, "event": code, "payload": payload["action"]})
elif code == "RtspSessionDisconnect":
self.events.append({"device_id": device_id, "event": code, "payload": payload["action"]})
# lets just ignore these
elif code == "InterVideoAccess": # I think this is US, accessing the API of the camera, lets not inception!
pass
elif code == "VideoMotionInfo":
pass
# save everything else as a 'generic' event
else:
self.logger.info(f"Event on {self.get_device_name(device_id)} - {code}: {payload}")
self.logger.info(f"logged event on {self.get_device_name(device_id)} - {code}: {payload}")
self.events.append({"device_id": device_id, "event": code, "payload": payload})
except Exception as err:
self.logger.error(f"Failed to process event from {self.get_device_name(device_id)}: {err}", exc_info=True)
self.logger.error(f"failed to process event from {self.get_device_name(device_id)}: {err}", exc_info=True)
def get_next_event(self: Amcrest2Mqtt) -> dict[str, Any] | None:
return self.events.pop(0) if len(self.events) > 0 else None

@ -1,8 +1,7 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Jeff Culverhouse
import asyncio
import json
from typing import TYPE_CHECKING, cast, Any
from typing import TYPE_CHECKING
from datetime import datetime, timezone
if TYPE_CHECKING:
@ -15,17 +14,17 @@ class EventsMixin:
await asyncio.gather(*tasks)
async def check_for_events(self: Amcrest2Mqtt) -> None:
try:
needs_publish = set()
while device_event := self.get_next_event():
if "device_id" not in device_event:
self.logger.error(f"Got event, but missing device_id: {json.dumps(device_event)}")
continue
device_id = str(device_event["device_id"])
event = cast(str, device_event["event"])
payload = cast(dict[str, Any], device_event["payload"])
event = str(device_event["event"])
payload = device_event["payload"]
device_states = self.states[device_id]
states = self.states[device_id]
# if one of our known sensors
if event in ["motion", "human", "doorbell", "recording", "privacy_mode"]:
@ -41,9 +40,7 @@ class EventsMixin:
elif payload["file"].endswith(".mp4"):
if "path" in self.config["media"] and self.states[device_id]["switch"]["save_recordings"] == "ON":
await self.store_recording_in_media(device_id, payload["file"])
else:
self.logger.info(f"Got event for {self.get_device_name(device_id)}: {event} - {payload}")
if event == "motion":
elif event == "motion":
self.upsert_state(
device_id,
binary_sensor={"motion": payload["state"]},
@ -55,12 +52,12 @@ class EventsMixin:
else:
self.upsert_state(device_id, sensor={event: payload})
# other ways to infer "privacy mode" is off and needs updating
if event in ["motion", "human", "doorbell"] and device_states["switch"]["privacy"] != "OFF":
# other ways to infer "privacy mode" has been turned off and we need to update
if event in ["motion", "human", "doorbell"] and states["switch"]["privacy"] != "OFF":
self.upsert_state(device_id, switch={"privacy_mode": "OFF"})
# send everything to the device's event_text/time
self.logger.debug(f'Got {{{event}: {payload}}} for "{self.get_device_name(device_id)}"')
self.logger.debug(f'got event {{{event}: {payload}}} for "{self.get_device_name(device_id)}"')
self.upsert_state(
device_id,
sensor={
@ -68,7 +65,7 @@ class EventsMixin:
"event_time": datetime.now(timezone.utc).isoformat(),
},
)
needs_publish.add(device_id)
self.publish_device_state(device_id)
except Exception as err:
self.logger.error(err, exc_info=True)
for id in needs_publish:
self.publish_device_state(id)

@ -2,7 +2,6 @@
# Copyright (c) 2025 Jeff Culverhouse
from deepmerge.merger import Merger
import ipaddress
import logging
import os
import pathlib
import signal
@ -51,7 +50,7 @@ class HelpersMixin:
match handler:
case "save_recordings":
if message == "ON" and "path" not in self.config["media"]:
self.logger.error("User tried to turn on save_recordings, but there is no media path set")
self.logger.error("user tried to turn on save_recordings, but there is no media path set")
return
self.upsert_state(device_id, switch={"save_recordings": message})
self.publish_device_state(device_id)
@ -67,22 +66,17 @@ class HelpersMixin:
case "refresh_device_list":
if message == "refresh":
self.rediscover_all()
else:
self.logger.error("[handler] unknown [message]")
return
case _:
self.logger.error(f"Unrecognized message to {self.mqtt_helper.service_slug}: {handler} -> {message}")
self.logger.error(f"unrecognized message to {self.mqtt_helper.service_slug}: {handler} -> {message}")
return
self.publish_service_state()
def rediscover_all(self: Amcrest2Mqtt) -> None:
self.publish_service_state()
self.publish_service_discovery()
self.publish_service_state()
for device_id in self.devices:
if device_id == "service":
continue
self.publish_device_state(device_id)
self.publish_device_discovery(device_id)
self.publish_device_state(device_id)
# Utility functions ---------------------------------------------------------------------------
@ -95,7 +89,6 @@ class HelpersMixin:
def read_file(self: Amcrest2Mqtt, file_name: str) -> str:
with open(file_name, "r") as file:
data = file.read().replace("\n", "")
return data
def mb_to_b(self: Amcrest2Mqtt, total: int) -> int:
@ -121,18 +114,20 @@ class HelpersMixin:
for i in socket.getaddrinfo(string, None):
if i[0] == socket.AddressFamily.AF_INET:
return str(i[4][0])
except socket.gaierror as e:
raise Exception(f"Failed to resolve {string}: {e}")
raise Exception(f"Failed to find IP address for {string}")
except socket.gaierror as err:
raise Exception(f"failed to resolve {string}: {err}")
raise Exception(f"failed to find IP address for {string}")
def _csv(self: Amcrest2Mqtt, env_name: str) -> list[str] | None:
def list_from_env(self: Amcrest2Mqtt, env_name: str) -> list[str]:
v = os.getenv(env_name)
if not v:
return None
return [s.strip() for s in v.split(",") if s.strip()]
return [] if not v else [s.strip() for s in v.split(",") if s.strip()]
def load_config(self: Amcrest2Mqtt, config_arg: Any | None) -> dict[str, Any]:
version = os.getenv("BLINK2MQTT_VERSION", self.read_file("VERSION"))
version = os.getenv("AMCREST2MQTT_VERSION", self.read_file("VERSION"))
tier = os.getenv("AMCREST2MQTT_TIER", "prod")
if tier == "dev":
version += ":DEV"
config_from = "env"
config: dict[str, str | bool | int | dict] = {}
@ -146,12 +141,6 @@ class HelpersMixin:
elif os.path.isfile(config_path):
config_file = config_path
config_path = os.path.dirname(config_file)
else:
# If it's not a valid path but looks like a filename, handle gracefully
if config_path.endswith(".yaml"):
config_file = config_path
else:
config_file = os.path.join(config_path, "config.yaml")
# Try to load from YAML
if os.path.exists(config_file):
@ -159,10 +148,10 @@ class HelpersMixin:
with open(config_file, "r") as f:
config = yaml.safe_load(f) or {}
config_from = "file"
except Exception as e:
logging.warning(f"Failed to load config from {config_file}: {e}")
except Exception as err:
raise ConfigError(f"found {config_file} but failed to load: {err}")
else:
logging.warning(f"Config file not found at {config_file}, falling back to environment vars")
self.logger.info(f"config file not found at {config_file}, falling back to environment vars")
# Merge with environment vars (env vars override nothing if file exists)
mqtt = cast(dict[str, Any], config.get("mqtt", {}))
@ -178,41 +167,41 @@ class HelpersMixin:
if os.path.exists(media_path) and os.access(media_path, os.W_OK):
media["path"] = media_path
self.logger.info(f"Will be storing recordings in {media_path}, watch that it doesn't fill up your file system")
self.logger.info(f"storing recordings in {media_path}, watch that it doesn't fill up the file system")
else:
self.logger.info("media_path not configured, not found, or is not writable. Will not be saving recordings")
# fmt: off
mqtt = {
"host": cast(str, mqtt.get("host") or os.getenv("MQTT_HOST", "localhost")),
"port": int(cast(str, mqtt.get("port") or os.getenv("MQTT_PORT", 1883))),
"qos": int(cast(str, mqtt.get("qos") or os.getenv("MQTT_QOS", 0))),
"username": mqtt.get("username") or os.getenv("MQTT_USERNAME", ""),
"password": mqtt.get("password") or os.getenv("MQTT_PASSWORD", ""),
"tls_enabled": mqtt.get("tls_enabled") or (os.getenv("MQTT_TLS_ENABLED", "false").lower() == "true"),
"tls_ca_cert": mqtt.get("tls_ca_cert") or os.getenv("MQTT_TLS_CA_CERT"),
"tls_cert": mqtt.get("tls_cert") or os.getenv("MQTT_TLS_CERT"),
"tls_key": mqtt.get("tls_key") or os.getenv("MQTT_TLS_KEY"),
"prefix": mqtt.get("prefix") or os.getenv("MQTT_PREFIX", "amcrest2mqtt"),
"discovery_prefix": mqtt.get("discovery_prefix") or os.getenv("MQTT_DISCOVERY_PREFIX", "homeassistant"),
"host": str(mqtt.get("host") or os.getenv("MQTT_HOST", "localhost")),
"port": int(str(mqtt.get("port") or os.getenv("MQTT_PORT", 1883))),
"qos": int(str(mqtt.get("qos") or os.getenv("MQTT_QOS", 0))),
"username": str(mqtt.get("username") or os.getenv("MQTT_USERNAME", "")),
"password": str(mqtt.get("password") or os.getenv("MQTT_PASSWORD", "")),
"tls_enabled": bool(mqtt.get("tls_enabled") or (os.getenv("MQTT_TLS_ENABLED", "false").lower() == "true")),
"tls_ca_cert": str(mqtt.get("tls_ca_cert") or os.getenv("MQTT_TLS_CA_CERT")),
"tls_cert": str(mqtt.get("tls_cert") or os.getenv("MQTT_TLS_CERT")),
"tls_key": str(mqtt.get("tls_key") or os.getenv("MQTT_TLS_KEY")),
"prefix": str(mqtt.get("prefix") or os.getenv("MQTT_PREFIX", "amcrest2mqtt")),
"discovery_prefix": str(mqtt.get("discovery_prefix") or os.getenv("MQTT_DISCOVERY_PREFIX", "homeassistant")),
}
hosts = amcrest.get("hosts") or self._csv("AMCREST_HOSTS") or []
names = amcrest.get("names") or self._csv("AMCREST_NAMES") or []
sources = webrtc.get("sources") or self._csv("AMCREST_SOURCES") or []
hosts = list[str](amcrest.get("hosts") or self.list_from_env("AMCREST_HOSTS"))
names = list[str](amcrest.get("names") or self.list_from_env("AMCREST_NAMES"))
sources = list[str](webrtc.get("sources") or self.list_from_env("AMCREST_SOURCES"))
amcrest = {
"hosts": hosts,
"names": names,
"port": int(cast(str, amcrest.get("port") or os.getenv("AMCREST_PORT", 80))),
"username": amcrest.get("username") or os.getenv("AMCREST_USERNAME", ""),
"password": amcrest.get("password") or os.getenv("AMCREST_PASSWORD", ""),
"storage_update_interval": int(cast(str, amcrest.get("storage_update_interval") or os.getenv("AMCREST_STORAGE_UPDATE_INTERVAL", 900))),
"snapshot_update_interval": int(cast(str, amcrest.get("snapshot_update_interval") or os.getenv("AMCREST_SNAPSHOT_UPDATE_INTERVAL", 60))),
"port": int(str(amcrest.get("port") or os.getenv("AMCREST_PORT", 80))),
"username": str(amcrest.get("username") or os.getenv("AMCREST_USERNAME", "")),
"password": str(amcrest.get("password") or os.getenv("AMCREST_PASSWORD", "")),
"storage_update_interval": int(str(amcrest.get("storage_update_interval") or os.getenv("AMCREST_STORAGE_UPDATE_INTERVAL", 900))),
"snapshot_update_interval": int(str(amcrest.get("snapshot_update_interval") or os.getenv("AMCREST_SNAPSHOT_UPDATE_INTERVAL", 60))),
"webrtc": {
"host": webrtc.get("host") or os.getenv("AMCREST_WEBRTC_HOST", ""),
"port": int(cast(str, webrtc.get("port") or os.getenv("AMCREST_WEBRTC_PORT", 1984))),
"link": webrtc.get("link") or os.getenv("AMCREST_WEBRTC_LINK", "webrtc"),
"host": str(webrtc.get("host") or os.getenv("AMCREST_WEBRTC_HOST", "")),
"port": int(str(webrtc.get("port") or os.getenv("AMCREST_WEBRTC_PORT", 1984))),
"link": str(webrtc.get("link") or os.getenv("AMCREST_WEBRTC_LINK", "webrtc")),
"sources": sources,
},
}
@ -220,9 +209,9 @@ class HelpersMixin:
config = {
"mqtt": mqtt,
"amcrest": amcrest,
"debug": config.get("debug", os.getenv("DEBUG", "").lower() == "true"),
"hide_ts": config.get("hide_ts", os.getenv("HIDE_TS", "").lower() == "true"),
"timezone": config.get("timezone", os.getenv("TZ", "UTC")),
"debug": bool(config.get("debug", os.getenv("DEBUG", "").lower() == "true")),
"hide_ts": bool(config.get("hide_ts", os.getenv("HIDE_TS", "").lower() == "true")),
"timezone": str(config.get("timezone", os.getenv("TZ", "UTC"))),
"media": media,
"config_from": config_from,
"config_path": config_path,
@ -250,14 +239,20 @@ class HelpersMixin:
path = self.config["media"]["path"]
file_name = f"{name}-{time}.mp4"
file_path = Path(f"{path}/{file_name}")
try:
file_path.write_bytes(recording.encode("latin-1"))
except IOError as err:
self.logger.error(f"failed to save recordingt to {path}: {err}")
return None
self.upsert_state(
device_id,
media={"recording": file_path},
sensor={"recording_time": datetime.now(timezone.utc).isoformat()},
)
# update symlink to "lastest" recording
local_file = Path(f"./{file_name}")
latest_link = Path(f"{path}/{name}-latest.mp4")
if latest_link.is_symlink():
@ -268,27 +263,22 @@ class HelpersMixin:
url = f"{self.config["media"]["media_source"]}/{file_name}"
self.upsert_state(device_id, sensor={"recording_url": url})
return url
except IOError as e:
self.logger.error(f"Failed to save recordingt to {path}: {e}")
return None
self.logger.error(f"Failed to download recording from device {self.get_device_name(device_id)}")
return None
def _handle_signal(self: Amcrest2Mqtt, signum: int, frame: FrameType | None) -> Any:
def handle_signal(self: Amcrest2Mqtt, signum: int, _: FrameType | None) -> Any:
sig_name = signal.Signals(signum).name
self.logger.warning(f"{sig_name} received - stopping service loop")
self.running = False
def _force_exit() -> None:
self.logger.warning("Force-exiting process after signal")
self.logger.warning("force-exiting process after signal")
os._exit(0)
threading.Timer(5.0, _force_exit).start()
# Upsert devices and states -------------------------------------------------------------------
def _assert_no_tuples(self: Amcrest2Mqtt, data: Any, path: str = "root") -> None:
def assert_no_tuples(self: Amcrest2Mqtt, data: Any, path: str = "root") -> None:
if isinstance(data, tuple):
raise TypeError(f"⚠️ Found tuple at {path}: {data!r}")
@ -296,33 +286,33 @@ class HelpersMixin:
for key, value in data.items():
if isinstance(key, tuple):
raise TypeError(f"⚠️ Found tuple key at {path}: {key!r}")
self._assert_no_tuples(value, f"{path}.{key}")
self.assert_no_tuples(value, f"{path}.{key}")
elif isinstance(data, list):
for idx, value in enumerate(data):
self._assert_no_tuples(value, f"{path}[{idx}]")
self.assert_no_tuples(value, f"{path}[{idx}]")
def upsert_device(self: Amcrest2Mqtt, device_id: str, **kwargs: dict[str, Any] | str | int | bool | None) -> None:
MERGER = Merger(
[(dict, "merge"), (list, "append_unique"), (set, "union")],
["override"], # type conflicts: new wins
["override"], # fallback
["override"],
["override"],
)
for section, data in kwargs.items():
# Pre-merge check
self._assert_no_tuples(data, f"device[{device_id}].{section}")
self.assert_no_tuples(data, f"device[{device_id}].{section}")
merged = MERGER.merge(self.devices.get(device_id, {}), {section: data})
# Post-merge check
self._assert_no_tuples(merged, f"device[{device_id}].{section} (post-merge)")
self.assert_no_tuples(merged, f"device[{device_id}].{section} (post-merge)")
self.devices[device_id] = merged
def upsert_state(self: Amcrest2Mqtt, device_id: str, **kwargs: dict[str, Any] | str | int | bool | None) -> None:
MERGER = Merger(
[(dict, "merge"), (list, "append_unique"), (set, "union")],
["override"], # type conflicts: new wins
["override"], # fallback
["override"],
["override"],
)
for section, data in kwargs.items():
self._assert_no_tuples(data, f"state[{device_id}].{section}")
self.assert_no_tuples(data, f"state[{device_id}].{section}")
merged = MERGER.merge(self.states.get(device_id, {}), {section: data})
self._assert_no_tuples(merged, f"state[{device_id}].{section} (post-merge)")
self.assert_no_tuples(merged, f"state[{device_id}].{section} (post-merge)")
self.states[device_id] = merged

@ -61,7 +61,7 @@ class LoopsMixin:
self.loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
try:
signal.signal(sig, self._handle_signal)
signal.signal(sig, self.handle_signal)
except Exception:
self.logger.debug(f"Cannot install handler for {sig}")

@ -57,22 +57,22 @@ class MqttMixin:
try:
host = self.mqtt_config["host"]
port = self.mqtt_config["port"]
self.logger.info(f"Connecting to MQTT broker at {host}:{port} as {self.client_id}")
self.logger.info(f"connecting to MQTT broker at {host}:{port} as {self.client_id}")
props = Properties(PacketTypes.CONNECT)
props.SessionExpiryInterval = 0
self.mqttc.connect(host=host, port=port, keepalive=60, properties=props)
self.logger.info(f"Successful connection to {host} MQTT broker")
self.logger.info(f"connected to {host} MQTT broker")
self.mqtt_connect_time = datetime.now()
self.mqttc.loop_start()
except ConnectionError as error:
self.logger.error(f"Failed to connect to MQTT host {host}: {error}")
except ConnectionError as err:
self.logger.error(f"failed to connect to MQTT host {host}: {err}")
self.running = False
raise SystemExit(1)
except Exception as error:
self.logger.error(f"Network problem trying to connect to MQTT host {host}: {error}")
except Exception as err:
self.logger.error(f"network problem trying to connect to MQTT host {host}: {err}")
self.running = False
raise SystemExit(1)
@ -89,7 +89,7 @@ class MqttMixin:
self.publish_service_availability()
self.publish_service_state()
self.logger.debug("Subscribing to topics on MQTT")
self.logger.debug("subscribing to topics on MQTT")
client.subscribe("homeassistant/status")
client.subscribe(f"{self.mqtt_helper.service_slug}/service/+/set")
client.subscribe(f"{self.mqtt_helper.service_slug}/service/+/command")
@ -102,23 +102,23 @@ class MqttMixin:
self.mqtt_helper.clear_client()
if reason_code.value != 0:
self.logger.error(f"MQTT lost connection ({reason_code.getName()})")
self.logger.error(f"Mqtt lost connection ({reason_code.getName()})")
else:
self.logger.info("Closed MQTT connection")
self.logger.info("closed Mqtt connection")
if self.running and (self.mqtt_connect_time is None or datetime.now() > self.mqtt_connect_time + timedelta(seconds=10)):
# lets use a new client_id for a reconnect attempt
self.client_id = self.mqtt_helper.client_id()
self.mqttc_create()
else:
self.logger.info("MQTT disconnect — stopping service loop")
self.logger.info("Mqtt disconnect — stopping service loop")
self.running = False
def mqtt_on_log(self: Amcrest2Mqtt, client: Client, userdata: Any, paho_log_level: int, msg: str) -> None:
if paho_log_level == LogLevel.MQTT_LOG_ERR:
self.logger.error(f"MQTT logged: {msg}")
self.logger.error(f"Mqtt logged: {msg}")
if paho_log_level == LogLevel.MQTT_LOG_WARNING:
self.logger.warning(f"MQTT logged: {msg}")
self.logger.warning(f"Mqtt logged: {msg}")
def mqtt_on_message(self: Amcrest2Mqtt, client: Client, userdata: Any, msg: MQTTMessage) -> None:
topic = msg.topic
@ -134,7 +134,7 @@ class MqttMixin:
if components[0] == self.mqtt_helper.service_slug:
return self._handle_device_topic(components, payload)
self.logger.debug(f"Ignoring unrelated MQTT topic: {topic}")
self.logger.debug(f"ignoring unrelated MQTT topic: {topic}")
def _decode_payload(self: Amcrest2Mqtt, raw: bytes) -> Any:
try:
@ -143,13 +143,13 @@ class MqttMixin:
try:
return raw.decode("utf-8")
except Exception:
self.logger.warning("Failed to decode MQTT payload")
self.logger.warning("failed to decode MQTT payload: {err}")
return None
def _handle_homeassistant_message(self: Amcrest2Mqtt, payload: str) -> None:
if payload == "online":
self.rediscover_all()
self.logger.info("Home Assistant came online — rediscovering devices")
self.logger.info("home Assistant came (back?) online — resending device discovery")
def _handle_device_topic(self: Amcrest2Mqtt, components: list[str], payload: str) -> None:
parsed = self._parse_device_topic(components)
@ -158,16 +158,16 @@ class MqttMixin:
(vendor, device_id, attribute) = parsed
if not vendor or not vendor.startswith(self.mqtt_helper.service_slug):
self.logger.info(f"Ignoring non-Amcrest device command, got vendor {vendor}")
self.logger.info(f"ignoring non-Amcrest device command, got vendor {vendor}")
return
if not device_id or not attribute:
self.logger.error(f"Failed to parse device_id and/or payload from mqtt topic components: {components}")
self.logger.error(f"failed to parse device_id and/or payload from mqtt topic components: {components}")
return
if not self.devices.get(device_id, None):
self.logger.warning(f"Got MQTT message for unknown device: {device_id}")
self.logger.warning(f"got Mqtt message for unknown device: {device_id}")
return
self.logger.info(f"Got message for {self.get_device_name(device_id)}: set {components[-2]} to {payload}")
self.logger.info(f"got message for {self.get_device_name(device_id)}: set {components[-2]} to {payload}")
self.handle_device_command(device_id, attribute, payload)
def _parse_device_topic(self: Amcrest2Mqtt, components: list[str]) -> list[str | None] | None:
@ -193,18 +193,18 @@ class MqttMixin:
return [vendor, device_id, attribute]
except Exception as e:
self.logger.warning(f"Malformed device topic: {components} ({e})")
except Exception as err:
self.logger.warning(f"malformed device topic with {components}: {err}")
return []
def safe_split_device(self: Amcrest2Mqtt, topic: str, segment: str) -> list[str]:
try:
return segment.split("-", 1)
except ValueError:
self.logger.warning(f"Ignoring malformed topic: {topic}")
except ValueError as err:
self.logger.warning(f"Ignoring malformed topic {topic}: {err}")
return []
def mqtt_on_subscribe(self: Amcrest2Mqtt, client: Client, userdata: Any, mid: int, reason_code_list: list[ReasonCode], properties: Properties) -> None:
reason_names = [rc.getName() for rc in reason_code_list]
joined = "; ".join(reason_names) if reason_names else "none"
self.logger.debug(f"MQTT subscribed (mid={mid}): {joined}")
self.logger.debug(f"Mqtt subscribed (mid={mid}): {joined}")

@ -17,7 +17,7 @@ class PublishMixin:
self.config["version"],
)
self.logger.info("Publishing service entity")
self.logger.debug("publishing service entity")
self.mqtt_helper.safe_publish(
topic=self.mqtt_helper.disc_t("binary_sensor", "service"),
payload=json.dumps(
@ -46,8 +46,7 @@ class PublishMixin:
{
"name": f"{self.service_name} API Calls Today",
"uniq_id": self.mqtt_helper.svc_unique_id("api_calls"),
"stat_t": self.mqtt_helper.stat_t("service", "service"),
"value_template": "{{ value_json.api_calls }}",
"stat_t": self.mqtt_helper.stat_t("service", "service", "api_calls"),
"avty_t": self.mqtt_helper.avty_t("service"),
"unit_of_measurement": "calls",
"icon": "mdi:api",
@ -64,8 +63,7 @@ class PublishMixin:
{
"name": f"{self.service_name} Rate Limited by Amcrest",
"uniq_id": self.mqtt_helper.svc_unique_id("rate_limited"),
"stat_t": self.mqtt_helper.stat_t("service", "service"),
"value_template": "{{ value_json.rate_limited }}",
"stat_t": self.mqtt_helper.stat_t("service", "service", "rate_limited"),
"avty_t": self.mqtt_helper.avty_t("service"),
"payload_on": "YES",
"payload_off": "NO",
@ -83,8 +81,7 @@ class PublishMixin:
{
"name": f"{self.service_name} Device Refresh Interval",
"uniq_id": self.mqtt_helper.svc_unique_id("storage_refresh"),
"stat_t": self.mqtt_helper.stat_t("service", "service"),
"value_template": "{{ value_json.storage_refresh }}",
"stat_t": self.mqtt_helper.stat_t("service", "service", "storage_refresh"),
"avty_t": self.mqtt_helper.avty_t("service"),
"cmd_t": self.mqtt_helper.cmd_t("service", "storage_refresh"),
"unit_of_measurement": "s",
@ -104,8 +101,7 @@ class PublishMixin:
{
"name": f"{self.service_name} Device List Refresh Interval",
"uniq_id": self.mqtt_helper.svc_unique_id("device_list_refresh"),
"stat_t": self.mqtt_helper.stat_t("service", "service"),
"value_template": "{{ value_json.device_list_refresh }}",
"stat_t": self.mqtt_helper.stat_t("service", "service", "device_list_refresh"),
"avty_t": self.mqtt_helper.avty_t("service"),
"cmd_t": self.mqtt_helper.cmd_t("service", "device_list_refresh"),
"unit_of_measurement": "s",
@ -125,8 +121,7 @@ class PublishMixin:
{
"name": f"{self.service_name} Snapshot Refresh Interval",
"uniq_id": self.mqtt_helper.svc_unique_id("snapshot_refresh"),
"stat_t": self.mqtt_helper.stat_t("service", "service"),
"value_template": "{{ value_json.snapshot_refresh }}",
"stat_t": self.mqtt_helper.stat_t("service", "service", "snapshot_refresh"),
"avty_t": self.mqtt_helper.avty_t("service"),
"cmd_t": self.mqtt_helper.cmd_t("service", "snapshot_refresh"),
"unit_of_measurement": "m",
@ -155,7 +150,7 @@ class PublishMixin:
qos=self.mqtt_config["qos"],
retain=True,
)
self.logger.debug(f"[HA] Discovery published for {self.service} ({self.mqtt_helper.service_slug})")
self.logger.debug(f"discovery published for {self.service} ({self.mqtt_helper.service_slug})")
def publish_service_availability(self: Amcrest2Mqtt, status: str = "online") -> None:
self.mqtt_helper.safe_publish(self.mqtt_helper.avty_t("service"), status, qos=self.qos, retain=True)
@ -170,16 +165,10 @@ class PublishMixin:
"snapshot_refresh": self.snapshot_update_interval,
}
payload: Any
for key, value in service.items():
if not isinstance(value, dict):
payload = str(value)
else:
payload = json.dumps(value)
self.mqtt_helper.safe_publish(
self.mqtt_helper.stat_t("service", "service", key),
payload,
json.dumps(value) if isinstance(value, dict) else str(value),
qos=self.mqtt_config["qos"],
retain=True,
)
@ -188,23 +177,13 @@ class PublishMixin:
def publish_device_discovery(self: Amcrest2Mqtt, device_id: str) -> None:
def _publish_one(dev_id: str, defn: dict, suffix: str = "") -> None:
# Compute a per-mode device_id for topic namespacing
eff_device_id = dev_id if not suffix else f"{dev_id}_{suffix}"
# Grab this component's discovery topic
topic = self.mqtt_helper.disc_t(defn["component_type"], f"{dev_id}_{suffix}" if suffix else dev_id)
# Shallow copy to avoid mutating source
payload = {k: v for k, v in defn.items() if k != "component_type"}
# Publish discovery
self.mqtt_helper.safe_publish(topic, json.dumps(payload), retain=True)
self.upsert_state(eff_device_id, internal={"discovered": True})
# Mark discovered in state (per published entity)
self.states.setdefault(eff_device_id, {}).setdefault("internal", {})["discovered"] = 1
component = self.get_component(device_id)
_publish_one(device_id, component)
_publish_one(device_id, self.get_component(device_id))
# Publish any modes (0..n)
modes = self.get_modes(device_id)
@ -219,14 +198,9 @@ class PublishMixin:
def publish_device_state(self: Amcrest2Mqtt, device_id: str) -> None:
def _publish_one(dev_id: str, defn: str | dict[str, Any], suffix: str = "") -> None:
# Grab this component's state topic
topic = self.get_device_state_topic(dev_id, suffix)
# Shallow copy to avoid mutating source
if isinstance(defn, dict):
flat: dict[str, Any] = {k: v for k, v in defn.items() if k != "component_type"}
# Add metadata
meta = self.states[dev_id].get("meta")
if isinstance(meta, dict) and "last_update" in meta:
flat["last_update"] = meta["last_update"]
@ -235,13 +209,12 @@ class PublishMixin:
self.mqtt_helper.safe_publish(topic, defn, retain=True)
if not self.is_discovered(device_id):
self.logger.debug(f"[device state] Discovery not complete for {device_id} yet, holding off on sending state")
self.logger.debug(f"discovery not complete for {device_id} yet, holding off on sending state")
return
states = self.states[device_id]
_publish_one(device_id, states[self.get_component_type(device_id)])
# Publish any modes (0..n)
modes = self.get_modes(device_id)
for name, mode in modes.items():
component_type = mode["component_type"]

@ -568,28 +568,28 @@ wheels = [
[[package]]
name = "ruff"
version = "0.14.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ee/34/8218a19b2055b80601e8fd201ec723c74c7fe1ca06d525a43ed07b6d8e85/ruff-0.14.2.tar.gz", hash = "sha256:98da787668f239313d9c902ca7c523fe11b8ec3f39345553a51b25abc4629c96", size = 5539663, upload-time = "2025-10-23T19:37:00.956Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/16/dd/23eb2db5ad9acae7c845700493b72d3ae214dce0b226f27df89216110f2b/ruff-0.14.2-py3-none-linux_armv6l.whl", hash = "sha256:7cbe4e593505bdec5884c2d0a4d791a90301bc23e49a6b1eb642dd85ef9c64f1", size = 12533390, upload-time = "2025-10-23T19:36:18.044Z" },
{ url = "https://files.pythonhosted.org/packages/5a/8c/5f9acff43ddcf3f85130d0146d0477e28ccecc495f9f684f8f7119b74c0d/ruff-0.14.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:8d54b561729cee92f8d89c316ad7a3f9705533f5903b042399b6ae0ddfc62e11", size = 12887187, upload-time = "2025-10-23T19:36:22.664Z" },
{ url = "https://files.pythonhosted.org/packages/99/fa/047646491479074029665022e9f3dc6f0515797f40a4b6014ea8474c539d/ruff-0.14.2-py3-none-macosx_11_0_arm64.whl", hash = "sha256:5c8753dfa44ebb2cde10ce5b4d2ef55a41fb9d9b16732a2c5df64620dbda44a3", size = 11925177, upload-time = "2025-10-23T19:36:24.778Z" },
{ url = "https://files.pythonhosted.org/packages/15/8b/c44cf7fe6e59ab24a9d939493a11030b503bdc2a16622cede8b7b1df0114/ruff-0.14.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3d0bbeffb8d9f4fccf7b5198d566d0bad99a9cb622f1fc3467af96cb8773c9e3", size = 12358285, upload-time = "2025-10-23T19:36:26.979Z" },
{ url = "https://files.pythonhosted.org/packages/45/01/47701b26254267ef40369aea3acb62a7b23e921c27372d127e0f3af48092/ruff-0.14.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:7047f0c5a713a401e43a88d36843d9c83a19c584e63d664474675620aaa634a8", size = 12303832, upload-time = "2025-10-23T19:36:29.192Z" },
{ url = "https://files.pythonhosted.org/packages/2d/5c/ae7244ca4fbdf2bee9d6405dcd5bc6ae51ee1df66eb7a9884b77b8af856d/ruff-0.14.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3bf8d2f9aa1602599217d82e8e0af7fd33e5878c4d98f37906b7c93f46f9a839", size = 13036995, upload-time = "2025-10-23T19:36:31.861Z" },
{ url = "https://files.pythonhosted.org/packages/27/4c/0860a79ce6fd4c709ac01173f76f929d53f59748d0dcdd662519835dae43/ruff-0.14.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:1c505b389e19c57a317cf4b42db824e2fca96ffb3d86766c1c9f8b96d32048a7", size = 14512649, upload-time = "2025-10-23T19:36:33.915Z" },
{ url = "https://files.pythonhosted.org/packages/7f/7f/d365de998069720a3abfc250ddd876fc4b81a403a766c74ff9bde15b5378/ruff-0.14.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a307fc45ebd887b3f26b36d9326bb70bf69b01561950cdcc6c0bdf7bb8e0f7cc", size = 14088182, upload-time = "2025-10-23T19:36:36.983Z" },
{ url = "https://files.pythonhosted.org/packages/6c/ea/d8e3e6b209162000a7be1faa41b0a0c16a133010311edc3329753cc6596a/ruff-0.14.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:61ae91a32c853172f832c2f40bd05fd69f491db7289fb85a9b941ebdd549781a", size = 13599516, upload-time = "2025-10-23T19:36:39.208Z" },
{ url = "https://files.pythonhosted.org/packages/fa/ea/c7810322086db68989fb20a8d5221dd3b79e49e396b01badca07b433ab45/ruff-0.14.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1967e40286f63ee23c615e8e7e98098dedc7301568bd88991f6e544d8ae096", size = 13272690, upload-time = "2025-10-23T19:36:41.453Z" },
{ url = "https://files.pythonhosted.org/packages/a9/39/10b05acf8c45786ef501d454e00937e1b97964f846bf28883d1f9619928a/ruff-0.14.2-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:2877f02119cdebf52a632d743a2e302dea422bfae152ebe2f193d3285a3a65df", size = 13496497, upload-time = "2025-10-23T19:36:43.61Z" },
{ url = "https://files.pythonhosted.org/packages/59/a1/1f25f8301e13751c30895092485fada29076e5e14264bdacc37202e85d24/ruff-0.14.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:e681c5bc777de5af898decdcb6ba3321d0d466f4cb43c3e7cc2c3b4e7b843a05", size = 12266116, upload-time = "2025-10-23T19:36:45.625Z" },
{ url = "https://files.pythonhosted.org/packages/5c/fa/0029bfc9ce16ae78164e6923ef392e5f173b793b26cc39aa1d8b366cf9dc/ruff-0.14.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:e21be42d72e224736f0c992cdb9959a2fa53c7e943b97ef5d081e13170e3ffc5", size = 12281345, upload-time = "2025-10-23T19:36:47.618Z" },
{ url = "https://files.pythonhosted.org/packages/a5/ab/ece7baa3c0f29b7683be868c024f0838770c16607bea6852e46b202f1ff6/ruff-0.14.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:b8264016f6f209fac16262882dbebf3f8be1629777cf0f37e7aff071b3e9b92e", size = 12629296, upload-time = "2025-10-23T19:36:49.789Z" },
{ url = "https://files.pythonhosted.org/packages/a4/7f/638f54b43f3d4e48c6a68062794e5b367ddac778051806b9e235dfb7aa81/ruff-0.14.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:5ca36b4cb4db3067a3b24444463ceea5565ea78b95fe9a07ca7cb7fd16948770", size = 13371610, upload-time = "2025-10-23T19:36:51.882Z" },
{ url = "https://files.pythonhosted.org/packages/8d/35/3654a973ebe5b32e1fd4a08ed2d46755af7267da7ac710d97420d7b8657d/ruff-0.14.2-py3-none-win32.whl", hash = "sha256:41775927d287685e08f48d8eb3f765625ab0b7042cc9377e20e64f4eb0056ee9", size = 12415318, upload-time = "2025-10-23T19:36:53.961Z" },
{ url = "https://files.pythonhosted.org/packages/71/30/3758bcf9e0b6a4193a6f51abf84254aba00887dfa8c20aba18aa366c5f57/ruff-0.14.2-py3-none-win_amd64.whl", hash = "sha256:0df3424aa5c3c08b34ed8ce099df1021e3adaca6e90229273496b839e5a7e1af", size = 13565279, upload-time = "2025-10-23T19:36:56.578Z" },
{ url = "https://files.pythonhosted.org/packages/2e/5d/aa883766f8ef9ffbe6aa24f7192fb71632f31a30e77eb39aa2b0dc4290ac/ruff-0.14.2-py3-none-win_arm64.whl", hash = "sha256:ea9d635e83ba21569fbacda7e78afbfeb94911c9434aff06192d9bc23fd5495a", size = 12554956, upload-time = "2025-10-23T19:36:58.714Z" },
version = "0.14.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/75/62/50b7727004dfe361104dfbf898c45a9a2fdfad8c72c04ae62900224d6ecf/ruff-0.14.3.tar.gz", hash = "sha256:4ff876d2ab2b161b6de0aa1f5bd714e8e9b4033dc122ee006925fbacc4f62153", size = 5558687, upload-time = "2025-10-31T00:26:26.878Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ce/8e/0c10ff1ea5d4360ab8bfca4cb2c9d979101a391f3e79d2616c9bf348cd26/ruff-0.14.3-py3-none-linux_armv6l.whl", hash = "sha256:876b21e6c824f519446715c1342b8e60f97f93264012de9d8d10314f8a79c371", size = 12535613, upload-time = "2025-10-31T00:25:44.302Z" },
{ url = "https://files.pythonhosted.org/packages/d3/c8/6724f4634c1daf52409fbf13fefda64aa9c8f81e44727a378b7b73dc590b/ruff-0.14.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b6fd8c79b457bedd2abf2702b9b472147cd860ed7855c73a5247fa55c9117654", size = 12855812, upload-time = "2025-10-31T00:25:47.793Z" },
{ url = "https://files.pythonhosted.org/packages/de/03/db1bce591d55fd5f8a08bb02517fa0b5097b2ccabd4ea1ee29aa72b67d96/ruff-0.14.3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:71ff6edca490c308f083156938c0c1a66907151263c4abdcb588602c6e696a14", size = 11944026, upload-time = "2025-10-31T00:25:49.657Z" },
{ url = "https://files.pythonhosted.org/packages/0b/75/4f8dbd48e03272715d12c87dc4fcaaf21b913f0affa5f12a4e9c6f8a0582/ruff-0.14.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:786ee3ce6139772ff9272aaf43296d975c0217ee1b97538a98171bf0d21f87ed", size = 12356818, upload-time = "2025-10-31T00:25:51.949Z" },
{ url = "https://files.pythonhosted.org/packages/ec/9b/506ec5b140c11d44a9a4f284ea7c14ebf6f8b01e6e8917734a3325bff787/ruff-0.14.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:cd6291d0061811c52b8e392f946889916757610d45d004e41140d81fb6cd5ddc", size = 12336745, upload-time = "2025-10-31T00:25:54.248Z" },
{ url = "https://files.pythonhosted.org/packages/c7/e1/c560d254048c147f35e7f8131d30bc1f63a008ac61595cf3078a3e93533d/ruff-0.14.3-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a497ec0c3d2c88561b6d90f9c29f5ae68221ac00d471f306fa21fa4264ce5fcd", size = 13101684, upload-time = "2025-10-31T00:25:56.253Z" },
{ url = "https://files.pythonhosted.org/packages/a5/32/e310133f8af5cd11f8cc30f52522a3ebccc5ea5bff4b492f94faceaca7a8/ruff-0.14.3-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:e231e1be58fc568950a04fbe6887c8e4b85310e7889727e2b81db205c45059eb", size = 14535000, upload-time = "2025-10-31T00:25:58.397Z" },
{ url = "https://files.pythonhosted.org/packages/a2/a1/7b0470a22158c6d8501eabc5e9b6043c99bede40fa1994cadf6b5c2a61c7/ruff-0.14.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:469e35872a09c0e45fecf48dd960bfbce056b5db2d5e6b50eca329b4f853ae20", size = 14156450, upload-time = "2025-10-31T00:26:00.889Z" },
{ url = "https://files.pythonhosted.org/packages/0a/96/24bfd9d1a7f532b560dcee1a87096332e461354d3882124219bcaff65c09/ruff-0.14.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3d6bc90307c469cb9d28b7cfad90aaa600b10d67c6e22026869f585e1e8a2db0", size = 13568414, upload-time = "2025-10-31T00:26:03.291Z" },
{ url = "https://files.pythonhosted.org/packages/a7/e7/138b883f0dfe4ad5b76b58bf4ae675f4d2176ac2b24bdd81b4d966b28c61/ruff-0.14.3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e2f8a0bbcffcfd895df39c9a4ecd59bb80dca03dc43f7fb63e647ed176b741e", size = 13315293, upload-time = "2025-10-31T00:26:05.708Z" },
{ url = "https://files.pythonhosted.org/packages/33/f4/c09bb898be97b2eb18476b7c950df8815ef14cf956074177e9fbd40b7719/ruff-0.14.3-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:678fdd7c7d2d94851597c23ee6336d25f9930b460b55f8598e011b57c74fd8c5", size = 13539444, upload-time = "2025-10-31T00:26:08.09Z" },
{ url = "https://files.pythonhosted.org/packages/9c/aa/b30a1db25fc6128b1dd6ff0741fa4abf969ded161599d07ca7edd0739cc0/ruff-0.14.3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:1ec1ac071e7e37e0221d2f2dbaf90897a988c531a8592a6a5959f0603a1ecf5e", size = 12252581, upload-time = "2025-10-31T00:26:10.297Z" },
{ url = "https://files.pythonhosted.org/packages/da/13/21096308f384d796ffe3f2960b17054110a9c3828d223ca540c2b7cc670b/ruff-0.14.3-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:afcdc4b5335ef440d19e7df9e8ae2ad9f749352190e96d481dc501b753f0733e", size = 12307503, upload-time = "2025-10-31T00:26:12.646Z" },
{ url = "https://files.pythonhosted.org/packages/cb/cc/a350bac23f03b7dbcde3c81b154706e80c6f16b06ff1ce28ed07dc7b07b0/ruff-0.14.3-py3-none-musllinux_1_2_i686.whl", hash = "sha256:7bfc42f81862749a7136267a343990f865e71fe2f99cf8d2958f684d23ce3dfa", size = 12675457, upload-time = "2025-10-31T00:26:15.044Z" },
{ url = "https://files.pythonhosted.org/packages/cb/76/46346029fa2f2078826bc88ef7167e8c198e58fe3126636e52f77488cbba/ruff-0.14.3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:a65e448cfd7e9c59fae8cf37f9221585d3354febaad9a07f29158af1528e165f", size = 13403980, upload-time = "2025-10-31T00:26:17.81Z" },
{ url = "https://files.pythonhosted.org/packages/9f/a4/35f1ef68c4e7b236d4a5204e3669efdeefaef21f0ff6a456792b3d8be438/ruff-0.14.3-py3-none-win32.whl", hash = "sha256:f3d91857d023ba93e14ed2d462ab62c3428f9bbf2b4fbac50a03ca66d31991f7", size = 12500045, upload-time = "2025-10-31T00:26:20.503Z" },
{ url = "https://files.pythonhosted.org/packages/03/15/51960ae340823c9859fb60c63301d977308735403e2134e17d1d2858c7fb/ruff-0.14.3-py3-none-win_amd64.whl", hash = "sha256:d7b7006ac0756306db212fd37116cce2bd307e1e109375e1c6c106002df0ae5f", size = 13594005, upload-time = "2025-10-31T00:26:22.533Z" },
{ url = "https://files.pythonhosted.org/packages/b7/73/4de6579bac8e979fca0a77e54dec1f1e011a0d268165eb8a9bc0982a6564/ruff-0.14.3-py3-none-win_arm64.whl", hash = "sha256:26eb477ede6d399d898791d01961e16b86f02bc2486d0d1a7a9bb2379d055dc1", size = 12590017, upload-time = "2025-10-31T00:26:24.52Z" },
]
[[package]]

Loading…
Cancel
Save