code cleanup

pull/106/head
Jeff Culverhouse 11 months ago
parent 34444997e1
commit 7ffeb433e4

@ -1 +1 @@
0.99.13
0.99.14

@ -1,7 +1,7 @@
from amcrest import AmcrestCamera, AmcrestError
import asyncio
from asyncio import timeout
from datetime import date
from datetime import datetime
import httpx
import logging
import time
@ -32,10 +32,9 @@ class AmcrestAPI(object):
for host in self.amcrest_config['hosts']:
task = asyncio.create_task(self.get_device(host, device_names.pop(0)))
tasks.append(task)
await asyncio.gather(*tasks)
self.logger.info('Connecting to hosts done.')
await asyncio.gather(*tasks, return_exceptions=True)
# return just the config of each device, not the camera object
return {d: self.devices[d]['config'] for d in self.devices.keys()}
def get_camera(self, host):
@ -92,27 +91,15 @@ class AmcrestAPI(object):
"mac": mac_address,
}
},
"storage": {},
}
def get_device_storage_stats(self, device_id):
if 'error' in self.devices[device_id]:
try:
self.devices[device_id]['camera'] = self.get_camera(self.devices[device_id]['config']['host'])
del self.devices[device_id]['error']
except Exception as err:
err_msg = f'Problem re-connecting to camera: {err}'
self.logger.error(err_msg)
self.devices[device_id]["error"] = err_msg
raise Exception(err_msg)
try:
storage = self.devices[device_id]["camera"].storage_all
except Exception as err:
err_msg = f'Problem connecting with camera to get storage stats: {err}'
self.logger.error(err_msg)
self.devices[device_id]["error"] = err_msg
raise Exception(err_msg)
self.logger.error(f'Problem connecting with camera to get storage stats: {err}')
return {}
return {
'last_update': str(datetime.now(ZoneInfo(self.timezone))),
'used_percent': str(storage['used_percent']),
@ -125,14 +112,14 @@ class AmcrestAPI(object):
tasks = [self.get_events_from_device(device_id) for device_id in self.devices]
await asyncio.gather(*tasks)
except Exception as err:
self.logger.error(f'collect_all_device_events: {err}', exc_info=True)
self.logger.error(f'collect_all_device_events: {err}')
async def get_events_from_device(self, device_id):
try:
async for code, payload in self.devices[device_id]["camera"].async_event_actions("All"):
await self.process_device_event(device_id, code, payload)
except Exception as err:
self.logger.error(f'Failed to get events from device ({device_id}): {err}', exc_info=True)
self.logger.error(f'Failed to get events from device ({device_id}): {err}')
async def process_device_event(self, device_id, code, payload):
try:

@ -1,5 +1,5 @@
import asyncio
from datetime import date
from datetime import datetime
import amcrest_api
import json
import logging
@ -14,10 +14,8 @@ from zoneinfo import ZoneInfo
class AmcrestMqtt(object):
def __init__(self, config):
self.logger = logging.getLogger(__name__)
self.running = False
self.timezone = config['timezone']
self.logger = logging.getLogger(__name__)
self.mqttc = None
self.mqtt_connect_time = None
@ -25,29 +23,18 @@ class AmcrestMqtt(object):
self.config = config
self.mqtt_config = config['mqtt']
self.amcrest_config = config['amcrest']
self.client_id = self.get_new_client_id()
self.timezone = config['timezone']
self.version = config['version']
self.device_update_interval = config['amcrest'].get('device_update_interval', 600)
self.client_id = self.get_new_client_id()
self.service_name = self.mqtt_config['prefix'] + ' service'
self.service_slug = self.mqtt_config['prefix'] + '-service'
self.devices = {}
self.configs = {}
async def _handle_sigterm(self, loop, tasks):
self.running = False
self.logger.warn('SIGTERM received, waiting for tasks to cancel...')
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
def __enter__(self):
self.mqttc_create()
self.amcrestc = amcrest_api.AmcrestAPI(self.config)
@ -168,7 +155,12 @@ class AmcrestMqtt(object):
self.mqttc.on_subscribe = self.mqtt_on_subscribe
self.mqttc.on_log = self.mqtt_on_log
self.mqttc.will_set(self.get_discovery_topic('service', 'availability'), payload="offline", qos=self.mqtt_config['qos'], retain=True)
self.mqttc.will_set(
self.get_discovery_topic('service', 'availability'),
payload="offline",
qos=self.mqtt_config['qos'],
retain=True
)
try:
self.mqttc.connect(
@ -269,31 +261,34 @@ class AmcrestMqtt(object):
},
},
}),
qos=self.mqtt_config['qos'],
retain=True
)
self.update_service_device()
def update_service_device(self):
self.mqttc.publish(self.get_discovery_topic('service','availability'), 'online', retain=True)
self.mqttc.publish(
self.get_discovery_topic('service','availability'),
'online',
qos=self.mqtt_config['qos'],
retain=True
)
self.mqttc.publish(
self.get_discovery_topic('service','state'),
json.dumps({
'status': 'online',
'device_refresh': self.device_update_interval,
}),
qos=self.mqtt_config['qos'],
retain=True
)
# amcrest Helpers
async def setup_devices(self):
self.logger.info(f'Setup devices')
try:
devices = await self.amcrestc.connect_to_devices()
except Exception as err:
self.logger.error(f'Failed to connect to 1 or more devices {err}')
exit(1)
self.logger.info(f'Connected to: {list(devices.keys())}')
self.publish_service_device()
for device_id in devices:
@ -305,13 +300,28 @@ class AmcrestMqtt(object):
first = True
self.devices[device_id] = {}
self.configs[device_id] = config
self.devices[device_id]['qos'] = self.mqtt_config['qos'],
self.devices[device_id]['qos'] = self.mqtt_config['qos']
self.devices[device_id]['state_topic'] = self.get_discovery_topic(device_id, 'state')
self.devices[device_id]['availability_topic'] = self.get_discovery_topic(device_id, 'availability')
self.devices[device_id]['command_topic'] = self.get_discovery_topic(device_id, 'set')
self.mqttc.will_set(self.get_discovery_topic(device_id,'state'), payload=json.dumps({'status': 'offline'}), qos=self.mqtt_config['qos'], retain=True)
self.mqttc.will_set(self.get_discovery_topic(device_id,'motion'), payload=None, qos=self.mqtt_config['qos'], retain=True)
self.mqttc.will_set(self.get_discovery_topic(device_id,'availability'), payload='offline', qos=self.mqtt_config['qos'], retain=True)
self.mqttc.will_set(
self.get_discovery_topic(device_id,'state'),
json.dumps({'status': 'offline'}),
qos=self.mqtt_config['qos'],
retain=True
)
self.mqttc.will_set(
self.get_discovery_topic(device_id,'motion'),
None,
qos=self.mqtt_config['qos'],
retain=True
)
self.mqttc.will_set(
self.get_discovery_topic(device_id,'availability'),
'offline',
qos=self.mqtt_config['qos'],
retain=True
)
self.devices[device_id]['device'] = {
'name': config['device_name'],
@ -343,7 +353,12 @@ class AmcrestMqtt(object):
# device discovery sent, now it is save to add these to the
# dict (so they aren't included in device discovery object itself)
self.devices[device_id]['state'] = {}
self.devices[device_id]['state'] = {
'status': 'online',
'host': config['host'],
'serial_number': config['serial_number'],
'sw_version': config['software_version'],
}
self.devices[device_id]['availability'] = 'online'
self.devices[device_id]['motion'] = 'off'
else:
@ -369,7 +384,12 @@ class AmcrestMqtt(object):
'value_template': '{{ value_json.doorbell }}',
'unique_id': self.get_slug(device_id, 'doorbell'),
}
self.mqttc.will_set(self.get_discovery_topic(device_id,'doorbell'), payload=None, qos=self.mqtt_config['qos'], retain=True)
self.mqttc.will_set(
self.get_discovery_topic(device_id,'doorbell'),
payload=None,
qos=self.mqtt_config['qos'],
retain=True
)
if config['is_ad410']:
components[self.get_slug(device_id, 'human')] = {
@ -382,7 +402,12 @@ class AmcrestMqtt(object):
'value_template': '{{ value_json.human }}',
'unique_id': self.get_slug(device_id, 'human'),
}
self.mqttc.will_set(self.get_discovery_topic(device_id,'human'), payload=None, qos=self.mqtt_config['qos'], retain=True)
self.mqttc.will_set(
self.get_discovery_topic(device_id,'human'),
payload=None,
qos=self.mqtt_config['qos'],
retain=True
)
components[self.get_slug(device_id, 'motion')] = {
'name': 'Motion',
@ -474,51 +499,50 @@ class AmcrestMqtt(object):
def send_device_discovery(self, device_id):
device = self.devices[device_id]
self.mqttc.publish(self.get_discovery_topic(device_id, 'config'), json.dumps(device), retain=True)
self.mqttc.publish(
self.get_discovery_topic(device_id, 'config'),
json.dumps(device),
qos=self.mqtt_config['qos'],
retain=True
)
device['state'] = { 'state': 'ON' }
device['availability'] = 'online'
self.publish_device(device_id)
def refresh_all_devices(self):
self.logger.info(f'Refreshing storage info for all devices (every {self.device_update_interval} sec)')
# refresh devices starting with the device updated the longest time ago
for each in sorted(self.devices.items(), key=lambda dt: (dt is None, dt)):
# sorted = sorted(self.devices.items(), key=lambda dt: (dt is None, dt)):
for device_id in self.devices:
# break loop if we are ending
if not self.running:
break
device_id = each[0]
self.refresh_device(device_id)
def refresh_device(self, device_id):
# don't refresh the device until it has been published in device discovery
# and we can tell because it will be `online`
#if self.devices[device_id]['state']['status'] != 'online':
# return
device = self.devices[device_id]
config = self.configs[device_id]
result = self.amcrestc.get_device_storage_stats(device_id)
if result and 'last_update' in result:
self.devices[device_id]['storage'] = result
self.configs[device_id]['last_update'] = datetime.now(ZoneInfo(self.timezone))
self.devices[device_id]['state'] = {
'status': 'online',
'host': config['host'],
'serial_number': config['serial_number'],
'sw_version': config['software_version'],
'last_update': config['last_update'].isoformat(),
}
# get the storage info, pull out last_update and save that to the device state
storage = self.amcrestc.get_device_storage_stats(device_id)
device['state']['last_update'] = storage.pop('last_update', None)
device['storage'] = storage
self.update_service_device()
self.publish_device(device_id)
def publish_device(self, device_id):
device = self.devices[device_id]
for topic in ['state','availability','storage','motion','human','doorbell','event','recording']:
if topic in self.devices[device_id]:
if topic in device:
self.mqttc.publish(
self.get_discovery_topic(device_id,topic),
json.dumps(self.devices[device_id][topic]) if isinstance(self.devices[device_id][topic], dict) else self.devices[device_id][topic],
self.get_discovery_topic(device_id, topic),
json.dumps(device[topic]) if isinstance(device[topic], dict) else device[topic],
qos=self.mqtt_config['qos'],
retain=True
)
@ -563,55 +587,52 @@ class AmcrestMqtt(object):
self.refresh_device(device_id)
async def _handle_signals(self, signame, loop, tasks):
self.running = False
self.logger.warn(f'{signame} received, waiting for tasks to cancel...')
for t in tasks:
if not t.done():
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
async def device_loop(self):
while self.running == True:
self.refresh_all_devices()
await asyncio.sleep(self.device_update_interval)
async def collect_events(self):
while self.running == True:
await self.amcrestc.collect_all_device_events()
async def process_events(self):
while self.running == True:
self.check_for_events()
await asyncio.sleep(1)
# main loop
async def main_loop(self):
try:
await self.setup_devices()
except:
self.running = False
loop = asyncio.get_running_loop()
tasks = [
asyncio.create_task(self.device_loop()),
asyncio.create_task(self.collect_events()),
asyncio.create_task(self.process_events()),
]
# setup signal handling for tasks
for signame in {'SIGINT','SIGTERM'}:
loop.add_signal_handler(
getattr(signal, signame),
lambda: asyncio.create_task(self._handle_sigterm(loop, tasks))
lambda: asyncio.create_task(self._handle_signals(signame, loop, tasks))
)
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
except Exception as err:
self.logger.error(f'main_loop: {err}')
self.running = False
async def device_loop(self):
while self.running == True:
try:
self.refresh_all_devices()
await asyncio.sleep(self.device_update_interval)
except Exception as err:
self.logger.error('device_loop: {err}')
self.running = False
async def collect_events(self):
while self.running == True:
try:
await self.amcrestc.collect_all_device_events()
except Exception as err:
self.logger.error(f'collect_events: {err}')
self.running = False
async def process_events(self):
while self.running == True:
try:
self.check_for_events()
await asyncio.sleep(1)
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as err:
self.logger.error(f'process_events: {err}')
self.running = False
self.log.error(f'Caught exception: {err}')

@ -8,23 +8,10 @@ import time
from util import *
import yaml
# Helper functions and callbacks
def read_file(file_name):
with open(file_name, 'r') as file:
data = file.read().replace('\n', '')
return data
def read_version():
if os.path.isfile('./VERSION'):
return read_file('./VERSION')
return read_file('../VERSION')
# Let's go!
version = read_version()
# cmd-line args
# Cmd-line args
argparser = argparse.ArgumentParser()
argparser.add_argument(
'-c',
@ -34,7 +21,7 @@ argparser.add_argument(
)
args = argparser.parse_args()
# load config file
# Setup config from yaml file or env
configpath = args.config or '/config'
try:
if not configpath.endswith('.yaml'):
@ -43,8 +30,8 @@ try:
configfile = configpath + 'config.yaml'
with open(configfile) as file:
config = yaml.safe_load(file)
config['config_from'] = 'file'
config['config_path'] = configpath
config['config_from'] = 'file'
except:
config = {
'mqtt': {
@ -71,9 +58,13 @@ except:
},
'debug': True if os.getenv('DEBUG') else False,
'hide_ts': True if os.getenv('HIDE_TS') else False,
'config_from': 'env',
'timezone': os.getenv('TZ'),
'config_from': 'env',
}
config['version'] = version
config['configpath'] = os.path.dirname(configpath)
if 'timezone' not in config: config['timezone'] = 'UTC'
if 'debug' not in config: config['debug'] = False
logging.basicConfig(
format = '%(asctime)s.%(msecs)03d [%(levelname)s] %(name)s: %(message)s' if config['hide_ts'] == False else '[%(levelname)s] %(name)s: %(message)s',
@ -84,37 +75,26 @@ logger = logging.getLogger(__name__)
logger.info(f'Starting: amcrest2mqtt v{version}')
logger.info(f'Config loaded from {config["config_from"]}')
config['version'] = version
config['configpath'] = os.path.dirname(configpath)
# Exit if any of the required vars are not provided
# Check for required config properties
if config['amcrest']['hosts'] is None:
logger.error('Missing env var: AMCREST_HOSTS or amcrest.hosts in config')
sys.exit(1)
exit(1)
config['amcrest']['host_count'] = len(config['amcrest']['hosts'])
if config['amcrest']['names'] is None:
logger.error('Missing env var: AMCREST_NAMES or amcrest.names in config')
sys.exit(1)
exit(1)
config['amcrest']['name_count'] = len(config['amcrest']['names'])
if config['amcrest']['host_count'] != config['amcrest']['name_count']:
logger.error('The AMCREST_HOSTS and AMCREST_NAMES must have the same number of space-delimited hosts/names')
sys.exit(1)
exit(1)
logger.info(f'Found {config["amcrest"]["host_count"]} host(s) defined to monitor')
if config['amcrest']['password'] is None:
logger.error('Please set the AMCREST_PASSWORD environment variable')
sys.exit(1)
if not 'timezone' in config:
logger.info('`timezone` required in config file or in TZ env var', level='ERROR', tz=timezone)
exit(1)
else:
logger.info(f'TIMEZONE set as {config["timezone"]}')
if config['debug']:
logger.setLevel(logging.DEBUG)
# Go!
with AmcrestMqtt(config) as mqtt:
asyncio.run(mqtt.main_loop())

@ -1,13 +1,17 @@
from datetime import datetime, timezone
import os
from zoneinfo import ZoneInfo
def app_log(msg, level='INFO', tz='UTC', hide_ts=False):
ts = datetime.now(ZoneInfo(tz)).strftime('%Y-%m-%d %H:%M:%S %Z')
if len(msg) > 102400:
raise ValueError('Log message exceeds max length')
if level != 'DEBUG' or os.getenv('DEBUG'):
print(f'{ts + " " if not hide_ts else ""}[{level}] {msg}')
# Helper functions and callbacks
def read_file(file_name):
with open(file_name, 'r') as file:
data = file.read().replace('\n', '')
return data
def read_version():
if os.path.isfile('./VERSION'):
return read_file('./VERSION')
return read_file('../VERSION')
def to_gb(total):
return str(round(float(total[0]) / 1024 / 1024 / 1024, 2))

Loading…
Cancel
Save