try to op on multiple devices; how does this python stuff work??

pull/106/head
Jeff Culverhouse 12 months ago
parent f905880cdf
commit 4046eaff00

@ -14,16 +14,16 @@ is_exiting = False
mqtt_client = None
# Read env variables
amcrest_host = os.getenv("AMCREST_HOST")
amcrest_hosts = os.getenv("AMCREST_HOSTS")
amcrest_port = int(os.getenv("AMCREST_PORT") or 80)
amcrest_username = os.getenv("AMCREST_USERNAME") or "admin"
amcrest_password = os.getenv("AMCREST_PASSWORD")
storage_poll_interval = int(os.getenv("STORAGE_POLL_INTERVAL") or 3600)
device_name = os.getenv("DEVICE_NAME")
device_names = os.getenv("DEVICE_NAMES")
mqtt_host = os.getenv("MQTT_HOST") or "localhost"
mqtt_qos = int(os.getenv("MQTT_QOS") or 0)
config["mqtt_qos"] = int(os.getenv("MQTT_QOS") or 0)
mqtt_port = int(os.getenv("MQTT_PORT") or 1883)
mqtt_username = os.getenv("MQTT_USERNAME")
mqtt_password = os.getenv("MQTT_PASSWORD") # can be None
@ -56,7 +56,7 @@ def mqtt_publish(topic, payload, exit_on_error=True, json=False):
global mqtt_client
msg = mqtt_client.publish(
topic, payload=(dumps(payload) if json else payload), qos=mqtt_qos, retain=True
topic, payload=(dumps(payload) if json else payload), qos=config["mqtt_qos"], retain=True
)
if msg.rc == mqtt.MQTT_ERR_SUCCESS:
@ -114,40 +114,21 @@ def signal_handler(sig, frame):
is_exiting = True
exit_gracefully(0)
# Exit if any of the required vars are not provided
if amcrest_host is None:
log("Please set the AMCREST_HOST environment variable", level="ERROR")
sys.exit(1)
def get_camera(amcrest_host, amcrest_post, amcrest_username, amcrest_password):
camera |= AmcrestCamera(
amcrest_host, amcrest_port, amcrest_username, amcrest_password
).camera
if amcrest_password is None:
log("Please set the AMCREST_PASSWORD environment variable", level="ERROR")
sys.exit(1)
# Fetch camera details
log("Fetching camera details...")
if mqtt_username is None:
log("Please set the MQTT_USERNAME environment variable", level="ERROR")
sys.exit(1)
version = read_version()
log(f"App Version: {version}")
# Handle interruptions
signal.signal(signal.SIGINT, signal_handler)
# Connect to camera
camera = AmcrestCamera(
amcrest_host, amcrest_port, amcrest_username, amcrest_password
).camera
# Fetch camera details
log("Fetching camera details...")
try:
device_type = camera.device_type.replace("type=", "").strip()
is_ad110 = device_type == "AD110"
is_ad410 = device_type == "AD410"
camera_config["amcrest_host"] = amcrest_host
try:
camera_config["device_type"] = device_type = camera.device_type.replace("type=", "").strip()
is_ad110 = camera_config["device_type"] == "AD110"
is_ad410 = camera_config["device_type"] == "AD410"
is_doorbell = is_ad110 or is_ad410
serial_number = camera.serial_number
camera_config["serial_number"] = serial_number = camera.serial_number
if not isinstance(serial_number, str):
log(f"Error fetching serial number", level="ERROR")
@ -156,23 +137,23 @@ try:
sw_version = camera.software_information[0].replace("version=", "").strip()
build_version = camera.software_information[1].strip()
amcrest_version = f"{sw_version} ({build_version})"
config["amcrest_version"] = amcrest_version = f"{sw_version} ({build_version})"
if not device_name:
device_name = camera.machine_name.replace("name=", "").strip()
device_slug = slugify(device_name, separator="_")
except AmcrestError as error:
except AmcrestError as error:
log(f"Error fetching camera details", level="ERROR")
exit_gracefully(1)
log(f"Device type: {device_type}")
log(f"Serial number: {serial_number}")
log(f"Software version: {amcrest_version}")
log(f"Device name: {device_name}")
log(f"Device type: {device_type}")
log(f"Serial number: {serial_number}")
log(f"Software version: {amcrest_version}")
log(f"Device name: {device_name}")
# MQTT topics
topics = {
# MQTT topics
topics = {
"config": f"amcrest2mqtt/{serial_number}/config",
"status": f"amcrest2mqtt/{serial_number}/status",
"event": f"amcrest2mqtt/{serial_number}/event",
@ -204,49 +185,17 @@ topics = {
"host": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/host/config",
"serial_number": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/serial_number/config",
},
}
# Connect to MQTT
mqtt_client = mqtt.Client(
client_id=f"amcrest2mqtt_{serial_number}", clean_session=False
)
mqtt_client.on_disconnect = on_mqtt_disconnect
mqtt_client.will_set(topics["status"], payload="offline", qos=mqtt_qos, retain=True)
if mqtt_tls_enabled:
log(f"Setting up MQTT for TLS")
if mqtt_tls_ca_cert is None:
log("Missing var: MQTT_TLS_CA_CERT", level="ERROR")
sys.exit(1)
if mqtt_tls_cert is None:
log("Missing var: MQTT_TLS_CERT", level="ERROR")
sys.exit(1)
if mqtt_tls_cert is None:
log("Missing var: MQTT_TLS_KEY", level="ERROR")
sys.exit(1)
mqtt_client.tls_set(
ca_certs=mqtt_tls_ca_cert,
certfile=mqtt_tls_cert,
keyfile=mqtt_tls_key,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS,
)
else:
mqtt_client.username_pw_set(mqtt_username, password=mqtt_password)
}
try:
mqtt_client.connect(mqtt_host, port=mqtt_port)
mqtt_client.loop_start()
except ConnectionError as error:
log(f"Could not connect to MQTT server: {error}", level="ERROR")
sys.exit(1)
return camera, camera_config, topics
# Configure Home Assistant
if home_assistant:
log("Writing Home Assistant discovery config...")
def config_home_assistant(config, camera_config, topics):
device_type = camera_config["device_type"]
serial_number = camera_config["serial_number"]
base_config = {
"availability_topic": topics["status"],
"qos": mqtt_qos,
"qos": config["mqtt_qos"],
"device": {
"name": f"Amcrest {device_type}",
"manufacturer": "Amcrest",
@ -401,16 +350,106 @@ if home_assistant:
json=True,
)
def camera_online(config, camera_config, topics):
mqtt_publish(topics["status"], "online")
mqtt_publish(topics["config"], {
"version": version,
"device_type": camera_config["device_type"],
"device_name": device_name,
"sw_version": config["amcrest_version"],
"serial_number": camera_config["serial_number"],
"host": camera_config["amcrest_host"],
}, json=True)
# Exit if any of the required vars are not provided
if amcrest_hosts is None:
log("Please set the AMCREST_HOSTS environment variable", level="ERROR")
sys.exit(1)
hosts = amcrest_hosts.split()
host_count = len(hosts)
if device_names is None:
log("Please set the DEVICE_NAMES environment variable", level="ERROR")
sys.exit(1)
names = device_names.split()
name_count = len(names)
if host_count != name_count:
log("The AMCREST_HOSTS and DEVICE_NAMES must have the same number of space-delimited devices", level="ERROR")
sys.exit(1)
if amcrest_password is None:
log("Please set the AMCREST_PASSWORD environment variable", level="ERROR")
sys.exit(1)
if mqtt_username is None:
log("Please set the MQTT_USERNAME environment variable", level="ERROR")
sys.exit(1)
version = read_version()
log(f"App Version: {version}")
# Handle interruptions
signal.signal(signal.SIGINT, signal_handler)
# Connect to each camera, if not already
for x in range(1..host_count):
if cameras[x] and camers[x].serial_number:
continue
cameras[x], camera_configs[x], camera_topics[x] = get_camera(hosts[x], amcrest_port, amcrest_username, amcrest_password)
camera_count = len(cameras)
# Connect to MQTT
mqtt_client = mqtt.Client(
client_id=f"amcrest2mqtt_broker", clean_session=False
)
mqtt_client.on_disconnect = on_mqtt_disconnect
# send "will_set" for each connected camera
for x in range(1..camera_count):
if camera_topics[x]["status"]:
mqtt_client.will_set(camera_topics[x]["status"], payload="offline", qos=config["mqtt_qos"], retain=True)
if mqtt_tls_enabled:
log(f"Setting up MQTT for TLS")
if mqtt_tls_ca_cert is None:
log("Missing var: MQTT_TLS_CA_CERT", level="ERROR")
sys.exit(1)
if mqtt_tls_cert is None:
log("Missing var: MQTT_TLS_CERT", level="ERROR")
sys.exit(1)
if mqtt_tls_cert is None:
log("Missing var: MQTT_TLS_KEY", level="ERROR")
sys.exit(1)
mqtt_client.tls_set(
ca_certs=mqtt_tls_ca_cert,
certfile=mqtt_tls_cert,
keyfile=mqtt_tls_key,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS,
)
else:
mqtt_client.username_pw_set(mqtt_username, password=mqtt_password)
try:
mqtt_client.connect(mqtt_host, port=mqtt_port)
mqtt_client.loop_start()
except ConnectionError as error:
log(f"Could not connect to MQTT server: {error}", level="ERROR")
sys.exit(1)
# Configure Home Assistant
if home_assistant:
log("Writing Home Assistant discovery config...")
for x in range(1..camera_count):
if camera_topics[x]["status"]:
config_home_assistant(camera_configs[x], camera_topics[x])
# Main loop
mqtt_publish(topics["status"], "online")
mqtt_publish(topics["config"], {
"version": version,
"device_type": device_type,
"device_name": device_name,
"sw_version": amcrest_version,
"serial_number": serial_number,
"host": amcrest_host,
}, json=True)
for x in range(1..camera_count):
if camera_topics[x]["status"]:
camera_online(config, camera_configs[x], camera_topics[x])
if storage_poll_interval > 0:
refresh_storage_sensors()
@ -419,19 +458,20 @@ log("Listening for events...")
async def main():
try:
async for code, payload in camera.async_event_actions("All"):
if (is_ad110 and code == "ProfileAlarmTransmit") or (code == "VideoMotion" and not is_ad110):
motion_payload = "on" if payload["action"] == "Start" else "off"
mqtt_publish(topics["motion"], motion_payload)
elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human":
human_payload = "on" if payload["action"] == "Start" else "off"
mqtt_publish(topics["human"], human_payload)
elif code == "_DoTalkAction_":
doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off"
mqtt_publish(topics["doorbell"], doorbell_payload)
mqtt_publish(topics["event"], payload, json=True)
log(str(payload))
for x in range(1..camera_count):
async for code, payload in cameras[x].async_event_actions("All"):
if (camera_config[x].is_ad110 and code == "ProfileAlarmTransmit") or (code == "VideoMotion" and not camera_config[x].is_ad110):
motion_payload = "on" if payload["action"] == "Start" else "off"
mqtt_publish(camera_topics[x]["motion"], motion_payload)
elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human":
human_payload = "on" if payload["action"] == "Start" else "off"
mqtt_publish(camera_topics[x]["human"], human_payload)
elif code == "_DoTalkAction_":
doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off"
mqtt_publish(camera_topics[x]["doorbell"], doorbell_payload)
mqtt_publish(camera_topics[x]["event"], payload, json=True)
log(str(payload))
except AmcrestError as error:
log(f"Amcrest error: {AmcrestError}", level="ERROR")

Loading…
Cancel
Save