|
|
|
@ -23,6 +23,9 @@ class AmcrestAPIMixin:
|
|
|
|
def get_last_call_date(self: Amcrest2Mqtt) -> str:
|
|
|
|
def get_last_call_date(self: Amcrest2Mqtt) -> str:
|
|
|
|
return self.last_call_date
|
|
|
|
return self.last_call_date
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_last_call_date(self: Amcrest2Mqtt) -> None:
|
|
|
|
|
|
|
|
self.last_call_date = datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
|
|
|
|
|
|
def is_rate_limited(self: Amcrest2Mqtt) -> bool:
|
|
|
|
def is_rate_limited(self: Amcrest2Mqtt) -> bool:
|
|
|
|
return self.rate_limited
|
|
|
|
return self.rate_limited
|
|
|
|
|
|
|
|
|
|
|
|
@ -58,6 +61,7 @@ class AmcrestAPIMixin:
|
|
|
|
host_ip = self.get_ip_address(host)
|
|
|
|
host_ip = self.get_ip_address(host)
|
|
|
|
device = self.get_camera(host_ip)
|
|
|
|
device = self.get_camera(host_ip)
|
|
|
|
camera = device.camera
|
|
|
|
camera = device.camera
|
|
|
|
|
|
|
|
self.set_last_call_date()
|
|
|
|
except LoginError:
|
|
|
|
except LoginError:
|
|
|
|
self.logger.error(f'invalid username/password to connect to device "{host}", fix in config.yaml')
|
|
|
|
self.logger.error(f'invalid username/password to connect to device "{host}", fix in config.yaml')
|
|
|
|
return
|
|
|
|
return
|
|
|
|
@ -122,6 +126,7 @@ class AmcrestAPIMixin:
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
storage = device["camera"].storage_all
|
|
|
|
storage = device["camera"].storage_all
|
|
|
|
|
|
|
|
self.set_last_call_date()
|
|
|
|
except CommError as err:
|
|
|
|
except CommError as err:
|
|
|
|
self.logger.error(f"failed to get storage stats from ({self.get_device_name(device_id)}): {err}")
|
|
|
|
self.logger.error(f"failed to get storage stats from ({self.get_device_name(device_id)}): {err}")
|
|
|
|
return {}
|
|
|
|
return {}
|
|
|
|
@ -147,6 +152,7 @@ class AmcrestAPIMixin:
|
|
|
|
privacy = device["camera"].privacy_config().split()
|
|
|
|
privacy = device["camera"].privacy_config().split()
|
|
|
|
privacy_mode = True if privacy[0].split("=")[1] == "true" else False
|
|
|
|
privacy_mode = True if privacy[0].split("=")[1] == "true" else False
|
|
|
|
device["privacy_mode"] = privacy_mode
|
|
|
|
device["privacy_mode"] = privacy_mode
|
|
|
|
|
|
|
|
self.set_last_call_date()
|
|
|
|
except CommError as err:
|
|
|
|
except CommError as err:
|
|
|
|
self.logger.error(f"failed to get privacy mode from ({self.get_device_name(device_id)}): {err}")
|
|
|
|
self.logger.error(f"failed to get privacy mode from ({self.get_device_name(device_id)}): {err}")
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
@ -164,6 +170,7 @@ class AmcrestAPIMixin:
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
response = cast(str, device["camera"].set_privacy(switch).strip())
|
|
|
|
response = cast(str, device["camera"].set_privacy(switch).strip())
|
|
|
|
|
|
|
|
self.set_last_call_date()
|
|
|
|
except CommError as err:
|
|
|
|
except CommError as err:
|
|
|
|
self.logger.error(f"failed to set privacy mode on ({self.get_device_name(device_id)}): {err}")
|
|
|
|
self.logger.error(f"failed to set privacy mode on ({self.get_device_name(device_id)}): {err}")
|
|
|
|
return ""
|
|
|
|
return ""
|
|
|
|
@ -183,6 +190,7 @@ class AmcrestAPIMixin:
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
motion_detection = bool(device["camera"].is_motion_detector_on())
|
|
|
|
motion_detection = bool(device["camera"].is_motion_detector_on())
|
|
|
|
|
|
|
|
self.set_last_call_date()
|
|
|
|
except CommError as err:
|
|
|
|
except CommError as err:
|
|
|
|
self.logger.error(f"failed to get motion detection switch on ({self.get_device_name(device_id)}): {err}")
|
|
|
|
self.logger.error(f"failed to get motion detection switch on ({self.get_device_name(device_id)}): {err}")
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
@ -200,6 +208,7 @@ class AmcrestAPIMixin:
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
response = str(device["camera"].set_motion_detection(switch))
|
|
|
|
response = str(device["camera"].set_motion_detection(switch))
|
|
|
|
|
|
|
|
self.set_last_call_date()
|
|
|
|
except CommError:
|
|
|
|
except CommError:
|
|
|
|
self.logger.error(f"Failed to communicate with device ({self.get_device_name(device_id)}) to set motion detections")
|
|
|
|
self.logger.error(f"Failed to communicate with device ({self.get_device_name(device_id)}) to set motion detections")
|
|
|
|
return ""
|
|
|
|
return ""
|
|
|
|
@ -231,6 +240,7 @@ class AmcrestAPIMixin:
|
|
|
|
for attempt in range(1, SNAPSHOT_MAX_TRIES + 1):
|
|
|
|
for attempt in range(1, SNAPSHOT_MAX_TRIES + 1):
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
image_bytes = await asyncio.wait_for(camera.async_snapshot(), timeout=SNAPSHOT_TIMEOUT_S)
|
|
|
|
image_bytes = await asyncio.wait_for(camera.async_snapshot(), timeout=SNAPSHOT_TIMEOUT_S)
|
|
|
|
|
|
|
|
self.set_last_call_date()
|
|
|
|
if not image_bytes:
|
|
|
|
if not image_bytes:
|
|
|
|
self.logger.warning(f"Snapshot: empty image from {self.get_device_name(device_id)}")
|
|
|
|
self.logger.warning(f"Snapshot: empty image from {self.get_device_name(device_id)}")
|
|
|
|
return None
|
|
|
|
return None
|
|
|
|
@ -285,6 +295,7 @@ class AmcrestAPIMixin:
|
|
|
|
while tries < 3:
|
|
|
|
while tries < 3:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
data_raw = cast(bytes, device["camera"].download_file(file))
|
|
|
|
data_raw = cast(bytes, device["camera"].download_file(file))
|
|
|
|
|
|
|
|
self.set_last_call_date()
|
|
|
|
if data_raw:
|
|
|
|
if data_raw:
|
|
|
|
if not encode:
|
|
|
|
if not encode:
|
|
|
|
if len(data_raw) < self.mb_to_b(100):
|
|
|
|
if len(data_raw) < self.mb_to_b(100):
|
|
|
|
@ -326,6 +337,7 @@ class AmcrestAPIMixin:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
async for code, payload in device["camera"].async_event_actions("All"):
|
|
|
|
async for code, payload in device["camera"].async_event_actions("All"):
|
|
|
|
await self.process_device_event(device_id, code, payload)
|
|
|
|
await self.process_device_event(device_id, code, payload)
|
|
|
|
|
|
|
|
self.set_last_call_date()
|
|
|
|
return
|
|
|
|
return
|
|
|
|
except CommError:
|
|
|
|
except CommError:
|
|
|
|
tries += 1
|
|
|
|
tries += 1
|
|
|
|
|