From 64a010b0d3482119f2625380054ea75ceb0fea50 Mon Sep 17 00:00:00 2001 From: Jeff Culverhouse Date: Sat, 8 Mar 2025 23:31:34 -0500 Subject: [PATCH] fix events; don't record event for snapshots --- README.md | 5 +- VERSION | 2 +- amcrest_api.py | 77 ++++++++++++-------- amcrest_mqtt.py | 190 +++++++++++++++++++++++++----------------------- app.py | 4 +- 5 files changed, 153 insertions(+), 125 deletions(-) diff --git a/README.md b/README.md index 9831fd5..11bb25e 100644 --- a/README.md +++ b/README.md @@ -9,14 +9,15 @@ Forked from [dchesterton/amcrest2mqtt](https://github.com/dchesterton/amcrest2mq You can define config in config.yaml and pass `-c path/to/config.yaml`. See the `config.yaml.sample` file for an example. -Or, we support the following environment variables and defaults: +Or, we support the following environment variables and defaults (though, this is becoming unwieldy): - `AMCREST_HOSTS` (required, 1+ space-separated list of hostnames/ips) - `AMCREST_NAMES` (required, 1+ space-separated list of device names - must match count of AMCREST_HOSTS) - `AMCREST_PORT` (optional, default = 80) - `AMCREST_USERNAME` (optional, default = admin) - `AMCREST_PASSWORD` (required) -- `AMCREST_WEBRTC_HOST` (optional, webrtc hostname for link, along with...) + +- `AMCREST_WEBRTC_HOST` (optional, webrtc hostname for link, but then link/sources below become required:) - `AMCREST_WEBRTC_PORT` (webrtc port, default = 1984) - `AMCREST_WEBRTC_LINK` (webrtc stream link, default = 'stream.html') - `AMCREST_WEBRTC_SOURCES` (webrtc "Source" param for each camera, same count and order of AMCREST_HOSTS above) diff --git a/VERSION b/VERSION index 0b539a7..c970ff4 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.99.20 +0.99.21 diff --git a/amcrest_api.py b/amcrest_api.py index bd4d46d..18c1431 100644 --- a/amcrest_api.py +++ b/amcrest_api.py @@ -13,9 +13,13 @@ class AmcrestAPI(object): def __init__(self, config): self.logger = logging.getLogger(__name__) - # we don't want to get the .info HTTP Request logs from Amcrest + # we don't want to get this mess of deeper-level logging logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("httpcore.http11").setLevel(logging.WARNING) + logging.getLogger("httpcore.connection").setLevel(logging.WARNING) logging.getLogger("amcrest.http").setLevel(logging.ERROR) + logging.getLogger("amcrest.event").setLevel(logging.WARNING) + logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) self.last_call_date = '' self.timezone = config['timezone'] @@ -136,8 +140,11 @@ class AmcrestAPI(object): # Events -------------------------------------------------------------------------------------- async def collect_all_device_events(self): - tasks = [self.get_events_from_device(device_id) for device_id in self.devices] - await asyncio.gather(*tasks) + try: + tasks = [self.get_events_from_device(device_id) for device_id in self.devices] + await asyncio.gather(*tasks) + except Exception as err: + self.logger.error(err, exc_info=True) async def get_events_from_device(self, device_id): try: @@ -149,34 +156,42 @@ class AmcrestAPI(object): self.reset_connection(device_id) async def process_device_event(self, device_id, code, payload): - config = self.devices[device_id]['config'] - - self.logger.debug(f'Event on {config["host"]} - {code}: {payload}') - - if ((code == "ProfileAlarmTransmit" and config["is_ad110"]) - or (code == "VideoMotion" and not config["is_ad110"])): - motion_payload = "on" if payload["action"] == "Start" else "off" - self.events.append({ 'device_id': device_id, 'event': 'motion', 'payload': motion_payload }) - elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human": - human_payload = "on" if payload["action"] == "Start" else "off" - self.events.append({ 'device_id': device_id, 'event': 'human', 'payload': human_payload }) - elif code == "_DoTalkAction_": - doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off" - self.events.append({ 'device_id': device_id, 'event': 'doorbell', 'payload': doorbell_payload }) - elif code == "NewFile": - file_payload = { 'file': payload["data"]["File"], 'size': payload["data"]["Size"] } - self.events.append({ 'device_id': device_id, 'event': 'recording', 'payload': file_payload }) - # lets ignore the event codes we don't care about (for now) - elif code == "VideoMotionInfo": - pass - elif code == "TimeChange": - pass - elif code == "NTPAdjustTime": - pass - elif code == "RtspSessionDisconnect": - pass - else: - self.events.append({ 'device_id': device_id, 'event': code , 'payload': payload['action'] }) + try: + config = self.devices[device_id]['config'] + + self.logger.debug(f'Event on {config["host"]} - {code}: {payload}') + + # VideoMotion: motion detection event + # VideoLoss: video loss detection event + # VideoBlind: video blind detection event + # AlarmLocal: alarm detection event + # StorageNotExist: storage not exist event + # StorageFailure: storage failure event + # StorageLowSpace: storage low space event + # AlarmOutput: alarm output event + # SmartMotionHuman: human detection event + # SmartMotionVehicle: vehicle detection event + + if ((code == "ProfileAlarmTransmit" and config["is_ad110"]) + or (code == "VideoMotion" and not config["is_ad110"])): + motion_payload = "on" if payload["action"] == "Start" else "off" + self.events.append({ 'device_id': device_id, 'event': 'motion', 'payload': motion_payload }) + elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human": + human_payload = "on" if payload["action"] == "Start" else "off" + self.events.append({ 'device_id': device_id, 'event': 'human', 'payload': human_payload }) + elif code == "_DoTalkAction_": + doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off" + self.events.append({ 'device_id': device_id, 'event': 'doorbell', 'payload': doorbell_payload }) + elif code == "NewFile": + # we don't care about recording events for snapshots being recorded every 1+ seconds! + if not payload["data"]["File"].endswith('.jpg'): + self.logger.info(payload["data"]) + file_payload = { 'file': payload["data"]["File"], 'size': payload["data"]["Size"] } + self.events.append({ 'device_id': device_id, 'event': 'recording', 'payload': file_payload }) + else: + self.events.append({ 'device_id': device_id, 'event': code , 'payload': payload['action'] }) + except Exception as err: + self.logger.error(err, exc_info=True) def get_next_event(self): return self.events.pop(0) if len(self.events) > 0 else None \ No newline at end of file diff --git a/amcrest_mqtt.py b/amcrest_mqtt.py index d7d33ca..657ff87 100644 --- a/amcrest_mqtt.py +++ b/amcrest_mqtt.py @@ -28,13 +28,14 @@ class AmcrestMqtt(object): self.storage_update_interval = config['amcrest'].get('storage_update_interval', 900) self.snapshot_update_interval = config['amcrest'].get('snapshot_update_interval', 300) + self.discovery_complete = False self.client_id = self.get_new_client_id() self.service_name = self.mqtt_config['prefix'] + ' service' self.service_slug = self.mqtt_config['prefix'] + '-service' - self.devices = {} self.configs = {} + self.states = {} def __enter__(self): self.mqttc_create() @@ -124,7 +125,7 @@ class AmcrestMqtt(object): # if we just started, we might get messages immediately, lets # wait up to 3 min for devices to show up before we ignore the message checks = 0 - while device_id not in self.configs: + while device_id not in self.states: checks += 1 # we'll try for 3 min, and then give up if checks > 36: @@ -132,7 +133,7 @@ class AmcrestMqtt(object): return time.sleep(5) - self.logger.info(f'Got MQTT message for: {self.configs[device_id]["device"]["name"]} - {payload}') + self.logger.info(f'Got MQTT message for: {self.states[device_id]["device"]["name"]} - {payload}') # ok, lets format the device_id (not needed) and send to amcrest self.send_command(device_id, payload) @@ -234,14 +235,14 @@ class AmcrestMqtt(object): # Service Device ------------------------------------------------------------------------------ def publish_service_state(self): - if 'service' not in self.configs: - self.configs['service'] = { + if 'service' not in self.states: + self.states['service'] = { 'availability': 'online', 'state': { 'state': 'ON' }, 'intervals': {}, } - service_states = self.configs['service'] + service_states = self.states['service'] # update states service_states['state'] = { @@ -338,16 +339,16 @@ class AmcrestMqtt(object): if 'device_type' in config: first = False - if device_id not in self.devices: + if device_id not in self.configs: first = True - self.devices[device_id] = {} - self.configs[device_id] = config - self.devices[device_id]['qos'] = self.mqtt_config['qos'] - self.devices[device_id]['state_topic'] = self.get_discovery_topic(device_id, 'state') - self.devices[device_id]['availability_topic'] = self.get_discovery_topic('service', 'availability') - self.devices[device_id]['command_topic'] = self.get_discovery_topic(device_id, 'set') - - self.devices[device_id]['device'] = { + self.configs[device_id] = {} + self.states[device_id] = config + self.configs[device_id]['qos'] = self.mqtt_config['qos'] + self.configs[device_id]['state_topic'] = self.get_discovery_topic(device_id, 'state') + self.configs[device_id]['availability_topic'] = self.get_discovery_topic('service', 'availability') + self.configs[device_id]['command_topic'] = self.get_discovery_topic(device_id, 'set') + + self.configs[device_id]['device'] = { 'name': config['device_name'], 'manufacturer': config['vendor'], 'model': config['device_type'], @@ -362,39 +363,46 @@ class AmcrestMqtt(object): 'configuration_url': 'http://' + config['host'] + '/', 'via_device': self.service_slug, } - self.devices[device_id]['origin'] = { + self.configs[device_id]['origin'] = { 'name': self.service_name, 'sw_version': self.version, 'support_url': 'https://github.com/weirdtangent/amcrest2mqtt', } + + # setup initial satte + self.states[device_id]['state'] = { + 'state': 'ON', + 'last_update': None, + 'host': config['host'], + 'serial_number': config['serial_number'], + 'sw_version': config['software_version'], + } + self.add_components_to_device(device_id) if first: self.logger.info(f'Adding device: "{config['device_name']}" [Amcrest {config["device_type"]}] ({device_id})') self.publish_device_discovery(device_id) else: - self.logger.debug(f'Updated device: {self.devices[device_id]['device']['name']}') + self.logger.debug(f'Updated device: {self.configs[device_id]['device']['name']}') - # device discovery sent, now it is save to add these to the - # dict (so they aren't included in device discovery object itself) - self.devices[device_id]['state'] = { - 'status': 'online', - 'host': config['host'], - 'serial_number': config['serial_number'], - 'sw_version': config['software_version'], - } else: if first_time_through: self.logger.info(f'Saw device, but not supported yet: "{config["device_name"]}" [amcrest {config["device_type"]}] ({device_id})') + # lets log our first time through and then release the hounds + if not self.discovery_complete: + self.logger.info('Device setup and discovery is done') + self.discovery_complete = True + # add amcrest components to devices def add_components_to_device(self, device_id): - device = self.devices[device_id] - config = self.configs[device_id] + device_config = self.configs[device_id] + device_states = self.states[device_id] components = {} - if config['is_doorbell']: - doorbell_name = 'Doorbell' if config['device_name'] == 'Doorbell' else f'{config["device_name"]} Doorbell' + if device_states['is_doorbell']: + doorbell_name = 'Doorbell' if device_states['device_name'] == 'Doorbell' else f'{device_states["device_name"]} Doorbell' components[self.get_slug(device_id, 'doorbell')] = { 'name': doorbell_name, 'platform': 'binary_sensor', @@ -406,14 +414,9 @@ class AmcrestMqtt(object): 'value_template': '{{ value_json.doorbell }}', 'unique_id': self.get_slug(device_id, 'doorbell'), } - self.mqttc.will_set( - self.get_discovery_topic(device_id,'doorbell'), - payload=None, - qos=self.mqtt_config['qos'], - retain=True - ) + device_states['doorbell'] = {} - if config['is_ad410']: + if device_states['is_ad410']: components[self.get_slug(device_id, 'human')] = { 'name': 'Human', 'platform': 'binary_sensor', @@ -424,19 +427,14 @@ class AmcrestMqtt(object): 'value_template': '{{ value_json.human }}', 'unique_id': self.get_slug(device_id, 'human'), } - self.mqttc.will_set( - self.get_discovery_topic(device_id,'human'), - payload=None, - qos=self.mqtt_config['qos'], - retain=True - ) + device_states['human'] = {} components[self.get_slug(device_id, 'camera')] = { 'name': 'Camera', 'platform': 'camera', 'topic': self.get_discovery_subtopic(device_id, 'camera','snapshot'), 'image_encoding': 'b64', - 'state_topic': device['state_topic'], + 'state_topic': device_config['state_topic'], 'value_template': '{{ value_json.state }}', 'unique_id': self.get_slug(device_id, 'camera'), } @@ -447,7 +445,8 @@ class AmcrestMqtt(object): rtc_link = webrtc_config['link'] rtc_source = webrtc_config['sources'].pop(0) rtc_url = f'http://{rtc_host}:{rtc_port}/{rtc_link}?src={rtc_source}' - device['device']['configuration_url'] = rtc_url + device_config['device']['configuration_url'] = rtc_url + device_states['camera'] = {'snapshot': None} components[self.get_slug(device_id, 'motion')] = { 'name': 'Motion', @@ -458,12 +457,13 @@ class AmcrestMqtt(object): 'state_topic': self.get_discovery_topic(device_id, 'motion'), 'unique_id': self.get_slug(device_id, 'motion'), } + device_states['motion'] = {} components[self.get_slug(device_id, 'version')] = { 'name': 'Version', 'platform': 'sensor', 'icon': 'mdi:package-up', - 'state_topic': device['state_topic'], + 'state_topic': device_config['state_topic'], 'value_template': '{{ value_json.sw_version }}', 'entity_category': 'diagnostic', 'unique_id': self.get_slug(device_id, 'sw_version'), @@ -473,7 +473,7 @@ class AmcrestMqtt(object): 'name': 'Serial Number', 'platform': 'sensor', 'icon': 'mdi:identifier', - 'state_topic': device['state_topic'], + 'state_topic': device_config['state_topic'], 'value_template': '{{ value_json.serial_number }}', 'entity_category': 'diagnostic', 'unique_id': self.get_slug(device_id, 'serial_number'), @@ -483,7 +483,7 @@ class AmcrestMqtt(object): 'name': 'Host', 'platform': 'sensor', 'icon': 'mdi:ip-network', - 'state_topic': device['state_topic'], + 'state_topic': device_config['state_topic'], 'value_template': '{{ value_json.host }}', 'entity_category': 'diagnostic', 'unique_id': self.get_slug(device_id, 'host'), @@ -495,6 +495,8 @@ class AmcrestMqtt(object): 'state_topic': self.get_discovery_topic(device_id, 'event'), 'unique_id': self.get_slug(device_id, 'event'), } + device_states['event'] = {} + device_states['recording'] = {} components[self.get_slug(device_id, 'storage_used_percent')] = { 'name': 'Storage Used %', @@ -523,61 +525,51 @@ class AmcrestMqtt(object): 'value_template': '{{ value_json.used }}', 'unique_id': self.get_slug(device_id, 'storage_used'), } + device_states['storage'] = {} + components[self.get_slug(device_id, 'last_update')] = { 'name': 'Last Update', 'platform': 'sensor', 'device_class': 'timestamp', 'entity_category': 'diagnostic', - 'state_topic': device['state_topic'], + 'state_topic': device_config['state_topic'], 'value_template': '{{ value_json.last_update }}', 'unique_id': self.get_slug(device_id, 'last_update'), } - device['components'] = components + device_config['components'] = components def publish_device_state(self, device_id): - device = self.devices[device_id] - config = self.configs[device_id] + device_states = self.states[device_id] for topic in ['state','storage','motion','human','doorbell','event','recording']: - if topic in device: - payload = json.dumps(device[topic]) if isinstance(device[topic], dict) else device[topic] + if topic in device_states: + payload = json.dumps(device_states[topic]) if isinstance(device_states[topic], dict) else device_states[topic] self.mqttc.publish(self.get_discovery_topic(device_id, topic), payload, qos=self.mqtt_config['qos'], retain=True) - if 'snapshot' in device['camera'] and device['camera']['snapshot'] is not None: - self.logger.info(f'Refreshing snapshot for {config["device_name"]}') - payload = device['camera']['snapshot'] + if 'snapshot' in device_states['camera'] and device_states['camera']['snapshot'] is not None: + payload = device_states['camera']['snapshot'] result = self.mqttc.publish(self.get_discovery_subtopic(device_id, 'camera','snapshot'), payload, qos=self.mqtt_config['qos'], retain=True) def publish_device_discovery(self, device_id): - device = self.devices[device_id] - self.mqttc.publish( - self.get_discovery_topic(device_id, 'config'), - json.dumps(device), - qos=self.mqtt_config['qos'], - retain=True - ) + device_config = self.configs[device_id] + payload = json.dumps(device_config) - # setup initial states - device['state'] = { 'state': 'ON' } - device['motion'] = 'off' - device['camera'] = { 'snapshot': None } + self.mqttc.publish(self.get_discovery_topic(device_id, 'config'), payload, qos=self.mqtt_config['qos'], retain=True) # refresh * all devices ----------------------------------------------------------------------- def refresh_storage_all_devices(self): self.logger.info(f'Refreshing storage info for all devices (every {self.storage_update_interval} sec)') - for device_id in self.devices: + for device_id in self.configs: if not self.running: break - - device = self.devices[device_id] - config = self.configs[device_id] + device_states = self.states[device_id] # get the storage info, pull out last_update and save that to the device state storage = self.amcrestc.get_device_storage_stats(device_id) - device['state']['last_update'] = storage.pop('last_update', None) - device['storage'] = storage + device_states['state']['last_update'] = storage.pop('last_update', None) + device_states['storage'] = storage self.publish_service_state() self.publish_device_state(device_id) @@ -585,24 +577,27 @@ class AmcrestMqtt(object): def refresh_snapshot_all_devices(self): self.logger.info(f'Collecting snapshots for all devices (every {self.snapshot_update_interval} sec)') - for device_id in self.devices: + for device_id in self.configs: if not self.running: break + self.refresh_snapshot(device_id) - device = self.devices[device_id] - config = self.configs[device_id] - - device['camera']['snapshot'] = self.amcrestc.get_snapshot(device_id) + def refresh_snapshot(self, device_id): + device_states = self.states[device_id] + image = self.amcrestc.get_snapshot(device_id) + # only store and send to MQTT if the image has changed + if device_states['camera']['snapshot'] is None or device_states['camera']['snapshot'] != image: + device_states['camera']['snapshot'] = image self.publish_service_state() self.publish_device_state(device_id) # send command to Amcrest -------------------------------------------------------------------- def send_command(self, device_id, data): - device = self.devices[device_id] - config = self.configs[device_id] + device_config = self.configs[device_id] + device_states = self.states[device_id] - self.logger.info(f'COMMAND {config["device_name"]} = {data}') + self.logger.info(f'COMMAND {device_states["device_name"]} = {data}') if data == 'PRESS': pass @@ -623,28 +618,38 @@ class AmcrestMqtt(object): self.publish_service_state() - # check for events ---------------------------------------------------------------------------- + # collect events and then check queue of events ----------------------------------------------- + + async def collect_all_device_events(self): + await self.amcrestc.collect_all_device_events() def check_for_events(self): while device_event := self.amcrestc.get_next_event(): + if device_event is None: + break if 'device_id' not in device_event: self.logger(f'Got event, but missing device_id: {device_event}') continue + device_id = device_event['device_id'] event = device_event['event'] payload = device_event['payload'] - device = self.devices[device_id] - config = self.configs[device_id] + + device_states = self.states[device_id] # if one of our known sensors if event in ['motion','human','doorbell','recording']: - self.logger.info(f'Got event for {config["device_name"]}: {event}') - device[event] = payload + self.logger.info(f'Got event for {device_states["device_name"]}: {event}') + device_states[event] = payload + + # any of these could mean a new snapshot is available early, lets try to grab it + self.logger.debug(f'Refreshing snapshot for "{device_states["device_name"]}" early because of event') + self.refresh_snapshot(device_id) else: - self.logger.debug(f'Got "other" event for {config["device_name"]}: {event} {payload}') - device['event'] = event + self.logger.info(f'Got "other" event for "{device_states["device_name"]}": {event} {payload}') + device_states['event'] = event - self.refresh_device(device_id) + self.publish_device_state(device_id) # async loops and main loop ------------------------------------------------------------------- @@ -662,7 +667,11 @@ class AmcrestMqtt(object): async def collect_events(self): while self.running == True: - await self.amcrestc.collect_all_device_events() + await self.collect_all_device_events() + await asyncio.sleep(1) + + async def check_event_queue(self): + while self.running == True: self.check_for_events() await asyncio.sleep(1) @@ -680,6 +689,7 @@ class AmcrestMqtt(object): tasks = [ asyncio.create_task(self.collect_storage_info()), asyncio.create_task(self.collect_events()), + asyncio.create_task(self.check_event_queue()), asyncio.create_task(self.collect_snapshots()), ] diff --git a/app.py b/app.py index 2187149..bc1bc73 100644 --- a/app.py +++ b/app.py @@ -79,7 +79,7 @@ if 'debug' not in config: config['debug'] = os.getenv('DEBUG') or False logging.basicConfig( format = '%(asctime)s.%(msecs)03d [%(levelname)s] %(name)s: %(message)s' if config['hide_ts'] == False else '[%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', - level=logging.INFO + level=logging.INFO if config['debug'] == False else logging.DEBUG ) logger = logging.getLogger(__name__) logger.info(f'Starting: amcrest2mqtt v{version}') @@ -120,6 +120,8 @@ if config['amcrest']['password'] is None: logger.error('Please set the AMCREST_PASSWORD environment variable') exit(1) +logger.debug("DEBUG logging is ON") + # Go! with AmcrestMqtt(config) as mqtt: asyncio.run(mqtt.main_loop()) \ No newline at end of file