diff --git a/Dockerfile b/Dockerfile index ccad353..cf7dc8f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,9 @@ FROM python:3.14.0a5-alpine3.21 +RUN python -m venv /usr/src/app +# Enable venv +ENV PATH="/usr/src/app/venv/bin:$PATH" + RUN python3 -m ensurepip # Upgrade pip and setuptools @@ -10,8 +14,6 @@ WORKDIR /usr/src/app COPY requirements.txt ./ RUN pip3 install --no-cache-dir --upgrade -r requirements.txt -RUN pip3 check - COPY . . ARG USER_ID=1000 @@ -19,7 +21,10 @@ ARG GROUP_ID=1000 RUN addgroup -g $GROUP_ID appuser && \ adduser -u $USER_ID -G appuser --disabled-password --gecos "" appuser +RUN chown appuser:appuser /config/* +RUN chmod 0664 /config/* USER appuser -CMD [ "python", "-u", "./amcrest2mqtt.py", "-c", "/config" ] +ENTRYPOINT [ "python", "-u", "./app.py" ] +CMD [ "-c", "/config" ] diff --git a/README.md b/README.md index 39101fc..61a9fe2 100644 --- a/README.md +++ b/README.md @@ -31,16 +31,19 @@ Or, we support the following environment variables and defaults: - `MQTT_DISCOVERY_PREFIX` (optional, default = 'homeassistant') - `TZ` (required, timezone identifier, see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List) -- `STORAGE_POLL_INTERVAL` (optional, default = 3600) - how often to fetch storage data (in seconds) (set to 0 to disable functionality) +- `DEVICE_UPDATE_INTERVAL` (optional, default = 3600) - how often to fetch storage stats (in seconds) -It exposes events to the following topics: +It exposes through device discovery a `service` and a `device` with components for each camera: -- `amcrest2mqtt/broker` - broker config -- `amcrest2mqtt/[SERIAL_NUMBER]/event` - all events -- `amcrest2mqtt/[SERIAL_NUMBER]/doorbell` - doorbell status (if AD110 or AD410) -- `amcrest2mqtt/[SERIAL_NUMBER]/human` - human detection (if AD410) -- `amcrest2mqtt/[SERIAL_NUMBER]/motion` - motion events (if supported) -- `amcrest2mqtt/[SERIAL_NUMBER]/config` - device configuration information +- `homeassistant/device/amcrest-service` - service config + +- `homeassistant/device/amcrest-[SERIAL_NUMBER]` per camera, with components: +- `event` - all events +- `doorbell` - doorbell status (if AD110 or AD410) +- `human` - human detection (if AD410) +- `motion` - motion events (if supported) +- `config` - device configuration information +- `storage` - storage stats ## Device Support @@ -48,15 +51,15 @@ The app supports events for any Amcrest device supported by [`python-amcrest`](h ## Home Assistant -The app has built-in support for Home Assistant discovery. Set the `HOME_ASSISTANT` environment variable to `true` to enable support. -If you are using a different MQTT prefix to the default, you will need to set the `HOME_ASSISTANT_PREFIX` environment variable. +The app has built-in support for Home Assistant discovery. Set the `MQTT_HOMEASSISTANT` environment variable to `true` to enable support. +If you are using a different MQTT prefix to the default, you will need to set the `MQTT_DISCOVERY_PREFIX` environment variable. ## Running the app To run via env variables with Docker Compose, see docker-compose.yaml or make sure you attach a volume with the config file and point to that directory, for example: ``` -CMD [ "python", "-u", "./amcrest2mqtt.py", "-c", "/config" ] +CMD [ "python", "-u", "./app.py", "-c", "/config" ] ``` ## Out of Scope diff --git a/VERSION b/VERSION index a041fc3..d268474 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.99.10 +0.99.11 diff --git a/amcrest2mqtt.py b/amcrest2mqtt.py deleted file mode 100644 index 644c5cf..0000000 --- a/amcrest2mqtt.py +++ /dev/null @@ -1,595 +0,0 @@ -from amcrest import AmcrestCamera, AmcrestError -import argparse -import asyncio -from datetime import datetime, timezone -from json import dumps -import paho.mqtt.client as mqtt -import os -import signal -from slugify import slugify -import ssl -import sys -from threading import Timer -import time -import yaml -from zoneinfo import ZoneInfo - -is_exiting = False -mqtt_client = None -config = { 'timezone': 'America/New_York' } -devices = {} - -# Helper functions and callbacks -def log(msg, level='INFO'): - ts = datetime.now(tz=ZoneInfo(config['timezone'])).strftime('%Y-%m-%d %H:%M:%S') - if len(msg) > 20480: - raise ValueError('Log message exceeds max length') - if level != "DEBUG" or os.getenv('DEBUG'): - print(f'{ts} [{level}] {msg}') - -def read_file(file_name): - with open(file_name, 'r') as file: - data = file.read().replace('\n', '') - - return data - -def read_version(): - if os.path.isfile("./VERSION"): - return read_file("./VERSION") - - return read_file("../VERSION") - -def to_gb(total): - return str(round(float(total[0]) / 1024 / 1024 / 1024, 2)) - -def signal_handler(sig, frame): - # exit immediately upon receiving a second SIGINT - global is_exiting - - if is_exiting: - os._exit(1) - - is_exiting = True - exit_gracefully(0) - -def exit_gracefully(rc, skip_mqtt=False): - log("Exiting app...") - - if mqtt_client is not None and mqtt_client.is_connected() and skip_mqtt == False: - # set cameras offline - for host in config['amcrest']['hosts']: - mqtt_publish(devices[host]["topics"]["status"], "offline", exit_on_error=False) - # set broker offline - mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/availability', "offline") - mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/status', "offline") - - mqtt_client.disconnect() - - # Use os._exit instead of sys.exit to ensure an MQTT disconnect event causes the program to exit correctly as they - # occur on a separate thread - os._exit(rc) - -# MQTT setup -def mqtt_connect(): - global mqtt_client - - if config['mqtt']['username'] is None: - log("Missing env vari: MQTT_USERNAME or mqtt.username in config", level="ERROR") - sys.exit(1) - - mqtt_client = mqtt.Client( - mqtt.CallbackAPIVersion.VERSION1, - client_id=f'{config["mqtt"]["prefix"]}_broker', - clean_session=False - ) - mqtt_client.on_connect = on_mqtt_connect - mqtt_client.on_disconnect = on_mqtt_disconnect - - # send "will_set" for the broker and each connected camera - mqtt_client.will_set(f'{config["mqtt"]["prefix"]}/{via_device}', payload="offline", qos=config['mqtt']['qos'], retain=True) - for host in config['amcrest']['hosts']: - mqtt_client.will_set(devices[host]["topics"]["status"], payload="offline", qos=config['mqtt']['qos'], retain=True) - - if config['mqtt']['tls_enabled']: - log(f"Setting up MQTT for TLS") - if config['mqtt']['tls_ca_cert'] is None: - log("Missing env var: MQTT_TLS_CA_CERT or mqtt.tls_ca_cert in config", level="ERROR") - sys.exit(1) - if config['mqtt']['tls_cert'] is None: - log("Missing env var: MQTT_TLS_CERT or mqtt.tls_cert in config", level="ERROR") - sys.exit(1) - if config['mqtt']['tls_cert'] is None: - log("Missing env var: MQTT_TLS_KEY or mqtt.tls_key in config", level="ERROR") - sys.exit(1) - mqtt_client.tls_set( - ca_certs=config['mqtt']['tls_ca_cert'], - certfile=config['mqtt']['tls_cert'], - keyfile=config['mqtt']['tls_key'], - cert_reqs=ssl.CERT_REQUIRED, - tls_version=ssl.PROTOCOL_TLS, - ) - else: - mqtt_client.username_pw_set(config['mqtt']['username'], password=config['mqtt']['password']) - - try: - mqtt_client.connect( - config['mqtt']['host'], - port=config['mqtt']['port'], - keepalive=60 - ) - mqtt_client.loop_start() - except ConnectionError as error: - log(f"Could not connect to MQTT server: {error}", level="ERROR") - sys.exit(1) - -def on_mqtt_connect(mqtt_client, userdata, flags, rc): - if rc != 0: - log(f"MQTT Connection Issue: {rc}", level="ERROR") - exit_gracefully(rc, skip_mqtt=True) - log(f"MQTT Connected", level="INFO") - -def on_mqtt_disconnect(mqtt_client, userdata, rc): - if rc != 0: - log(f"MQTT connection failed: {rc}", level="ERROR") - else: - log(f"MQTT connection closed successfully", level="INFO") - exit_gracefully(rc, skip_mqtt=True) - -def mqtt_publish(topic, payload, exit_on_error=True, json=False): - msg = mqtt_client.publish( - topic, payload=(dumps(payload) if json else payload), qos=config['mqtt']['qos'], retain=True - ) - - if msg.rc == mqtt.MQTT_ERR_SUCCESS: - msg.wait_for_publish(2) - return - - log(f"Error publishing MQTT message: {mqtt.error_string(msg.rc)}", level="ERROR") - - if exit_on_error: - exit_gracefully(msg.rc, skip_mqtt=True) - -# Amcrest Devices -def get_device(amcrest_host, amcrest_port, amcrest_username, amcrest_password, device_name): - log(f"Connecting to device and getting details for {amcrest_host}...") - camera = AmcrestCamera( - amcrest_host, amcrest_port, amcrest_username, amcrest_password - ).camera - - try: - device_type = camera.device_type.replace("type=", "").strip() - is_ad110 = device_type == "AD110" - is_ad410 = device_type == "AD410" - is_doorbell = is_ad110 or is_ad410 - serial_number = camera.serial_number - - if not isinstance(serial_number, str): - log(f"Error fetching serial number for {amcrest_host}", level="ERROR") - exit_gracefully(1) - - sw_version = camera.software_information[0].replace("version=", "").strip() - build_version = camera.software_information[1].strip() - amcrest_version = f"{sw_version} ({build_version})" - device_slug = slugify(device_name, separator="_") - vendor = camera.vendor_information - hardware_version = camera.hardware_version - except AmcrestError as error: - log(f"Error fetching camera details for {amcrest_host}", level="ERROR") - exit_gracefully(1) - - log(f" Vendor: {camera.vendor_information}") - log(f" Device name: {device_name}") - log(f" Device type: {device_type}") - log(f" Serial number: {serial_number}") - log(f" Software version: {amcrest_version}") - log(f" Hardware version: {camera.hardware_version}") - - home_assistant_prefix = config['mqtt']['home_assistant_prefix'] - - return { - "camera": camera, - "config": { - "amcrest_host": amcrest_host, - "device_name": device_name, - "device_type": device_type, - "device_slug": device_slug, - "device_class": camera.device_class, - "is_ad110": is_ad110, - "is_ad410": is_ad410, - "is_doorbell": is_doorbell, - "serial_number": serial_number, - "amcrest_version": amcrest_version, - "hardware_version": hardware_version, - "vendor": vendor, - }, - "telemetry": { - }, - "topics": { - "config": f'{config["mqtt"]["prefix"]}/{serial_number}/config', - "status": f'{config["mqtt"]["prefix"]}/{serial_number}/status', - "telemetry": f'{config["mqtt"]["prefix"]}/{serial_number}/telemetry', - "event": f'{config["mqtt"]["prefix"]}/{serial_number}/event', - "motion": f'{config["mqtt"]["prefix"]}/{serial_number}/motion', - "doorbell": f'{config["mqtt"]["prefix"]}/{serial_number}/doorbell', - "human": f'{config["mqtt"]["prefix"]}/{serial_number}/human', - "storage_used": f'{config["mqtt"]["prefix"]}/{serial_number}/storage/used', - "storage_used_percent": f'{config["mqtt"]["prefix"]}/{serial_number}/storage/used_percent', - "storage_total": f'{config["mqtt"]["prefix"]}/{serial_number}/storage/total', - "home_assistant": { - "doorbell": f"{home_assistant_prefix}/binary_sensor/amcrest2mqtt-{serial_number}/doorbell/config", - "human": f"{home_assistant_prefix}/binary_sensor/amcrest2mqtt-{serial_number}/human/config", - "motion": f"{home_assistant_prefix}/binary_sensor/amcrest2mqtt-{serial_number}/motion/config", - "storage_used": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/storage_used/config", - "storage_used_percent": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/storage_used_percent/config", - "storage_total": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/storage_total/config", - "version": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/version/config", - "host": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/host/config", - "serial_number": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/serial_number/config", - }, - }, - } - -# MQTT messages -def send_broker_discovery(): - mqtt_publish(f'{config["mqtt"]["home_assistant_prefix"]}/sensor/{via_device}/broker/config', { - "availability_topic": f'{config["mqtt"]["prefix"]}/{via_device}/availability', - "state_topic": f'{config["mqtt"]["prefix"]}/{via_device}/status', - "qos": config['mqtt']['qos'], - "device": { - "name": 'amcrest2mqtt broker', - "identifiers": via_device, - }, - "icon": 'mdi:language-python', - "unique_id": via_device, - "name": "amcrest2mqtt broker", - }, - json=True, - ) - -def send_device_discovery(device): - vendor = device["config"]["vendor"] - device_name = device["config"]["device_name"] - device_type = device["config"]["device_type"] - device_slug = device["config"]["device_slug"] - serial_number = device["config"]["serial_number"] - amcrest_version = device["config"]["amcrest_version"] - hw_version = device["config"]["hardware_version"] - - base_config = { - "availability_topic": device["topics"]["status"], - "qos": config['mqtt']['qos'], - "device": { - "name": device_name, - "manufacturer": vendor, - "model": device_type, - "identifiers": serial_number, - "sw_version": amcrest_version, - "hw_version": hw_version, - "via_device": via_device, - }, - } - - if device["config"]["is_doorbell"]: - doorbell_name = "Doorbell" if device_name == "Doorbell" else f"{device_name} Doorbell" - - mqtt_publish( - device["topics"]["home_assistant"]["doorbell"], - base_config - | { - "state_topic": device["topics"]["doorbell"], - "payload_on": "on", - "payload_off": "off", - "icon": "mdi:doorbell", - "name": doorbell_name, - "unique_id": f"{serial_number}.doorbell", - }, - json=True, - ) - - if device["config"]["is_ad410"]: - mqtt_publish( - device["topics"]["home_assistant"]["human"], - base_config - | { - "state_topic": device["topics"]["human"], - "payload_on": "on", - "payload_off": "off", - "device_class": "motion", - "name": "Human", - "unique_id": f"{serial_number}.human", - }, - json=True, - ) - - mqtt_publish( - device["topics"]["home_assistant"]["motion"], - base_config - | { - "state_topic": device["topics"]["motion"], - "payload_on": "on", - "payload_off": "off", - "device_class": "motion", - "name": "Motion", - "unique_id": f"{serial_number}.motion", - }, - json=True, - ) - - mqtt_publish( - device["topics"]["home_assistant"]["version"], - base_config - | { - "state_topic": device["topics"]["config"], - "value_template": "{{ value_json.sw_version }}", - "icon": "mdi:package-up", - "name": "Version", - "unique_id": f"{serial_number}.version", - "entity_category": "diagnostic", - "enabled_by_default": False - }, - json=True, - ) - - mqtt_publish( - device["topics"]["home_assistant"]["serial_number"], - base_config - | { - "state_topic": device["topics"]["config"], - "value_template": "{{ value_json.serial_number }}", - "icon": "mdi:alphabetical-variant", - "name": "Serial Number", - "unique_id": f"{serial_number}.serial_number", - "entity_category": "diagnostic", - "enabled_by_default": False - }, - json=True, - ) - - mqtt_publish( - device["topics"]["home_assistant"]["host"], - base_config - | { - "state_topic": device["topics"]["config"], - "value_template": "{{ value_json.host }}", - "icon": "mdi:ip-network", - "name": "Host", - "unique_id": f"{serial_number}.host", - "entity_category": "diagnostic", - "enabled_by_default": False - }, - json=True, - ) - - if config['amcrest']['storage_poll_interval'] > 0: - mqtt_publish( - device["topics"]["home_assistant"]["storage_used_percent"], - base_config - | { - "state_topic": device["topics"]["storage_used_percent"], - "unit_of_measurement": "%", - "icon": "mdi:micro-sd", - "name": f"Storage Used %", - "object_id": f"{device_slug}_storage_used_percent", - "unique_id": f"{serial_number}.storage_used_percent", - "entity_category": "diagnostic", - }, - json=True, - ) - - mqtt_publish( - device["topics"]["home_assistant"]["storage_used"], - base_config - | { - "state_topic": device["topics"]["storage_used"], - "unit_of_measurement": "GB", - "icon": "mdi:micro-sd", - "name": "Storage Used", - "unique_id": f"{serial_number}.storage_used", - "entity_category": "diagnostic", - }, - json=True, - ) - - mqtt_publish( - device["topics"]["home_assistant"]["storage_total"], - base_config - | { - "state_topic": device["topics"]["storage_total"], - "unit_of_measurement": "GB", - "icon": "mdi:micro-sd", - "name": "Storage Total", - "unique_id": f"{serial_number}.storage_total", - "entity_category": "diagnostic", - }, - json=True, - ) - -def refresh_broker(): - Timer(60, refresh_broker).start() - log('Refreshing amcrest2mqtt broker, every 60 sec') - mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/availability', 'online') - mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/status', 'online') - mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/config', { - 'device_name': 'amcrest2mqtt broker', - 'sw_version': version, - 'origin': { - 'name': 'amcrest2mqtt broker', - 'sw_version': version, - 'url': 'https://github.com/weirdtangent/amcrest2mqtt', - }, - }, json=True) - -def refresh_devices(): - for host in config['amcrest']['hosts']: - refresh_device(devices[host]) - -def refresh_device(device): - mqtt_publish(device['topics']['status'], 'online') - mqtt_publish(device['topics']['config'], { - 'device_type': device['config']['device_type'], - 'device_name': device['config']['device_name'], - 'sw_version': device['config']['amcrest_version'], - 'hw_version': device['config']['hardware_version'], - 'serial_number': device['config']['serial_number'], - 'host': device['config']['amcrest_host'], - 'configuration_url': 'http://' + device['config']['amcrest_host'] + '/', - 'origin': { - 'name': 'amcrest2mqtt broker', - 'sw_version': version, - 'url': 'https://github.com/weirdtangent/amcrest2mqtt', - }, - }, json=True) - mqtt_publish(device['topics']['telemetry'], - dumps(device['telemetry']), - json=True) - -def refresh_storage_sensors(): - Timer(config['amcrest']['storage_poll_interval'], refresh_storage_sensors).start() - log(f'Fetching storage sensors for {config["amcrest"]["host_count"]} host(s) (every {config["amcrest"]["storage_poll_interval"]} secs)') - - for host in config['amcrest']['hosts']: - device = devices[host] - topics = device["topics"] - try: - storage = device["camera"].storage_all - - mqtt_publish(topics["storage_used_percent"], str(storage["used_percent"])) - mqtt_publish(topics["storage_used"], to_gb(storage["used"])) - mqtt_publish(topics["storage_total"], to_gb(storage["total"])) - except AmcrestError as error: - log(f"Error fetching storage information for {host}: {error}", level="WARNING") - - -# cmd-line args -argparser = argparse.ArgumentParser() -argparser.add_argument( - "-c", - "--config", - required=False, - help="Directory holding config.yaml or full path to config file", -) -args = argparser.parse_args() - -# load config file -configpath = args.config -if configpath: - if not configpath.endswith(".yaml"): - if not configpath.endswith("/"): - configpath += "/" - configpath += "config.yaml" - log(f"Trying to load config file {configpath}") - with open(configpath) as file: - config = yaml.safe_load(file) -# or check env vars -else: - log(f"INFO:root:No config file specified, checking ENV") - config = { - 'mqtt': { - 'host': os.getenv("MQTT_HOST") or 'localhost', - 'port': int(os.getenv("MQTT_PORT") or 1883), - 'username': os.getenv("MQTT_USERNAME"), - 'password': os.getenv("MQTT_PASSWORD"), # can be None - 'qos': int(os.getenv("MQTT_QOS") or 0), - 'prefix': os.getenv("MQTT_PREFIX") or 'amcrest2mqtt', - 'home_assistant_prefix': os.getenv("MQTT_HOME_ASSISTANT_PREFIX") or "homeassistant", - 'tls_enabled': os.getenv("MQTT_TLS_ENABLED") == "true", - 'tls_ca_cert': os.getenv("MQTT_TLS_CA_CERT"), - 'tls_cert': os.getenv("MQTT_TLS_CERT"), - 'tls_key': os.getenv("MQTT_TLS_KEY"), - }, - 'amcrest': { - 'hosts': os.getenv("AMCREST_HOSTS"), - 'names': os.getenv("AMCREST_NAMES"), - 'port': int(os.getenv("AMCREST_PORT") or 80), - 'username': os.getenv("AMCREST_USERNAME") or "admin", - 'password': os.getenv("AMCREST_PASSWORD"), - 'storage_poll_interval': int(os.getenv("STORAGE_POLL_INTERVAL") or 3600), - }, - 'home_assistant': os.getenv("HOME_ASSISTANT") == "true", - 'debug': os.getenv("AMCREST_DEBUG") == "true", - 'timezone': os.getenv("TZ") or 'utc', - } - -# Exit if any of the required vars are not provided -if config['amcrest']['hosts'] is None: - log("Missing env var: AMCREST_HOSTS or amcrest.hosts in config", level="ERROR") - sys.exit(1) -config['amcrest']['host_count'] = len(config['amcrest']['hosts']) - -if config['amcrest']['names'] is None: - log("Missing env var: AMCREST_NAMES or amcrest.names in config", level="ERROR") - sys.exit(1) -config['amcrest']['name_count'] = len(config['amcrest']['names']) - -if config['amcrest']['host_count'] != config['amcrest']['name_count']: - log("The AMCREST_HOSTS and AMCREST_NAMES must have the same number of space-delimited hosts/names", level="ERROR") - sys.exit(1) -log(f"Found {config['amcrest']['host_count']} host(s) defined to monitor") - -if config['amcrest']['password'] is None: - log("Please set the AMCREST_PASSWORD environment variable", level="ERROR") - sys.exit(1) - -version = read_version() -via_device = config["mqtt"]["prefix"] + '-broker' -log(f"Starting: amcrest2mqtt v{version}") - -# handle interruptions -signal.signal(signal.SIGINT, signal_handler) - -# connect to each camera -amcrest_names = config['amcrest']['names'] -for host in config['amcrest']['hosts']: - name = amcrest_names.pop(0) - log(f"Connecting host: {host} as {name}", level="INFO") - devices[host] = get_device(host, config['amcrest']['port'], config['amcrest']['username'], config['amcrest']['password'], name) -log(f"Connecting to hosts done.", level="INFO") - -# connect to MQTT service -mqtt_connect() - -# configure broker and devices in Home Assistant -if config['home_assistant']: - send_broker_discovery() - for host in config['amcrest']['hosts']: - send_device_discovery(devices[host]) - -refresh_broker() -refresh_devices() - -# kick off storage refresh timer -if config['amcrest']['storage_poll_interval'] > 0: - refresh_storage_sensors() - -log(f"Listening for events on {config['amcrest']['host_count']} host(s)", level="DEBUG") - -async def main(): - try: - for host in config['amcrest']['hosts']: - device = devices[host] - device_config = device["config"] - device_topics = device["topics"] - async for code, payload in device["camera"].async_event_actions("All"): - log(f"Event on {host} - {code}: {payload['action']}") - if ((code == "ProfileAlarmTransmit" and device_config["is_ad110"]) - or (code == "VideoMotion" and not device_config["is_ad110"])): - motion_payload = "on" if payload["action"] == "Start" else "off" - mqtt_publish(device_topics["motion"], motion_payload) - device[host]['telemetry']['last_motion_event'] = str(datetime.now(tz=ZoneInfo(config['timezone']))) - elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human": - human_payload = "on" if payload["action"] == "Start" else "off" - mqtt_publish(device_topics["human"], human_payload) - device[host]['telemetry']['last_human_event'] = str(datetime.now(tz=ZoneInfo(config['timezone']))) - elif code == "_DoTalkAction_": - doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off" - mqtt_publish(device_topics["doorbell"], doorbell_payload) - device[host]['telemetry']['last_doorbell_event'] = str(datetime.now(tz=ZoneInfo(config['timezone']))) - - mqtt_publish(device_topics["event"], payload, json=True) - refresh_device(device) - - except AmcrestError as error: - log(f"Amcrest error while working on {host}: {AmcrestError}. Sleeping for 10 seconds.", level="ERROR") - time.sleep(10) - -asyncio.run(main()) diff --git a/amcrest_api.py b/amcrest_api.py new file mode 100644 index 0000000..e5d8668 --- /dev/null +++ b/amcrest_api.py @@ -0,0 +1,124 @@ +from amcrest import AmcrestCamera, AmcrestError +import asyncio +from datetime import date +import time +from util import * +from zoneinfo import ZoneInfo + +class AmcrestAPI(object): + def __init__(self, config): + self.last_call_date = '' + self.timezone = config['timezone'] + self.hide_ts = config['hide_ts'] or False + + self.amcrest_config = config['amcrest'] + + self.count = len(self.amcrest_config['hosts']) + self.devices = {} + + def log(self, msg, level='INFO'): + app_log(msg, level=level, tz=self.timezone, hide_ts=self.hide_ts) + + async def connect_to_devices(self): + self.log(f'Connecting to: {self.amcrest_config["hosts"]}') + tasks = [] + + device_names = self.amcrest_config['names'] + for host in self.amcrest_config['hosts']: + task = asyncio.create_task(self.get_device(host, device_names.pop(0))) + tasks.append(task) + await asyncio.gather(*tasks) + + self.log(f"Connecting to hosts done.", level="INFO") + + return {d: self.devices[d]['config'] for d in self.devices.keys()} + + def get_camera(self, host): + return AmcrestCamera( + host, self.amcrest_config['port'], self.amcrest_config['username'], self.amcrest_config['password'] + ).camera + + async def get_device(self, host, device_name): + camera = self.get_camera(host) + + try: + device_type = camera.device_type.replace("type=", "").strip() + is_ad110 = device_type == "AD110" + is_ad410 = device_type == "AD410" + is_doorbell = is_ad110 or is_ad410 + serial_number = camera.serial_number + + if not isinstance(serial_number, str): + raise Exception(f'Error fetching serial number for {host}: {error}') + + sw_version = camera.software_information[0].replace("version=", "").strip() + build_version = camera.software_information[1].strip() + amcrest_version = f"{sw_version} ({build_version})" + + vendor = camera.vendor_information + hardware_version = camera.hardware_version + except AmcrestError as error: + raise Exception(f'Error fetching camera details for {host}: {error}') + + self.devices[serial_number] = { + "camera": camera, + "config": { + "host": host, + "device_name": device_name, + "device_type": device_type, + "device_class": camera.device_class, + "is_ad110": is_ad110, + "is_ad410": is_ad410, + "is_doorbell": is_doorbell, + "serial_number": serial_number, + "software_version": amcrest_version, + "hardware_version": hardware_version, + "vendor": vendor, + }, + "storage": {}, + } + + def get_device_storage_stats(self, device_id): + if 'error' in self.devices[device_id]: + try: + self.devices[device_id]['camera'] = self.get_camera(self.devices[device_id]['config']['host']) + del self.devices[device_id]['error'] + except Exception as err: + err_msg = f'Problem re-connecting to camera: {err}' + self.log(err_msg, level='ERROR') + self.devices[device_id]["error"] = err_msg + raise Exception(err_msg) + + try: + storage = self.devices[device_id]["camera"].storage_all + except Exception as err: + err_msg = f'Problem connecting with camera to get storage stats: {err}' + self.log(err_msg, level='ERROR') + self.devices[device_id]["error"] = err_msg + raise Exception(err_msg) + return { + 'last_update': str(datetime.now(ZoneInfo(self.timezone))), + 'used_percent': str(storage['used_percent']), + 'used': to_gb(storage['used']), + 'total': to_gb(storage['total']), + } + + async def get_device_event_actions(self, device_id): + events = [] + device = self.devices[device_id] + config = device['config'] + async for code, payload in device["camera"].async_event_actions("All"): + self.log(f"Event on {config['host']} - {code}: {payload['action']}") + if ((code == "ProfileAlarmTransmit" and config["is_ad110"]) + or (code == "VideoMotion" and not config["is_ad110"])): + motion_payload = "on" if payload["action"] == "Start" else "off" + events.append({ 'event': 'motion', 'payload': motion_payload }) + elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human": + human_payload = "on" if payload["action"] == "Start" else "off" + events.append({ 'event': 'human', 'payload': human_payload }) + elif code == "_DoTalkAction_": + doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off" + events.append({ 'event': 'doorbell', 'payload': doorbell_payload }) + + events.append({ 'event': 'event', 'payload': payload }) + return events \ No newline at end of file diff --git a/amcrest_mqtt.py b/amcrest_mqtt.py new file mode 100644 index 0000000..01df7a6 --- /dev/null +++ b/amcrest_mqtt.py @@ -0,0 +1,631 @@ +import asyncio +from datetime import date +import amcrest_api +import json +import paho.mqtt.client as mqtt +import random +import signal +import ssl +import string +import time +from util import * +from zoneinfo import ZoneInfo + +class AmcrestMqtt(object): + def __init__(self, config): + self.running = False + + self.timezone = config['timezone'] + + self.mqttc = None + self.mqtt_connect_time = None + + self.config = config + self.mqtt_config = config['mqtt'] + self.amcrest_config = config['amcrest'] + + self.client_id = self.get_new_client_id() + + self.version = config['version'] + self.hide_ts = config['hide_ts'] or False + + self.device_update_interval = config['amcrest'].get('device_update_interval', 600) + + self.service_name = self.mqtt_config['prefix'] + ' service' + self.service_slug = self.mqtt_config['prefix'] + '-service' + + self.devices = {} + self.configs = {} + + def log(self, msg, level='INFO'): + app_log(msg, level=level, tz=self.timezone, hide_ts=self.hide_ts) + + async def _handle_sigterm(self, loop, tasks): + self.running = False + self.log('SIGTERM received, waiting for tasks to cancel...', level='WARN') + + for t in tasks: + t.cancel() + + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + + def __enter__(self): + self.mqttc_create() + self.amcrestc = amcrest_api.AmcrestAPI(self.config) + self.running = True + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.running = False + self.log('Exiting gracefully') + + if self.mqttc is not None and self.mqttc.is_connected(): + for device_id in self.devices: + self.devices[device_id]['availability'] = 'offline' + if 'state' not in self.devices[device_id]: + self.devices[device_id]['state'] = {} + self.publish_device(device_id) + + self.mqttc.disconnect() + else: + self.log('Lost connection to MQTT') + + # MQTT Functions + def mqtt_on_connect(self, client, userdata, flags, rc, properties): + if rc != 0: + self.log(f'MQTT CONNECTION ISSUE ({rc})', level='ERROR') + exit() + self.log(f'MQTT connected as {self.client_id}') + client.subscribe(self.get_device_sub_topic()) + client.subscribe(self.get_attribute_sub_topic()) + + def mqtt_on_disconnect(self, client, userdata, flags, rc, properties): + self.log('MQTT connection closed') + + # if reconnect, lets use a new client_id + self.client_id = self.get_new_client_id() + + if time.time() > self.mqtt_connect_time + 10: + self.mqttc_create() + else: + exit() + + def mqtt_on_log(self, client, userdata, paho_log_level, msg): + level = None + if paho_log_level == mqtt.LogLevel.MQTT_LOG_ERR: + level = 'ERROR' + if paho_log_level == mqtt.LogLevel.MQTT_LOG_WARNING: + level = 'WARN' + if level: + self.log(f'MQTT LOG: {msg}', level=level) + + def mqtt_on_message(self, client, userdata, msg): + if not msg or not msg.payload: + return + topic = msg.topic + payload = json.loads(msg.payload) + + self.log(f'Got MQTT message for {topic} - {payload}') + + # we might get: + # device/component/set + # device/component/set/attribute + # homeassistant/device/component/set + # homeassistant/device/component/set/attribute + components = topic.split('/') + + # handle this message if it's for us, otherwise pass along to amcrest API + if components[-2] == self.get_component_slug('service'): + self.handle_service_message(None, payload) + elif components[-3] == self.get_component_slug('service'): + self.handle_service_message(components[-1], payload) + else: + if components[-1] == 'set': + mac = components[-2][-16:] + elif components[-2] == 'set': + mac = components[-3][-16:] + else: + self.log(f'UNKNOWN MQTT MESSAGE STRUCTURE: {topic}', level='ERROR') + return + # ok, lets format the device_id and send to amcrest + device_id = ':'.join([mac[i:i+2] for i in range (0, len(mac), 2)]) + self.send_command(device_id, payload) + + def mqtt_on_subscribe(self, client, userdata, mid, reason_code_list, properties): + rc_list = map(lambda x: x.getName(), reason_code_list) + self.log(f'MQTT SUBSCRIBED: reason_codes - {'; '.join(rc_list)}', level='DEBUG') + + # MQTT Helpers + def mqttc_create(self): + self.mqttc = mqtt.Client( + callback_api_version=mqtt.CallbackAPIVersion.VERSION2, + client_id=self.client_id, + clean_session=False, + ) + + if self.mqtt_config.get('tls_enabled'): + self.mqttcnt.tls_set( + ca_certs=self.mqtt_config.get('tls_ca_cert'), + certfile=self.mqtt_config.get('tls_cert'), + keyfile=self.mqtt_config.get('tls_key'), + cert_reqs=ssl.CERT_REQUIRED, + tls_version=ssl.PROTOCOL_TLS, + ) + else: + self.mqttc.username_pw_set( + username=self.mqtt_config.get('username'), + password=self.mqtt_config.get('password'), + ) + + self.mqttc.on_connect = self.mqtt_on_connect + self.mqttc.on_disconnect = self.mqtt_on_disconnect + self.mqttc.on_message = self.mqtt_on_message + self.mqttc.on_subscribe = self.mqtt_on_subscribe + self.mqttc.on_log = self.mqtt_on_log + + # self.mqttc.will_set(self.get_state_topic(self.service_slug) + '/availability', payload="offline", qos=0, retain=True) + + try: + self.mqttc.connect( + self.mqtt_config.get('host'), + port=self.mqtt_config.get('port'), + keepalive=60, + ) + self.mqtt_connect_time = time.time() + self.mqttc.loop_start() + except ConnectionError as error: + self.log(f'COULD NOT CONNECT TO MQTT {self.mqtt_config.get("host")}: {error}', level='ERROR') + exit(1) + + # MQTT Topics + def get_new_client_id(self): + return self.mqtt_config['prefix'] + '-' + ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) + + def get_slug(self, device_id, type): + return f"amcrest_{device_id.replace(':','')}_{type}" + + def get_device_sub_topic(self): + if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False: + return f"{self.mqtt_config['prefix']}/+/set" + return f"{self.mqtt_config['discovery_prefix']}/device/+/set" + + def get_attribute_sub_topic(self): + if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False: + return f"{self.mqtt_config['prefix']}/+/set" + return f"{self.mqtt_config['discovery_prefix']}/device/+/set/+" + + def get_component_slug(self, device_id): + return f"amcrest-{device_id.replace(':','')}" + + def get_command_topic(self, device_id, attribute_name): + if attribute_name: + if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False: + return f"{self.mqtt_config['prefix']}/{self.get_component_slug(device_id)}/set/{attribute_name}" + return f"{self.mqtt_config['discovery_prefix']}/device/{self.get_component_slug(device_id)}/set/{attribute_name}" + else: + if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False: + return f"{self.mqtt_config['prefix']}/{self.get_component_slug(device_id)}/set" + return f"{self.mqtt_config['discovery_prefix']}/device/{self.get_component_slug(device_id)}/set" + + def get_discovery_topic(self, device_id, topic): + if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False: + return f"{self.mqtt_config['prefix']}/{self.get_component_slug(device_id)}/{topic}" + return f"{self.mqtt_config['discovery_prefix']}/device/{self.get_component_slug(device_id)}/{topic}" + + # Service Device + def publish_service_device(self): + state_topic = self.get_discovery_topic('service', 'state') + command_topic = self.get_discovery_topic('service', 'set') + availability_topic = self.get_discovery_topic('service', 'availability') + + self.mqttc.publish( + self.get_discovery_topic('service','config'), + json.dumps({ + 'qos': 0, + 'state_topic': state_topic, + 'availability_topic': availability_topic, + 'device': { + 'name': self.service_name, + 'ids': self.service_slug, + 'suggested_area': 'House', + 'manufacturer': 'weirdTangent', + 'model': self.version, + }, + 'origin': { + 'name': self.service_name, + 'sw_version': self.version, + 'support_url': 'https://github.com/weirdtangent/amcrest2mqtt', + }, + 'components': { + self.service_slug + '_status': { + 'name': 'Service', + 'platform': 'binary_sensor', + 'schema': 'json', + 'payload_on': 'online', + 'payload_off': 'offline', + 'icon': 'mdi:language-python', + 'state_topic': state_topic, + 'availability_topic': availability_topic, + 'value_template': '{{ value_json.status }}', + 'unique_id': 'amcrest_service_status', + }, + self.service_slug + '_device_refresh': { + 'name': 'Device Refresh Interval', + 'platform': 'number', + 'schema': 'json', + 'icon': 'mdi:numeric', + 'min': 10, + 'max': 3600, + 'state_topic': state_topic, + 'command_topic': self.get_command_topic('service', 'device_refresh'), + 'availability_topic': availability_topic, + 'value_template': '{{ value_json.device_refresh }}', + 'unique_id': 'amcrest_service_device_refresh', + }, + }, + }), + retain=True + ) + self.update_service_device() + + def update_service_device(self): + self.mqttc.publish(self.get_discovery_topic('service','availability'), 'online', retain=True) + self.mqttc.publish( + self.get_discovery_topic('service','state'), + json.dumps({ + 'status': 'online', + 'device_refresh': self.device_update_interval, + }), + retain=True + ) + + + # amcrest Helpers + async def setup_devices(self): + self.log(f'Setup devices') + + try: + devices = await self.amcrestc.connect_to_devices() + except Exception as err: + self.log(f'Failed to connect to 1 or more devices {err}', level='ERROR') + exit(1) + + self.publish_service_device() + for device_id in devices: + config = devices[device_id] + + if 'device_type' in config: + first = False + if device_id not in self.devices: + first = True + self.devices[device_id] = {} + self.configs[device_id] = config + self.devices[device_id]['qos'] = 0 + self.devices[device_id]['state_topic'] = self.get_discovery_topic(device_id, 'state') + self.devices[device_id]['availability_topic'] = self.get_discovery_topic(device_id, 'availability') + self.devices[device_id]['command_topic'] = self.get_discovery_topic(device_id, 'set') + # self.mqttc.will_set(self.get_state_topic(device_id)+'/availability', payload="offline", qos=0, retain=True) + + self.devices[device_id]['device'] = { + 'name': config['device_name'], + 'manufacturer': config['vendor'], + 'model': config['device_type'], + 'ids': device_id, + 'sw_version': config['software_version'], + 'hw_version': config['hardware_version'], + 'via_device': self.service_slug, + } + self.devices[device_id]['origin'] = { + 'name': self.service_name, + 'sw_version': self.version, + 'support_url': 'https://github.com/weirdtangent/amcrest2mqtt', + } + self.add_components_to_device(device_id) + + if first: + self.log(f'Adding device: "{config['device_name']}" [Amcrest {config["device_type"]}] ({device_id})') + self.send_device_discovery(device_id) + else: + self.log(f'Updated device: {self.devices[device_id]['device']['name']}', level='DEBUG') + + # device discovery sent, now it is save to add these to the dict + self.devices[device_id]['state'] = {} + self.devices[device_id]['availability'] = 'online' + self.devices[device_id]['storage'] = {} + self.devices[device_id]['motion'] = {} + self.devices[device_id]['event'] = {} + else: + if first_time_through: + self.log(f'Saw device, but not supported yet: "{config["device_name"]}" [amcrest {config["device_type"]}] ({device_id})') + + # add amcrest components to devices + def add_components_to_device(self, device_id): + device = self.devices[device_id] + config = self.configs[device_id] + components = {} + + if config['is_doorbell']: + doorbell_name = 'Doorbell' if config['device_name'] == 'Doorbell' else f'{config["device_name"]} Doorbell' + components[self.get_slug(device_id, 'doorbell')] = { + 'name': doorbell_name, + 'platform': 'binary_sensor', + 'payload_on': 'on', + 'payload_off': 'off', + 'device_class': '', + 'icon': 'mdi:doorbell', + 'state_topic': self.get_discovery_topic(device_id, 'doorbell'), + 'availability_topic': device['availability_topic'], + 'value_template': '{{ value_json.doorbell }}', + 'unique_id': self.get_slug(device_id, 'doorbell'), + } + + if config['is_ad410']: + components[self.get_slug(device_id, 'human')] = { + 'name': 'Human', + 'platform': 'binary_sensor', + 'payload_on': 'on', + 'payload_off': 'off', + 'device_class': 'motion', + 'state_topic': self.get_discovery_topic(device_id, 'human'), + 'availability_topic': device['availability_topic'], + 'value_template': '{{ value_json.human }}', + 'unique_id': self.get_slug(device_id, 'human'), + } + + components[self.get_slug(device_id, 'motion')] = { + 'name': 'Motion', + 'platform': 'binary_sensor', + 'payload_on': 'on', + 'payload_off': 'off', + 'device_class': 'motion', + 'state_topic': self.get_discovery_topic(device_id, 'motion'), + 'availability_topic': device['availability_topic'], + 'unique_id': self.get_slug(device_id, 'motion'), + } + + components[self.get_slug(device_id, 'version')] = { + 'name': 'Version', + 'platform': 'sensor', + 'icon': 'mdi:package-up', + 'state_topic': device['state_topic'], + 'availability_topic': device['availability_topic'], + 'value_template': '{{ value_json.sw_version }}', + 'entity_category': 'diagnostic', + 'unique_id': self.get_slug(device_id, 'sw_version'), + } + + components[self.get_slug(device_id, 'serial_number')] = { + 'name': 'Serial Number', + 'platform': 'sensor', + 'icon': 'mdi:alphabetical-variant-up', + 'state_topic': device['state_topic'], + 'availability_topic': device['availability_topic'], + 'value_template': '{{ value_json.serial_number }}', + 'entity_category': 'diagnostic', + 'unique_id': self.get_slug(device_id, 'serial_number'), + } + + components[self.get_slug(device_id, 'host')] = { + 'name': 'Host', + 'platform': 'sensor', + 'icon': 'mdi:ip-network', + 'state_topic': device['state_topic'], + 'availability_topic': device['availability_topic'], + 'value_template': '{{ value_json.host }}', + 'entity_category': 'diagnostic', + 'unique_id': self.get_slug(device_id, 'host'), + } + + components[self.get_slug(device_id, 'event')] = { + 'name': 'Event', + 'platform': 'sensor', + 'state_topic': self.get_discovery_topic(device_id, 'event'), + 'availability_topic': device['availability_topic'], + 'unique_id': self.get_slug(device_id, 'event'), + } + + components[self.get_slug(device_id, 'storage_used_percent')] = { + 'name': 'Storage Used %', + 'platform': 'sensor', + 'icon': 'mdi:micro-sd', + 'unit_of_measurement': '%', + 'state_topic': self.get_discovery_topic(device_id, 'storage'), + 'availability_topic': device['availability_topic'], + 'value_template': '{{ value_json.used_percent }}', + 'entity_category': 'diagnostic', + 'unique_id': self.get_slug(device_id, 'storage_used_percent'), + } + components[self.get_slug(device_id, 'storage_total')] = { + 'name': 'Storage Total', + 'platform': 'sensor', + 'icon': 'mdi:micro-sd', + 'unit_of_measurement': 'GB', + 'state_topic': self.get_discovery_topic(device_id, 'storage'), + 'availability_topic': device['availability_topic'], + 'value_template': '{{ value_json.total }}', + 'entity_category': 'diagnostic', + 'unique_id': self.get_slug(device_id, 'storage_total'), + } + components[self.get_slug(device_id, 'storage_used')] = { + 'name': 'Storage Used', + 'platform': 'sensor', + 'icon': 'mdi:micro-sd', + 'unit_of_measurement': 'GB', + 'state_topic': self.get_discovery_topic(device_id, 'storage'), + 'availability_topic': device['availability_topic'], + 'value_template': '{{ value_json.used }}', + 'entity_category': 'diagnostic', + 'unique_id': self.get_slug(device_id, 'storage_used'), + } + components[self.get_slug(device_id, 'last_update')] = { + 'name': 'Last Update', + 'platform': 'sensor', + 'device_class': 'timestamp', + 'state_topic': device['state_topic'], + 'availability_topic': device['availability_topic'], + 'value_template': '{{ value_json.last_update }}', + 'unique_id': self.get_slug(device_id, 'last_update'), + } + + + # since we always add at least `motion`, this should always be true + if len(components) > 0: + device['components'] = components + + def send_device_discovery(self, device_id): + device = self.devices[device_id] + self.mqttc.publish(self.get_discovery_topic(device_id, 'config'), json.dumps(device), retain=True) + + def refresh_all_devices(self): + self.log(f'Refreshing storage info for all devices (every {self.device_update_interval} sec)') + + # refresh devices starting with the device updated the longest time ago + for each in sorted(self.devices.items(), key=lambda dt: (dt is None, dt)): + # break loop if we are ending + if not self.running: + break + device_id = each[0] + + # all just to format the log record + last_updated = self.devices[device_id]['state']['last_update'][:19].replace('T',' ') if 'last_update' in self.devices[device_id]['state'] else 'server started' + + self.log(f'Refreshing device "{self.devices[device_id]['device']['name']} ({device_id})", not updated since: {last_updated}') + self.configs[device_id]['last_update'] = datetime.now(ZoneInfo(self.timezone)) + self.refresh_device(device_id) + + def refresh_device(self, device_id): + # don't refresh the device until it has been published in device discovery + # and we can tell because it will be `online` + + #if self.devices[device_id]['state']['status'] != 'online': + # return + + config = self.configs[device_id] + + result = self.amcrestc.get_device_storage_stats(device_id) + if result and 'last_update' in result: + self.devices[device_id]['storage'] = result + self.devices[device_id]['state'] = { + 'status': 'online', + 'host': config['host'], + 'serial_number': config['serial_number'], + 'sw_version': config['software_version'], + 'last_update': config['last_update'].isoformat(), + } + + + self.update_service_device() + self.publish_device(device_id) + + def publish_device(self, device_id): + self.mqttc.publish( + self.get_discovery_topic(device_id,'state'), + json.dumps(self.devices[device_id]['state']), + retain=True + ) + self.mqttc.publish( + self.get_discovery_topic(device_id,'availability'), + self.devices[device_id]['availability'], + retain=True + ) + self.mqttc.publish( + self.get_discovery_topic(device_id,'storage'), + json.dumps(self.devices[device_id]['storage']), + retain=True + ) + self.mqttc.publish( + self.get_discovery_topic(device_id,'motion'), + json.dumps(self.devices[device_id]['motion']), + retain=True + ) + self.mqttc.publish( + self.get_discovery_topic(device_id,'event'), + json.dumps(self.devices[device_id]['event']), + retain=True + ) + + def handle_service_message(self, attribute, message): + match attribute: + case 'device_refresh': + self.device_update_interval = message + self.log(f'Updated UPDATE_INTERVAL to be {message}') + case _: + self.log(f'IGNORED UNRECOGNIZED amcrest-service MESSAGE for {attribute}: {message}') + return + + self.update_service_device() + + def send_command(self, device_id, data): + caps = self.convert_attributes_to_capabilities(data) + sku = self.devices[device_id]['device']['model'] + + self.log(f'COMMAND {device_id} = {caps}', level='DEBUG') + + first = True + for key in caps: + if not first: + time.sleep(1) + self.log(f'CMD DEVICE {self.devices[device_id]['device']['name']} ({device_id}) {key} = {caps[key]}', level='DEBUG') + self.amcrestc.send_command(device_id, sku, caps[key]['type'], caps[key]['instance'], caps[key]['value']) + self.update_service_device() + first = False + + if device_id not in self.boosted: + self.boosted.append(device_id) + + async def check_devices_for_events(self): + try: + for device_id in self.devices: + events = await self.amcrestc.get_device_event_actions(device_id) + log(f'Got events for {device_id}: {events.join(';')}') + for event in events: + self.devices[device_id][event] = events[event] + self.mqttc.publish( + self.get_discovery_topic(device_id,event), + json.dumps(self.devices[device_id][event]), + retain=True + ) + self.refresh_device(device_id) + except Exception as err: + self.log(f'CAUGHT IN check_devices_for_events: {err}', level='ERROR') + + + # main loop + async def main_loop(self): + await self.setup_devices() + + loop = asyncio.get_running_loop() + tasks = [ + asyncio.create_task(self.device_loop()), + asyncio.create_task(self.device_actions()), + ] + + for signame in {'SIGINT','SIGTERM'}: + loop.add_signal_handler( + getattr(signal, signame), + lambda: asyncio.create_task(self._handle_sigterm(loop, tasks)) + ) + + try: + results = await asyncio.gather(*tasks, return_exceptions=True) + except Exception as err: + self.log(f'CAUGHT IN main_loop {err}', level='ERROR') + self.running = False + + async def device_loop(self): + while self.running == True: + try: + self.refresh_all_devices() + await asyncio.sleep(self.device_update_interval) + except Exception as err: + self.log(f'CAUGHT IN device_loop {err}', level='ERROR') + self.running = False + + async def device_actions(self): + while self.running == True: + try: + await self.check_devices_for_events() + except Exception as err: + self.log(f'CAUGHT IN device_actions {err}', level='ERROR') \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..049bb0c --- /dev/null +++ b/app.py @@ -0,0 +1,115 @@ +import asyncio +import argparse +from amcrest_mqtt import AmcrestMqtt +import os +import sys +import time +from util import * +import yaml + +# Helper functions and callbacks +def read_file(file_name): + with open(file_name, 'r') as file: + data = file.read().replace('\n', '') + + return data + +def read_version(): + if os.path.isfile('./VERSION'): + return read_file('./VERSION') + + return read_file('../VERSION') + +# Let's go! +version = read_version() +app_log(f'Starting: amcrest2mqtt v{version}') + +# cmd-line args +argparser = argparse.ArgumentParser() +argparser.add_argument( + '-c', + '--config', + required=False, + help='Directory holding config.yaml or full path to config file', +) +args = argparser.parse_args() + +# load config file +configpath = args.config or '/config' +try: + if not configpath.endswith('.yaml'): + if not configpath.endswith('/'): + configpath += '/' + configfile = configpath + 'config.yaml' + with open(configfile) as file: + config = yaml.safe_load(file) + app_log(f'Reading config file {configpath}') + config['config_from'] = 'file' + config['config_path'] = configpath +except: + app_log(f'config.yaml not found, checking ENV') + config = { + 'mqtt': { + 'host': os.getenv('MQTT_HOST') or 'localhost', + 'qos': int(os.getenv('MQTT_QOS') or 0), + 'port': int(os.getenv('MQTT_PORT') or 1883), + 'username': os.getenv('MQTT_USERNAME'), + 'password': os.getenv('MQTT_PASSWORD'), # can be None + 'tls_enabled': os.getenv('MQTT_TLS_ENABLED') == 'true', + 'tls_ca_cert': os.getenv('MQTT_TLS_CA_CERT'), + 'tls_cert': os.getenv('MQTT_TLS_CERT'), + 'tls_key': os.getenv('MQTT_TLS_KEY'), + 'prefix': os.getenv('MQTT_PREFIX') or 'amcrest2mqtt', + 'homeassistant': os.getenv('MQTT_HOMEASSISTANT') == True, + 'discovery_prefix': os.getenv('MQTT_DISCOVERY_PREFIX') or 'homeassistant', + }, + 'amcrest': { + 'hosts': os.getenv("AMCREST_HOSTS"), + 'names': os.getenv("AMCREST_NAMES"), + 'port': int(os.getenv("AMCREST_PORT") or 80), + 'username': os.getenv("AMCREST_USERNAME") or "admin", + 'password': os.getenv("AMCREST_PASSWORD"), + 'device_update_interval': int(os.getenv("DEVICE_UPDATE_INTERVAL") or 600), + }, + 'debug': True if os.getenv('DEBUG') else False, + 'hide_ts': True if os.getenv('HIDE_TS') else False, + 'config_from': 'env', + 'timezone': os.getenv('TZ'), + } + +config['version'] = version +config['configpath'] = os.path.dirname(configpath) +if not 'hide_ts' in config: + config['hide_ts'] = False + +# Exit if any of the required vars are not provided +if config['amcrest']['hosts'] is None: + app_log("Missing env var: AMCREST_HOSTS or amcrest.hosts in config", level="ERROR") + sys.exit(1) +config['amcrest']['host_count'] = len(config['amcrest']['hosts']) + +if config['amcrest']['names'] is None: + app_log("Missing env var: AMCREST_NAMES or amcrest.names in config", level="ERROR") + sys.exit(1) +config['amcrest']['name_count'] = len(config['amcrest']['names']) + +if config['amcrest']['host_count'] != config['amcrest']['name_count']: + app_log("The AMCREST_HOSTS and AMCREST_NAMES must have the same number of space-delimited hosts/names", level="ERROR") + sys.exit(1) +app_log(f"Found {config['amcrest']['host_count']} host(s) defined to monitor") + +if config['amcrest']['password'] is None: + app_log("Please set the AMCREST_PASSWORD environment variable", level="ERROR") + sys.exit(1) + +if not 'timezone' in config: + app_log('`timezone` required in config file or in TZ env var', level='ERROR', tz=timezone) + exit(1) +else: + app_log(f'TIMEZONE set as {config["timezone"]}', tz=config["timezone"]) + +try: + with AmcrestMqtt(config) as mqtt: + asyncio.run(mqtt.main_loop()) +except Exception as err: + app_log(f'CAUGHT IN app: {err}', level='ERROR') \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 896ddd6..52325f8 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -16,6 +16,5 @@ services: AMCREST_USERNAME: viewer AMCREST_PASSWORD: password HOME_ASSISTANT: true - STORAGE_POLL_INTERVAL: 600 + DEVICE_UPDATE_INTERVAL: 600 DEBUG_MODE: false - command: python3 -u amcrest2mqtt.py -c /config diff --git a/util.py b/util.py new file mode 100644 index 0000000..656b8e9 --- /dev/null +++ b/util.py @@ -0,0 +1,13 @@ +from datetime import datetime, timezone +import os +from zoneinfo import ZoneInfo + +def app_log(msg, level='INFO', tz='UTC', hide_ts=False): + ts = datetime.now(ZoneInfo(tz)).strftime('%Y-%m-%d %H:%M:%S %Z') + if len(msg) > 102400: + raise ValueError('Log message exceeds max length') + if level != 'DEBUG' or os.getenv('DEBUG'): + print(f'{ts + " " if not hide_ts else ""}[{level}] {msg}') + +def to_gb(total): + return str(round(float(total[0]) / 1024 / 1024 / 1024, 2))