diff --git a/.gitignore b/.gitignore index f103928..006acbe 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,8 @@ dist/ # Environments venv/ -# Config testing +# Config testing and notes +config +config/ config.yaml - +NOTES diff --git a/README.md b/README.md index 61a9fe2..36b7ca4 100644 --- a/README.md +++ b/README.md @@ -76,4 +76,6 @@ useful to myself and others, not for any financial gain - but any token of appre Buy Me A Coffee +### How Happy am I? + diff --git a/VERSION b/VERSION index a75a83e..0b539a7 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.99.19 +0.99.20 diff --git a/amcrest_api.py b/amcrest_api.py index e1c667e..bd4d46d 100644 --- a/amcrest_api.py +++ b/amcrest_api.py @@ -1,6 +1,7 @@ from amcrest import AmcrestCamera, AmcrestError import asyncio from asyncio import timeout +import base64 from datetime import datetime import httpx import logging @@ -14,6 +15,7 @@ class AmcrestAPI(object): # we don't want to get the .info HTTP Request logs from Amcrest logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("amcrest.http").setLevel(logging.ERROR) self.last_call_date = '' self.timezone = config['timezone'] @@ -97,6 +99,8 @@ class AmcrestAPI(object): }, } + # Storage stats ------------------------------------------------------------------------------- + def get_device_storage_stats(self, device_id): try: storage = self.devices[device_id]["camera"].storage_all @@ -104,19 +108,36 @@ class AmcrestAPI(object): self.logger.error(f'Problem connecting with camera to get storage stats: {err}') return {} - return { + return { 'last_update': str(datetime.now(ZoneInfo(self.timezone))), 'used_percent': str(storage['used_percent']), 'used': to_gb(storage['used']), 'total': to_gb(storage['total']), } - async def collect_all_device_events(self): + # Snapshots ----------------------------------------------------------------------------------- + + async def collect_all_device_snapshots(self): + tasks = [self.get_snapshot_from_device(device_id) for device_id in self.devices] + await asyncio.gather(*tasks) + + async def get_snapshot_from_device(self, device_id): try: - tasks = [self.get_events_from_device(device_id) for device_id in self.devices] - await asyncio.gather(*tasks) + image = await self.devices[device_id]["camera"].async_snapshot() + self.devices[device_id]['snapshot'] = base64.b64encode(image) + self.logger.debug(f'Processed snapshot from ({device_id}) {len(image)} bytes raw, and {len(self.devices[device_id]['snapshot'])} bytes base64') except Exception as err: - self.logger.error(f'collect_all_device_events: {err}') + self.logger.error(f'Failed to get snapshot from device ({device_id})') + pass + + def get_snapshot(self, device_id): + return self.devices[device_id]['snapshot'] if 'snapshot' in self.devices[device_id] else None + + # Events -------------------------------------------------------------------------------------- + + async def collect_all_device_events(self): + tasks = [self.get_events_from_device(device_id) for device_id in self.devices] + await asyncio.gather(*tasks) async def get_events_from_device(self, device_id): try: @@ -128,38 +149,34 @@ class AmcrestAPI(object): self.reset_connection(device_id) async def process_device_event(self, device_id, code, payload): - try: - config = self.devices[device_id]['config'] - - self.logger.debug(f'Event on {config["host"]} - {code}: {payload}') - - if ((code == "ProfileAlarmTransmit" and config["is_ad110"]) - or (code == "VideoMotion" and not config["is_ad110"])): - motion_payload = "on" if payload["action"] == "Start" else "off" - self.events.append({ 'device_id': device_id, 'event': 'motion', 'payload': motion_payload }) - elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human": - human_payload = "on" if payload["action"] == "Start" else "off" - self.events.append({ 'device_id': device_id, 'event': 'human', 'payload': human_payload }) - elif code == "_DoTalkAction_": - doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off" - self.events.append({ 'device_id': device_id, 'event': 'doorbell', 'payload': doorbell_payload }) - elif code == "NewFile": - file_payload = { 'file': payload["data"]["File"], 'size': payload["data"]["Size"] } - self.events.append({ 'device_id': device_id, 'event': 'recording', 'payload': file_payload }) - # lets ignore the event codes we don't care about (for now) - elif code == "VideoMotionInfo": - pass - elif code == "TimeChange": - pass - elif code == "NTPAdjustTime": - pass - elif code == "RtspSessionDisconnect": - pass - else: - self.events.append({ 'device_id': device_id, 'event': code , 'payload': payload['action'] }) - except Exception as err: - self.logger.error(f'process_device_event: {err}', exc_info=True) - + config = self.devices[device_id]['config'] + + self.logger.debug(f'Event on {config["host"]} - {code}: {payload}') + + if ((code == "ProfileAlarmTransmit" and config["is_ad110"]) + or (code == "VideoMotion" and not config["is_ad110"])): + motion_payload = "on" if payload["action"] == "Start" else "off" + self.events.append({ 'device_id': device_id, 'event': 'motion', 'payload': motion_payload }) + elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human": + human_payload = "on" if payload["action"] == "Start" else "off" + self.events.append({ 'device_id': device_id, 'event': 'human', 'payload': human_payload }) + elif code == "_DoTalkAction_": + doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off" + self.events.append({ 'device_id': device_id, 'event': 'doorbell', 'payload': doorbell_payload }) + elif code == "NewFile": + file_payload = { 'file': payload["data"]["File"], 'size': payload["data"]["Size"] } + self.events.append({ 'device_id': device_id, 'event': 'recording', 'payload': file_payload }) + # lets ignore the event codes we don't care about (for now) + elif code == "VideoMotionInfo": + pass + elif code == "TimeChange": + pass + elif code == "NTPAdjustTime": + pass + elif code == "RtspSessionDisconnect": + pass + else: + self.events.append({ 'device_id': device_id, 'event': code , 'payload': payload['action'] }) def get_next_event(self): return self.events.pop(0) if len(self.events) > 0 else None \ No newline at end of file diff --git a/amcrest_mqtt.py b/amcrest_mqtt.py index 8714e8c..91fcfcb 100644 --- a/amcrest_mqtt.py +++ b/amcrest_mqtt.py @@ -26,7 +26,8 @@ class AmcrestMqtt(object): self.timezone = config['timezone'] self.version = config['version'] - self.device_update_interval = config['amcrest'].get('device_update_interval', 600) + self.storage_update_interval = config['amcrest'].get('storage_update_interval', 900) + self.snapshot_update_interval = config['amcrest'].get('snapshot_update_interval', 300) self.client_id = self.get_new_client_id() self.service_name = self.mqtt_config['prefix'] + ' service' @@ -87,38 +88,54 @@ class AmcrestMqtt(object): self.logger.error('Failed to understand MQTT message, ignoring') return - self.logger.info(f'Got MQTT message for {topic} - {payload}') - # we might get: - # device/component/set - # device/component/set/attribute - # homeassistant/device/component/set - # homeassistant/device/component/set/attribute + # */service/set + # */service/set/attribute + # */device/component/set + # */device/component/set/attribute components = topic.split('/') # handle this message if it's for us, otherwise pass along to amcrest API - try: - if components[-2] == self.get_component_slug('service'): - self.handle_service_message(None, payload) - elif components[-3] == self.get_component_slug('service'): - self.handle_service_message(components[-1], payload) + if components[-2] == self.get_component_slug('service'): + self.handle_service_message(None, payload) + elif components[-3] == self.get_component_slug('service'): + self.handle_service_message(components[-1], payload) + else: + if components[-1] == 'set': + vendor, device_id = components[-2].split('-') + elif components[-2] == 'set': + vendor, device_id = components[-3].split('-') else: - if components[-1] == 'set': - vendor, device_id = components[-2].split('-') - elif components[-2] == 'set': - vendor, device_id = components[-3].split('-') - else: - self.logger.error(f'UNKNOWN MQTT MESSAGE STRUCTURE: {topic}') - return - # of course, we only care about our 'amcrest-' messages - if vendor != 'amcrest': + self.logger.error(f'UNKNOWN MQTT MESSAGE STRUCTURE: {topic}') + return + + # of course, we only care about our 'amcrest-' messages + if vendor != 'amcrest': + return + + # ok, it's for us, lets announce it + self.logger.debug(f'Incoming MQTT message for {topic} - {payload}') + + # if we only got back a scalar value, lets turn it into a dict with + # the attribute name after `/set/` in the command topic + if not isinstance(payload, dict) and attribute: + payload = { attribute: payload } + + # if we just started, we might get messages immediately, lets + # wait up to 3 min for devices to show up before we ignore the message + checks = 0 + while device_id not in self.configs: + checks += 1 + # we'll try for 3 min, and then give up + if checks > 36: + self.logger.warn(f"Got MQTT message for a device we don't know: {device_id}") return - # ok, lets format the device_id and send to amcrest - # for Amcrest devices, we use the string as-is (after the vendor name) - self.send_command(device_id, payload) - except Exception as err: - self.logger.error(f'Failed to understand MQTT message slug ({topic}): {err}, ignoring') - return + time.sleep(5) + + self.logger.info(f'Got MQTT message for: {self.configs[device_id]["device"]["name"]} - {payload}') + + # ok, lets format the device_id (not needed) and send to amcrest + self.send_command(device_id, payload) def mqtt_on_subscribe(self, client, userdata, mid, reason_code_list, properties): rc_list = map(lambda x: x.getName(), reason_code_list) @@ -204,24 +221,41 @@ class AmcrestMqtt(object): return f"{self.mqtt_config['prefix']}/{self.get_component_slug(device_id)}/{topic}" return f"{self.mqtt_config['discovery_prefix']}/device/{self.get_component_slug(device_id)}/{topic}" + def get_discovery_topic(self, device_id, topic): + if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False: + return f"{self.mqtt_config['prefix']}/{self.get_component_slug(device_id)}/{topic}" + return f"{self.mqtt_config['discovery_prefix']}/device/{self.get_component_slug(device_id)}/{topic}" + + def get_discovery_subtopic(self, device_id, topic, subtopic): + if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False: + return f"{self.mqtt_config['prefix']}/{self.get_component_slug(device_id)}/{topic}/{subtopic}" + return f"{self.mqtt_config['discovery_prefix']}/device/{self.get_component_slug(device_id)}/{topic}/{subtopic}" + # Service Device ------------------------------------------------------------------------------ def publish_service_state(self): - self.mqttc.publish( - self.get_discovery_topic('service','availability'), - 'online', - qos=self.mqtt_config['qos'], - retain=True - ) - self.mqttc.publish( - self.get_discovery_topic('service','state'), - json.dumps({ - 'status': 'online', - 'device_refresh': self.device_update_interval, - }), - qos=self.mqtt_config['qos'], - retain=True - ) + if 'service' not in self.configs: + self.configs['service'] = { + 'availability': 'online', + 'state': { 'state': 'ON' }, + 'intervals': {}, + } + + service_states = self.configs['service'] + + # update states + service_states['state'] = { + 'state': 'ON', + } + service_states['intervals'] = { + 'storage_refresh': self.storage_update_interval, + 'snapshot_refresh': self.snapshot_update_interval, + } + + for topic in ['state','availability','intervals']: + if topic in service_states: + payload = json.dumps(service_states[topic]) if isinstance(service_states[topic], dict) else service_states[topic] + self.mqttc.publish(self.get_discovery_topic('service', topic), payload, qos=self.mqtt_config['qos'], retain=True) def publish_service_device(self): state_topic = self.get_discovery_topic('service', 'state') @@ -251,26 +285,36 @@ class AmcrestMqtt(object): 'name': 'Service', 'platform': 'binary_sensor', 'schema': 'json', - 'payload_on': 'online', - 'payload_off': 'offline', + 'payload_on': 'ON', + 'payload_off': 'OFF', 'icon': 'mdi:language-python', 'state_topic': state_topic, - 'availability_topic': availability_topic, - 'value_template': '{{ value_json.status }}', + 'value_template': '{{ value_json.state }}', 'unique_id': 'amcrest_service_status', }, - self.service_slug + '_device_refresh': { - 'name': 'Device Refresh Interval', + self.service_slug + '_storage_refresh': { + 'name': 'Storage Refresh Interval', 'platform': 'number', 'schema': 'json', 'icon': 'mdi:numeric', 'min': 10, 'max': 3600, - 'state_topic': state_topic, - 'command_topic': self.get_command_topic('service', 'device_refresh'), - 'availability_topic': availability_topic, - 'value_template': '{{ value_json.device_refresh }}', - 'unique_id': 'amcrest_service_device_refresh', + 'state_topic': self.get_discovery_topic('service', 'intervals'), + 'command_topic': self.get_command_topic('service', 'storage_refresh'), + 'value_template': '{{ value_json.storage_refresh }}', + 'unique_id': 'amcrest_service_storage_refresh', + }, + self.service_slug + '_snapshot_refresh': { + 'name': 'Snapshot Refresh Interval', + 'platform': 'number', + 'schema': 'json', + 'icon': 'mdi:numeric', + 'min': 10, + 'max': 3600, + 'state_topic': self.get_discovery_topic('service', 'intervals'), + 'command_topic': self.get_command_topic('service', 'snapshot_refresh'), + 'value_template': '{{ value_json.snapshot_refresh }}', + 'unique_id': 'amcrest_service_snapshot_refresh', }, }, }), @@ -324,13 +368,13 @@ class AmcrestMqtt(object): 'support_url': 'https://github.com/weirdtangent/amcrest2mqtt', } self.add_components_to_device(device_id) - + if first: self.logger.info(f'Adding device: "{config['device_name']}" [Amcrest {config["device_type"]}] ({device_id})') self.publish_device_discovery(device_id) else: self.logger.debug(f'Updated device: {self.devices[device_id]['device']['name']}') - + # device discovery sent, now it is save to add these to the # dict (so they aren't included in device discovery object itself) self.devices[device_id]['state'] = { @@ -339,7 +383,6 @@ class AmcrestMqtt(object): 'serial_number': config['serial_number'], 'sw_version': config['software_version'], } - self.devices[device_id]['motion'] = 'off' else: if first_time_through: self.logger.info(f'Saw device, but not supported yet: "{config["device_name"]}" [amcrest {config["device_type"]}] ({device_id})') @@ -388,6 +431,23 @@ class AmcrestMqtt(object): retain=True ) + components[self.get_slug(device_id, 'camera')] = { + 'name': 'Camera', + 'platform': 'camera', + 'topic': self.get_discovery_subtopic(device_id, 'camera','snapshot'), + 'image_encoding': 'b64', + 'state_topic': device['state_topic'], + 'value_template': '{{ value_json.state }}', + 'unique_id': self.get_slug(device_id, 'camera'), + } + if 'webrtc' in self.amcrest_config: + webrtc_config = self.amcrest_config['webrtc'] + rtc_host = webrtc_config['host'] + rtc_port = webrtc_config['port'] if 'port' in webrtc_config else 1984 + rtc_source = webrtc_config['sources'].pop(0) + rtc_url = f'http://{rtc_host}:{rtc_port}/api/frame.jpeg?src={rtc_source}' + components[self.get_slug(device_id, 'camera')]['entity_picture'] = rtc_url + components[self.get_slug(device_id, 'motion')] = { 'name': 'Motion', 'platform': 'binary_sensor', @@ -476,12 +536,18 @@ class AmcrestMqtt(object): def publish_device_state(self, device_id): device = self.devices[device_id] + config = self.configs[device_id] for topic in ['state','storage','motion','human','doorbell','event','recording']: if topic in device: - payload = json.dumps(device[topic]) if isinstance(device[topic], dict) else device[topic] + payload = json.dumps(device[topic]) if isinstance(device[topic], dict) else device[topic] self.mqttc.publish(self.get_discovery_topic(device_id, topic), payload, qos=self.mqtt_config['qos'], retain=True) + if 'snapshot' in device['camera'] and device['camera']['snapshot'] is not None: + self.logger.info(f'Refreshing snapshot for {config["device_name"]}') + payload = device['camera']['snapshot'] + result = self.mqttc.publish(self.get_discovery_subtopic(device_id, 'camera','snapshot'), payload, qos=self.mqtt_config['qos'], retain=True) + def publish_device_discovery(self, device_id): device = self.devices[device_id] self.mqttc.publish( @@ -491,36 +557,43 @@ class AmcrestMqtt(object): retain=True ) + # setup initial states device['state'] = { 'state': 'ON' } + device['motion'] = 'off' + device['camera'] = { 'snapshot': None } - self.publish_device_state(device_id) - - # refresh all devices ------------------------------------------------------------------------- + # refresh * all devices ----------------------------------------------------------------------- - def refresh_all_devices(self): - self.logger.info(f'Refreshing storage info for all devices (every {self.device_update_interval} sec)') + def refresh_storage_all_devices(self): + self.logger.info(f'Refreshing storage info for all devices (every {self.storage_update_interval} sec)') - # refresh devices starting with the device updated the longest time ago - # sorted = sorted(self.devices.items(), key=lambda dt: (dt is None, dt)): for device_id in self.devices: - # break loop if we are ending - if not self.running: - break - self.refresh_device(device_id) + if not self.running: break - # other helpers ------------------------------------------------------------------------------- + device = self.devices[device_id] + config = self.configs[device_id] - def refresh_device(self, device_id): - device = self.devices[device_id] - config = self.configs[device_id] + # get the storage info, pull out last_update and save that to the device state + storage = self.amcrestc.get_device_storage_stats(device_id) + device['state']['last_update'] = storage.pop('last_update', None) + device['storage'] = storage - # get the storage info, pull out last_update and save that to the device state - storage = self.amcrestc.get_device_storage_stats(device_id) - device['state']['last_update'] = storage.pop('last_update', None) - device['storage'] = storage + self.publish_service_state() + self.publish_device_state(device_id) - self.publish_service_state() - self.publish_device_state(device_id) + def refresh_snapshot_all_devices(self): + self.logger.info(f'Collecting snapshots for all devices (every {self.snapshot_update_interval} sec)') + + for device_id in self.devices: + if not self.running: break + + device = self.devices[device_id] + config = self.configs[device_id] + + device['camera']['snapshot'] = self.amcrestc.get_snapshot(device_id) + + self.publish_service_state() + self.publish_device_state(device_id) # send command to Amcrest -------------------------------------------------------------------- @@ -537,9 +610,12 @@ class AmcrestMqtt(object): def handle_service_message(self, attribute, message): match attribute: - case 'device_refresh': - self.device_update_interval = message - self.logger.info(f'Updated UPDATE_INTERVAL to be {message}') + case 'storage_refresh': + self.storage_update_interval = message + self.logger.info(f'Updated STORAGE_REFRESH_INTERVAL to be {message}') + case 'snapshot_refresh': + self.snapshot_update_interval = message + self.logger.info(f'Updated SNAPSHOT_REFRESH_INTERVAL to be {message}') case _: self.logger.info(f'IGNORED UNRECOGNIZED amcrest-service MESSAGE for {attribute}: {message}') return @@ -563,10 +639,9 @@ class AmcrestMqtt(object): if event in ['motion','human','doorbell','recording']: self.logger.info(f'Got event for {config["device_name"]}: {event}') device[event] = payload - # otherwise, just store generically else: - self.logger.info(f'Got generic event for {config["device_name"]}: {event} {payload}') - device['event'] = f'{event}: {payload}' + self.logger.debug(f'Got "other" event for {config["device_name"]}: {event} {payload}') + device['event'] = event self.refresh_device(device_id) @@ -579,29 +654,32 @@ class AmcrestMqtt(object): for task in asyncio.all_tasks(): if not task.done(): task.cancel(f'{signame} received') - async def device_loop(self): + async def collect_storage_info(self): while self.running == True: - self.refresh_all_devices() - await asyncio.sleep(self.device_update_interval) + self.refresh_storage_all_devices() + await asyncio.sleep(self.storage_update_interval) async def collect_events(self): while self.running == True: await self.amcrestc.collect_all_device_events() - - async def process_events(self): - while self.running == True: self.check_for_events() await asyncio.sleep(1) + async def collect_snapshots(self): + while self.running == True: + await self.amcrestc.collect_all_device_snapshots() + self.refresh_snapshot_all_devices() + await asyncio.sleep(self.snapshot_update_interval) + # main loop async def main_loop(self): await self.setup_devices() loop = asyncio.get_running_loop() tasks = [ - asyncio.create_task(self.device_loop()), + asyncio.create_task(self.collect_storage_info()), asyncio.create_task(self.collect_events()), - asyncio.create_task(self.process_events()), + asyncio.create_task(self.collect_snapshots()), ] # setup signal handling for tasks diff --git a/app.py b/app.py index 648c608..e6ac231 100644 --- a/app.py +++ b/app.py @@ -54,7 +54,8 @@ except: 'port': int(os.getenv("AMCREST_PORT") or 80), 'username': os.getenv("AMCREST_USERNAME") or "admin", 'password': os.getenv("AMCREST_PASSWORD"), - 'device_update_interval': int(os.getenv("DEVICE_UPDATE_INTERVAL") or 600), + 'storage_update_interval': int(os.getenv("STORAGE_UPDATE_INTERVAL") or 900), + 'snapshot_update_interval': int(os.getenv("SNAPSHOT_UPDATE_INTERVAL") or 300), }, 'debug': True if os.getenv('DEBUG') else False, 'hide_ts': True if os.getenv('HIDE_TS') else False, @@ -63,6 +64,9 @@ except: } config['version'] = version config['configpath'] = os.path.dirname(configpath) +if 'username' not in config['mqtt']: config['mqtt']['username'] = '' +if 'password' not in config['mqtt']: config['mqtt']['password'] = '' +if 'qos' not in config['mqtt']: config['mqtt']['qos'] = 0 if 'timezone' not in config: config['timezone'] = 'UTC' if 'debug' not in config: config['debug'] = os.getenv('DEBUG') or False @@ -91,6 +95,19 @@ if config['amcrest']['host_count'] != config['amcrest']['name_count']: exit(1) logger.info(f'Found {config["amcrest"]["host_count"]} host(s) defined to monitor') +if 'webrtc' in config['amcrest']: + webrtc = config['amcrest']['webrtc'] + if 'host' not in webrtc: + logger.error('Missing HOST in webrtc config') + exit(1) + if 'sources' not in webrtc: + logger.error('Missing SOURCES in webrtc config') + exit(1) + config['amcrest']['webrtc_sources_count'] = len(config['amcrest']['webrtc']['sources']) + if config['amcrest']['host_count'] != config['amcrest']['webrtc_sources_count']: + logger.error('The AMCREST_HOSTS and AMCREST_WEBRTC_SOURCES must have the same number of space-delimited hosts/names') + exit(1) + if config['amcrest']['password'] is None: logger.error('Please set the AMCREST_PASSWORD environment variable') exit(1) diff --git a/config.yaml.sample b/config.yaml.sample index 556b4f1..9bf7e0e 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -22,7 +22,13 @@ amcrest: port: 80 username: admin password: password - storage_poll_interval: 60 + storage_update_interval: 900 + snapshot_update_interval: 60 + webrtc: + url: http://webrtc.local.com:1984/api/frame.jpeg?src= + sources: + - FrontYard + - Patio timezone: America/New_York hide_ts: False