code cleanup

pull/106/head
Jeff Culverhouse 12 months ago
parent d322f9ad4e
commit d46be67b57

@ -4,6 +4,7 @@ from datetime import datetime, timezone
import paho.mqtt.client as mqtt
import os
import sys
import time
from json import dumps
import signal
from threading import Timer
@ -14,9 +15,7 @@ is_exiting = False
mqtt_client = None
config = {}
cameras = {}
camera_configs = {}
camera_topics = {}
devices = {}
# Read env variables
amcrest_hosts = os.getenv("AMCREST_HOSTS")
@ -82,14 +81,13 @@ def on_mqtt_disconnect(client, userdata, rc):
exit_gracefully(rc, skip_mqtt=True)
def exit_gracefully(rc, skip_mqtt=False):
global hosts, camera_topics, mqtt_client
global hosts, devices, mqtt_client
log("Exiting app...")
if mqtt_client is not None and mqtt_client.is_connected() and skip_mqtt == False:
for host in hosts:
topics = camera_topics[host]
mqtt_publish(topics["status"], "offline", exit_on_error=False)
mqtt_publish(devices[host]["topics"]["status"], "offline", exit_on_error=False)
mqtt_client.disconnect()
# Use os._exit instead of sys.exit to ensure an MQTT disconnect event causes the program to exit correctly as they
@ -97,21 +95,21 @@ def exit_gracefully(rc, skip_mqtt=False):
os._exit(rc)
def refresh_storage_sensors():
global hosts, camera, camera_topics, storage_poll_interval
global hosts, devices, storage_poll_interval
Timer(storage_poll_interval, refresh_storage_sensors).start()
log("Fetching storage sensors...")
for host in hosts:
topics = camera_topics[host]
topics = devices[host]["topics"]
try:
storage = cameras[host].storage_all
storage = devices[host]["camera"].storage_all
mqtt_publish(topics["storage_used_percent"], str(storage["used_percent"]))
mqtt_publish(topics["storage_used"], to_gb(storage["used"]))
mqtt_publish(topics["storage_total"], to_gb(storage["total"]))
except AmcrestError as error:
log(f"Error fetching storage information {error}", level="WARNING")
log(f"Error fetching storage information for {host}: {error}", level="WARNING")
def to_gb(total):
return str(round(float(total[0]) / 1024 / 1024 / 1024, 2))
@ -132,33 +130,27 @@ def get_camera(amcrest_host, amcrest_post, amcrest_username, amcrest_password, d
).camera
# Fetch camera details
log("Fetching camera details...")
log(f"Fetching camera details for {amcrest_host}...")
camera_config = {}
camera_config["device_name"] = device_name
camera_config["amcrest_host"] = amcrest_host
device_name = device_name
amcrest_host = amcrest_host
try:
camera_config["device_type"] = device_type = camera.device_type.replace("type=", "").strip()
camera_config["is_ad110"] = is_ad110 = camera_config["device_type"] == "AD110"
camera_config["is_ad410"] = is_ad410 = camera_config["device_type"] == "AD410"
camera_config["is_doorbell"] = is_doorbell = is_ad110 or is_ad410
camera_config["serial_number"] = serial_number = camera.serial_number
device_type = camera.device_type.replace("type=", "").strip()
is_ad110 = device_type == "AD110"
is_ad410 = device_type == "AD410"
is_doorbell = is_ad110 or is_ad410
serial_number = camera.serial_number
if not isinstance(serial_number, str):
log(f"Error fetching serial number", level="ERROR")
log(f"Error fetching serial number for {amcrest_host}", level="ERROR")
exit_gracefully(1)
sw_version = camera.software_information[0].replace("version=", "").strip()
build_version = camera.software_information[1].strip()
config["amcrest_version"] = amcrest_version = f"{sw_version} ({build_version})"
if not device_name:
device_name = camera.machine_name.replace("name=", "").strip()
camera_config["device_slug"] = device_slug = slugify(device_name, separator="_")
amcrest_version = f"{sw_version} ({build_version})"
device_slug = slugify(device_name, separator="_")
except AmcrestError as error:
log(f"Error fetching camera details", level="ERROR")
log(f"Error fetching camera details for {amcrest_host}", level="ERROR")
exit_gracefully(1)
log(f"Device type: {device_type}")
@ -166,10 +158,21 @@ def get_camera(amcrest_host, amcrest_post, amcrest_username, amcrest_password, d
log(f"Software version: {amcrest_version}")
log(f"Device name: {device_name}")
setup = {}
setup["camera"] = camera
setup["camera_config"] = camera_config
setup["camera_topic"] = {
setup = {
"camera": camera,
}
setup["config"] = {
"amcrest_host": amcrest_host,
"device_name": device_name,
"device_type": device_type,
"device_slug": device_slug,
"is_ad110": is_ad110,
"is_ad410": is_ad410,
"is_doorbell": is_doorbell,
"serial_number": serial_number,
"amcrest_version": amcrest_version,
}
setup["topics"] = {
"config": f"amcrest2mqtt/{serial_number}/config",
"status": f"amcrest2mqtt/{serial_number}/status",
"event": f"amcrest2mqtt/{serial_number}/event",
@ -205,15 +208,15 @@ def get_camera(amcrest_host, amcrest_post, amcrest_username, amcrest_password, d
return setup
def config_home_assistant(config, camera_config, topics):
amcrest_version = config["amcrest_version"]
device_name = camera_config["device_name"]
device_slug = camera_config["device_slug"]
device_type = camera_config["device_type"]
serial_number = camera_config["serial_number"]
def config_home_assistant(config, device):
device_name = device["config"]["device_name"]
device_type = device["config"]["device_type"]
device_slug = device["config"]["device_slug"]
serial_number = device["config"]["serial_number"]
amcrest_version = device["config"]["amcrest_version"]
base_config = {
"availability_topic": topics["status"],
"availability_topic": device["topics"]["status"],
"qos": config["mqtt_qos"],
"device": {
"name": f"Amcrest {device_type}",
@ -225,15 +228,15 @@ def config_home_assistant(config, camera_config, topics):
},
}
if camera_config["is_doorbell"]:
if device["config"]["is_doorbell"]:
doorbell_name = "Doorbell" if device_name == "Doorbell" else f"{device_name} Doorbell"
mqtt_publish(topics["home_assistant_legacy"]["doorbell"], "")
mqtt_publish(device["topics"]["home_assistant_legacy"]["doorbell"], "")
mqtt_publish(
topics["home_assistant"]["doorbell"],
device["topics"]["home_assistant"]["doorbell"],
base_config
| {
"state_topic": topics["doorbell"],
"state_topic": device["topics"]["doorbell"],
"payload_on": "on",
"payload_off": "off",
"icon": "mdi:doorbell",
@ -243,13 +246,13 @@ def config_home_assistant(config, camera_config, topics):
json=True,
)
if camera_config["is_ad410"]:
mqtt_publish(topics["home_assistant_legacy"]["human"], "")
if device["config"]["is_ad410"]:
mqtt_publish(device["topics"]["home_assistant_legacy"]["human"], "")
mqtt_publish(
topics["home_assistant"]["human"],
device["topics"]["home_assistant"]["human"],
base_config
| {
"state_topic": topics["human"],
"state_topic": device["topics"]["human"],
"payload_on": "on",
"payload_off": "off",
"device_class": "motion",
@ -259,12 +262,12 @@ def config_home_assistant(config, camera_config, topics):
json=True,
)
mqtt_publish(topics["home_assistant_legacy"]["motion"], "")
mqtt_publish(device["topics"]["home_assistant_legacy"]["motion"], "")
mqtt_publish(
topics["home_assistant"]["motion"],
device["topics"]["home_assistant"]["motion"],
base_config
| {
"state_topic": topics["motion"],
"state_topic": device["topics"]["motion"],
"payload_on": "on",
"payload_off": "off",
"device_class": "motion",
@ -274,12 +277,12 @@ def config_home_assistant(config, camera_config, topics):
json=True,
)
mqtt_publish(topics["home_assistant_legacy"]["version"], "")
mqtt_publish(device["topics"]["home_assistant_legacy"]["version"], "")
mqtt_publish(
topics["home_assistant"]["version"],
device["topics"]["home_assistant"]["version"],
base_config
| {
"state_topic": topics["config"],
"state_topic": device["topics"]["config"],
"value_template": "{{ value_json.sw_version }}",
"icon": "mdi:package-up",
"name": f"{device_name} Version",
@ -290,12 +293,12 @@ def config_home_assistant(config, camera_config, topics):
json=True,
)
mqtt_publish(topics["home_assistant_legacy"]["serial_number"], "")
mqtt_publish(device["topics"]["home_assistant_legacy"]["serial_number"], "")
mqtt_publish(
topics["home_assistant"]["serial_number"],
device["topics"]["home_assistant"]["serial_number"],
base_config
| {
"state_topic": topics["config"],
"state_topic": device["topics"]["config"],
"value_template": "{{ value_json.serial_number }}",
"icon": "mdi:alphabetical-variant",
"name": f"{device_name} Serial Number",
@ -306,12 +309,12 @@ def config_home_assistant(config, camera_config, topics):
json=True,
)
mqtt_publish(topics["home_assistant_legacy"]["host"], "")
mqtt_publish(device["topics"]["home_assistant_legacy"]["host"], "")
mqtt_publish(
topics["home_assistant"]["host"],
device["topics"]["home_assistant"]["host"],
base_config
| {
"state_topic": topics["config"],
"state_topic": device["topics"]["config"],
"value_template": "{{ value_json.host }}",
"icon": "mdi:ip-network",
"name": f"{device_name} Host",
@ -323,12 +326,12 @@ def config_home_assistant(config, camera_config, topics):
)
if storage_poll_interval > 0:
mqtt_publish(topics["home_assistant_legacy"]["storage_used_percent"], "")
mqtt_publish(device["topics"]["home_assistant_legacy"]["storage_used_percent"], "")
mqtt_publish(
topics["home_assistant"]["storage_used_percent"],
device["topics"]["home_assistant"]["storage_used_percent"],
base_config
| {
"state_topic": topics["storage_used_percent"],
"state_topic": device["topics"]["storage_used_percent"],
"unit_of_measurement": "%",
"icon": "mdi:micro-sd",
"name": f"{device_name} Storage Used %",
@ -339,12 +342,12 @@ def config_home_assistant(config, camera_config, topics):
json=True,
)
mqtt_publish(topics["home_assistant_legacy"]["storage_used"], "")
mqtt_publish(device["topics"]["home_assistant_legacy"]["storage_used"], "")
mqtt_publish(
topics["home_assistant"]["storage_used"],
device["topics"]["home_assistant"]["storage_used"],
base_config
| {
"state_topic": topics["storage_used"],
"state_topic": device["topics"]["storage_used"],
"unit_of_measurement": "GB",
"icon": "mdi:micro-sd",
"name": f"{device_name} Storage Used",
@ -354,12 +357,12 @@ def config_home_assistant(config, camera_config, topics):
json=True,
)
mqtt_publish(topics["home_assistant_legacy"]["storage_total"], "")
mqtt_publish(device["topics"]["home_assistant_legacy"]["storage_total"], "")
mqtt_publish(
topics["home_assistant"]["storage_total"],
device["topics"]["home_assistant"]["storage_total"],
base_config
| {
"state_topic": topics["storage_total"],
"state_topic": device["topics"]["storage_total"],
"unit_of_measurement": "GB",
"icon": "mdi:micro-sd",
"name": f"{device_name} Storage Total",
@ -369,22 +372,15 @@ def config_home_assistant(config, camera_config, topics):
json=True,
)
def camera_online(config, camera_config, topics):
amcrest_version = config["amcrest_version"]
host = camera_config["amcrest_host"]
device_name = camera_config["device_name"]
device_slug = camera_config["device_slug"]
device_type = camera_config["device_type"]
serial_number = camera_config["serial_number"]
mqtt_publish(topics["status"], "online")
mqtt_publish(topics["config"], {
"version": version,
"device_type": device_type,
"device_name": device_name,
"sw_version": amcrest_version,
"serial_number": serial_number,
"host": host,
def camera_online(device):
mqtt_publish(device["topics"]["status"], "online")
mqtt_publish(device["topics"]["config"], {
"version": device["config"]["amcrest_version"],
"device_type": device["config"]["device_type"],
"device_name": device["config"]["device_name"],
"sw_version": device["config"]["amcrest_version"],
"serial_number": device["config"]["serial_number"],
"host": device["config"]["amcrest_host"],
}, json=True)
# Exit if any of the required vars are not provided
@ -401,7 +397,7 @@ names = device_names.split()
name_count = len(names)
if host_count != name_count:
log("The AMCREST_HOSTS and DEVICE_NAMES must have the same number of space-delimited devices", level="ERROR")
log("The AMCREST_HOSTS and DEVICE_NAMES must have the same number of space-delimited hosts/names", level="ERROR")
sys.exit(1)
if amcrest_password is None:
@ -421,13 +417,10 @@ signal.signal(signal.SIGINT, signal_handler)
# Connect to each camera, if not already
for host in hosts:
log(f"Working host: {host}", level="INFO")
if host in cameras and camers[host].serial_number:
continue
setup = get_camera(host, amcrest_port, amcrest_username, amcrest_password, names.pop())
cameras[host] = setup["camera"]
camera_configs[host] = setup["camera_config"]
camera_topics[host] = setup["camera_topic"]
name = names.pop()
log(f"Connecting host: {host} as {name}", level="INFO")
devices[host] = get_camera(host, amcrest_port, amcrest_username, amcrest_password, name)
log(f"Connecting to hosts done.", level="INFO")
# Connect to MQTT
mqtt_client = mqtt.Client(
@ -436,19 +429,19 @@ mqtt_client = mqtt.Client(
mqtt_client.on_disconnect = on_mqtt_disconnect
# send "will_set" for each connected camera
for host in hosts:
if camera_topics[host]["status"]:
mqtt_client.will_set(camera_topics[host]["status"], payload="offline", qos=config["mqtt_qos"], retain=True)
if devices[host]["topics"]["status"]:
mqtt_client.will_set(devices[host]["topics"]["status"], payload="offline", qos=config["mqtt_qos"], retain=True)
if mqtt_tls_enabled:
log(f"Setting up MQTT for TLS")
if mqtt_tls_ca_cert is None:
log("Missing var: MQTT_TLS_CA_CERT", level="ERROR")
log("Missing env var: MQTT_TLS_CA_CERT", level="ERROR")
sys.exit(1)
if mqtt_tls_cert is None:
log("Missing var: MQTT_TLS_CERT", level="ERROR")
log("Missing env var: MQTT_TLS_CERT", level="ERROR")
sys.exit(1)
if mqtt_tls_cert is None:
log("Missing var: MQTT_TLS_KEY", level="ERROR")
log("Missing env var: MQTT_TLS_KEY", level="ERROR")
sys.exit(1)
mqtt_client.tls_set(
ca_certs=mqtt_tls_ca_cert,
@ -472,38 +465,41 @@ if home_assistant:
log("Writing Home Assistant discovery config...")
for host in hosts:
if host in camera_topics:
config_home_assistant(config, camera_configs[host], camera_topics[host])
config_home_assistant(config, devices[host])
# Main loop
for host in hosts:
if host in camera_topics:
camera_online(config, camera_configs[host], camera_topics[host])
camera_online(devices[host])
if storage_poll_interval > 0:
refresh_storage_sensors()
log("Listening for events...")
log(f"Listening for events on {len(hosts)} hosts...", level="INFO")
async def main():
try:
for host in hosts:
async for code, payload in cameras[host].async_event_actions("All"):
if (camera_configs[host]["is_ad110"] and code == "ProfileAlarmTransmit") or (code == "VideoMotion" and not camera_configs[host]["is_ad110"]):
device = devices[host]
config = device["config"]
camera = device["camera"]
topics = device["topics"]
async for code, payload in camera.async_event_actions("All"):
log(f"Event on {host}: {str(payload)}", error="INFO")
if ((code == "ProfileAlarmTransmit" and config["is_ad110"])
or (code == "VideoMotion" and not config["is_ad110"])):
motion_payload = "on" if payload["action"] == "Start" else "off"
mqtt_publish(camera_topics[host]["motion"], motion_payload)
mqtt_publish(topics["motion"], motion_payload)
elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human":
human_payload = "on" if payload["action"] == "Start" else "off"
mqtt_publish(camera_topics[host]["human"], human_payload)
mqtt_publish(topics["human"], human_payload)
elif code == "_DoTalkAction_":
doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off"
mqtt_publish(camera_topics[host]["doorbell"], doorbell_payload)
mqtt_publish(topics["doorbell"], doorbell_payload)
mqtt_publish(camera_topics[host]["event"], payload, json=True)
log(str(payload))
mqtt_publish(topics["event"], payload, json=True)
except AmcrestError as error:
log(f"Amcrest error: {AmcrestError}", level="ERROR")
log(f"Amcrest error while working on {host}: {AmcrestError}", level="ERROR")
time.sleep(10)
asyncio.run(main())

Loading…
Cancel
Save