add last_*_event telemetry; rearrange a bit; refresh broker every min

pull/106/head
Jeff Culverhouse 11 months ago
parent e3584cbc5b
commit 874d659ffa

@ -28,6 +28,7 @@ Or, we support the following environment variables and defaults:
- `MQTT_HOME_ASSISTANT_PREFIX` (optional, default = 'homeassistant')
- `HOME_ASSISTANT` (optional, default = false)
- `TZ` (required, timezone identifier, see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List)
- `STORAGE_POLL_INTERVAL` (optional, default = 3600) - how often to fetch storage data (in seconds) (set to 0 to disable functionality)
It exposes events to the following topics:
@ -50,30 +51,7 @@ If you are using a different MQTT prefix to the default, you will need to set th
## Running the app
To run via env variables with Docker Compose
```yaml
version: "3.4"
services:
amcrest2mqtt:
image: weirdtangent/amcrest2mqtt:latest
container_name: amcrest2mqtt
restart: unless-stopped
environment:
MQTT_HOST: 10.10.10.1
MQTT_USERNAME: admin
MQTT_PASSWORD: password
MQTT_PREFIX: amcrest2mqtt
MQTT_HOMEASSISTANT_PREFIX: homeassistant
AMCREST_HOSTS: "10.10.10.20 camera2.local"
AMCREST_NAMES: "Front_Yard Patio"
AMCREST_USERNAME: viewer
AMCREST_PASSWORD: password
HOME_ASSISTANT: true
STORAGE_POLL_INTERVAL: 600
DEBUG_MODE: false
```
To run via env variables with Docker Compose, see docker-compose.yaml
or make sure you attach a volume with the config file and point to that directory, for example:
```
CMD [ "python", "-u", "./amcrest2mqtt.py", "-c", "/config" ]

@ -1 +1 @@
0.99.9
0.99.10

@ -12,15 +12,16 @@ import sys
from threading import Timer
import time
import yaml
from zoneinfo import ZoneInfo
is_exiting = False
mqtt_client = None
config = {}
config = { 'timezone': 'utc' }
devices = {}
# Helper functions and callbacks
def log(msg, level='INFO'):
ts = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
ts = datetime.now(tz=ZoneInfo(config['timezone'])).strftime('%Y-%m-%d %H:%M:%S')
if len(msg) > 20480:
raise ValueError('Log message exceeds max length')
if level != "DEBUG" or os.getenv('DEBUG'):
@ -38,20 +39,37 @@ def read_version():
return read_file("../VERSION")
def mqtt_publish(topic, payload, exit_on_error=True, json=False):
msg = mqtt_client.publish(
topic, payload=(dumps(payload) if json else payload), qos=config['mqtt']['qos'], retain=True
)
def to_gb(total):
return str(round(float(total[0]) / 1024 / 1024 / 1024, 2))
if msg.rc == mqtt.MQTT_ERR_SUCCESS:
msg.wait_for_publish(2)
return
def signal_handler(sig, frame):
# exit immediately upon receiving a second SIGINT
global is_exiting
log(f"Error publishing MQTT message: {mqtt.error_string(msg.rc)}", level="ERROR")
if is_exiting:
os._exit(1)
if exit_on_error:
exit_gracefully(msg.rc, skip_mqtt=True)
is_exiting = True
exit_gracefully(0)
def exit_gracefully(rc, skip_mqtt=False):
log("Exiting app...")
if mqtt_client is not None and mqtt_client.is_connected() and skip_mqtt == False:
# set cameras offline
for host in config['amcrest']['hosts']:
mqtt_publish(devices[host]["topics"]["status"], "offline", exit_on_error=False)
# set broker offline
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/availability', "offline")
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/status', "offline")
mqtt_client.disconnect()
# Use os._exit instead of sys.exit to ensure an MQTT disconnect event causes the program to exit correctly as they
# occur on a separate thread
os._exit(rc)
# MQTT setup
def mqtt_connect():
global mqtt_client
@ -59,7 +77,6 @@ def mqtt_connect():
log("Missing env vari: MQTT_USERNAME or mqtt.username in config", level="ERROR")
sys.exit(1)
# Connect to MQTT
mqtt_client = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION1,
client_id=f'{config["mqtt"]["prefix"]}_broker',
@ -118,52 +135,21 @@ def on_mqtt_disconnect(mqtt_client, userdata, rc):
log(f"MQTT connection closed successfully", level="INFO")
exit_gracefully(rc, skip_mqtt=True)
def exit_gracefully(rc, skip_mqtt=False):
log("Exiting app...")
if mqtt_client is not None and mqtt_client.is_connected() and skip_mqtt == False:
# set cameras offline
for host in config['amcrest']['hosts']:
mqtt_publish(devices[host]["topics"]["status"], "offline", exit_on_error=False)
# set broker offline
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/availability', "offline")
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/status', "offline")
mqtt_client.disconnect()
# Use os._exit instead of sys.exit to ensure an MQTT disconnect event causes the program to exit correctly as they
# occur on a separate thread
os._exit(rc)
def refresh_storage_sensors():
Timer(config['amcrest']['storage_poll_interval'], refresh_storage_sensors).start()
log(f'Fetching storage sensors for {config["amcrest"]["host_count"]} host(s) (every {config["amcrest"]["storage_poll_interval"]} secs)')
for host in config['amcrest']['hosts']:
device = devices[host]
topics = device["topics"]
try:
storage = device["camera"].storage_all
mqtt_publish(topics["storage_used_percent"], str(storage["used_percent"]))
mqtt_publish(topics["storage_used"], to_gb(storage["used"]))
mqtt_publish(topics["storage_total"], to_gb(storage["total"]))
except AmcrestError as error:
log(f"Error fetching storage information for {host}: {error}", level="WARNING")
def to_gb(total):
return str(round(float(total[0]) / 1024 / 1024 / 1024, 2))
def mqtt_publish(topic, payload, exit_on_error=True, json=False):
msg = mqtt_client.publish(
topic, payload=(dumps(payload) if json else payload), qos=config['mqtt']['qos'], retain=True
)
def signal_handler(sig, frame):
# exit immediately upon receiving a second SIGINT
global is_exiting
if msg.rc == mqtt.MQTT_ERR_SUCCESS:
msg.wait_for_publish(2)
return
if is_exiting:
os._exit(1)
log(f"Error publishing MQTT message: {mqtt.error_string(msg.rc)}", level="ERROR")
is_exiting = True
exit_gracefully(0)
if exit_on_error:
exit_gracefully(msg.rc, skip_mqtt=True)
# Amcrest Devices
def get_device(amcrest_host, amcrest_port, amcrest_username, amcrest_password, device_name):
log(f"Connecting to device and getting details for {amcrest_host}...")
camera = AmcrestCamera(
@ -219,6 +205,7 @@ def get_device(amcrest_host, amcrest_port, amcrest_username, amcrest_password, d
"topics": {
"config": f'{config["mqtt"]["prefix"]}/{serial_number}/config',
"status": f'{config["mqtt"]["prefix"]}/{serial_number}/status',
"telemetry": f'{config["mqtt"]["prefix"]}/{serial_number}/telemetry',
"event": f'{config["mqtt"]["prefix"]}/{serial_number}/event',
"motion": f'{config["mqtt"]["prefix"]}/{serial_number}/motion',
"doorbell": f'{config["mqtt"]["prefix"]}/{serial_number}/doorbell',
@ -240,6 +227,7 @@ def get_device(amcrest_host, amcrest_port, amcrest_username, amcrest_password, d
},
}
# MQTT messages
def send_broker_discovery():
mqtt_publish(f'{config["mqtt"]["home_assistant_prefix"]}/sensor/{via_device}/broker/config', {
"availability_topic": f'{config["mqtt"]["prefix"]}/{via_device}/availability',
@ -415,6 +403,8 @@ def send_device_discovery(device):
)
def refresh_broker():
Timer(60, refresh_broker).start()
log('Refreshing amcrest2mqtt broker, every 60 sec')
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/availability', 'online')
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/status', 'online')
mqtt_publish(f'{config["mqtt"]["prefix"]}/{via_device}/config', {
@ -427,7 +417,11 @@ def refresh_broker():
},
}, json=True)
def refresh_camera(device):
def refresh_devices():
for host in config['amcrest']['hosts']:
refresh_device(devices[host])
def refresh_device(device):
mqtt_publish(device['topics']['status'], 'online')
mqtt_publish(device['topics']['config'], {
'device_type': device['config']['device_type'],
@ -435,13 +429,34 @@ def refresh_camera(device):
'sw_version': device['config']['amcrest_version'],
'hw_version': device['config']['hardware_version'],
'serial_number': device['config']['serial_number'],
'host': device["config"]["amcrest_host"],
'host': device['config']['amcrest_host'],
'configuration_url': 'http://' + device['config']['amcrest_host'] + '/',
'origin': {
'name': 'amcrest2mqtt broker',
'sw_version': version,
'url': 'https://github.com/weirdtangent/amcrest2mqtt',
},
}, json=True)
mqtt_publish(device['topics']['telemetry'],
device['telemetry'] if 'telemetry' in device else {},
json=True)
def refresh_storage_sensors():
Timer(config['amcrest']['storage_poll_interval'], refresh_storage_sensors).start()
log(f'Fetching storage sensors for {config["amcrest"]["host_count"]} host(s) (every {config["amcrest"]["storage_poll_interval"]} secs)')
for host in config['amcrest']['hosts']:
device = devices[host]
topics = device["topics"]
try:
storage = device["camera"].storage_all
mqtt_publish(topics["storage_used_percent"], str(storage["used_percent"]))
mqtt_publish(topics["storage_used"], to_gb(storage["used"]))
mqtt_publish(topics["storage_total"], to_gb(storage["total"]))
except AmcrestError as error:
log(f"Error fetching storage information for {host}: {error}", level="WARNING")
# cmd-line args
argparser = argparse.ArgumentParser()
@ -490,9 +505,9 @@ else:
},
'home_assistant': os.getenv("HOME_ASSISTANT") == "true",
'debug': os.getenv("AMCREST_DEBUG") == "true",
'timezone': os.getenv("TZ") or 'utc',
}
# Exit if any of the required vars are not provided
if config['amcrest']['hosts'] is None:
log("Missing env var: AMCREST_HOSTS or amcrest.hosts in config", level="ERROR")
@ -507,6 +522,7 @@ config['amcrest']['name_count'] = len(config['amcrest']['names'])
if config['amcrest']['host_count'] != config['amcrest']['name_count']:
log("The AMCREST_HOSTS and AMCREST_NAMES must have the same number of space-delimited hosts/names", level="ERROR")
sys.exit(1)
log(f"Found {config['amcrest']['host_count']} host(s) defined to monitor")
if config['amcrest']['password'] is None:
log("Please set the AMCREST_PASSWORD environment variable", level="ERROR")
@ -516,10 +532,10 @@ version = read_version()
via_device = config["mqtt"]["prefix"] + '-broker'
log(f"Starting: amcrest2mqtt v{version}")
# Handle interruptions
# handle interruptions
signal.signal(signal.SIGINT, signal_handler)
# Connect to each camera, if not already
# connect to each camera
amcrest_names = config['amcrest']['names']
for host in config['amcrest']['hosts']:
name = amcrest_names.pop(0)
@ -530,17 +546,16 @@ log(f"Connecting to hosts done.", level="INFO")
# connect to MQTT service
mqtt_connect()
# Configure Home Assistant
# configure broker and devices in Home Assistant
if config['home_assistant']:
send_broker_discovery()
for host in config['amcrest']['hosts']:
send_device_discovery(devices[host])
# Main loop
refresh_broker()
for host in config['amcrest']['hosts']:
refresh_camera(devices[host])
refresh_devices()
# kick off storage refresh timer
if config['amcrest']['storage_poll_interval'] > 0:
refresh_storage_sensors()
@ -553,20 +568,23 @@ async def main():
device_config = device["config"]
device_topics = device["topics"]
async for code, payload in device["camera"].async_event_actions("All"):
log(f"Event on {host}: {str(payload)}", level="DEBUG")
refresh_broker()
log(f"Event on {host}: {str(payload)}")
if ((code == "ProfileAlarmTransmit" and device_config["is_ad110"])
or (code == "VideoMotion" and not device_config["is_ad110"])):
motion_payload = "on" if payload["action"] == "Start" else "off"
mqtt_publish(device_topics["motion"], motion_payload)
device[host]['telemetry']['last_motion_event'] =str(datetime.now(tz=ZoneInfo(config['timezone'])))
elif code == "CrossRegionDetection" and payload["data"]["ObjectType"] == "Human":
human_payload = "on" if payload["action"] == "Start" else "off"
mqtt_publish(device_topics["human"], human_payload)
device[host]['telemetry']['last_human_event'] =str(datetime.now(tz=ZoneInfo(config['timezone'])))
elif code == "_DoTalkAction_":
doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off"
mqtt_publish(device_topics["doorbell"], doorbell_payload)
device[host]['telemetry']['last_doorbell_event'] =str(datetime.now(tz=ZoneInfo(config['timezone'])))
mqtt_publish(device_topics["event"], payload, json=True)
refresh_device(device)
except AmcrestError as error:
log(f"Amcrest error while working on {host}: {AmcrestError}. Sleeping for 10 seconds.", level="ERROR")

Loading…
Cancel
Save