From 7ffeb433e402d356cb27d6e38c43d40d55a021f1 Mon Sep 17 00:00:00 2001 From: Jeff Culverhouse Date: Wed, 5 Mar 2025 17:07:48 -0500 Subject: [PATCH] code cleanup --- VERSION | 2 +- amcrest_api.py | 29 ++----- amcrest_mqtt.py | 201 ++++++++++++++++++++++++++---------------------- app.py | 46 ++++------- util.py | 22 +++--- 5 files changed, 146 insertions(+), 154 deletions(-) diff --git a/VERSION b/VERSION index a982559..65813b1 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.99.13 +0.99.14 diff --git a/amcrest_api.py b/amcrest_api.py index b627541..2836ee7 100644 --- a/amcrest_api.py +++ b/amcrest_api.py @@ -1,7 +1,7 @@ from amcrest import AmcrestCamera, AmcrestError import asyncio from asyncio import timeout -from datetime import date +from datetime import datetime import httpx import logging import time @@ -32,10 +32,9 @@ class AmcrestAPI(object): for host in self.amcrest_config['hosts']: task = asyncio.create_task(self.get_device(host, device_names.pop(0))) tasks.append(task) - await asyncio.gather(*tasks) - - self.logger.info('Connecting to hosts done.') + await asyncio.gather(*tasks, return_exceptions=True) + # return just the config of each device, not the camera object return {d: self.devices[d]['config'] for d in self.devices.keys()} def get_camera(self, host): @@ -92,27 +91,15 @@ class AmcrestAPI(object): "mac": mac_address, } }, - "storage": {}, } def get_device_storage_stats(self, device_id): - if 'error' in self.devices[device_id]: - try: - self.devices[device_id]['camera'] = self.get_camera(self.devices[device_id]['config']['host']) - del self.devices[device_id]['error'] - except Exception as err: - err_msg = f'Problem re-connecting to camera: {err}' - self.logger.error(err_msg) - self.devices[device_id]["error"] = err_msg - raise Exception(err_msg) - try: storage = self.devices[device_id]["camera"].storage_all except Exception as err: - err_msg = f'Problem connecting with camera to get storage stats: {err}' - self.logger.error(err_msg) - self.devices[device_id]["error"] = err_msg - raise Exception(err_msg) + self.logger.error(f'Problem connecting with camera to get storage stats: {err}') + return {} + return { 'last_update': str(datetime.now(ZoneInfo(self.timezone))), 'used_percent': str(storage['used_percent']), @@ -125,14 +112,14 @@ class AmcrestAPI(object): tasks = [self.get_events_from_device(device_id) for device_id in self.devices] await asyncio.gather(*tasks) except Exception as err: - self.logger.error(f'collect_all_device_events: {err}', exc_info=True) + self.logger.error(f'collect_all_device_events: {err}') async def get_events_from_device(self, device_id): try: async for code, payload in self.devices[device_id]["camera"].async_event_actions("All"): await self.process_device_event(device_id, code, payload) except Exception as err: - self.logger.error(f'Failed to get events from device ({device_id}): {err}', exc_info=True) + self.logger.error(f'Failed to get events from device ({device_id}): {err}') async def process_device_event(self, device_id, code, payload): try: diff --git a/amcrest_mqtt.py b/amcrest_mqtt.py index 2d8631c..6b1f729 100644 --- a/amcrest_mqtt.py +++ b/amcrest_mqtt.py @@ -1,5 +1,5 @@ import asyncio -from datetime import date +from datetime import datetime import amcrest_api import json import logging @@ -14,10 +14,8 @@ from zoneinfo import ZoneInfo class AmcrestMqtt(object): def __init__(self, config): - self.logger = logging.getLogger(__name__) self.running = False - - self.timezone = config['timezone'] + self.logger = logging.getLogger(__name__) self.mqttc = None self.mqtt_connect_time = None @@ -25,29 +23,18 @@ class AmcrestMqtt(object): self.config = config self.mqtt_config = config['mqtt'] self.amcrest_config = config['amcrest'] - - self.client_id = self.get_new_client_id() - + self.timezone = config['timezone'] self.version = config['version'] self.device_update_interval = config['amcrest'].get('device_update_interval', 600) + self.client_id = self.get_new_client_id() self.service_name = self.mqtt_config['prefix'] + ' service' self.service_slug = self.mqtt_config['prefix'] + '-service' self.devices = {} self.configs = {} - async def _handle_sigterm(self, loop, tasks): - self.running = False - self.logger.warn('SIGTERM received, waiting for tasks to cancel...') - - for t in tasks: - t.cancel() - - await asyncio.gather(*tasks, return_exceptions=True) - loop.stop() - def __enter__(self): self.mqttc_create() self.amcrestc = amcrest_api.AmcrestAPI(self.config) @@ -168,7 +155,12 @@ class AmcrestMqtt(object): self.mqttc.on_subscribe = self.mqtt_on_subscribe self.mqttc.on_log = self.mqtt_on_log - self.mqttc.will_set(self.get_discovery_topic('service', 'availability'), payload="offline", qos=self.mqtt_config['qos'], retain=True) + self.mqttc.will_set( + self.get_discovery_topic('service', 'availability'), + payload="offline", + qos=self.mqtt_config['qos'], + retain=True + ) try: self.mqttc.connect( @@ -269,31 +261,34 @@ class AmcrestMqtt(object): }, }, }), + qos=self.mqtt_config['qos'], retain=True ) self.update_service_device() def update_service_device(self): - self.mqttc.publish(self.get_discovery_topic('service','availability'), 'online', retain=True) + self.mqttc.publish( + self.get_discovery_topic('service','availability'), + 'online', + qos=self.mqtt_config['qos'], + retain=True + ) self.mqttc.publish( self.get_discovery_topic('service','state'), json.dumps({ 'status': 'online', 'device_refresh': self.device_update_interval, }), + qos=self.mqtt_config['qos'], retain=True ) - # amcrest Helpers async def setup_devices(self): self.logger.info(f'Setup devices') - try: - devices = await self.amcrestc.connect_to_devices() - except Exception as err: - self.logger.error(f'Failed to connect to 1 or more devices {err}') - exit(1) + devices = await self.amcrestc.connect_to_devices() + self.logger.info(f'Connected to: {list(devices.keys())}') self.publish_service_device() for device_id in devices: @@ -305,13 +300,28 @@ class AmcrestMqtt(object): first = True self.devices[device_id] = {} self.configs[device_id] = config - self.devices[device_id]['qos'] = self.mqtt_config['qos'], + self.devices[device_id]['qos'] = self.mqtt_config['qos'] self.devices[device_id]['state_topic'] = self.get_discovery_topic(device_id, 'state') self.devices[device_id]['availability_topic'] = self.get_discovery_topic(device_id, 'availability') self.devices[device_id]['command_topic'] = self.get_discovery_topic(device_id, 'set') - self.mqttc.will_set(self.get_discovery_topic(device_id,'state'), payload=json.dumps({'status': 'offline'}), qos=self.mqtt_config['qos'], retain=True) - self.mqttc.will_set(self.get_discovery_topic(device_id,'motion'), payload=None, qos=self.mqtt_config['qos'], retain=True) - self.mqttc.will_set(self.get_discovery_topic(device_id,'availability'), payload='offline', qos=self.mqtt_config['qos'], retain=True) + self.mqttc.will_set( + self.get_discovery_topic(device_id,'state'), + json.dumps({'status': 'offline'}), + qos=self.mqtt_config['qos'], + retain=True + ) + self.mqttc.will_set( + self.get_discovery_topic(device_id,'motion'), + None, + qos=self.mqtt_config['qos'], + retain=True + ) + self.mqttc.will_set( + self.get_discovery_topic(device_id,'availability'), + 'offline', + qos=self.mqtt_config['qos'], + retain=True + ) self.devices[device_id]['device'] = { 'name': config['device_name'], @@ -343,7 +353,12 @@ class AmcrestMqtt(object): # device discovery sent, now it is save to add these to the # dict (so they aren't included in device discovery object itself) - self.devices[device_id]['state'] = {} + self.devices[device_id]['state'] = { + 'status': 'online', + 'host': config['host'], + 'serial_number': config['serial_number'], + 'sw_version': config['software_version'], + } self.devices[device_id]['availability'] = 'online' self.devices[device_id]['motion'] = 'off' else: @@ -369,7 +384,12 @@ class AmcrestMqtt(object): 'value_template': '{{ value_json.doorbell }}', 'unique_id': self.get_slug(device_id, 'doorbell'), } - self.mqttc.will_set(self.get_discovery_topic(device_id,'doorbell'), payload=None, qos=self.mqtt_config['qos'], retain=True) + self.mqttc.will_set( + self.get_discovery_topic(device_id,'doorbell'), + payload=None, + qos=self.mqtt_config['qos'], + retain=True + ) if config['is_ad410']: components[self.get_slug(device_id, 'human')] = { @@ -382,7 +402,12 @@ class AmcrestMqtt(object): 'value_template': '{{ value_json.human }}', 'unique_id': self.get_slug(device_id, 'human'), } - self.mqttc.will_set(self.get_discovery_topic(device_id,'human'), payload=None, qos=self.mqtt_config['qos'], retain=True) + self.mqttc.will_set( + self.get_discovery_topic(device_id,'human'), + payload=None, + qos=self.mqtt_config['qos'], + retain=True + ) components[self.get_slug(device_id, 'motion')] = { 'name': 'Motion', @@ -474,51 +499,50 @@ class AmcrestMqtt(object): def send_device_discovery(self, device_id): device = self.devices[device_id] - self.mqttc.publish(self.get_discovery_topic(device_id, 'config'), json.dumps(device), retain=True) + self.mqttc.publish( + self.get_discovery_topic(device_id, 'config'), + json.dumps(device), + qos=self.mqtt_config['qos'], + retain=True + ) + + device['state'] = { 'state': 'ON' } + device['availability'] = 'online' + + self.publish_device(device_id) def refresh_all_devices(self): self.logger.info(f'Refreshing storage info for all devices (every {self.device_update_interval} sec)') # refresh devices starting with the device updated the longest time ago - for each in sorted(self.devices.items(), key=lambda dt: (dt is None, dt)): + # sorted = sorted(self.devices.items(), key=lambda dt: (dt is None, dt)): + for device_id in self.devices: # break loop if we are ending if not self.running: break - device_id = each[0] - self.refresh_device(device_id) def refresh_device(self, device_id): - # don't refresh the device until it has been published in device discovery - # and we can tell because it will be `online` - - #if self.devices[device_id]['state']['status'] != 'online': - # return - + device = self.devices[device_id] config = self.configs[device_id] - result = self.amcrestc.get_device_storage_stats(device_id) - if result and 'last_update' in result: - self.devices[device_id]['storage'] = result - - self.configs[device_id]['last_update'] = datetime.now(ZoneInfo(self.timezone)) - self.devices[device_id]['state'] = { - 'status': 'online', - 'host': config['host'], - 'serial_number': config['serial_number'], - 'sw_version': config['software_version'], - 'last_update': config['last_update'].isoformat(), - } + # get the storage info, pull out last_update and save that to the device state + storage = self.amcrestc.get_device_storage_stats(device_id) + device['state']['last_update'] = storage.pop('last_update', None) + device['storage'] = storage self.update_service_device() self.publish_device(device_id) def publish_device(self, device_id): + device = self.devices[device_id] + for topic in ['state','availability','storage','motion','human','doorbell','event','recording']: - if topic in self.devices[device_id]: + if topic in device: self.mqttc.publish( - self.get_discovery_topic(device_id,topic), - json.dumps(self.devices[device_id][topic]) if isinstance(self.devices[device_id][topic], dict) else self.devices[device_id][topic], + self.get_discovery_topic(device_id, topic), + json.dumps(device[topic]) if isinstance(device[topic], dict) else device[topic], + qos=self.mqtt_config['qos'], retain=True ) @@ -563,55 +587,52 @@ class AmcrestMqtt(object): self.refresh_device(device_id) + async def _handle_signals(self, signame, loop, tasks): + self.running = False + self.logger.warn(f'{signame} received, waiting for tasks to cancel...') + + for t in tasks: + if not t.done(): + t.cancel() + + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + + async def device_loop(self): + while self.running == True: + self.refresh_all_devices() + await asyncio.sleep(self.device_update_interval) + + async def collect_events(self): + while self.running == True: + await self.amcrestc.collect_all_device_events() + + async def process_events(self): + while self.running == True: + self.check_for_events() + await asyncio.sleep(1) # main loop async def main_loop(self): - try: - await self.setup_devices() - except: - self.running = False + await self.setup_devices() loop = asyncio.get_running_loop() + tasks = [ asyncio.create_task(self.device_loop()), asyncio.create_task(self.collect_events()), asyncio.create_task(self.process_events()), ] + # setup signal handling for tasks for signame in {'SIGINT','SIGTERM'}: loop.add_signal_handler( getattr(signal, signame), - lambda: asyncio.create_task(self._handle_sigterm(loop, tasks)) + lambda: asyncio.create_task(self._handle_signals(signame, loop, tasks)) ) try: - results = await asyncio.gather(*tasks, return_exceptions=True) + await asyncio.gather(*tasks, return_exceptions=True) except Exception as err: - self.logger.error(f'main_loop: {err}') self.running = False - - async def device_loop(self): - while self.running == True: - try: - self.refresh_all_devices() - await asyncio.sleep(self.device_update_interval) - except Exception as err: - self.logger.error('device_loop: {err}') - self.running = False - - async def collect_events(self): - while self.running == True: - try: - await self.amcrestc.collect_all_device_events() - except Exception as err: - self.logger.error(f'collect_events: {err}') - self.running = False - - async def process_events(self): - while self.running == True: - try: - self.check_for_events() - await asyncio.sleep(1) - except Exception as err: - self.logger.error(f'process_events: {err}') - self.running = False + self.log.error(f'Caught exception: {err}') \ No newline at end of file diff --git a/app.py b/app.py index 951575d..9aff11e 100644 --- a/app.py +++ b/app.py @@ -8,23 +8,10 @@ import time from util import * import yaml -# Helper functions and callbacks -def read_file(file_name): - with open(file_name, 'r') as file: - data = file.read().replace('\n', '') - - return data - -def read_version(): - if os.path.isfile('./VERSION'): - return read_file('./VERSION') - - return read_file('../VERSION') - # Let's go! version = read_version() -# cmd-line args +# Cmd-line args argparser = argparse.ArgumentParser() argparser.add_argument( '-c', @@ -34,7 +21,7 @@ argparser.add_argument( ) args = argparser.parse_args() -# load config file +# Setup config from yaml file or env configpath = args.config or '/config' try: if not configpath.endswith('.yaml'): @@ -43,8 +30,8 @@ try: configfile = configpath + 'config.yaml' with open(configfile) as file: config = yaml.safe_load(file) - config['config_from'] = 'file' config['config_path'] = configpath + config['config_from'] = 'file' except: config = { 'mqtt': { @@ -71,9 +58,13 @@ except: }, 'debug': True if os.getenv('DEBUG') else False, 'hide_ts': True if os.getenv('HIDE_TS') else False, - 'config_from': 'env', 'timezone': os.getenv('TZ'), + 'config_from': 'env', } +config['version'] = version +config['configpath'] = os.path.dirname(configpath) +if 'timezone' not in config: config['timezone'] = 'UTC' +if 'debug' not in config: config['debug'] = False logging.basicConfig( format = '%(asctime)s.%(msecs)03d [%(levelname)s] %(name)s: %(message)s' if config['hide_ts'] == False else '[%(levelname)s] %(name)s: %(message)s', @@ -84,37 +75,26 @@ logger = logging.getLogger(__name__) logger.info(f'Starting: amcrest2mqtt v{version}') logger.info(f'Config loaded from {config["config_from"]}') -config['version'] = version -config['configpath'] = os.path.dirname(configpath) - -# Exit if any of the required vars are not provided +# Check for required config properties if config['amcrest']['hosts'] is None: logger.error('Missing env var: AMCREST_HOSTS or amcrest.hosts in config') - sys.exit(1) + exit(1) config['amcrest']['host_count'] = len(config['amcrest']['hosts']) if config['amcrest']['names'] is None: logger.error('Missing env var: AMCREST_NAMES or amcrest.names in config') - sys.exit(1) + exit(1) config['amcrest']['name_count'] = len(config['amcrest']['names']) if config['amcrest']['host_count'] != config['amcrest']['name_count']: logger.error('The AMCREST_HOSTS and AMCREST_NAMES must have the same number of space-delimited hosts/names') - sys.exit(1) + exit(1) logger.info(f'Found {config["amcrest"]["host_count"]} host(s) defined to monitor') if config['amcrest']['password'] is None: logger.error('Please set the AMCREST_PASSWORD environment variable') - sys.exit(1) - -if not 'timezone' in config: - logger.info('`timezone` required in config file or in TZ env var', level='ERROR', tz=timezone) exit(1) -else: - logger.info(f'TIMEZONE set as {config["timezone"]}') - -if config['debug']: - logger.setLevel(logging.DEBUG) +# Go! with AmcrestMqtt(config) as mqtt: asyncio.run(mqtt.main_loop()) \ No newline at end of file diff --git a/util.py b/util.py index 656b8e9..18bd476 100644 --- a/util.py +++ b/util.py @@ -1,13 +1,17 @@ -from datetime import datetime, timezone import os -from zoneinfo import ZoneInfo - -def app_log(msg, level='INFO', tz='UTC', hide_ts=False): - ts = datetime.now(ZoneInfo(tz)).strftime('%Y-%m-%d %H:%M:%S %Z') - if len(msg) > 102400: - raise ValueError('Log message exceeds max length') - if level != 'DEBUG' or os.getenv('DEBUG'): - print(f'{ts + " " if not hide_ts else ""}[{level}] {msg}') + +# Helper functions and callbacks +def read_file(file_name): + with open(file_name, 'r') as file: + data = file.read().replace('\n', '') + + return data + +def read_version(): + if os.path.isfile('./VERSION'): + return read_file('./VERSION') + + return read_file('../VERSION') def to_gb(total): return str(round(float(total[0]) / 1024 / 1024 / 1024, 2))