change to logging package; fix device event processing

pull/106/head
Jeff Culverhouse 11 months ago
parent af9960f0b3
commit 437cfba337

@ -1 +1 @@
0.99.11 0.99.12

@ -1,26 +1,31 @@
from amcrest import AmcrestCamera, AmcrestError from amcrest import AmcrestCamera, AmcrestError
import asyncio import asyncio
from asyncio import timeout
from datetime import date from datetime import date
import httpx
import logging
import time import time
from util import * from util import *
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
class AmcrestAPI(object): class AmcrestAPI(object):
def __init__(self, config): def __init__(self, config):
self.logger = logging.getLogger(__name__)
# we don't want to get the .info HTTP Request logs from Amcrest
logging.getLogger("httpx").setLevel(logging.WARNING)
self.last_call_date = '' self.last_call_date = ''
self.timezone = config['timezone'] self.timezone = config['timezone']
self.hide_ts = config['hide_ts'] or False
self.amcrest_config = config['amcrest'] self.amcrest_config = config['amcrest']
self.count = len(self.amcrest_config['hosts']) self.count = len(self.amcrest_config['hosts'])
self.devices = {} self.devices = {}
self.events = []
def log(self, msg, level='INFO'):
app_log(msg, level=level, tz=self.timezone, hide_ts=self.hide_ts)
async def connect_to_devices(self): async def connect_to_devices(self):
self.log(f'Connecting to: {self.amcrest_config["hosts"]}') self.logger.info(f'Connecting to: {self.amcrest_config["hosts"]}')
tasks = [] tasks = []
device_names = self.amcrest_config['names'] device_names = self.amcrest_config['names']
@ -29,13 +34,17 @@ class AmcrestAPI(object):
tasks.append(task) tasks.append(task)
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
self.log(f"Connecting to hosts done.", level="INFO") self.logger.info('Connecting to hosts done.')
return {d: self.devices[d]['config'] for d in self.devices.keys()} return {d: self.devices[d]['config'] for d in self.devices.keys()}
def get_camera(self, host): def get_camera(self, host):
return AmcrestCamera( return AmcrestCamera(
host, self.amcrest_config['port'], self.amcrest_config['username'], self.amcrest_config['password'] host,
self.amcrest_config['port'],
self.amcrest_config['username'],
self.amcrest_config['password'],
verbose=False,
).camera ).camera
async def get_device(self, host, device_name): async def get_device(self, host, device_name):
@ -53,10 +62,13 @@ class AmcrestAPI(object):
sw_version = camera.software_information[0].replace("version=", "").strip() sw_version = camera.software_information[0].replace("version=", "").strip()
build_version = camera.software_information[1].strip() build_version = camera.software_information[1].strip()
amcrest_version = f"{sw_version} ({build_version})" sw_version = f"{sw_version} ({build_version})"
network_config = dict(item.split('=') for item in camera.network_config.splitlines())
interface = network_config['table.Network.DefaultInterface']
ip_address = network_config[f'table.Network.{interface}.IPAddress']
mac_address = network_config[f'table.Network.{interface}.PhysicalAddress'].upper()
vendor = camera.vendor_information
hardware_version = camera.hardware_version
except AmcrestError as error: except AmcrestError as error:
raise Exception(f'Error fetching camera details for {host}: {error}') raise Exception(f'Error fetching camera details for {host}: {error}')
@ -71,9 +83,14 @@ class AmcrestAPI(object):
"is_ad410": is_ad410, "is_ad410": is_ad410,
"is_doorbell": is_doorbell, "is_doorbell": is_doorbell,
"serial_number": serial_number, "serial_number": serial_number,
"software_version": amcrest_version, "software_version": sw_version,
"hardware_version": hardware_version, "hardware_version": camera.hardware_version,
"vendor": vendor, "vendor": camera.vendor_information,
"network": {
"interface": interface,
"ip_address": ip_address,
"mac": mac_address,
}
}, },
"storage": {}, "storage": {},
} }
@ -85,7 +102,7 @@ class AmcrestAPI(object):
del self.devices[device_id]['error'] del self.devices[device_id]['error']
except Exception as err: except Exception as err:
err_msg = f'Problem re-connecting to camera: {err}' err_msg = f'Problem re-connecting to camera: {err}'
self.log(err_msg, level='ERROR') self.logger.error(err_msg)
self.devices[device_id]["error"] = err_msg self.devices[device_id]["error"] = err_msg
raise Exception(err_msg) raise Exception(err_msg)
@ -93,7 +110,7 @@ class AmcrestAPI(object):
storage = self.devices[device_id]["camera"].storage_all storage = self.devices[device_id]["camera"].storage_all
except Exception as err: except Exception as err:
err_msg = f'Problem connecting with camera to get storage stats: {err}' err_msg = f'Problem connecting with camera to get storage stats: {err}'
self.log(err_msg, level='ERROR') self.logger.error(err_msg)
self.devices[device_id]["error"] = err_msg self.devices[device_id]["error"] = err_msg
raise Exception(err_msg) raise Exception(err_msg)
return { return {
@ -103,22 +120,48 @@ class AmcrestAPI(object):
'total': to_gb(storage['total']), 'total': to_gb(storage['total']),
} }
async def get_device_event_actions(self, device_id): async def collect_all_device_events(self):
events = [] try:
device = self.devices[device_id] tasks = [self.get_events_from_device(device_id) for device_id in self.devices]
config = device['config'] await asyncio.gather(*tasks)
async for code, payload in device["camera"].async_event_actions("All"): self.logger.info(f'Checked all devices for events')
self.log(f"Event on {config['host']} - {code}: {payload['action']}") except Exception as err:
self.logger.error(f'collect_all_device_events: {err}')
async def get_events_from_device(self, device_id):
try:
async for code, payload in self.devices[device_id]["camera"].async_event_actions("All"):
await self.process_device_event(device_id, code, payload)
except Exception as err:
self.logger.error(f'get_events_from_device: {err}')
self.logger.info(f'Checked {device_id} for events')
async def process_device_event(self, device_id, code, payload):
try:
config = self.devices[device_id]['config']
self.logger.info(f'Event on {config["host"]} - {code}: {payload}')
if ((code == "ProfileAlarmTransmit" and config["is_ad110"]) if ((code == "ProfileAlarmTransmit" and config["is_ad110"])
or (code == "VideoMotion" and not config["is_ad110"])): or (code == "VideoMotion" and not config["is_ad110"])):
motion_payload = "on" if payload["action"] == "Start" else "off" motion_payload = "on" if payload["action"] == "Start" else "off"
events.append({ 'event': 'motion', 'payload': motion_payload }) self.events.append({ 'device_id': device_id, 'event': 'motion', 'payload': motion_payload })
elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human": elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human":
human_payload = "on" if payload["action"] == "Start" else "off" human_payload = "on" if payload["action"] == "Start" else "off"
events.append({ 'event': 'human', 'payload': human_payload }) self.events.append({ 'device_id': device_id, 'event': 'human', 'payload': human_payload })
elif code == "_DoTalkAction_": elif code == "_DoTalkAction_":
doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off" doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off"
events.append({ 'event': 'doorbell', 'payload': doorbell_payload }) self.events.append({ 'device_id': device_id, 'event': 'doorbell', 'payload': doorbell_payload })
self.events.append({ 'device_id': device_id, 'event': code , 'payload': payload['action'] })
self.logger.info(f'Event(s) appended to queue, queue length now: {len(self.events)}')
except Exception as err:
self.logger.error(f'process_device_event: {err}')
def get_next_event(self):
if len(self.events) > 0:
self.logger.info('Found event on queue')
return self.events.pop(0)
events.append({ 'event': 'event', 'payload': payload }) return None
return events

@ -2,6 +2,7 @@ import asyncio
from datetime import date from datetime import date
import amcrest_api import amcrest_api
import json import json
import logging
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import random import random
import signal import signal
@ -13,6 +14,7 @@ from zoneinfo import ZoneInfo
class AmcrestMqtt(object): class AmcrestMqtt(object):
def __init__(self, config): def __init__(self, config):
self.logger = logging.getLogger(__name__)
self.running = False self.running = False
self.timezone = config['timezone'] self.timezone = config['timezone']
@ -27,7 +29,6 @@ class AmcrestMqtt(object):
self.client_id = self.get_new_client_id() self.client_id = self.get_new_client_id()
self.version = config['version'] self.version = config['version']
self.hide_ts = config['hide_ts'] or False
self.device_update_interval = config['amcrest'].get('device_update_interval', 600) self.device_update_interval = config['amcrest'].get('device_update_interval', 600)
@ -37,12 +38,9 @@ class AmcrestMqtt(object):
self.devices = {} self.devices = {}
self.configs = {} self.configs = {}
def log(self, msg, level='INFO'):
app_log(msg, level=level, tz=self.timezone, hide_ts=self.hide_ts)
async def _handle_sigterm(self, loop, tasks): async def _handle_sigterm(self, loop, tasks):
self.running = False self.running = False
self.log('SIGTERM received, waiting for tasks to cancel...', level='WARN') self.logger.warn('SIGTERM received, waiting for tasks to cancel...')
for t in tasks: for t in tasks:
t.cancel() t.cancel()
@ -59,7 +57,7 @@ class AmcrestMqtt(object):
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
self.running = False self.running = False
self.log('Exiting gracefully') self.logger.info('Exiting gracefully')
if self.mqttc is not None and self.mqttc.is_connected(): if self.mqttc is not None and self.mqttc.is_connected():
for device_id in self.devices: for device_id in self.devices:
@ -70,19 +68,19 @@ class AmcrestMqtt(object):
self.mqttc.disconnect() self.mqttc.disconnect()
else: else:
self.log('Lost connection to MQTT') self.logger.info('Lost connection to MQTT')
# MQTT Functions # MQTT Functions
def mqtt_on_connect(self, client, userdata, flags, rc, properties): def mqtt_on_connect(self, client, userdata, flags, rc, properties):
if rc != 0: if rc != 0:
self.log(f'MQTT CONNECTION ISSUE ({rc})', level='ERROR') self.logger.error(f'MQTT CONNECTION ISSUE ({rc})')
exit() exit()
self.log(f'MQTT connected as {self.client_id}') self.logger.info(f'MQTT connected as {self.client_id}')
client.subscribe(self.get_device_sub_topic()) client.subscribe(self.get_device_sub_topic())
client.subscribe(self.get_attribute_sub_topic()) client.subscribe(self.get_attribute_sub_topic())
def mqtt_on_disconnect(self, client, userdata, flags, rc, properties): def mqtt_on_disconnect(self, client, userdata, flags, rc, properties):
self.log('MQTT connection closed') self.logger.info('MQTT connection closed')
# if reconnect, lets use a new client_id # if reconnect, lets use a new client_id
self.client_id = self.get_new_client_id() self.client_id = self.get_new_client_id()
@ -93,13 +91,10 @@ class AmcrestMqtt(object):
exit() exit()
def mqtt_on_log(self, client, userdata, paho_log_level, msg): def mqtt_on_log(self, client, userdata, paho_log_level, msg):
level = None
if paho_log_level == mqtt.LogLevel.MQTT_LOG_ERR: if paho_log_level == mqtt.LogLevel.MQTT_LOG_ERR:
level = 'ERROR' self.logger.error(f'MQTT LOG: {msg}')
if paho_log_level == mqtt.LogLevel.MQTT_LOG_WARNING: elif paho_log_level == mqtt.LogLevel.MQTT_LOG_WARNING:
level = 'WARN' self.logger.warn(f'MQTT LOG: {msg}')
if level:
self.log(f'MQTT LOG: {msg}', level=level)
def mqtt_on_message(self, client, userdata, msg): def mqtt_on_message(self, client, userdata, msg):
if not msg or not msg.payload: if not msg or not msg.payload:
@ -107,7 +102,7 @@ class AmcrestMqtt(object):
topic = msg.topic topic = msg.topic
payload = json.loads(msg.payload) payload = json.loads(msg.payload)
self.log(f'Got MQTT message for {topic} - {payload}') self.logger.info(f'Got MQTT message for {topic} - {payload}')
# we might get: # we might get:
# device/component/set # device/component/set
@ -127,7 +122,7 @@ class AmcrestMqtt(object):
elif components[-2] == 'set': elif components[-2] == 'set':
mac = components[-3][-16:] mac = components[-3][-16:]
else: else:
self.log(f'UNKNOWN MQTT MESSAGE STRUCTURE: {topic}', level='ERROR') self.logger.error(f'UNKNOWN MQTT MESSAGE STRUCTURE: {topic}')
return return
# ok, lets format the device_id and send to amcrest # ok, lets format the device_id and send to amcrest
device_id = ':'.join([mac[i:i+2] for i in range (0, len(mac), 2)]) device_id = ':'.join([mac[i:i+2] for i in range (0, len(mac), 2)])
@ -135,7 +130,7 @@ class AmcrestMqtt(object):
def mqtt_on_subscribe(self, client, userdata, mid, reason_code_list, properties): def mqtt_on_subscribe(self, client, userdata, mid, reason_code_list, properties):
rc_list = map(lambda x: x.getName(), reason_code_list) rc_list = map(lambda x: x.getName(), reason_code_list)
self.log(f'MQTT SUBSCRIBED: reason_codes - {'; '.join(rc_list)}', level='DEBUG') self.logger.debug(f'MQTT SUBSCRIBED: reason_codes - {'; '.join(rc_list)}')
# MQTT Helpers # MQTT Helpers
def mqttc_create(self): def mqttc_create(self):
@ -176,7 +171,7 @@ class AmcrestMqtt(object):
self.mqtt_connect_time = time.time() self.mqtt_connect_time = time.time()
self.mqttc.loop_start() self.mqttc.loop_start()
except ConnectionError as error: except ConnectionError as error:
self.log(f'COULD NOT CONNECT TO MQTT {self.mqtt_config.get("host")}: {error}', level='ERROR') self.logger.error(f'COULD NOT CONNECT TO MQTT {self.mqtt_config.get("host")}: {error}')
exit(1) exit(1)
# MQTT Topics # MQTT Topics
@ -284,12 +279,12 @@ class AmcrestMqtt(object):
# amcrest Helpers # amcrest Helpers
async def setup_devices(self): async def setup_devices(self):
self.log(f'Setup devices') self.logger.info(f'Setup devices')
try: try:
devices = await self.amcrestc.connect_to_devices() devices = await self.amcrestc.connect_to_devices()
except Exception as err: except Exception as err:
self.log(f'Failed to connect to 1 or more devices {err}', level='ERROR') self.logger.error(f'Failed to connect to 1 or more devices {err}')
exit(1) exit(1)
self.publish_service_device() self.publish_service_device()
@ -315,6 +310,12 @@ class AmcrestMqtt(object):
'ids': device_id, 'ids': device_id,
'sw_version': config['software_version'], 'sw_version': config['software_version'],
'hw_version': config['hardware_version'], 'hw_version': config['hardware_version'],
'connections': [
['host', config['host']],
['mac', config['network']['mac']],
['ip address', config['network']['ip_address']],
],
'configuration_url': 'http://' + config['host'] + '/',
'via_device': self.service_slug, 'via_device': self.service_slug,
} }
self.devices[device_id]['origin'] = { self.devices[device_id]['origin'] = {
@ -325,20 +326,17 @@ class AmcrestMqtt(object):
self.add_components_to_device(device_id) self.add_components_to_device(device_id)
if first: if first:
self.log(f'Adding device: "{config['device_name']}" [Amcrest {config["device_type"]}] ({device_id})') self.logger.info(f'Adding device: "{config['device_name']}" [Amcrest {config["device_type"]}] ({device_id})')
self.send_device_discovery(device_id) self.send_device_discovery(device_id)
else: else:
self.log(f'Updated device: {self.devices[device_id]['device']['name']}', level='DEBUG') self.logger.debug(f'Updated device: {self.devices[device_id]['device']['name']}')
# device discovery sent, now it is save to add these to the dict # device discovery sent, now it is save to add these to the dict
self.devices[device_id]['state'] = {} self.devices[device_id]['state'] = {}
self.devices[device_id]['availability'] = 'online' self.devices[device_id]['availability'] = 'online'
self.devices[device_id]['storage'] = {}
self.devices[device_id]['motion'] = {}
self.devices[device_id]['event'] = {}
else: else:
if first_time_through: if first_time_through:
self.log(f'Saw device, but not supported yet: "{config["device_name"]}" [amcrest {config["device_type"]}] ({device_id})') self.logger.info(f'Saw device, but not supported yet: "{config["device_name"]}" [amcrest {config["device_type"]}] ({device_id})')
# add amcrest components to devices # add amcrest components to devices
def add_components_to_device(self, device_id): def add_components_to_device(self, device_id):
@ -395,7 +393,7 @@ class AmcrestMqtt(object):
components[self.get_slug(device_id, 'serial_number')] = { components[self.get_slug(device_id, 'serial_number')] = {
'name': 'Serial Number', 'name': 'Serial Number',
'platform': 'sensor', 'platform': 'sensor',
'icon': 'mdi:alphabetical-variant-up', 'icon': 'mdi:identifier',
'state_topic': device['state_topic'], 'state_topic': device['state_topic'],
'value_template': '{{ value_json.serial_number }}', 'value_template': '{{ value_json.serial_number }}',
'entity_category': 'diagnostic', 'entity_category': 'diagnostic',
@ -426,7 +424,6 @@ class AmcrestMqtt(object):
'unit_of_measurement': '%', 'unit_of_measurement': '%',
'state_topic': self.get_discovery_topic(device_id, 'storage'), 'state_topic': self.get_discovery_topic(device_id, 'storage'),
'value_template': '{{ value_json.used_percent }}', 'value_template': '{{ value_json.used_percent }}',
'entity_category': 'diagnostic',
'unique_id': self.get_slug(device_id, 'storage_used_percent'), 'unique_id': self.get_slug(device_id, 'storage_used_percent'),
} }
components[self.get_slug(device_id, 'storage_total')] = { components[self.get_slug(device_id, 'storage_total')] = {
@ -436,7 +433,6 @@ class AmcrestMqtt(object):
'unit_of_measurement': 'GB', 'unit_of_measurement': 'GB',
'state_topic': self.get_discovery_topic(device_id, 'storage'), 'state_topic': self.get_discovery_topic(device_id, 'storage'),
'value_template': '{{ value_json.total }}', 'value_template': '{{ value_json.total }}',
'entity_category': 'diagnostic',
'unique_id': self.get_slug(device_id, 'storage_total'), 'unique_id': self.get_slug(device_id, 'storage_total'),
} }
components[self.get_slug(device_id, 'storage_used')] = { components[self.get_slug(device_id, 'storage_used')] = {
@ -446,13 +442,13 @@ class AmcrestMqtt(object):
'unit_of_measurement': 'GB', 'unit_of_measurement': 'GB',
'state_topic': self.get_discovery_topic(device_id, 'storage'), 'state_topic': self.get_discovery_topic(device_id, 'storage'),
'value_template': '{{ value_json.used }}', 'value_template': '{{ value_json.used }}',
'entity_category': 'diagnostic',
'unique_id': self.get_slug(device_id, 'storage_used'), 'unique_id': self.get_slug(device_id, 'storage_used'),
} }
components[self.get_slug(device_id, 'last_update')] = { components[self.get_slug(device_id, 'last_update')] = {
'name': 'Last Update', 'name': 'Last Update',
'platform': 'sensor', 'platform': 'sensor',
'device_class': 'timestamp', 'device_class': 'timestamp',
'entity_category': 'diagnostic',
'state_topic': device['state_topic'], 'state_topic': device['state_topic'],
'value_template': '{{ value_json.last_update }}', 'value_template': '{{ value_json.last_update }}',
'unique_id': self.get_slug(device_id, 'last_update'), 'unique_id': self.get_slug(device_id, 'last_update'),
@ -468,7 +464,7 @@ class AmcrestMqtt(object):
self.mqttc.publish(self.get_discovery_topic(device_id, 'config'), json.dumps(device), retain=True) self.mqttc.publish(self.get_discovery_topic(device_id, 'config'), json.dumps(device), retain=True)
def refresh_all_devices(self): def refresh_all_devices(self):
self.log(f'Refreshing storage info for all devices (every {self.device_update_interval} sec)') self.logger.info(f'Refreshing storage info for all devices (every {self.device_update_interval} sec)')
# refresh devices starting with the device updated the longest time ago # refresh devices starting with the device updated the longest time ago
for each in sorted(self.devices.items(), key=lambda dt: (dt is None, dt)): for each in sorted(self.devices.items(), key=lambda dt: (dt is None, dt)):
@ -477,11 +473,6 @@ class AmcrestMqtt(object):
break break
device_id = each[0] device_id = each[0]
# all just to format the log record
last_updated = self.devices[device_id]['state']['last_update'][:19].replace('T',' ') if 'last_update' in self.devices[device_id]['state'] else 'server started'
self.log(f'Refreshing device "{self.devices[device_id]['device']['name']} ({device_id})", not updated since: {last_updated}')
self.configs[device_id]['last_update'] = datetime.now(ZoneInfo(self.timezone))
self.refresh_device(device_id) self.refresh_device(device_id)
def refresh_device(self, device_id): def refresh_device(self, device_id):
@ -496,6 +487,8 @@ class AmcrestMqtt(object):
result = self.amcrestc.get_device_storage_stats(device_id) result = self.amcrestc.get_device_storage_stats(device_id)
if result and 'last_update' in result: if result and 'last_update' in result:
self.devices[device_id]['storage'] = result self.devices[device_id]['storage'] = result
self.configs[device_id]['last_update'] = datetime.now(ZoneInfo(self.timezone))
self.devices[device_id]['state'] = { self.devices[device_id]['state'] = {
'status': 'online', 'status': 'online',
'host': config['host'], 'host': config['host'],
@ -504,44 +497,25 @@ class AmcrestMqtt(object):
'last_update': config['last_update'].isoformat(), 'last_update': config['last_update'].isoformat(),
} }
self.update_service_device() self.update_service_device()
self.publish_device(device_id) self.publish_device(device_id)
def publish_device(self, device_id): def publish_device(self, device_id):
self.mqttc.publish( for topic in ['state','availability','storage','motion','human','doorbell','event']:
self.get_discovery_topic(device_id,'state'), if topic in self.devices[device_id]:
json.dumps(self.devices[device_id]['state']), self.mqttc.publish(
retain=True self.get_discovery_topic(device_id,topic),
) json.dumps(self.devices[device_id][topic]) if isinstance(self.devices[device_id][topic], dict) else self.devices[device_id][topic],
self.mqttc.publish( retain=True
self.get_discovery_topic(device_id,'availability'), )
self.devices[device_id]['availability'],
retain=True
)
self.mqttc.publish(
self.get_discovery_topic(device_id,'storage'),
json.dumps(self.devices[device_id]['storage']),
retain=True
)
self.mqttc.publish(
self.get_discovery_topic(device_id,'motion'),
json.dumps(self.devices[device_id]['motion']),
retain=True
)
self.mqttc.publish(
self.get_discovery_topic(device_id,'event'),
json.dumps(self.devices[device_id]['event']),
retain=True
)
def handle_service_message(self, attribute, message): def handle_service_message(self, attribute, message):
match attribute: match attribute:
case 'device_refresh': case 'device_refresh':
self.device_update_interval = message self.device_update_interval = message
self.log(f'Updated UPDATE_INTERVAL to be {message}') self.logger.info(f'Updated UPDATE_INTERVAL to be {message}')
case _: case _:
self.log(f'IGNORED UNRECOGNIZED amcrest-service MESSAGE for {attribute}: {message}') self.logger.info(f'IGNORED UNRECOGNIZED amcrest-service MESSAGE for {attribute}: {message}')
return return
self.update_service_device() self.update_service_device()
@ -550,13 +524,13 @@ class AmcrestMqtt(object):
caps = self.convert_attributes_to_capabilities(data) caps = self.convert_attributes_to_capabilities(data)
sku = self.devices[device_id]['device']['model'] sku = self.devices[device_id]['device']['model']
self.log(f'COMMAND {device_id} = {caps}', level='DEBUG') self.logger.debug(f'COMMAND {device_id} = {caps}')
first = True first = True
for key in caps: for key in caps:
if not first: if not first:
time.sleep(1) time.sleep(1)
self.log(f'CMD DEVICE {self.devices[device_id]['device']['name']} ({device_id}) {key} = {caps[key]}', level='DEBUG') self.logger.debug(f'CMD DEVICE {self.devices[device_id]['device']['name']} ({device_id}) {key} = {caps[key]}')
self.amcrestc.send_command(device_id, sku, caps[key]['type'], caps[key]['instance'], caps[key]['value']) self.amcrestc.send_command(device_id, sku, caps[key]['type'], caps[key]['instance'], caps[key]['value'])
self.update_service_device() self.update_service_device()
first = False first = False
@ -564,31 +538,39 @@ class AmcrestMqtt(object):
if device_id not in self.boosted: if device_id not in self.boosted:
self.boosted.append(device_id) self.boosted.append(device_id)
async def check_devices_for_events(self): def check_for_events(self):
try: while device_event := self.amcrestc.get_next_event():
for device_id in self.devices: if 'device_id' not in device_event:
events = await self.amcrestc.get_device_event_actions(device_id) self.logger(f'Got event, but missing device_id: {device_event}')
log(f'Got events for {device_id}: {events.join(';')}') continue
for event in events: device_id = device_event['device_id']
self.devices[device_id][event] = events[event] event = device_event['event']
self.mqttc.publish( payload = device_event['payload']
self.get_discovery_topic(device_id,event), device = self.devices[device_id]
json.dumps(self.devices[device_id][event]),
retain=True self.logger.info(f'Got event for {device_id}: {event} {payload}')
) # if one of our known sensors
self.refresh_device(device_id) if event in ['motion','human','doorbell']:
except Exception as err: device[event] = payload
self.log(f'CAUGHT IN check_devices_for_events: {err}', level='ERROR') # otherwise, just store generically
else:
device['event'] = f'{event}: {payload}'
self.refresh_device(device_id)
# main loop # main loop
async def main_loop(self): async def main_loop(self):
await self.setup_devices() try:
await self.setup_devices()
except:
self.running = False
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
tasks = [ tasks = [
asyncio.create_task(self.device_loop()), asyncio.create_task(self.device_loop()),
asyncio.create_task(self.device_actions()), asyncio.create_task(self.collect_events()),
asyncio.create_task(self.process_events()),
] ]
for signame in {'SIGINT','SIGTERM'}: for signame in {'SIGINT','SIGTERM'}:
@ -600,7 +582,7 @@ class AmcrestMqtt(object):
try: try:
results = await asyncio.gather(*tasks, return_exceptions=True) results = await asyncio.gather(*tasks, return_exceptions=True)
except Exception as err: except Exception as err:
self.log(f'CAUGHT IN main_loop {err}', level='ERROR') self.logger.error(f'main_loop: {err}')
self.running = False self.running = False
async def device_loop(self): async def device_loop(self):
@ -609,12 +591,22 @@ class AmcrestMqtt(object):
self.refresh_all_devices() self.refresh_all_devices()
await asyncio.sleep(self.device_update_interval) await asyncio.sleep(self.device_update_interval)
except Exception as err: except Exception as err:
self.log(f'CAUGHT IN device_loop {err}', level='ERROR') self.logger.error('device_loop: {err}')
self.running = False self.running = False
async def device_actions(self): async def collect_events(self):
while self.running == True: while self.running == True:
try: try:
await self.check_devices_for_events() await self.amcrestc.collect_all_device_events()
except Exception as err: except Exception as err:
self.log(f'CAUGHT IN device_actions {err}', level='ERROR') self.logger.error(f'collect_events: {err}')
self.running = False
async def process_events(self):
while self.running == True:
try:
self.check_for_events()
await asyncio.sleep(1)
except Exception as err:
self.logger.error(f'process_events: {err}')
self.running = False

@ -1,6 +1,7 @@
import asyncio import asyncio
import argparse import argparse
from amcrest_mqtt import AmcrestMqtt from amcrest_mqtt import AmcrestMqtt
import logging
import os import os
import sys import sys
import time import time
@ -21,8 +22,15 @@ def read_version():
return read_file('../VERSION') return read_file('../VERSION')
# Let's go! # Let's go!
logging.basicConfig(
format = '%(asctime)s.%(msecs)03d [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.INFO
)
logger = logging.getLogger(__name__)
version = read_version() version = read_version()
app_log(f'Starting: amcrest2mqtt v{version}') logger.info(f'Starting: amcrest2mqtt v{version}')
# cmd-line args # cmd-line args
argparser = argparse.ArgumentParser() argparser = argparse.ArgumentParser()
@ -43,11 +51,11 @@ try:
configfile = configpath + 'config.yaml' configfile = configpath + 'config.yaml'
with open(configfile) as file: with open(configfile) as file:
config = yaml.safe_load(file) config = yaml.safe_load(file)
app_log(f'Reading config file {configpath}') logger.info(f'Reading config file {configpath}')
config['config_from'] = 'file' config['config_from'] = 'file'
config['config_path'] = configpath config['config_path'] = configpath
except: except:
app_log(f'config.yaml not found, checking ENV') logger.info(f'config.yaml not found, checking ENV')
config = { config = {
'mqtt': { 'mqtt': {
'host': os.getenv('MQTT_HOST') or 'localhost', 'host': os.getenv('MQTT_HOST') or 'localhost',
@ -72,44 +80,43 @@ except:
'device_update_interval': int(os.getenv("DEVICE_UPDATE_INTERVAL") or 600), 'device_update_interval': int(os.getenv("DEVICE_UPDATE_INTERVAL") or 600),
}, },
'debug': True if os.getenv('DEBUG') else False, 'debug': True if os.getenv('DEBUG') else False,
'hide_ts': True if os.getenv('HIDE_TS') else False,
'config_from': 'env', 'config_from': 'env',
'timezone': os.getenv('TZ'), 'timezone': os.getenv('TZ'),
} }
config['version'] = version config['version'] = version
config['configpath'] = os.path.dirname(configpath) config['configpath'] = os.path.dirname(configpath)
if not 'hide_ts' in config:
config['hide_ts'] = False
# Exit if any of the required vars are not provided # Exit if any of the required vars are not provided
if config['amcrest']['hosts'] is None: if config['amcrest']['hosts'] is None:
app_log("Missing env var: AMCREST_HOSTS or amcrest.hosts in config", level="ERROR") logger.error('Missing env var: AMCREST_HOSTS or amcrest.hosts in config')
sys.exit(1) sys.exit(1)
config['amcrest']['host_count'] = len(config['amcrest']['hosts']) config['amcrest']['host_count'] = len(config['amcrest']['hosts'])
if config['amcrest']['names'] is None: if config['amcrest']['names'] is None:
app_log("Missing env var: AMCREST_NAMES or amcrest.names in config", level="ERROR") logger.error('Missing env var: AMCREST_NAMES or amcrest.names in config')
sys.exit(1) sys.exit(1)
config['amcrest']['name_count'] = len(config['amcrest']['names']) config['amcrest']['name_count'] = len(config['amcrest']['names'])
if config['amcrest']['host_count'] != config['amcrest']['name_count']: if config['amcrest']['host_count'] != config['amcrest']['name_count']:
app_log("The AMCREST_HOSTS and AMCREST_NAMES must have the same number of space-delimited hosts/names", level="ERROR") logger.error('The AMCREST_HOSTS and AMCREST_NAMES must have the same number of space-delimited hosts/names')
sys.exit(1) sys.exit(1)
app_log(f"Found {config['amcrest']['host_count']} host(s) defined to monitor") logger.info(f'Found {config["amcrest"]["host_count"]} host(s) defined to monitor')
if config['amcrest']['password'] is None: if config['amcrest']['password'] is None:
app_log("Please set the AMCREST_PASSWORD environment variable", level="ERROR") logger.error('Please set the AMCREST_PASSWORD environment variable')
sys.exit(1) sys.exit(1)
if not 'timezone' in config: if not 'timezone' in config:
app_log('`timezone` required in config file or in TZ env var', level='ERROR', tz=timezone) logger.info('`timezone` required in config file or in TZ env var', level='ERROR', tz=timezone)
exit(1) exit(1)
else: else:
app_log(f'TIMEZONE set as {config["timezone"]}', tz=config["timezone"]) logger.info(f'TIMEZONE set as {config["timezone"]}')
try: try:
with AmcrestMqtt(config) as mqtt: with AmcrestMqtt(config) as mqtt:
asyncio.run(mqtt.main_loop()) asyncio.run(mqtt.main_loop())
except KeyboardInterrupt:
pass
except Exception as err: except Exception as err:
app_log(f'CAUGHT IN app: {err}', level='ERROR') logging.exception("Exception caught", exc_info=True)
Loading…
Cancel
Save