diff --git a/README.md b/README.md index 1a74c7e..25d5905 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Or, we support the following environment variables and defaults: - `MQTT_HOME_ASSISTANT_PREFIX` (optional, default = 'homeassistant') - `HOME_ASSISTANT` (optional, default = false) +- `TZ` (required, timezone identifier, see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List) - `STORAGE_POLL_INTERVAL` (optional, default = 3600) - how often to fetch storage data (in seconds) (set to 0 to disable functionality) It exposes events to the following topics: @@ -50,30 +51,7 @@ If you are using a different MQTT prefix to the default, you will need to set th ## Running the app -To run via env variables with Docker Compose - -```yaml -version: "3.4" -services: - amcrest2mqtt: - image: weirdtangent/amcrest2mqtt:latest - container_name: amcrest2mqtt - restart: unless-stopped - environment: - MQTT_HOST: 10.10.10.1 - MQTT_USERNAME: admin - MQTT_PASSWORD: password - MQTT_PREFIX: amcrest2mqtt - MQTT_HOMEASSISTANT_PREFIX: homeassistant - AMCREST_HOSTS: "10.10.10.20 camera2.local" - AMCREST_NAMES: "Front_Yard Patio" - AMCREST_USERNAME: viewer - AMCREST_PASSWORD: password - HOME_ASSISTANT: true - STORAGE_POLL_INTERVAL: 600 - DEBUG_MODE: false -``` - +To run via env variables with Docker Compose, see docker-compose.yaml or make sure you attach a volume with the config file and point to that directory, for example: ``` CMD [ "python", "-u", "./amcrest2mqtt.py", "-c", "/config" ] diff --git a/VERSION b/VERSION index 575bea4..a041fc3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.99.9 +0.99.10 diff --git a/amcrest2mqtt.py b/amcrest2mqtt.py index 4e22c21..55b234b 100644 --- a/amcrest2mqtt.py +++ b/amcrest2mqtt.py @@ -12,15 +12,16 @@ import sys from threading import Timer import time import yaml +from zoneinfo import ZoneInfo is_exiting = False mqtt_client = None -config = {} +config = { 'timezone': 'utc' } devices = {} # Helper functions and callbacks def log(msg, level='INFO'): - ts = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + ts = datetime.now(tz=ZoneInfo(config['timezone'])).strftime('%Y-%m-%d %H:%M:%S') if len(msg) > 20480: raise ValueError('Log message exceeds max length') if level != "DEBUG" or os.getenv('DEBUG'): @@ -38,20 +39,37 @@ def read_version(): return read_file("../VERSION") -def mqtt_publish(topic, payload, exit_on_error=True, json=False): - msg = mqtt_client.publish( - topic, payload=(dumps(payload) if json else payload), qos=config['mqtt']['qos'], retain=True - ) +def to_gb(total): + return str(round(float(total[0]) / 1024 / 1024 / 1024, 2)) - if msg.rc == mqtt.MQTT_ERR_SUCCESS: - msg.wait_for_publish(2) - return +def signal_handler(sig, frame): + # exit immediately upon receiving a second SIGINT + global is_exiting - log(f"Error publishing MQTT message: {mqtt.error_string(msg.rc)}", level="ERROR") + if is_exiting: + os._exit(1) - if exit_on_error: - exit_gracefully(msg.rc, skip_mqtt=True) + is_exiting = True + exit_gracefully(0) + +def exit_gracefully(rc, skip_mqtt=False): + log("Exiting app...") + if mqtt_client is not None and mqtt_client.is_connected() and skip_mqtt == False: + # set cameras offline + for host in config['amcrest']['hosts']: + mqtt_publish(devices[host]["topics"]["status"], "offline", exit_on_error=False) + # set broker offline + mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/availability', "offline") + mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/status', "offline") + + mqtt_client.disconnect() + + # Use os._exit instead of sys.exit to ensure an MQTT disconnect event causes the program to exit correctly as they + # occur on a separate thread + os._exit(rc) + +# MQTT setup def mqtt_connect(): global mqtt_client @@ -59,7 +77,6 @@ def mqtt_connect(): log("Missing env vari: MQTT_USERNAME or mqtt.username in config", level="ERROR") sys.exit(1) - # Connect to MQTT mqtt_client = mqtt.Client( mqtt.CallbackAPIVersion.VERSION1, client_id=f'{config["mqtt"]["prefix"]}_broker', @@ -118,52 +135,21 @@ def on_mqtt_disconnect(mqtt_client, userdata, rc): log(f"MQTT connection closed successfully", level="INFO") exit_gracefully(rc, skip_mqtt=True) -def exit_gracefully(rc, skip_mqtt=False): - log("Exiting app...") - - if mqtt_client is not None and mqtt_client.is_connected() and skip_mqtt == False: - # set cameras offline - for host in config['amcrest']['hosts']: - mqtt_publish(devices[host]["topics"]["status"], "offline", exit_on_error=False) - # set broker offline - mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/availability', "offline") - mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/status', "offline") - - mqtt_client.disconnect() - - # Use os._exit instead of sys.exit to ensure an MQTT disconnect event causes the program to exit correctly as they - # occur on a separate thread - os._exit(rc) - -def refresh_storage_sensors(): - Timer(config['amcrest']['storage_poll_interval'], refresh_storage_sensors).start() - log(f'Fetching storage sensors for {config["amcrest"]["host_count"]} host(s) (every {config["amcrest"]["storage_poll_interval"]} secs)') - - for host in config['amcrest']['hosts']: - device = devices[host] - topics = device["topics"] - try: - storage = device["camera"].storage_all - - mqtt_publish(topics["storage_used_percent"], str(storage["used_percent"])) - mqtt_publish(topics["storage_used"], to_gb(storage["used"])) - mqtt_publish(topics["storage_total"], to_gb(storage["total"])) - except AmcrestError as error: - log(f"Error fetching storage information for {host}: {error}", level="WARNING") - -def to_gb(total): - return str(round(float(total[0]) / 1024 / 1024 / 1024, 2)) +def mqtt_publish(topic, payload, exit_on_error=True, json=False): + msg = mqtt_client.publish( + topic, payload=(dumps(payload) if json else payload), qos=config['mqtt']['qos'], retain=True + ) -def signal_handler(sig, frame): - # exit immediately upon receiving a second SIGINT - global is_exiting + if msg.rc == mqtt.MQTT_ERR_SUCCESS: + msg.wait_for_publish(2) + return - if is_exiting: - os._exit(1) + log(f"Error publishing MQTT message: {mqtt.error_string(msg.rc)}", level="ERROR") - is_exiting = True - exit_gracefully(0) + if exit_on_error: + exit_gracefully(msg.rc, skip_mqtt=True) +# Amcrest Devices def get_device(amcrest_host, amcrest_port, amcrest_username, amcrest_password, device_name): log(f"Connecting to device and getting details for {amcrest_host}...") camera = AmcrestCamera( @@ -219,6 +205,7 @@ def get_device(amcrest_host, amcrest_port, amcrest_username, amcrest_password, d "topics": { "config": f'{config["mqtt"]["prefix"]}/{serial_number}/config', "status": f'{config["mqtt"]["prefix"]}/{serial_number}/status', + "telemetry": f'{config["mqtt"]["prefix"]}/{serial_number}/telemetry', "event": f'{config["mqtt"]["prefix"]}/{serial_number}/event', "motion": f'{config["mqtt"]["prefix"]}/{serial_number}/motion', "doorbell": f'{config["mqtt"]["prefix"]}/{serial_number}/doorbell', @@ -240,6 +227,7 @@ def get_device(amcrest_host, amcrest_port, amcrest_username, amcrest_password, d }, } +# MQTT messages def send_broker_discovery(): mqtt_publish(f'{config["mqtt"]["home_assistant_prefix"]}/sensor/{via_device}/broker/config', { "availability_topic": f'{config["mqtt"]["prefix"]}/{via_device}/availability', @@ -415,6 +403,8 @@ def send_device_discovery(device): ) def refresh_broker(): + Timer(60, refresh_broker).start() + log('Refreshing amcrest2mqtt broker, every 60 sec') mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/availability', 'online') mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/status', 'online') mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/config', { @@ -427,7 +417,11 @@ def refresh_broker(): }, }, json=True) -def refresh_camera(device): +def refresh_devices(): + for host in config['amcrest']['hosts']: + refresh_device(devices[host]) + +def refresh_device(device): mqtt_publish(device['topics']['status'], 'online') mqtt_publish(device['topics']['config'], { 'device_type': device['config']['device_type'], @@ -435,13 +429,34 @@ def refresh_camera(device): 'sw_version': device['config']['amcrest_version'], 'hw_version': device['config']['hardware_version'], 'serial_number': device['config']['serial_number'], - 'host': device["config"]["amcrest_host"], + 'host': device['config']['amcrest_host'], + 'configuration_url': 'http://' + device['config']['amcrest_host'] + '/', 'origin': { 'name': 'amcrest2mqtt broker', 'sw_version': version, 'url': 'https://github.com/weirdtangent/amcrest2mqtt', }, }, json=True) + mqtt_publish(device['topics']['telemetry'], + device['telemetry'] if 'telemetry' in device else {}, + json=True) + +def refresh_storage_sensors(): + Timer(config['amcrest']['storage_poll_interval'], refresh_storage_sensors).start() + log(f'Fetching storage sensors for {config["amcrest"]["host_count"]} host(s) (every {config["amcrest"]["storage_poll_interval"]} secs)') + + for host in config['amcrest']['hosts']: + device = devices[host] + topics = device["topics"] + try: + storage = device["camera"].storage_all + + mqtt_publish(topics["storage_used_percent"], str(storage["used_percent"])) + mqtt_publish(topics["storage_used"], to_gb(storage["used"])) + mqtt_publish(topics["storage_total"], to_gb(storage["total"])) + except AmcrestError as error: + log(f"Error fetching storage information for {host}: {error}", level="WARNING") + # cmd-line args argparser = argparse.ArgumentParser() @@ -490,9 +505,9 @@ else: }, 'home_assistant': os.getenv("HOME_ASSISTANT") == "true", 'debug': os.getenv("AMCREST_DEBUG") == "true", + 'timezone': os.getenv("TZ") or 'utc', } - # Exit if any of the required vars are not provided if config['amcrest']['hosts'] is None: log("Missing env var: AMCREST_HOSTS or amcrest.hosts in config", level="ERROR") @@ -507,6 +522,7 @@ config['amcrest']['name_count'] = len(config['amcrest']['names']) if config['amcrest']['host_count'] != config['amcrest']['name_count']: log("The AMCREST_HOSTS and AMCREST_NAMES must have the same number of space-delimited hosts/names", level="ERROR") sys.exit(1) +log(f"Found {config['amcrest']['host_count']} host(s) defined to monitor") if config['amcrest']['password'] is None: log("Please set the AMCREST_PASSWORD environment variable", level="ERROR") @@ -516,10 +532,10 @@ version = read_version() via_device = config["mqtt"]["prefix"] + '-broker' log(f"Starting: amcrest2mqtt v{version}") -# Handle interruptions +# handle interruptions signal.signal(signal.SIGINT, signal_handler) -# Connect to each camera, if not already +# connect to each camera amcrest_names = config['amcrest']['names'] for host in config['amcrest']['hosts']: name = amcrest_names.pop(0) @@ -530,17 +546,16 @@ log(f"Connecting to hosts done.", level="INFO") # connect to MQTT service mqtt_connect() -# Configure Home Assistant +# configure broker and devices in Home Assistant if config['home_assistant']: send_broker_discovery() for host in config['amcrest']['hosts']: send_device_discovery(devices[host]) -# Main loop refresh_broker() -for host in config['amcrest']['hosts']: - refresh_camera(devices[host]) +refresh_devices() +# kick off storage refresh timer if config['amcrest']['storage_poll_interval'] > 0: refresh_storage_sensors() @@ -553,20 +568,23 @@ async def main(): device_config = device["config"] device_topics = device["topics"] async for code, payload in device["camera"].async_event_actions("All"): - log(f"Event on {host}: {str(payload)}", level="DEBUG") - refresh_broker() + log(f"Event on {host}: {str(payload)}") if ((code == "ProfileAlarmTransmit" and device_config["is_ad110"]) or (code == "VideoMotion" and not device_config["is_ad110"])): motion_payload = "on" if payload["action"] == "Start" else "off" mqtt_publish(device_topics["motion"], motion_payload) + device[host]['telemetry']['last_motion_event'] =str(datetime.now(tz=ZoneInfo(config['timezone']))) elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human": human_payload = "on" if payload["action"] == "Start" else "off" mqtt_publish(device_topics["human"], human_payload) + device[host]['telemetry']['last_human_event'] =str(datetime.now(tz=ZoneInfo(config['timezone']))) elif code == "_DoTalkAction_": doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off" mqtt_publish(device_topics["doorbell"], doorbell_payload) + device[host]['telemetry']['last_doorbell_event'] =str(datetime.now(tz=ZoneInfo(config['timezone']))) mqtt_publish(device_topics["event"], payload, json=True) + refresh_device(device) except AmcrestError as error: log(f"Amcrest error while working on {host}: {AmcrestError}. Sleeping for 10 seconds.", level="ERROR")