|
|
|
@ -138,23 +138,24 @@ class AmcrestAPI(object):
|
|
|
|
privacy = device["camera"].privacy_config().split()
|
|
|
|
privacy = device["camera"].privacy_config().split()
|
|
|
|
privacy_mode = True if privacy[0].split('=')[1] == 'true' else False
|
|
|
|
privacy_mode = True if privacy[0].split('=')[1] == 'true' else False
|
|
|
|
device['privacy_mode'] = privacy_mode
|
|
|
|
device['privacy_mode'] = privacy_mode
|
|
|
|
|
|
|
|
|
|
|
|
return privacy_mode
|
|
|
|
|
|
|
|
except CommError as err:
|
|
|
|
except CommError as err:
|
|
|
|
self.logger.error(f'Failed to communicate with device ({device_id}) to get privacy mode')
|
|
|
|
self.logger.error(f'Failed to communicate with device ({device_id}) to get privacy mode')
|
|
|
|
except LoginError as err:
|
|
|
|
except LoginError as err:
|
|
|
|
self.logger.error(f'Failed to authenticate with device ({device_id}) to get privacy mode')
|
|
|
|
self.logger.error(f'Failed to authenticate with device ({device_id}) to get privacy mode')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return privacy_mode
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_privacy_mode(self, device_id, switch):
|
|
|
|
def set_privacy_mode(self, device_id, switch):
|
|
|
|
device = self.devices[device_id]
|
|
|
|
device = self.devices[device_id]
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
return device["camera"].set_privacy(switch).strip()
|
|
|
|
response = device["camera"].set_privacy(switch).strip()
|
|
|
|
except CommError as err:
|
|
|
|
except CommError as err:
|
|
|
|
self.logger.error(f'Failed to communicate with device ({device_id}) to set privacy mode')
|
|
|
|
self.logger.error(f'Failed to communicate with device ({device_id}) to set privacy mode')
|
|
|
|
except LoginError as err:
|
|
|
|
except LoginError as err:
|
|
|
|
self.logger.error(f'Failed to authenticate with device ({device_id}) to set privacy mode')
|
|
|
|
self.logger.error(f'Failed to authenticate with device ({device_id}) to set privacy mode')
|
|
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
|
|
# Motion detection config ---------------------------------------------------------------------
|
|
|
|
# Motion detection config ---------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
@ -163,23 +164,25 @@ class AmcrestAPI(object):
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
motion_detection = device["camera"].is_motion_detector_on()
|
|
|
|
motion_detection = device["camera"].is_motion_detector_on()
|
|
|
|
|
|
|
|
|
|
|
|
return motion_detection
|
|
|
|
|
|
|
|
except CommError as err:
|
|
|
|
except CommError as err:
|
|
|
|
self.logger.error(f'Failed to communicate with device ({device_id}) to get motion detection')
|
|
|
|
self.logger.error(f'Failed to communicate with device ({device_id}) to get motion detection')
|
|
|
|
except LoginError as err:
|
|
|
|
except LoginError as err:
|
|
|
|
self.logger.error(f'Failed to authenticate with device ({device_id}) to get motion detection')
|
|
|
|
self.logger.error(f'Failed to authenticate with device ({device_id}) to get motion detection')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return motion_detection
|
|
|
|
|
|
|
|
|
|
|
|
def set_motion_detection(self, device_id, switch):
|
|
|
|
def set_motion_detection(self, device_id, switch):
|
|
|
|
device = self.devices[device_id]
|
|
|
|
device = self.devices[device_id]
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
return device["camera"].set_motion_detection(switch)
|
|
|
|
response = device["camera"].set_motion_detection(switch)
|
|
|
|
except CommError as err:
|
|
|
|
except CommError as err:
|
|
|
|
self.logger.error(f'Failed to communicate with device ({device_id}) to set motion detections')
|
|
|
|
self.logger.error(f'Failed to communicate with device ({device_id}) to set motion detections')
|
|
|
|
except LoginError as err:
|
|
|
|
except LoginError as err:
|
|
|
|
self.logger.error(f'Failed to authenticate with device ({device_id}) to set motion detections')
|
|
|
|
self.logger.error(f'Failed to authenticate with device ({device_id}) to set motion detections')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
|
|
# Snapshots -----------------------------------------------------------------------------------
|
|
|
|
# Snapshots -----------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
async def collect_all_device_snapshots(self):
|
|
|
|
async def collect_all_device_snapshots(self):
|
|
|
|
@ -189,17 +192,24 @@ class AmcrestAPI(object):
|
|
|
|
async def get_snapshot_from_device(self, device_id):
|
|
|
|
async def get_snapshot_from_device(self, device_id):
|
|
|
|
device = self.devices[device_id]
|
|
|
|
device = self.devices[device_id]
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
tries = 0
|
|
|
|
if 'privacy_mode' not in device or device['privacy_mode'] == False:
|
|
|
|
while tries < 3:
|
|
|
|
image = await device["camera"].async_snapshot()
|
|
|
|
try:
|
|
|
|
device['snapshot'] = base64.b64encode(image)
|
|
|
|
if 'privacy_mode' not in device or device['privacy_mode'] == False:
|
|
|
|
self.logger.debug(f'Processed snapshot from ({device_id}) {len(image)} bytes raw, and {len(device['snapshot'])} bytes base64')
|
|
|
|
image = await device["camera"].async_snapshot()
|
|
|
|
else:
|
|
|
|
device['snapshot'] = base64.b64encode(image)
|
|
|
|
self.logger.info(f'Skipped snapshot from ({device_id}) because "privacy mode" is ON')
|
|
|
|
self.logger.debug(f'Processed snapshot from ({device_id}) {len(image)} bytes raw, and {len(device['snapshot'])} bytes base64')
|
|
|
|
except CommError as err:
|
|
|
|
break
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
self.logger.info(f'Skipped snapshot from ({device_id}) because "privacy mode" is ON')
|
|
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
except CommError as err:
|
|
|
|
|
|
|
|
tries += 1
|
|
|
|
|
|
|
|
except LoginError as err:
|
|
|
|
|
|
|
|
tries += 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if tries == 3:
|
|
|
|
self.logger.error(f'Failed to communicate with device ({device_id}) to get snapshot')
|
|
|
|
self.logger.error(f'Failed to communicate with device ({device_id}) to get snapshot')
|
|
|
|
except LoginError as err:
|
|
|
|
|
|
|
|
self.logger.error(f'Failed to authenticate with device ({device_id}) to get snapshot')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_snapshot(self, device_id):
|
|
|
|
def get_snapshot(self, device_id):
|
|
|
|
return self.devices[device_id]['snapshot'] if 'snapshot' in self.devices[device_id] else None
|
|
|
|
return self.devices[device_id]['snapshot'] if 'snapshot' in self.devices[device_id] else None
|
|
|
|
@ -208,20 +218,26 @@ class AmcrestAPI(object):
|
|
|
|
def get_recorded_file(self, device_id, file):
|
|
|
|
def get_recorded_file(self, device_id, file):
|
|
|
|
device = self.devices[device_id]
|
|
|
|
device = self.devices[device_id]
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
tries = 0
|
|
|
|
data_raw = device["camera"].download_file(file)
|
|
|
|
while tries < 3:
|
|
|
|
if data_raw:
|
|
|
|
try:
|
|
|
|
data_base64 = base64.b64encode(data_raw)
|
|
|
|
data_raw = device["camera"].download_file(file)
|
|
|
|
self.logger.info(f'Processed recording from ({device_id}) {len(data_raw)} bytes raw, and {len(data_base64)} bytes base64')
|
|
|
|
if data_raw:
|
|
|
|
if len(data_base64) < 100 * 1024 * 1024 * 1024:
|
|
|
|
data_base64 = base64.b64encode(data_raw)
|
|
|
|
return data_base64
|
|
|
|
self.logger.info(f'Processed recording from ({device_id}) {len(data_raw)} bytes raw, and {len(data_base64)} bytes base64')
|
|
|
|
else:
|
|
|
|
if len(data_base64) < 100 * 1024 * 1024 * 1024:
|
|
|
|
self.logger.error(f'Processed recording is too large')
|
|
|
|
return data_base64
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
except CommError as err:
|
|
|
|
self.logger.error(f'Processed recording is too large')
|
|
|
|
self.logger.error(f'Failed to download recorded file for device ({device_id})')
|
|
|
|
return
|
|
|
|
except LoginError as err:
|
|
|
|
except CommError as err:
|
|
|
|
self.logger.error(f'Failed to authenticate for recorded file for device ({device_id})')
|
|
|
|
tries += 1
|
|
|
|
|
|
|
|
except LoginError as err:
|
|
|
|
|
|
|
|
tries += 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if tries == 3:
|
|
|
|
|
|
|
|
self.logger.error(f'Failed to communicate with device ({device_id}) to get recorded file')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Events --------------------------------------------------------------------------------------
|
|
|
|
# Events --------------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
@ -232,13 +248,18 @@ class AmcrestAPI(object):
|
|
|
|
async def get_events_from_device(self, device_id):
|
|
|
|
async def get_events_from_device(self, device_id):
|
|
|
|
device = self.devices[device_id]
|
|
|
|
device = self.devices[device_id]
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
tries = 0
|
|
|
|
async for code, payload in device["camera"].async_event_actions("All"):
|
|
|
|
while tries < 3:
|
|
|
|
await self.process_device_event(device_id, code, payload)
|
|
|
|
try:
|
|
|
|
except CommError as err:
|
|
|
|
async for code, payload in device["camera"].async_event_actions("All"):
|
|
|
|
|
|
|
|
await self.process_device_event(device_id, code, payload)
|
|
|
|
|
|
|
|
except CommError as err:
|
|
|
|
|
|
|
|
tries += 1
|
|
|
|
|
|
|
|
except LoginError as err:
|
|
|
|
|
|
|
|
tries += 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if tries == 3:
|
|
|
|
self.logger.error(f'Failed to communicate for events for device ({device_id})')
|
|
|
|
self.logger.error(f'Failed to communicate for events for device ({device_id})')
|
|
|
|
except LoginError as err:
|
|
|
|
|
|
|
|
self.logger.error(f'Failed to authenticate for events for device ({device_id})')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def process_device_event(self, device_id, code, payload):
|
|
|
|
async def process_device_event(self, device_id, code, payload):
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
|