You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
amcrest2mqtt/amcrest_api.py

263 lines
12 KiB
Python

# This software is licensed under the MIT License, which allows you to use,
# copy, modify, merge, publish, distribute, and sell copies of the software,
# with the requirement to include the original copyright notice and this
# permission notice in all copies or substantial portions of the software.
#
# The software is provided 'as is', without any warranty.
from amcrest import AmcrestCamera, AmcrestError, CommError, LoginError, exceptions
import asyncio
from asyncio import timeout
import base64
from datetime import datetime
import httpx
import logging
import os
import time
from util import *
from zoneinfo import ZoneInfo
class AmcrestAPI(object):
def __init__(self, config):
self.logger = logging.getLogger(__name__)
# we don't want to get this mess of deeper-level logging
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore.http11").setLevel(logging.WARNING)
logging.getLogger("httpcore.connection").setLevel(logging.WARNING)
logging.getLogger("amcrest.http").setLevel(logging.ERROR)
logging.getLogger("amcrest.event").setLevel(logging.WARNING)
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
self.last_call_date = ''
self.timezone = config['timezone']
self.amcrest_config = config['amcrest']
self.count = len(self.amcrest_config['hosts'])
self.devices = {}
self.events = []
async def connect_to_devices(self):
self.logger.info(f'Connecting to: {self.amcrest_config["hosts"]}')
tasks = []
for host in self.amcrest_config['hosts']:
device_name = self.amcrest_config['names'].pop(0)
task = asyncio.create_task(self.get_device(host, device_name))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
if len(self.devices) == 0:
self.logger.error('Failed to connect to all devices, exiting')
exit(1)
# return just the config of each device, not the camera object
return {d: self.devices[d]['config'] for d in self.devices.keys()}
def get_camera(self, host):
config = self.amcrest_config
return AmcrestCamera(host, config['port'], config['username'], config['password'], verbose=False).camera
async def get_device(self, host, device_name):
try:
camera = self.get_camera(host)
device_type = camera.device_type.replace('type=', '').strip()
is_ad110 = device_type == 'AD110'
is_ad410 = device_type == 'AD410'
is_doorbell = is_ad110 or is_ad410
serial_number = camera.serial_number
if not isinstance(serial_number, str):
self.logger.error(f'Error fetching serial number for {host}: {camera.serial_number}')
exit(1)
version = camera.software_information[0].replace('version=', '').strip()
build = camera.software_information[1].strip()
sw_version = f'{version} ({build})'
network_config = dict(item.split('=') for item in camera.network_config.splitlines())
interface = network_config['table.Network.DefaultInterface']
ip_address = network_config[f'table.Network.{interface}.IPAddress']
mac_address = network_config[f'table.Network.{interface}.PhysicalAddress'].upper()
action = 'Connected' if camera.serial_number not in self.devices else 'Reconnected'
self.logger.info(f'{action} to {host} as {camera.serial_number}')
self.devices[serial_number] = {
'camera': camera,
'config': {
'host': host,
'device_name': device_name,
'device_type': device_type,
'device_class': camera.device_class,
'is_ad110': is_ad110,
'is_ad410': is_ad410,
'is_doorbell': is_doorbell,
'serial_number': serial_number,
'software_version': sw_version,
'hardware_version': camera.hardware_version,
'vendor': camera.vendor_information,
'network': {
'interface': interface,
'ip_address': ip_address,
'mac': mac_address,
}
},
}
self.get_privacy_mode(serial_number)
except LoginError as err:
self.logger.error(f'Invalid username/password to connect to device "{host}", fix in config.yaml')
except AmcrestError as err:
self.logger.error(f'Failed to connect to device "{host}", check config.yaml and restart to try again: {err}')
# Storage stats -------------------------------------------------------------------------------
def get_storage_stats(self, device_id):
try:
storage = self.devices[device_id]["camera"].storage_all
except CommError as err:
self.logger.error(f'Failed to communicate with device ({device_id}): No SD card?')
return {
'last_update': str(datetime.now(ZoneInfo(self.timezone))),
'used_percent': str(storage['used_percent']),
'used': to_gb(storage['used']),
'total': to_gb(storage['total']),
}
# Privacy config ------------------------------------------------------------------------------
def get_privacy_mode(self, device_id):
device = self.devices[device_id]
try:
privacy = device["camera"].privacy_config().split()
privacy_mode = True if privacy[0].split('=')[1] == 'true' else False
device['privacy_mode'] = privacy_mode
return privacy_mode
except CommError as err:
self.logger.error(f'Failed to communicate with device ({device_id}): {err}')
def set_privacy_mode(self, device_id, switch):
device = self.devices[device_id]
try:
return device["camera"].set_privacy(switch).strip()
except CommError as err:
self.logger.error(f'Failed to communicate with device ({device_id})')
# Snapshots -----------------------------------------------------------------------------------
async def collect_all_device_snapshots(self):
tasks = [self.get_snapshot_from_device(device_id) for device_id in self.devices]
await asyncio.gather(*tasks)
async def get_snapshot_from_device(self, device_id):
device = self.devices[device_id]
try:
if 'privacy_mode' not in device or device['privacy_mode'] == False:
image = await device["camera"].async_snapshot()
device['snapshot'] = base64.b64encode(image)
self.logger.debug(f'Processed snapshot from ({device_id}) {len(image)} bytes raw, and {len(device['snapshot'])} bytes base64')
else:
self.logger.info(f'Skipped snapshot from ({device_id}) because "privacy mode" is ON')
except CommError as err:
self.logger.error(f'Failed to communicate with device ({device_id}), maybe "Privacy Mode" is on? {err}')
def get_snapshot(self, device_id):
return self.devices[device_id]['snapshot'] if 'snapshot' in self.devices[device_id] else None
# Recorded file -------------------------------------------------------------------------------
def get_recorded_file(self, device_id, file):
device = self.devices[device_id]
data_raw = device["camera"].download_file(file)
if data_raw:
data_base64 = base64.b64encode(data_raw)
self.logger.debug(f'Processed recording from ({device_id}) {len(data_raw)} bytes raw, and {len(data_base64)} bytes base64')
if data_base64 < 100 * 1024 * 1024 * 1024:
return data_base64
else:
self.logger.error(f'Processed recording is too large')
return None
return None
# Events --------------------------------------------------------------------------------------
async def collect_all_device_events(self):
try:
tasks = [self.get_events_from_device(device_id) for device_id in self.devices]
await asyncio.gather(*tasks)
except Exception as err:
self.logger.error(err, exc_info=True)
async def get_events_from_device(self, device_id):
device = self.devices[device_id]
try:
async for code, payload in device["camera"].async_event_actions("All"):
await self.process_device_event(device_id, code, payload)
except CommError as err:
self.logger.error(f'Failed to communicate with device ({device_id}): {err}')
except Exception as err:
self.logger.error(f'generic Failed to get events from device({device_id}: {err}', exc_info=True)
async def process_device_event(self, device_id, code, payload):
try:
device = self.devices[device_id]
config = device['config']
# self.logger.debug(f'Event on {device_id} - {code}: {payload}')
# VideoMotion: motion detection event
# VideoLoss: video loss detection event
# VideoBlind: video blind detection event
# AlarmLocal: alarm detection event
# StorageNotExist: storage not exist event
# StorageFailure: storage failure event
# StorageLowSpace: storage low space event
# AlarmOutput: alarm output event
# SmartMotionHuman: human detection event
# SmartMotionVehicle: vehicle detection event
if ((code == 'ProfileAlarmTransmit' and config['is_ad110'])
or (code == 'VideoMotion' and not config['is_ad110'])):
motion_payload = 'on' if payload['action'] == 'Start' else 'off'
self.events.append({ 'device_id': device_id, 'event': 'motion', 'payload': motion_payload })
elif code == 'CrossRegionDetection' and payload['data']['ObjectType'] == 'Human':
human_payload = 'on' if payload['action'] == 'Start' else 'off'
self.events.append({ 'device_id': device_id, 'event': 'human', 'payload': human_payload })
elif code == '_DoTalkAction_':
doorbell_payload = 'on' if payload['data']['Action'] == 'Invite' else 'off'
self.events.append({ 'device_id': device_id, 'event': 'doorbell', 'payload': doorbell_payload })
elif code == 'NewFile':
# we don't care about recording events for general (non-saved) snapshots
if not payload['data']['StoragePoint'] == 'NULL':
file_payload = { 'file': payload['data']['File'], 'size': payload['data']['Size'], 'event': payload['data']['Event'] }
self.events.append({ 'device_id': device_id, 'event': 'recording', 'payload': file_payload })
elif code == 'LensMaskOpen':
device['privacy_mode'] = True
self.events.append({ 'device_id': device_id, 'event': 'privacy_mode', 'payload': 'on' })
elif code == 'LensMaskClose':
device['privacy_mode'] = False
self.events.append({ 'device_id': device_id, 'event': 'privacy_mode', 'payload': 'off' })
# lets just ignore these
elif code == 'InterVideoAccess': # I think this is US, accessing the API of the camera, lets not inception!
pass
elif code == 'VideoMotionInfo':
pass
# save everything else as a 'generic' event
else:
self.events.append({ 'device_id': device_id, 'event': code , 'payload': payload })
except Exception as err:
self.logger.error(err, exc_info=True)
def get_next_event(self):
return self.events.pop(0) if len(self.events) > 0 else None