From 3ff6bd9d36ea227962ef5ced6179631b237caeea Mon Sep 17 00:00:00 2001 From: Jeff Culverhouse Date: Sun, 9 Mar 2025 20:40:44 -0400 Subject: [PATCH] add privacy mode; working on better event-snapshot --- VERSION | 2 +- amcrest_api.py | 221 +++++++++++++++++++++++++++++++----------------- amcrest_mqtt.py | 148 ++++++++++++++++++++++++-------- app.py | 7 ++ util.py | 7 ++ 5 files changed, 274 insertions(+), 111 deletions(-) diff --git a/VERSION b/VERSION index c970ff4..da45b88 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.99.21 +0.99.22 diff --git a/amcrest_api.py b/amcrest_api.py index f77a568..1e79f24 100644 --- a/amcrest_api.py +++ b/amcrest_api.py @@ -1,10 +1,18 @@ -from amcrest import AmcrestCamera, AmcrestError +# This software is licensed under the MIT License, which allows you to use, +# copy, modify, merge, publish, distribute, and sell copies of the software, +# with the requirement to include the original copyright notice and this +# permission notice in all copies or substantial portions of the software. +# +# The software is provided 'as is', without any warranty. + +from amcrest import AmcrestCamera, AmcrestError, CommError, LoginError, exceptions import asyncio from asyncio import timeout import base64 from datetime import datetime import httpx import logging +import os import time from util import * from zoneinfo import ZoneInfo @@ -32,85 +40,86 @@ class AmcrestAPI(object): async def connect_to_devices(self): self.logger.info(f'Connecting to: {self.amcrest_config["hosts"]}') - tasks = [] - device_names = self.amcrest_config['names'] + tasks = [] for host in self.amcrest_config['hosts']: - task = asyncio.create_task(self.get_device(host, device_names.pop(0))) + device_name = self.amcrest_config['names'].pop(0) + task = asyncio.create_task(self.get_device(host, device_name)) tasks.append(task) await asyncio.gather(*tasks, return_exceptions=True) + if len(self.devices) == 0: + self.logger.error('Failed to connect to all devices, exiting') + exit(1) + # return just the config of each device, not the camera object return {d: self.devices[d]['config'] for d in self.devices.keys()} - def reset_connection(self, device_id): - device = self.devices[device_id] - device['camera'] = self.get_camera(device['config']['host']) - def get_camera(self, host): - return AmcrestCamera( - host, - self.amcrest_config['port'], - self.amcrest_config['username'], - self.amcrest_config['password'], - verbose=False, - ).camera + config = self.amcrest_config + return AmcrestCamera(host, config['port'], config['username'], config['password'], verbose=False).camera async def get_device(self, host, device_name): - camera = self.get_camera(host) - try: - device_type = camera.device_type.replace("type=", "").strip() - is_ad110 = device_type == "AD110" - is_ad410 = device_type == "AD410" + camera = self.get_camera(host) + + device_type = camera.device_type.replace('type=', '').strip() + is_ad110 = device_type == 'AD110' + is_ad410 = device_type == 'AD410' is_doorbell = is_ad110 or is_ad410 - serial_number = camera.serial_number + serial_number = camera.serial_number if not isinstance(serial_number, str): - raise Exception(f'Error fetching serial number for {host}: {error}') + self.logger.error(f'Error fetching serial number for {host}: {camera.serial_number}') + exit(1) - sw_version = camera.software_information[0].replace("version=", "").strip() - build_version = camera.software_information[1].strip() - sw_version = f"{sw_version} ({build_version})" + version = camera.software_information[0].replace('version=', '').strip() + build = camera.software_information[1].strip() + sw_version = f'{version} ({build})' network_config = dict(item.split('=') for item in camera.network_config.splitlines()) interface = network_config['table.Network.DefaultInterface'] ip_address = network_config[f'table.Network.{interface}.IPAddress'] mac_address = network_config[f'table.Network.{interface}.PhysicalAddress'].upper() - except AmcrestError as error: - raise Exception(f'Error fetching camera details for {host}: {error}') - - self.devices[serial_number] = { - "camera": camera, - "config": { - "host": host, - "device_name": device_name, - "device_type": device_type, - "device_class": camera.device_class, - "is_ad110": is_ad110, - "is_ad410": is_ad410, - "is_doorbell": is_doorbell, - "serial_number": serial_number, - "software_version": sw_version, - "hardware_version": camera.hardware_version, - "vendor": camera.vendor_information, - "network": { - "interface": interface, - "ip_address": ip_address, - "mac": mac_address, - } - }, - } + action = 'Connected' if camera.serial_number not in self.devices else 'Reconnected' + self.logger.info(f'{action} to {host} as {camera.serial_number}') + + self.devices[serial_number] = { + 'camera': camera, + 'config': { + 'host': host, + 'device_name': device_name, + 'device_type': device_type, + 'device_class': camera.device_class, + 'is_ad110': is_ad110, + 'is_ad410': is_ad410, + 'is_doorbell': is_doorbell, + 'serial_number': serial_number, + 'software_version': sw_version, + 'hardware_version': camera.hardware_version, + 'vendor': camera.vendor_information, + 'network': { + 'interface': interface, + 'ip_address': ip_address, + 'mac': mac_address, + } + }, + } + self.get_privacy_mode(serial_number) + + except LoginError as err: + self.logger.error(f'Invalid username/password to connect to device "{host}", fix in config.yaml') + except AmcrestError as err: + self.logger.error(f'Failed to connect to device "{host}", check config.yaml and restart to try again: {err}') # Storage stats ------------------------------------------------------------------------------- - def get_device_storage_stats(self, device_id): + def get_storage_stats(self, device_id): try: storage = self.devices[device_id]["camera"].storage_all - except Exception as err: - self.logger.error(f'Problem connecting with camera to get storage stats: {err}') - return {} + except CommError as err: + self.logger.error(f'Failed to communicate with device ({device_id}): No SD card?') return { 'last_update': str(datetime.now(ZoneInfo(self.timezone))), @@ -119,6 +128,28 @@ class AmcrestAPI(object): 'total': to_gb(storage['total']), } + # Privacy config ------------------------------------------------------------------------------ + + def get_privacy_mode(self, device_id): + device = self.devices[device_id] + + try: + privacy = device["camera"].privacy_config().split() + privacy_mode = True if privacy[0].split('=')[1] == 'true' else False + device['privacy_mode'] = privacy_mode + + return privacy_mode + except CommError as err: + self.logger.error(f'Failed to communicate with device ({device_id}): {err}') + + def set_privacy_mode(self, device_id, switch): + device = self.devices[device_id] + + try: + return device["camera"].set_privacy(switch).strip() + except CommError as err: + self.logger.error(f'Failed to communicate with device ({device_id})') + # Snapshots ----------------------------------------------------------------------------------- async def collect_all_device_snapshots(self): @@ -126,17 +157,37 @@ class AmcrestAPI(object): await asyncio.gather(*tasks) async def get_snapshot_from_device(self, device_id): + device = self.devices[device_id] + try: - image = await self.devices[device_id]["camera"].async_snapshot() - self.devices[device_id]['snapshot'] = base64.b64encode(image) - self.logger.debug(f'Processed snapshot from ({device_id}) {len(image)} bytes raw, and {len(self.devices[device_id]['snapshot'])} bytes base64') - except Exception as err: - self.logger.error(f'Failed to get snapshot from device ({device_id})') - pass + if 'privacy_mode' not in device or device['privacy_mode'] == False: + image = await device["camera"].async_snapshot() + device['snapshot'] = base64.b64encode(image) + self.logger.debug(f'Processed snapshot from ({device_id}) {len(image)} bytes raw, and {len(device['snapshot'])} bytes base64') + else: + self.logger.info(f'Skipped snapshot from ({device_id}) because "privacy mode" is ON') + except CommError as err: + self.logger.error(f'Failed to communicate with device ({device_id}), maybe "Privacy Mode" is on? {err}') def get_snapshot(self, device_id): return self.devices[device_id]['snapshot'] if 'snapshot' in self.devices[device_id] else None + # Recorded file ------------------------------------------------------------------------------- + def get_recorded_file(self, device_id, file): + device = self.devices[device_id] + + data_raw = device["camera"].download_file(file) + if data_raw: + data_base64 = base64.b64encode(data_raw) + self.logger.debug(f'Processed recording from ({device_id}) {len(data_raw)} bytes raw, and {len(data_base64)} bytes base64') + if data_base64 < 100 * 1024 * 1024 * 1024: + return data_base64 + else: + self.logger.error(f'Processed recording is too large') + return None + + return None + # Events -------------------------------------------------------------------------------------- async def collect_all_device_events(self): @@ -147,19 +198,22 @@ class AmcrestAPI(object): self.logger.error(err, exc_info=True) async def get_events_from_device(self, device_id): + device = self.devices[device_id] + try: - async for code, payload in self.devices[device_id]["camera"].async_event_actions("All"): + async for code, payload in device["camera"].async_event_actions("All"): await self.process_device_event(device_id, code, payload) + except CommError as err: + self.logger.error(f'Failed to communicate with device ({device_id}): {err}') except Exception as err: - self.logger.error(f'Failed to get events from device ({device_id}), sleeping 60 sec: {err}') - await asyncio.sleep(60) - self.reset_connection(device_id) + self.logger.error(f'generic Failed to get events from device({device_id}: {err}', exc_info=True) async def process_device_event(self, device_id, code, payload): try: - config = self.devices[device_id]['config'] + device = self.devices[device_id] + config = device['config'] - self.logger.debug(f'Event on {config["host"]} - {code}: {payload}') + # self.logger.debug(f'Event on {device_id} - {code}: {payload}') # VideoMotion: motion detection event # VideoLoss: video loss detection event @@ -172,23 +226,36 @@ class AmcrestAPI(object): # SmartMotionHuman: human detection event # SmartMotionVehicle: vehicle detection event - if ((code == "ProfileAlarmTransmit" and config["is_ad110"]) - or (code == "VideoMotion" and not config["is_ad110"])): - motion_payload = "on" if payload["action"] == "Start" else "off" + if ((code == 'ProfileAlarmTransmit' and config['is_ad110']) + or (code == 'VideoMotion' and not config['is_ad110'])): + motion_payload = 'on' if payload['action'] == 'Start' else 'off' self.events.append({ 'device_id': device_id, 'event': 'motion', 'payload': motion_payload }) - elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human": - human_payload = "on" if payload["action"] == "Start" else "off" + elif code == 'CrossRegionDetection' and payload['data']['ObjectType'] == 'Human': + human_payload = 'on' if payload['action'] == 'Start' else 'off' self.events.append({ 'device_id': device_id, 'event': 'human', 'payload': human_payload }) - elif code == "_DoTalkAction_": - doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off" + elif code == '_DoTalkAction_': + doorbell_payload = 'on' if payload['data']['Action'] == 'Invite' else 'off' self.events.append({ 'device_id': device_id, 'event': 'doorbell', 'payload': doorbell_payload }) - elif code == "NewFile": - # we don't care about recording events for snapshots being recorded every 1+ seconds! - if not payload["data"]["File"].endswith('.jpg'): - file_payload = { 'file': payload["data"]["File"], 'size': payload["data"]["Size"] } + elif code == 'NewFile': + # we don't care about recording events for general (non-saved) snapshots + if not payload['data']['StoragePoint'] == 'NULL': + file_payload = { 'file': payload['data']['File'], 'size': payload['data']['Size'], 'event': payload['data']['Event'] } self.events.append({ 'device_id': device_id, 'event': 'recording', 'payload': file_payload }) + elif code == 'LensMaskOpen': + device['privacy_mode'] = True + self.events.append({ 'device_id': device_id, 'event': 'privacy_mode', 'payload': 'on' }) + elif code == 'LensMaskClose': + device['privacy_mode'] = False + self.events.append({ 'device_id': device_id, 'event': 'privacy_mode', 'payload': 'off' }) + # lets just ignore these + elif code == 'InterVideoAccess': # I think this is US, accessing the API of the camera, lets not inception! + pass + elif code == 'VideoMotionInfo': + pass + # save everything else as a 'generic' event else: - self.events.append({ 'device_id': device_id, 'event': code , 'payload': payload['action'] }) + self.events.append({ 'device_id': device_id, 'event': code , 'payload': payload }) + except Exception as err: self.logger.error(err, exc_info=True) diff --git a/amcrest_mqtt.py b/amcrest_mqtt.py index 657ff87..a212aa8 100644 --- a/amcrest_mqtt.py +++ b/amcrest_mqtt.py @@ -1,3 +1,10 @@ +# This software is licensed under the MIT License, which allows you to use, +# copy, modify, merge, publish, distribute, and sell copies of the software, +# with the requirement to include the original copyright notice and this +# permission notice in all copies or substantial portions of the software. +# +# The software is provided 'as is', without any warranty. + import asyncio from datetime import datetime import amcrest_api @@ -106,6 +113,7 @@ class AmcrestMqtt(object): vendor, device_id = components[-2].split('-') elif components[-2] == 'set': vendor, device_id = components[-3].split('-') + attribute = components[-1] else: self.logger.error(f'UNKNOWN MQTT MESSAGE STRUCTURE: {topic}') return @@ -133,7 +141,7 @@ class AmcrestMqtt(object): return time.sleep(5) - self.logger.info(f'Got MQTT message for: {self.states[device_id]["device"]["name"]} - {payload}') + self.logger.info(f'Got MQTT message for: {self.get_device_name(device_id)} - {payload}') # ok, lets format the device_id (not needed) and send to amcrest self.send_command(device_id, payload) @@ -191,6 +199,11 @@ class AmcrestMqtt(object): def get_new_client_id(self): return self.mqtt_config['prefix'] + '-' + ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) + def get_device_name(self, device_id): + if device_id not in self.configs or 'device' not in self.configs[device_id] or 'name' not in self.configs[device_id]['device']: + return f'<{device_id}>' + return self.configs[device_id]['device']['name'] + def get_slug(self, device_id, type): return f"amcrest_{device_id.replace(':','')}_{type}" @@ -331,7 +344,6 @@ class AmcrestMqtt(object): self.logger.info(f'Setup devices') devices = await self.amcrestc.connect_to_devices() - self.logger.info(f'Connected to: {list(devices.keys())}') self.publish_service_device() for device_id in devices: @@ -427,16 +439,16 @@ class AmcrestMqtt(object): 'value_template': '{{ value_json.human }}', 'unique_id': self.get_slug(device_id, 'human'), } - device_states['human'] = {} + device_states['human'] = 'off' - components[self.get_slug(device_id, 'camera')] = { - 'name': 'Camera', + components[self.get_slug(device_id, 'snapshot_camera')] = { + 'name': 'Latest Snapshot', 'platform': 'camera', 'topic': self.get_discovery_subtopic(device_id, 'camera','snapshot'), 'image_encoding': 'b64', 'state_topic': device_config['state_topic'], 'value_template': '{{ value_json.state }}', - 'unique_id': self.get_slug(device_id, 'camera'), + 'unique_id': self.get_slug(device_id, 'snapshot_camera'), } if 'webrtc' in self.amcrest_config: webrtc_config = self.amcrest_config['webrtc'] @@ -446,7 +458,28 @@ class AmcrestMqtt(object): rtc_source = webrtc_config['sources'].pop(0) rtc_url = f'http://{rtc_host}:{rtc_port}/{rtc_link}?src={rtc_source}' device_config['device']['configuration_url'] = rtc_url - device_states['camera'] = {'snapshot': None} + + # copy the snapshot camera for the eventshot camera, with a couple of changes + components[self.get_slug(device_id, 'event_camera')] = \ + components[self.get_slug(device_id, 'snapshot_camera')] | { + 'name': 'Motion Snapshot', + 'topic': self.get_discovery_subtopic(device_id, 'camera','eventshot'), + 'unique_id': self.get_slug(device_id, 'eventshot_camera'), + } + device_states['camera'] = {'snapshot': None, 'eventshot': None} + + components[self.get_slug(device_id, 'privacy_mode')] = { + 'name': 'Privacy mode', + 'platform': 'switch', + 'payload_on': 'on', + 'payload_off': 'off', + 'device_class': 'switch', + 'icon': 'mdi:camera-off', + 'state_topic': self.get_discovery_topic(device_id, 'privacy_mode'), + 'command_topic': self.get_command_topic(device_id, 'privacy_mode'), + 'unique_id': self.get_slug(device_id, 'privacy_mode'), + } + device_states['privacy_mode'] = None components[self.get_slug(device_id, 'motion')] = { 'name': 'Motion', @@ -457,7 +490,7 @@ class AmcrestMqtt(object): 'state_topic': self.get_discovery_topic(device_id, 'motion'), 'unique_id': self.get_slug(device_id, 'motion'), } - device_states['motion'] = {} + device_states['motion'] = 'off' components[self.get_slug(device_id, 'version')] = { 'name': 'Version', @@ -490,12 +523,12 @@ class AmcrestMqtt(object): } components[self.get_slug(device_id, 'event')] = { - 'name': 'Event', + 'name': 'Last Non-motion Event', 'platform': 'sensor', 'state_topic': self.get_discovery_topic(device_id, 'event'), 'unique_id': self.get_slug(device_id, 'event'), } - device_states['event'] = {} + device_states['event'] = None device_states['recording'] = {} components[self.get_slug(device_id, 'storage_used_percent')] = { @@ -542,14 +575,15 @@ class AmcrestMqtt(object): def publish_device_state(self, device_id): device_states = self.states[device_id] - for topic in ['state','storage','motion','human','doorbell','event','recording']: + for topic in ['state','storage','motion','human','doorbell','event','recording','privacy_mode']: if topic in device_states: payload = json.dumps(device_states[topic]) if isinstance(device_states[topic], dict) else device_states[topic] self.mqttc.publish(self.get_discovery_topic(device_id, topic), payload, qos=self.mqtt_config['qos'], retain=True) - if 'snapshot' in device_states['camera'] and device_states['camera']['snapshot'] is not None: - payload = device_states['camera']['snapshot'] - result = self.mqttc.publish(self.get_discovery_subtopic(device_id, 'camera','snapshot'), payload, qos=self.mqtt_config['qos'], retain=True) + for shot_type in ['snapshot','eventshot']: + if shot_type in device_states['camera'] and device_states['camera'][shot_type] is not None: + payload = device_states['camera'][shot_type] + result = self.mqttc.publish(self.get_discovery_subtopic(device_id, 'camera',shot_type), payload, qos=self.mqtt_config['qos'], retain=True) def publish_device_discovery(self, device_id): device_config = self.configs[device_id] @@ -566,28 +600,56 @@ class AmcrestMqtt(object): if not self.running: break device_states = self.states[device_id] + # update the privacy mode setting + # we don't need to verify this often since events should let us know + privacy_mode = self.amcrestc.get_privacy_mode(device_id) + if privacy_mode is not None: + device_states['privacy_mode'] = 'on' if privacy_mode == True else 'off' + # get the storage info, pull out last_update and save that to the device state - storage = self.amcrestc.get_device_storage_stats(device_id) - device_states['state']['last_update'] = storage.pop('last_update', None) - device_states['storage'] = storage + storage = self.amcrestc.get_storage_stats(device_id) + if storage is not None: + device_states['state']['last_update'] = storage.pop('last_update', None) + device_states['storage'] = storage - self.publish_service_state() - self.publish_device_state(device_id) + self.publish_service_state() + self.publish_device_state(device_id) def refresh_snapshot_all_devices(self): self.logger.info(f'Collecting snapshots for all devices (every {self.snapshot_update_interval} sec)') for device_id in self.configs: if not self.running: break - self.refresh_snapshot(device_id) + self.refresh_snapshot(device_id,'snapshot') - def refresh_snapshot(self, device_id): + # type is 'snapshot' for normal, or 'eventshot' for capturing an image immediately after a "movement" event + def refresh_snapshot(self, device_id, type): device_states = self.states[device_id] + image = self.amcrestc.get_snapshot(device_id) + if image is None: + return + # only store and send to MQTT if the image has changed - if device_states['camera']['snapshot'] is None or device_states['camera']['snapshot'] != image: - device_states['camera']['snapshot'] = image + if device_states['camera'][type] is None or device_states['camera'][type] != image: + device_states['camera'][type] = image + self.publish_service_state() + self.publish_device_state(device_id) + + def get_recorded_file(self, device_id, file, type): + device_states = self.states[device_id] + + self.logger.info(f'Getting recorded file {file}') + image = self.amcrestc.get_recorded_file(device_id, file) + self.logger.info(f'Got back base64 image of {len(image)} bytes') + + if image is None: + return + + # only store and send to MQTT if the image has changed + if device_states['camera'][type] is None or device_states['camera'][type] != image: + device_states['camera'][type] = image self.publish_service_state() self.publish_device_state(device_id) @@ -597,10 +659,21 @@ class AmcrestMqtt(object): device_config = self.configs[device_id] device_states = self.states[device_id] - self.logger.info(f'COMMAND {device_states["device_name"]} = {data}') - if data == 'PRESS': + self.logger.info(f'We got a PRESS command for {self.get_device_name(device_id)}') pass + elif 'privacy_mode' in data: + set_privacy_to = False if data['privacy_mode'] == 'off' else True + self.logger.info(f'Setting PRIVACY_MODE to {set_privacy_to} for {self.get_device_name(device_id)}') + + response = self.amcrestc.set_privacy_mode(device_id, set_privacy_to) + + # if Amcrest device was good with that command, lets update state and then MQTT + if response == 'OK': + device_states['privacy_mode'] = data['privacy_mode'] + self.publish_device_state(device_id) + else: + self.logger.error(f'Setting PRIVACY_MODE failed: {repr(response)}') else: self.logger.error(f'We got a command ({data}), but do not know what to do') @@ -638,15 +711,24 @@ class AmcrestMqtt(object): device_states = self.states[device_id] # if one of our known sensors - if event in ['motion','human','doorbell','recording']: - self.logger.info(f'Got event for {device_states["device_name"]}: {event}') - device_states[event] = payload - - # any of these could mean a new snapshot is available early, lets try to grab it - self.logger.debug(f'Refreshing snapshot for "{device_states["device_name"]}" early because of event') - self.refresh_snapshot(device_id) + if event in ['motion','human','doorbell','recording','privacy_mode']: + if event == 'recording' and payload['file'].endswith('.jpg'): + self.logger.info(f'{event} - {payload}') + self.get_recorded_file(device_id, payload['file'], 'eventshot') + else: + # only log details if not a recording + if event != 'recording': + self.logger.info(f'Got event for {self.get_device_name(device_id)}: {event} - {payload}') + device_states[event] = payload + + # other ways to infer "privacy mode" is off and needs updating + if event in ['motion','human','doorbell'] and device_states['privacy_mode'] == 'on': + device_states['privacy_mode'] = 'off' + # these, we don't are to log + elif event in ['TimeChange','NTPAdjustTime','RtspSessionDisconnect']: + pass else: - self.logger.info(f'Got "other" event for "{device_states["device_name"]}": {event} {payload}') + self.logger.info(f'Got "other" event for "{self.get_device_name(device_id)}": {event} - {payload}') device_states['event'] = event self.publish_device_state(device_id) diff --git a/app.py b/app.py index bc1bc73..544d2f8 100644 --- a/app.py +++ b/app.py @@ -1,3 +1,10 @@ +# This software is licensed under the MIT License, which allows you to use, +# copy, modify, merge, publish, distribute, and sell copies of the software, +# with the requirement to include the original copyright notice and this +# permission notice in all copies or substantial portions of the software. +# +# The software is provided 'as is', without any warranty. + import asyncio import argparse from amcrest_mqtt import AmcrestMqtt diff --git a/util.py b/util.py index 18bd476..a7fbe1c 100644 --- a/util.py +++ b/util.py @@ -1,3 +1,10 @@ +# This software is licensed under the MIT License, which allows you to use, +# copy, modify, merge, publish, distribute, and sell copies of the software, +# with the requirement to include the original copyright notice and this +# permission notice in all copies or substantial portions of the software. +# +# The software is provided 'as is', without any warranty. + import os # Helper functions and callbacks