mostly rewritten to mimic my govee2mqtt fork

pull/106/head
Jeff Culverhouse 11 months ago
parent 346fa9cc7e
commit f06a9cac6d

@ -1,5 +1,9 @@
FROM python:3.14.0a5-alpine3.21
RUN python -m venv /usr/src/app
# Enable venv
ENV PATH="/usr/src/app/venv/bin:$PATH"
RUN python3 -m ensurepip
# Upgrade pip and setuptools
@ -10,8 +14,6 @@ WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip3 install --no-cache-dir --upgrade -r requirements.txt
RUN pip3 check
COPY . .
ARG USER_ID=1000
@ -19,7 +21,10 @@ ARG GROUP_ID=1000
RUN addgroup -g $GROUP_ID appuser && \
adduser -u $USER_ID -G appuser --disabled-password --gecos "" appuser
RUN chown appuser:appuser /config/*
RUN chmod 0664 /config/*
USER appuser
CMD [ "python", "-u", "./amcrest2mqtt.py", "-c", "/config" ]
ENTRYPOINT [ "python", "-u", "./app.py" ]
CMD [ "-c", "/config" ]

@ -31,16 +31,19 @@ Or, we support the following environment variables and defaults:
- `MQTT_DISCOVERY_PREFIX` (optional, default = 'homeassistant')
- `TZ` (required, timezone identifier, see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List)
- `STORAGE_POLL_INTERVAL` (optional, default = 3600) - how often to fetch storage data (in seconds) (set to 0 to disable functionality)
- `DEVICE_UPDATE_INTERVAL` (optional, default = 3600) - how often to fetch storage stats (in seconds)
It exposes events to the following topics:
It exposes through device discovery a `service` and a `device` with components for each camera:
- `amcrest2mqtt/broker` - broker config
- `amcrest2mqtt/[SERIAL_NUMBER]/event` - all events
- `amcrest2mqtt/[SERIAL_NUMBER]/doorbell` - doorbell status (if AD110 or AD410)
- `amcrest2mqtt/[SERIAL_NUMBER]/human` - human detection (if AD410)
- `amcrest2mqtt/[SERIAL_NUMBER]/motion` - motion events (if supported)
- `amcrest2mqtt/[SERIAL_NUMBER]/config` - device configuration information
- `homeassistant/device/amcrest-service` - service config
- `homeassistant/device/amcrest-[SERIAL_NUMBER]` per camera, with components:
- `event` - all events
- `doorbell` - doorbell status (if AD110 or AD410)
- `human` - human detection (if AD410)
- `motion` - motion events (if supported)
- `config` - device configuration information
- `storage` - storage stats
## Device Support
@ -48,15 +51,15 @@ The app supports events for any Amcrest device supported by [`python-amcrest`](h
## Home Assistant
The app has built-in support for Home Assistant discovery. Set the `HOME_ASSISTANT` environment variable to `true` to enable support.
If you are using a different MQTT prefix to the default, you will need to set the `HOME_ASSISTANT_PREFIX` environment variable.
The app has built-in support for Home Assistant discovery. Set the `MQTT_HOMEASSISTANT` environment variable to `true` to enable support.
If you are using a different MQTT prefix to the default, you will need to set the `MQTT_DISCOVERY_PREFIX` environment variable.
## Running the app
To run via env variables with Docker Compose, see docker-compose.yaml
or make sure you attach a volume with the config file and point to that directory, for example:
```
CMD [ "python", "-u", "./amcrest2mqtt.py", "-c", "/config" ]
CMD [ "python", "-u", "./app.py", "-c", "/config" ]
```
## Out of Scope

@ -1 +1 @@
0.99.10
0.99.11

@ -1,595 +0,0 @@
from amcrest import AmcrestCamera, AmcrestError
import argparse
import asyncio
from datetime import datetime, timezone
from json import dumps
import paho.mqtt.client as mqtt
import os
import signal
from slugify import slugify
import ssl
import sys
from threading import Timer
import time
import yaml
from zoneinfo import ZoneInfo
is_exiting = False
mqtt_client = None
config = { 'timezone': 'America/New_York' }
devices = {}
# Helper functions and callbacks
def log(msg, level='INFO'):
ts = datetime.now(tz=ZoneInfo(config['timezone'])).strftime('%Y-%m-%d %H:%M:%S')
if len(msg) > 20480:
raise ValueError('Log message exceeds max length')
if level != "DEBUG" or os.getenv('DEBUG'):
print(f'{ts} [{level}] {msg}')
def read_file(file_name):
with open(file_name, 'r') as file:
data = file.read().replace('\n', '')
return data
def read_version():
if os.path.isfile("./VERSION"):
return read_file("./VERSION")
return read_file("../VERSION")
def to_gb(total):
return str(round(float(total[0]) / 1024 / 1024 / 1024, 2))
def signal_handler(sig, frame):
# exit immediately upon receiving a second SIGINT
global is_exiting
if is_exiting:
os._exit(1)
is_exiting = True
exit_gracefully(0)
def exit_gracefully(rc, skip_mqtt=False):
log("Exiting app...")
if mqtt_client is not None and mqtt_client.is_connected() and skip_mqtt == False:
# set cameras offline
for host in config['amcrest']['hosts']:
mqtt_publish(devices[host]["topics"]["status"], "offline", exit_on_error=False)
# set broker offline
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/availability', "offline")
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/status', "offline")
mqtt_client.disconnect()
# Use os._exit instead of sys.exit to ensure an MQTT disconnect event causes the program to exit correctly as they
# occur on a separate thread
os._exit(rc)
# MQTT setup
def mqtt_connect():
global mqtt_client
if config['mqtt']['username'] is None:
log("Missing env vari: MQTT_USERNAME or mqtt.username in config", level="ERROR")
sys.exit(1)
mqtt_client = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION1,
client_id=f'{config["mqtt"]["prefix"]}_broker',
clean_session=False
)
mqtt_client.on_connect = on_mqtt_connect
mqtt_client.on_disconnect = on_mqtt_disconnect
# send "will_set" for the broker and each connected camera
mqtt_client.will_set(f'{config["mqtt"]["prefix"]}/{via_device}', payload="offline", qos=config['mqtt']['qos'], retain=True)
for host in config['amcrest']['hosts']:
mqtt_client.will_set(devices[host]["topics"]["status"], payload="offline", qos=config['mqtt']['qos'], retain=True)
if config['mqtt']['tls_enabled']:
log(f"Setting up MQTT for TLS")
if config['mqtt']['tls_ca_cert'] is None:
log("Missing env var: MQTT_TLS_CA_CERT or mqtt.tls_ca_cert in config", level="ERROR")
sys.exit(1)
if config['mqtt']['tls_cert'] is None:
log("Missing env var: MQTT_TLS_CERT or mqtt.tls_cert in config", level="ERROR")
sys.exit(1)
if config['mqtt']['tls_cert'] is None:
log("Missing env var: MQTT_TLS_KEY or mqtt.tls_key in config", level="ERROR")
sys.exit(1)
mqtt_client.tls_set(
ca_certs=config['mqtt']['tls_ca_cert'],
certfile=config['mqtt']['tls_cert'],
keyfile=config['mqtt']['tls_key'],
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS,
)
else:
mqtt_client.username_pw_set(config['mqtt']['username'], password=config['mqtt']['password'])
try:
mqtt_client.connect(
config['mqtt']['host'],
port=config['mqtt']['port'],
keepalive=60
)
mqtt_client.loop_start()
except ConnectionError as error:
log(f"Could not connect to MQTT server: {error}", level="ERROR")
sys.exit(1)
def on_mqtt_connect(mqtt_client, userdata, flags, rc):
if rc != 0:
log(f"MQTT Connection Issue: {rc}", level="ERROR")
exit_gracefully(rc, skip_mqtt=True)
log(f"MQTT Connected", level="INFO")
def on_mqtt_disconnect(mqtt_client, userdata, rc):
if rc != 0:
log(f"MQTT connection failed: {rc}", level="ERROR")
else:
log(f"MQTT connection closed successfully", level="INFO")
exit_gracefully(rc, skip_mqtt=True)
def mqtt_publish(topic, payload, exit_on_error=True, json=False):
msg = mqtt_client.publish(
topic, payload=(dumps(payload) if json else payload), qos=config['mqtt']['qos'], retain=True
)
if msg.rc == mqtt.MQTT_ERR_SUCCESS:
msg.wait_for_publish(2)
return
log(f"Error publishing MQTT message: {mqtt.error_string(msg.rc)}", level="ERROR")
if exit_on_error:
exit_gracefully(msg.rc, skip_mqtt=True)
# Amcrest Devices
def get_device(amcrest_host, amcrest_port, amcrest_username, amcrest_password, device_name):
log(f"Connecting to device and getting details for {amcrest_host}...")
camera = AmcrestCamera(
amcrest_host, amcrest_port, amcrest_username, amcrest_password
).camera
try:
device_type = camera.device_type.replace("type=", "").strip()
is_ad110 = device_type == "AD110"
is_ad410 = device_type == "AD410"
is_doorbell = is_ad110 or is_ad410
serial_number = camera.serial_number
if not isinstance(serial_number, str):
log(f"Error fetching serial number for {amcrest_host}", level="ERROR")
exit_gracefully(1)
sw_version = camera.software_information[0].replace("version=", "").strip()
build_version = camera.software_information[1].strip()
amcrest_version = f"{sw_version} ({build_version})"
device_slug = slugify(device_name, separator="_")
vendor = camera.vendor_information
hardware_version = camera.hardware_version
except AmcrestError as error:
log(f"Error fetching camera details for {amcrest_host}", level="ERROR")
exit_gracefully(1)
log(f" Vendor: {camera.vendor_information}")
log(f" Device name: {device_name}")
log(f" Device type: {device_type}")
log(f" Serial number: {serial_number}")
log(f" Software version: {amcrest_version}")
log(f" Hardware version: {camera.hardware_version}")
home_assistant_prefix = config['mqtt']['home_assistant_prefix']
return {
"camera": camera,
"config": {
"amcrest_host": amcrest_host,
"device_name": device_name,
"device_type": device_type,
"device_slug": device_slug,
"device_class": camera.device_class,
"is_ad110": is_ad110,
"is_ad410": is_ad410,
"is_doorbell": is_doorbell,
"serial_number": serial_number,
"amcrest_version": amcrest_version,
"hardware_version": hardware_version,
"vendor": vendor,
},
"telemetry": {
},
"topics": {
"config": f'{config["mqtt"]["prefix"]}/{serial_number}/config',
"status": f'{config["mqtt"]["prefix"]}/{serial_number}/status',
"telemetry": f'{config["mqtt"]["prefix"]}/{serial_number}/telemetry',
"event": f'{config["mqtt"]["prefix"]}/{serial_number}/event',
"motion": f'{config["mqtt"]["prefix"]}/{serial_number}/motion',
"doorbell": f'{config["mqtt"]["prefix"]}/{serial_number}/doorbell',
"human": f'{config["mqtt"]["prefix"]}/{serial_number}/human',
"storage_used": f'{config["mqtt"]["prefix"]}/{serial_number}/storage/used',
"storage_used_percent": f'{config["mqtt"]["prefix"]}/{serial_number}/storage/used_percent',
"storage_total": f'{config["mqtt"]["prefix"]}/{serial_number}/storage/total',
"home_assistant": {
"doorbell": f"{home_assistant_prefix}/binary_sensor/amcrest2mqtt-{serial_number}/doorbell/config",
"human": f"{home_assistant_prefix}/binary_sensor/amcrest2mqtt-{serial_number}/human/config",
"motion": f"{home_assistant_prefix}/binary_sensor/amcrest2mqtt-{serial_number}/motion/config",
"storage_used": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/storage_used/config",
"storage_used_percent": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/storage_used_percent/config",
"storage_total": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/storage_total/config",
"version": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/version/config",
"host": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/host/config",
"serial_number": f"{home_assistant_prefix}/sensor/amcrest2mqtt-{serial_number}/serial_number/config",
},
},
}
# MQTT messages
def send_broker_discovery():
mqtt_publish(f'{config["mqtt"]["home_assistant_prefix"]}/sensor/{via_device}/broker/config', {
"availability_topic": f'{config["mqtt"]["prefix"]}/{via_device}/availability',
"state_topic": f'{config["mqtt"]["prefix"]}/{via_device}/status',
"qos": config['mqtt']['qos'],
"device": {
"name": 'amcrest2mqtt broker',
"identifiers": via_device,
},
"icon": 'mdi:language-python',
"unique_id": via_device,
"name": "amcrest2mqtt broker",
},
json=True,
)
def send_device_discovery(device):
vendor = device["config"]["vendor"]
device_name = device["config"]["device_name"]
device_type = device["config"]["device_type"]
device_slug = device["config"]["device_slug"]
serial_number = device["config"]["serial_number"]
amcrest_version = device["config"]["amcrest_version"]
hw_version = device["config"]["hardware_version"]
base_config = {
"availability_topic": device["topics"]["status"],
"qos": config['mqtt']['qos'],
"device": {
"name": device_name,
"manufacturer": vendor,
"model": device_type,
"identifiers": serial_number,
"sw_version": amcrest_version,
"hw_version": hw_version,
"via_device": via_device,
},
}
if device["config"]["is_doorbell"]:
doorbell_name = "Doorbell" if device_name == "Doorbell" else f"{device_name} Doorbell"
mqtt_publish(
device["topics"]["home_assistant"]["doorbell"],
base_config
| {
"state_topic": device["topics"]["doorbell"],
"payload_on": "on",
"payload_off": "off",
"icon": "mdi:doorbell",
"name": doorbell_name,
"unique_id": f"{serial_number}.doorbell",
},
json=True,
)
if device["config"]["is_ad410"]:
mqtt_publish(
device["topics"]["home_assistant"]["human"],
base_config
| {
"state_topic": device["topics"]["human"],
"payload_on": "on",
"payload_off": "off",
"device_class": "motion",
"name": "Human",
"unique_id": f"{serial_number}.human",
},
json=True,
)
mqtt_publish(
device["topics"]["home_assistant"]["motion"],
base_config
| {
"state_topic": device["topics"]["motion"],
"payload_on": "on",
"payload_off": "off",
"device_class": "motion",
"name": "Motion",
"unique_id": f"{serial_number}.motion",
},
json=True,
)
mqtt_publish(
device["topics"]["home_assistant"]["version"],
base_config
| {
"state_topic": device["topics"]["config"],
"value_template": "{{ value_json.sw_version }}",
"icon": "mdi:package-up",
"name": "Version",
"unique_id": f"{serial_number}.version",
"entity_category": "diagnostic",
"enabled_by_default": False
},
json=True,
)
mqtt_publish(
device["topics"]["home_assistant"]["serial_number"],
base_config
| {
"state_topic": device["topics"]["config"],
"value_template": "{{ value_json.serial_number }}",
"icon": "mdi:alphabetical-variant",
"name": "Serial Number",
"unique_id": f"{serial_number}.serial_number",
"entity_category": "diagnostic",
"enabled_by_default": False
},
json=True,
)
mqtt_publish(
device["topics"]["home_assistant"]["host"],
base_config
| {
"state_topic": device["topics"]["config"],
"value_template": "{{ value_json.host }}",
"icon": "mdi:ip-network",
"name": "Host",
"unique_id": f"{serial_number}.host",
"entity_category": "diagnostic",
"enabled_by_default": False
},
json=True,
)
if config['amcrest']['storage_poll_interval'] > 0:
mqtt_publish(
device["topics"]["home_assistant"]["storage_used_percent"],
base_config
| {
"state_topic": device["topics"]["storage_used_percent"],
"unit_of_measurement": "%",
"icon": "mdi:micro-sd",
"name": f"Storage Used %",
"object_id": f"{device_slug}_storage_used_percent",
"unique_id": f"{serial_number}.storage_used_percent",
"entity_category": "diagnostic",
},
json=True,
)
mqtt_publish(
device["topics"]["home_assistant"]["storage_used"],
base_config
| {
"state_topic": device["topics"]["storage_used"],
"unit_of_measurement": "GB",
"icon": "mdi:micro-sd",
"name": "Storage Used",
"unique_id": f"{serial_number}.storage_used",
"entity_category": "diagnostic",
},
json=True,
)
mqtt_publish(
device["topics"]["home_assistant"]["storage_total"],
base_config
| {
"state_topic": device["topics"]["storage_total"],
"unit_of_measurement": "GB",
"icon": "mdi:micro-sd",
"name": "Storage Total",
"unique_id": f"{serial_number}.storage_total",
"entity_category": "diagnostic",
},
json=True,
)
def refresh_broker():
Timer(60, refresh_broker).start()
log('Refreshing amcrest2mqtt broker, every 60 sec')
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/availability', 'online')
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/status', 'online')
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/config', {
'device_name': 'amcrest2mqtt broker',
'sw_version': version,
'origin': {
'name': 'amcrest2mqtt broker',
'sw_version': version,
'url': 'https://github.com/weirdtangent/amcrest2mqtt',
},
}, json=True)
def refresh_devices():
for host in config['amcrest']['hosts']:
refresh_device(devices[host])
def refresh_device(device):
mqtt_publish(device['topics']['status'], 'online')
mqtt_publish(device['topics']['config'], {
'device_type': device['config']['device_type'],
'device_name': device['config']['device_name'],
'sw_version': device['config']['amcrest_version'],
'hw_version': device['config']['hardware_version'],
'serial_number': device['config']['serial_number'],
'host': device['config']['amcrest_host'],
'configuration_url': 'http://' + device['config']['amcrest_host'] + '/',
'origin': {
'name': 'amcrest2mqtt broker',
'sw_version': version,
'url': 'https://github.com/weirdtangent/amcrest2mqtt',
},
}, json=True)
mqtt_publish(device['topics']['telemetry'],
dumps(device['telemetry']),
json=True)
def refresh_storage_sensors():
Timer(config['amcrest']['storage_poll_interval'], refresh_storage_sensors).start()
log(f'Fetching storage sensors for {config["amcrest"]["host_count"]} host(s) (every {config["amcrest"]["storage_poll_interval"]} secs)')
for host in config['amcrest']['hosts']:
device = devices[host]
topics = device["topics"]
try:
storage = device["camera"].storage_all
mqtt_publish(topics["storage_used_percent"], str(storage["used_percent"]))
mqtt_publish(topics["storage_used"], to_gb(storage["used"]))
mqtt_publish(topics["storage_total"], to_gb(storage["total"]))
except AmcrestError as error:
log(f"Error fetching storage information for {host}: {error}", level="WARNING")
# cmd-line args
argparser = argparse.ArgumentParser()
argparser.add_argument(
"-c",
"--config",
required=False,
help="Directory holding config.yaml or full path to config file",
)
args = argparser.parse_args()
# load config file
configpath = args.config
if configpath:
if not configpath.endswith(".yaml"):
if not configpath.endswith("/"):
configpath += "/"
configpath += "config.yaml"
log(f"Trying to load config file {configpath}")
with open(configpath) as file:
config = yaml.safe_load(file)
# or check env vars
else:
log(f"INFO:root:No config file specified, checking ENV")
config = {
'mqtt': {
'host': os.getenv("MQTT_HOST") or 'localhost',
'port': int(os.getenv("MQTT_PORT") or 1883),
'username': os.getenv("MQTT_USERNAME"),
'password': os.getenv("MQTT_PASSWORD"), # can be None
'qos': int(os.getenv("MQTT_QOS") or 0),
'prefix': os.getenv("MQTT_PREFIX") or 'amcrest2mqtt',
'home_assistant_prefix': os.getenv("MQTT_HOME_ASSISTANT_PREFIX") or "homeassistant",
'tls_enabled': os.getenv("MQTT_TLS_ENABLED") == "true",
'tls_ca_cert': os.getenv("MQTT_TLS_CA_CERT"),
'tls_cert': os.getenv("MQTT_TLS_CERT"),
'tls_key': os.getenv("MQTT_TLS_KEY"),
},
'amcrest': {
'hosts': os.getenv("AMCREST_HOSTS"),
'names': os.getenv("AMCREST_NAMES"),
'port': int(os.getenv("AMCREST_PORT") or 80),
'username': os.getenv("AMCREST_USERNAME") or "admin",
'password': os.getenv("AMCREST_PASSWORD"),
'storage_poll_interval': int(os.getenv("STORAGE_POLL_INTERVAL") or 3600),
},
'home_assistant': os.getenv("HOME_ASSISTANT") == "true",
'debug': os.getenv("AMCREST_DEBUG") == "true",
'timezone': os.getenv("TZ") or 'utc',
}
# Exit if any of the required vars are not provided
if config['amcrest']['hosts'] is None:
log("Missing env var: AMCREST_HOSTS or amcrest.hosts in config", level="ERROR")
sys.exit(1)
config['amcrest']['host_count'] = len(config['amcrest']['hosts'])
if config['amcrest']['names'] is None:
log("Missing env var: AMCREST_NAMES or amcrest.names in config", level="ERROR")
sys.exit(1)
config['amcrest']['name_count'] = len(config['amcrest']['names'])
if config['amcrest']['host_count'] != config['amcrest']['name_count']:
log("The AMCREST_HOSTS and AMCREST_NAMES must have the same number of space-delimited hosts/names", level="ERROR")
sys.exit(1)
log(f"Found {config['amcrest']['host_count']} host(s) defined to monitor")
if config['amcrest']['password'] is None:
log("Please set the AMCREST_PASSWORD environment variable", level="ERROR")
sys.exit(1)
version = read_version()
via_device = config["mqtt"]["prefix"] + '-broker'
log(f"Starting: amcrest2mqtt v{version}")
# handle interruptions
signal.signal(signal.SIGINT, signal_handler)
# connect to each camera
amcrest_names = config['amcrest']['names']
for host in config['amcrest']['hosts']:
name = amcrest_names.pop(0)
log(f"Connecting host: {host} as {name}", level="INFO")
devices[host] = get_device(host, config['amcrest']['port'], config['amcrest']['username'], config['amcrest']['password'], name)
log(f"Connecting to hosts done.", level="INFO")
# connect to MQTT service
mqtt_connect()
# configure broker and devices in Home Assistant
if config['home_assistant']:
send_broker_discovery()
for host in config['amcrest']['hosts']:
send_device_discovery(devices[host])
refresh_broker()
refresh_devices()
# kick off storage refresh timer
if config['amcrest']['storage_poll_interval'] > 0:
refresh_storage_sensors()
log(f"Listening for events on {config['amcrest']['host_count']} host(s)", level="DEBUG")
async def main():
try:
for host in config['amcrest']['hosts']:
device = devices[host]
device_config = device["config"]
device_topics = device["topics"]
async for code, payload in device["camera"].async_event_actions("All"):
log(f"Event on {host} - {code}: {payload['action']}")
if ((code == "ProfileAlarmTransmit" and device_config["is_ad110"])
or (code == "VideoMotion" and not device_config["is_ad110"])):
motion_payload = "on" if payload["action"] == "Start" else "off"
mqtt_publish(device_topics["motion"], motion_payload)
device[host]['telemetry']['last_motion_event'] = str(datetime.now(tz=ZoneInfo(config['timezone'])))
elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human":
human_payload = "on" if payload["action"] == "Start" else "off"
mqtt_publish(device_topics["human"], human_payload)
device[host]['telemetry']['last_human_event'] = str(datetime.now(tz=ZoneInfo(config['timezone'])))
elif code == "_DoTalkAction_":
doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off"
mqtt_publish(device_topics["doorbell"], doorbell_payload)
device[host]['telemetry']['last_doorbell_event'] = str(datetime.now(tz=ZoneInfo(config['timezone'])))
mqtt_publish(device_topics["event"], payload, json=True)
refresh_device(device)
except AmcrestError as error:
log(f"Amcrest error while working on {host}: {AmcrestError}. Sleeping for 10 seconds.", level="ERROR")
time.sleep(10)
asyncio.run(main())

@ -0,0 +1,124 @@
from amcrest import AmcrestCamera, AmcrestError
import asyncio
from datetime import date
import time
from util import *
from zoneinfo import ZoneInfo
class AmcrestAPI(object):
def __init__(self, config):
self.last_call_date = ''
self.timezone = config['timezone']
self.hide_ts = config['hide_ts'] or False
self.amcrest_config = config['amcrest']
self.count = len(self.amcrest_config['hosts'])
self.devices = {}
def log(self, msg, level='INFO'):
app_log(msg, level=level, tz=self.timezone, hide_ts=self.hide_ts)
async def connect_to_devices(self):
self.log(f'Connecting to: {self.amcrest_config["hosts"]}')
tasks = []
device_names = self.amcrest_config['names']
for host in self.amcrest_config['hosts']:
task = asyncio.create_task(self.get_device(host, device_names.pop(0)))
tasks.append(task)
await asyncio.gather(*tasks)
self.log(f"Connecting to hosts done.", level="INFO")
return {d: self.devices[d]['config'] for d in self.devices.keys()}
def get_camera(self, host):
return AmcrestCamera(
host, self.amcrest_config['port'], self.amcrest_config['username'], self.amcrest_config['password']
).camera
async def get_device(self, host, device_name):
camera = self.get_camera(host)
try:
device_type = camera.device_type.replace("type=", "").strip()
is_ad110 = device_type == "AD110"
is_ad410 = device_type == "AD410"
is_doorbell = is_ad110 or is_ad410
serial_number = camera.serial_number
if not isinstance(serial_number, str):
raise Exception(f'Error fetching serial number for {host}: {error}')
sw_version = camera.software_information[0].replace("version=", "").strip()
build_version = camera.software_information[1].strip()
amcrest_version = f"{sw_version} ({build_version})"
vendor = camera.vendor_information
hardware_version = camera.hardware_version
except AmcrestError as error:
raise Exception(f'Error fetching camera details for {host}: {error}')
self.devices[serial_number] = {
"camera": camera,
"config": {
"host": host,
"device_name": device_name,
"device_type": device_type,
"device_class": camera.device_class,
"is_ad110": is_ad110,
"is_ad410": is_ad410,
"is_doorbell": is_doorbell,
"serial_number": serial_number,
"software_version": amcrest_version,
"hardware_version": hardware_version,
"vendor": vendor,
},
"storage": {},
}
def get_device_storage_stats(self, device_id):
if 'error' in self.devices[device_id]:
try:
self.devices[device_id]['camera'] = self.get_camera(self.devices[device_id]['config']['host'])
del self.devices[device_id]['error']
except Exception as err:
err_msg = f'Problem re-connecting to camera: {err}'
self.log(err_msg, level='ERROR')
self.devices[device_id]["error"] = err_msg
raise Exception(err_msg)
try:
storage = self.devices[device_id]["camera"].storage_all
except Exception as err:
err_msg = f'Problem connecting with camera to get storage stats: {err}'
self.log(err_msg, level='ERROR')
self.devices[device_id]["error"] = err_msg
raise Exception(err_msg)
return {
'last_update': str(datetime.now(ZoneInfo(self.timezone))),
'used_percent': str(storage['used_percent']),
'used': to_gb(storage['used']),
'total': to_gb(storage['total']),
}
async def get_device_event_actions(self, device_id):
events = []
device = self.devices[device_id]
config = device['config']
async for code, payload in device["camera"].async_event_actions("All"):
self.log(f"Event on {config['host']} - {code}: {payload['action']}")
if ((code == "ProfileAlarmTransmit" and config["is_ad110"])
or (code == "VideoMotion" and not config["is_ad110"])):
motion_payload = "on" if payload["action"] == "Start" else "off"
events.append({ 'event': 'motion', 'payload': motion_payload })
elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human":
human_payload = "on" if payload["action"] == "Start" else "off"
events.append({ 'event': 'human', 'payload': human_payload })
elif code == "_DoTalkAction_":
doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off"
events.append({ 'event': 'doorbell', 'payload': doorbell_payload })
events.append({ 'event': 'event', 'payload': payload })
return events

@ -0,0 +1,631 @@
import asyncio
from datetime import date
import amcrest_api
import json
import paho.mqtt.client as mqtt
import random
import signal
import ssl
import string
import time
from util import *
from zoneinfo import ZoneInfo
class AmcrestMqtt(object):
def __init__(self, config):
self.running = False
self.timezone = config['timezone']
self.mqttc = None
self.mqtt_connect_time = None
self.config = config
self.mqtt_config = config['mqtt']
self.amcrest_config = config['amcrest']
self.client_id = self.get_new_client_id()
self.version = config['version']
self.hide_ts = config['hide_ts'] or False
self.device_update_interval = config['amcrest'].get('device_update_interval', 600)
self.service_name = self.mqtt_config['prefix'] + ' service'
self.service_slug = self.mqtt_config['prefix'] + '-service'
self.devices = {}
self.configs = {}
def log(self, msg, level='INFO'):
app_log(msg, level=level, tz=self.timezone, hide_ts=self.hide_ts)
async def _handle_sigterm(self, loop, tasks):
self.running = False
self.log('SIGTERM received, waiting for tasks to cancel...', level='WARN')
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
def __enter__(self):
self.mqttc_create()
self.amcrestc = amcrest_api.AmcrestAPI(self.config)
self.running = True
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.running = False
self.log('Exiting gracefully')
if self.mqttc is not None and self.mqttc.is_connected():
for device_id in self.devices:
self.devices[device_id]['availability'] = 'offline'
if 'state' not in self.devices[device_id]:
self.devices[device_id]['state'] = {}
self.publish_device(device_id)
self.mqttc.disconnect()
else:
self.log('Lost connection to MQTT')
# MQTT Functions
def mqtt_on_connect(self, client, userdata, flags, rc, properties):
if rc != 0:
self.log(f'MQTT CONNECTION ISSUE ({rc})', level='ERROR')
exit()
self.log(f'MQTT connected as {self.client_id}')
client.subscribe(self.get_device_sub_topic())
client.subscribe(self.get_attribute_sub_topic())
def mqtt_on_disconnect(self, client, userdata, flags, rc, properties):
self.log('MQTT connection closed')
# if reconnect, lets use a new client_id
self.client_id = self.get_new_client_id()
if time.time() > self.mqtt_connect_time + 10:
self.mqttc_create()
else:
exit()
def mqtt_on_log(self, client, userdata, paho_log_level, msg):
level = None
if paho_log_level == mqtt.LogLevel.MQTT_LOG_ERR:
level = 'ERROR'
if paho_log_level == mqtt.LogLevel.MQTT_LOG_WARNING:
level = 'WARN'
if level:
self.log(f'MQTT LOG: {msg}', level=level)
def mqtt_on_message(self, client, userdata, msg):
if not msg or not msg.payload:
return
topic = msg.topic
payload = json.loads(msg.payload)
self.log(f'Got MQTT message for {topic} - {payload}')
# we might get:
# device/component/set
# device/component/set/attribute
# homeassistant/device/component/set
# homeassistant/device/component/set/attribute
components = topic.split('/')
# handle this message if it's for us, otherwise pass along to amcrest API
if components[-2] == self.get_component_slug('service'):
self.handle_service_message(None, payload)
elif components[-3] == self.get_component_slug('service'):
self.handle_service_message(components[-1], payload)
else:
if components[-1] == 'set':
mac = components[-2][-16:]
elif components[-2] == 'set':
mac = components[-3][-16:]
else:
self.log(f'UNKNOWN MQTT MESSAGE STRUCTURE: {topic}', level='ERROR')
return
# ok, lets format the device_id and send to amcrest
device_id = ':'.join([mac[i:i+2] for i in range (0, len(mac), 2)])
self.send_command(device_id, payload)
def mqtt_on_subscribe(self, client, userdata, mid, reason_code_list, properties):
rc_list = map(lambda x: x.getName(), reason_code_list)
self.log(f'MQTT SUBSCRIBED: reason_codes - {'; '.join(rc_list)}', level='DEBUG')
# MQTT Helpers
def mqttc_create(self):
self.mqttc = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id=self.client_id,
clean_session=False,
)
if self.mqtt_config.get('tls_enabled'):
self.mqttcnt.tls_set(
ca_certs=self.mqtt_config.get('tls_ca_cert'),
certfile=self.mqtt_config.get('tls_cert'),
keyfile=self.mqtt_config.get('tls_key'),
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS,
)
else:
self.mqttc.username_pw_set(
username=self.mqtt_config.get('username'),
password=self.mqtt_config.get('password'),
)
self.mqttc.on_connect = self.mqtt_on_connect
self.mqttc.on_disconnect = self.mqtt_on_disconnect
self.mqttc.on_message = self.mqtt_on_message
self.mqttc.on_subscribe = self.mqtt_on_subscribe
self.mqttc.on_log = self.mqtt_on_log
# self.mqttc.will_set(self.get_state_topic(self.service_slug) + '/availability', payload="offline", qos=0, retain=True)
try:
self.mqttc.connect(
self.mqtt_config.get('host'),
port=self.mqtt_config.get('port'),
keepalive=60,
)
self.mqtt_connect_time = time.time()
self.mqttc.loop_start()
except ConnectionError as error:
self.log(f'COULD NOT CONNECT TO MQTT {self.mqtt_config.get("host")}: {error}', level='ERROR')
exit(1)
# MQTT Topics
def get_new_client_id(self):
return self.mqtt_config['prefix'] + '-' + ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
def get_slug(self, device_id, type):
return f"amcrest_{device_id.replace(':','')}_{type}"
def get_device_sub_topic(self):
if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False:
return f"{self.mqtt_config['prefix']}/+/set"
return f"{self.mqtt_config['discovery_prefix']}/device/+/set"
def get_attribute_sub_topic(self):
if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False:
return f"{self.mqtt_config['prefix']}/+/set"
return f"{self.mqtt_config['discovery_prefix']}/device/+/set/+"
def get_component_slug(self, device_id):
return f"amcrest-{device_id.replace(':','')}"
def get_command_topic(self, device_id, attribute_name):
if attribute_name:
if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False:
return f"{self.mqtt_config['prefix']}/{self.get_component_slug(device_id)}/set/{attribute_name}"
return f"{self.mqtt_config['discovery_prefix']}/device/{self.get_component_slug(device_id)}/set/{attribute_name}"
else:
if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False:
return f"{self.mqtt_config['prefix']}/{self.get_component_slug(device_id)}/set"
return f"{self.mqtt_config['discovery_prefix']}/device/{self.get_component_slug(device_id)}/set"
def get_discovery_topic(self, device_id, topic):
if 'homeassistant' not in self.mqtt_config or self.mqtt_config['homeassistant'] == False:
return f"{self.mqtt_config['prefix']}/{self.get_component_slug(device_id)}/{topic}"
return f"{self.mqtt_config['discovery_prefix']}/device/{self.get_component_slug(device_id)}/{topic}"
# Service Device
def publish_service_device(self):
state_topic = self.get_discovery_topic('service', 'state')
command_topic = self.get_discovery_topic('service', 'set')
availability_topic = self.get_discovery_topic('service', 'availability')
self.mqttc.publish(
self.get_discovery_topic('service','config'),
json.dumps({
'qos': 0,
'state_topic': state_topic,
'availability_topic': availability_topic,
'device': {
'name': self.service_name,
'ids': self.service_slug,
'suggested_area': 'House',
'manufacturer': 'weirdTangent',
'model': self.version,
},
'origin': {
'name': self.service_name,
'sw_version': self.version,
'support_url': 'https://github.com/weirdtangent/amcrest2mqtt',
},
'components': {
self.service_slug + '_status': {
'name': 'Service',
'platform': 'binary_sensor',
'schema': 'json',
'payload_on': 'online',
'payload_off': 'offline',
'icon': 'mdi:language-python',
'state_topic': state_topic,
'availability_topic': availability_topic,
'value_template': '{{ value_json.status }}',
'unique_id': 'amcrest_service_status',
},
self.service_slug + '_device_refresh': {
'name': 'Device Refresh Interval',
'platform': 'number',
'schema': 'json',
'icon': 'mdi:numeric',
'min': 10,
'max': 3600,
'state_topic': state_topic,
'command_topic': self.get_command_topic('service', 'device_refresh'),
'availability_topic': availability_topic,
'value_template': '{{ value_json.device_refresh }}',
'unique_id': 'amcrest_service_device_refresh',
},
},
}),
retain=True
)
self.update_service_device()
def update_service_device(self):
self.mqttc.publish(self.get_discovery_topic('service','availability'), 'online', retain=True)
self.mqttc.publish(
self.get_discovery_topic('service','state'),
json.dumps({
'status': 'online',
'device_refresh': self.device_update_interval,
}),
retain=True
)
# amcrest Helpers
async def setup_devices(self):
self.log(f'Setup devices')
try:
devices = await self.amcrestc.connect_to_devices()
except Exception as err:
self.log(f'Failed to connect to 1 or more devices {err}', level='ERROR')
exit(1)
self.publish_service_device()
for device_id in devices:
config = devices[device_id]
if 'device_type' in config:
first = False
if device_id not in self.devices:
first = True
self.devices[device_id] = {}
self.configs[device_id] = config
self.devices[device_id]['qos'] = 0
self.devices[device_id]['state_topic'] = self.get_discovery_topic(device_id, 'state')
self.devices[device_id]['availability_topic'] = self.get_discovery_topic(device_id, 'availability')
self.devices[device_id]['command_topic'] = self.get_discovery_topic(device_id, 'set')
# self.mqttc.will_set(self.get_state_topic(device_id)+'/availability', payload="offline", qos=0, retain=True)
self.devices[device_id]['device'] = {
'name': config['device_name'],
'manufacturer': config['vendor'],
'model': config['device_type'],
'ids': device_id,
'sw_version': config['software_version'],
'hw_version': config['hardware_version'],
'via_device': self.service_slug,
}
self.devices[device_id]['origin'] = {
'name': self.service_name,
'sw_version': self.version,
'support_url': 'https://github.com/weirdtangent/amcrest2mqtt',
}
self.add_components_to_device(device_id)
if first:
self.log(f'Adding device: "{config['device_name']}" [Amcrest {config["device_type"]}] ({device_id})')
self.send_device_discovery(device_id)
else:
self.log(f'Updated device: {self.devices[device_id]['device']['name']}', level='DEBUG')
# device discovery sent, now it is save to add these to the dict
self.devices[device_id]['state'] = {}
self.devices[device_id]['availability'] = 'online'
self.devices[device_id]['storage'] = {}
self.devices[device_id]['motion'] = {}
self.devices[device_id]['event'] = {}
else:
if first_time_through:
self.log(f'Saw device, but not supported yet: "{config["device_name"]}" [amcrest {config["device_type"]}] ({device_id})')
# add amcrest components to devices
def add_components_to_device(self, device_id):
device = self.devices[device_id]
config = self.configs[device_id]
components = {}
if config['is_doorbell']:
doorbell_name = 'Doorbell' if config['device_name'] == 'Doorbell' else f'{config["device_name"]} Doorbell'
components[self.get_slug(device_id, 'doorbell')] = {
'name': doorbell_name,
'platform': 'binary_sensor',
'payload_on': 'on',
'payload_off': 'off',
'device_class': '',
'icon': 'mdi:doorbell',
'state_topic': self.get_discovery_topic(device_id, 'doorbell'),
'availability_topic': device['availability_topic'],
'value_template': '{{ value_json.doorbell }}',
'unique_id': self.get_slug(device_id, 'doorbell'),
}
if config['is_ad410']:
components[self.get_slug(device_id, 'human')] = {
'name': 'Human',
'platform': 'binary_sensor',
'payload_on': 'on',
'payload_off': 'off',
'device_class': 'motion',
'state_topic': self.get_discovery_topic(device_id, 'human'),
'availability_topic': device['availability_topic'],
'value_template': '{{ value_json.human }}',
'unique_id': self.get_slug(device_id, 'human'),
}
components[self.get_slug(device_id, 'motion')] = {
'name': 'Motion',
'platform': 'binary_sensor',
'payload_on': 'on',
'payload_off': 'off',
'device_class': 'motion',
'state_topic': self.get_discovery_topic(device_id, 'motion'),
'availability_topic': device['availability_topic'],
'unique_id': self.get_slug(device_id, 'motion'),
}
components[self.get_slug(device_id, 'version')] = {
'name': 'Version',
'platform': 'sensor',
'icon': 'mdi:package-up',
'state_topic': device['state_topic'],
'availability_topic': device['availability_topic'],
'value_template': '{{ value_json.sw_version }}',
'entity_category': 'diagnostic',
'unique_id': self.get_slug(device_id, 'sw_version'),
}
components[self.get_slug(device_id, 'serial_number')] = {
'name': 'Serial Number',
'platform': 'sensor',
'icon': 'mdi:alphabetical-variant-up',
'state_topic': device['state_topic'],
'availability_topic': device['availability_topic'],
'value_template': '{{ value_json.serial_number }}',
'entity_category': 'diagnostic',
'unique_id': self.get_slug(device_id, 'serial_number'),
}
components[self.get_slug(device_id, 'host')] = {
'name': 'Host',
'platform': 'sensor',
'icon': 'mdi:ip-network',
'state_topic': device['state_topic'],
'availability_topic': device['availability_topic'],
'value_template': '{{ value_json.host }}',
'entity_category': 'diagnostic',
'unique_id': self.get_slug(device_id, 'host'),
}
components[self.get_slug(device_id, 'event')] = {
'name': 'Event',
'platform': 'sensor',
'state_topic': self.get_discovery_topic(device_id, 'event'),
'availability_topic': device['availability_topic'],
'unique_id': self.get_slug(device_id, 'event'),
}
components[self.get_slug(device_id, 'storage_used_percent')] = {
'name': 'Storage Used %',
'platform': 'sensor',
'icon': 'mdi:micro-sd',
'unit_of_measurement': '%',
'state_topic': self.get_discovery_topic(device_id, 'storage'),
'availability_topic': device['availability_topic'],
'value_template': '{{ value_json.used_percent }}',
'entity_category': 'diagnostic',
'unique_id': self.get_slug(device_id, 'storage_used_percent'),
}
components[self.get_slug(device_id, 'storage_total')] = {
'name': 'Storage Total',
'platform': 'sensor',
'icon': 'mdi:micro-sd',
'unit_of_measurement': 'GB',
'state_topic': self.get_discovery_topic(device_id, 'storage'),
'availability_topic': device['availability_topic'],
'value_template': '{{ value_json.total }}',
'entity_category': 'diagnostic',
'unique_id': self.get_slug(device_id, 'storage_total'),
}
components[self.get_slug(device_id, 'storage_used')] = {
'name': 'Storage Used',
'platform': 'sensor',
'icon': 'mdi:micro-sd',
'unit_of_measurement': 'GB',
'state_topic': self.get_discovery_topic(device_id, 'storage'),
'availability_topic': device['availability_topic'],
'value_template': '{{ value_json.used }}',
'entity_category': 'diagnostic',
'unique_id': self.get_slug(device_id, 'storage_used'),
}
components[self.get_slug(device_id, 'last_update')] = {
'name': 'Last Update',
'platform': 'sensor',
'device_class': 'timestamp',
'state_topic': device['state_topic'],
'availability_topic': device['availability_topic'],
'value_template': '{{ value_json.last_update }}',
'unique_id': self.get_slug(device_id, 'last_update'),
}
# since we always add at least `motion`, this should always be true
if len(components) > 0:
device['components'] = components
def send_device_discovery(self, device_id):
device = self.devices[device_id]
self.mqttc.publish(self.get_discovery_topic(device_id, 'config'), json.dumps(device), retain=True)
def refresh_all_devices(self):
self.log(f'Refreshing storage info for all devices (every {self.device_update_interval} sec)')
# refresh devices starting with the device updated the longest time ago
for each in sorted(self.devices.items(), key=lambda dt: (dt is None, dt)):
# break loop if we are ending
if not self.running:
break
device_id = each[0]
# all just to format the log record
last_updated = self.devices[device_id]['state']['last_update'][:19].replace('T',' ') if 'last_update' in self.devices[device_id]['state'] else 'server started'
self.log(f'Refreshing device "{self.devices[device_id]['device']['name']} ({device_id})", not updated since: {last_updated}')
self.configs[device_id]['last_update'] = datetime.now(ZoneInfo(self.timezone))
self.refresh_device(device_id)
def refresh_device(self, device_id):
# don't refresh the device until it has been published in device discovery
# and we can tell because it will be `online`
#if self.devices[device_id]['state']['status'] != 'online':
# return
config = self.configs[device_id]
result = self.amcrestc.get_device_storage_stats(device_id)
if result and 'last_update' in result:
self.devices[device_id]['storage'] = result
self.devices[device_id]['state'] = {
'status': 'online',
'host': config['host'],
'serial_number': config['serial_number'],
'sw_version': config['software_version'],
'last_update': config['last_update'].isoformat(),
}
self.update_service_device()
self.publish_device(device_id)
def publish_device(self, device_id):
self.mqttc.publish(
self.get_discovery_topic(device_id,'state'),
json.dumps(self.devices[device_id]['state']),
retain=True
)
self.mqttc.publish(
self.get_discovery_topic(device_id,'availability'),
self.devices[device_id]['availability'],
retain=True
)
self.mqttc.publish(
self.get_discovery_topic(device_id,'storage'),
json.dumps(self.devices[device_id]['storage']),
retain=True
)
self.mqttc.publish(
self.get_discovery_topic(device_id,'motion'),
json.dumps(self.devices[device_id]['motion']),
retain=True
)
self.mqttc.publish(
self.get_discovery_topic(device_id,'event'),
json.dumps(self.devices[device_id]['event']),
retain=True
)
def handle_service_message(self, attribute, message):
match attribute:
case 'device_refresh':
self.device_update_interval = message
self.log(f'Updated UPDATE_INTERVAL to be {message}')
case _:
self.log(f'IGNORED UNRECOGNIZED amcrest-service MESSAGE for {attribute}: {message}')
return
self.update_service_device()
def send_command(self, device_id, data):
caps = self.convert_attributes_to_capabilities(data)
sku = self.devices[device_id]['device']['model']
self.log(f'COMMAND {device_id} = {caps}', level='DEBUG')
first = True
for key in caps:
if not first:
time.sleep(1)
self.log(f'CMD DEVICE {self.devices[device_id]['device']['name']} ({device_id}) {key} = {caps[key]}', level='DEBUG')
self.amcrestc.send_command(device_id, sku, caps[key]['type'], caps[key]['instance'], caps[key]['value'])
self.update_service_device()
first = False
if device_id not in self.boosted:
self.boosted.append(device_id)
async def check_devices_for_events(self):
try:
for device_id in self.devices:
events = await self.amcrestc.get_device_event_actions(device_id)
log(f'Got events for {device_id}: {events.join(';')}')
for event in events:
self.devices[device_id][event] = events[event]
self.mqttc.publish(
self.get_discovery_topic(device_id,event),
json.dumps(self.devices[device_id][event]),
retain=True
)
self.refresh_device(device_id)
except Exception as err:
self.log(f'CAUGHT IN check_devices_for_events: {err}', level='ERROR')
# main loop
async def main_loop(self):
await self.setup_devices()
loop = asyncio.get_running_loop()
tasks = [
asyncio.create_task(self.device_loop()),
asyncio.create_task(self.device_actions()),
]
for signame in {'SIGINT','SIGTERM'}:
loop.add_signal_handler(
getattr(signal, signame),
lambda: asyncio.create_task(self._handle_sigterm(loop, tasks))
)
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
except Exception as err:
self.log(f'CAUGHT IN main_loop {err}', level='ERROR')
self.running = False
async def device_loop(self):
while self.running == True:
try:
self.refresh_all_devices()
await asyncio.sleep(self.device_update_interval)
except Exception as err:
self.log(f'CAUGHT IN device_loop {err}', level='ERROR')
self.running = False
async def device_actions(self):
while self.running == True:
try:
await self.check_devices_for_events()
except Exception as err:
self.log(f'CAUGHT IN device_actions {err}', level='ERROR')

115
app.py

@ -0,0 +1,115 @@
import asyncio
import argparse
from amcrest_mqtt import AmcrestMqtt
import os
import sys
import time
from util import *
import yaml
# Helper functions and callbacks
def read_file(file_name):
with open(file_name, 'r') as file:
data = file.read().replace('\n', '')
return data
def read_version():
if os.path.isfile('./VERSION'):
return read_file('./VERSION')
return read_file('../VERSION')
# Let's go!
version = read_version()
app_log(f'Starting: amcrest2mqtt v{version}')
# cmd-line args
argparser = argparse.ArgumentParser()
argparser.add_argument(
'-c',
'--config',
required=False,
help='Directory holding config.yaml or full path to config file',
)
args = argparser.parse_args()
# load config file
configpath = args.config or '/config'
try:
if not configpath.endswith('.yaml'):
if not configpath.endswith('/'):
configpath += '/'
configfile = configpath + 'config.yaml'
with open(configfile) as file:
config = yaml.safe_load(file)
app_log(f'Reading config file {configpath}')
config['config_from'] = 'file'
config['config_path'] = configpath
except:
app_log(f'config.yaml not found, checking ENV')
config = {
'mqtt': {
'host': os.getenv('MQTT_HOST') or 'localhost',
'qos': int(os.getenv('MQTT_QOS') or 0),
'port': int(os.getenv('MQTT_PORT') or 1883),
'username': os.getenv('MQTT_USERNAME'),
'password': os.getenv('MQTT_PASSWORD'), # can be None
'tls_enabled': os.getenv('MQTT_TLS_ENABLED') == 'true',
'tls_ca_cert': os.getenv('MQTT_TLS_CA_CERT'),
'tls_cert': os.getenv('MQTT_TLS_CERT'),
'tls_key': os.getenv('MQTT_TLS_KEY'),
'prefix': os.getenv('MQTT_PREFIX') or 'amcrest2mqtt',
'homeassistant': os.getenv('MQTT_HOMEASSISTANT') == True,
'discovery_prefix': os.getenv('MQTT_DISCOVERY_PREFIX') or 'homeassistant',
},
'amcrest': {
'hosts': os.getenv("AMCREST_HOSTS"),
'names': os.getenv("AMCREST_NAMES"),
'port': int(os.getenv("AMCREST_PORT") or 80),
'username': os.getenv("AMCREST_USERNAME") or "admin",
'password': os.getenv("AMCREST_PASSWORD"),
'device_update_interval': int(os.getenv("DEVICE_UPDATE_INTERVAL") or 600),
},
'debug': True if os.getenv('DEBUG') else False,
'hide_ts': True if os.getenv('HIDE_TS') else False,
'config_from': 'env',
'timezone': os.getenv('TZ'),
}
config['version'] = version
config['configpath'] = os.path.dirname(configpath)
if not 'hide_ts' in config:
config['hide_ts'] = False
# Exit if any of the required vars are not provided
if config['amcrest']['hosts'] is None:
app_log("Missing env var: AMCREST_HOSTS or amcrest.hosts in config", level="ERROR")
sys.exit(1)
config['amcrest']['host_count'] = len(config['amcrest']['hosts'])
if config['amcrest']['names'] is None:
app_log("Missing env var: AMCREST_NAMES or amcrest.names in config", level="ERROR")
sys.exit(1)
config['amcrest']['name_count'] = len(config['amcrest']['names'])
if config['amcrest']['host_count'] != config['amcrest']['name_count']:
app_log("The AMCREST_HOSTS and AMCREST_NAMES must have the same number of space-delimited hosts/names", level="ERROR")
sys.exit(1)
app_log(f"Found {config['amcrest']['host_count']} host(s) defined to monitor")
if config['amcrest']['password'] is None:
app_log("Please set the AMCREST_PASSWORD environment variable", level="ERROR")
sys.exit(1)
if not 'timezone' in config:
app_log('`timezone` required in config file or in TZ env var', level='ERROR', tz=timezone)
exit(1)
else:
app_log(f'TIMEZONE set as {config["timezone"]}', tz=config["timezone"])
try:
with AmcrestMqtt(config) as mqtt:
asyncio.run(mqtt.main_loop())
except Exception as err:
app_log(f'CAUGHT IN app: {err}', level='ERROR')

@ -16,6 +16,5 @@ services:
AMCREST_USERNAME: viewer
AMCREST_PASSWORD: password
HOME_ASSISTANT: true
STORAGE_POLL_INTERVAL: 600
DEVICE_UPDATE_INTERVAL: 600
DEBUG_MODE: false
command: python3 -u amcrest2mqtt.py -c /config

@ -0,0 +1,13 @@
from datetime import datetime, timezone
import os
from zoneinfo import ZoneInfo
def app_log(msg, level='INFO', tz='UTC', hide_ts=False):
ts = datetime.now(ZoneInfo(tz)).strftime('%Y-%m-%d %H:%M:%S %Z')
if len(msg) > 102400:
raise ValueError('Log message exceeds max length')
if level != 'DEBUG' or os.getenv('DEBUG'):
print(f'{ts + " " if not hide_ts else ""}[{level}] {msg}')
def to_gb(total):
return str(round(float(total[0]) / 1024 / 1024 / 1024, 2))
Loading…
Cancel
Save