|
|
|
|
@ -47,12 +47,6 @@ class AmcrestMqtt(object):
|
|
|
|
|
self.logger.info('Exiting gracefully')
|
|
|
|
|
|
|
|
|
|
if self.mqttc is not None and self.mqttc.is_connected():
|
|
|
|
|
for device_id in self.devices:
|
|
|
|
|
self.devices[device_id]['availability'] = 'offline'
|
|
|
|
|
if 'state' not in self.devices[device_id]:
|
|
|
|
|
self.devices[device_id]['state'] = {}
|
|
|
|
|
self.publish_device_state(device_id)
|
|
|
|
|
|
|
|
|
|
self.mqttc.disconnect()
|
|
|
|
|
else:
|
|
|
|
|
self.logger.info('Lost connection to MQTT')
|
|
|
|
|
@ -70,10 +64,9 @@ class AmcrestMqtt(object):
|
|
|
|
|
def mqtt_on_disconnect(self, client, userdata, flags, rc, properties):
|
|
|
|
|
self.logger.info('MQTT connection closed')
|
|
|
|
|
|
|
|
|
|
# if we try to reconnect, lets use a new client_id
|
|
|
|
|
if self.running and time.time() > self.mqtt_connect_time + 10:
|
|
|
|
|
# lets use a new client_id for a reconnect
|
|
|
|
|
self.client_id = self.get_new_client_id()
|
|
|
|
|
|
|
|
|
|
if time.time() > self.mqtt_connect_time + 20:
|
|
|
|
|
self.mqttc_create()
|
|
|
|
|
else:
|
|
|
|
|
exit()
|
|
|
|
|
@ -160,12 +153,8 @@ class AmcrestMqtt(object):
|
|
|
|
|
self.mqttc.on_subscribe = self.mqtt_on_subscribe
|
|
|
|
|
self.mqttc.on_log = self.mqtt_on_log
|
|
|
|
|
|
|
|
|
|
self.mqttc.will_set(
|
|
|
|
|
self.get_discovery_topic('service', 'availability'),
|
|
|
|
|
payload="offline",
|
|
|
|
|
qos=self.mqtt_config['qos'],
|
|
|
|
|
retain=True
|
|
|
|
|
)
|
|
|
|
|
# will_set for service device
|
|
|
|
|
self.mqttc.will_set(self.get_discovery_topic('service', 'availability'), payload="offline", qos=self.mqtt_config['qos'], retain=True)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
self.mqttc.connect(
|
|
|
|
|
@ -311,26 +300,8 @@ class AmcrestMqtt(object):
|
|
|
|
|
self.configs[device_id] = config
|
|
|
|
|
self.devices[device_id]['qos'] = self.mqtt_config['qos']
|
|
|
|
|
self.devices[device_id]['state_topic'] = self.get_discovery_topic(device_id, 'state')
|
|
|
|
|
self.devices[device_id]['availability_topic'] = self.get_discovery_topic(device_id, 'availability')
|
|
|
|
|
self.devices[device_id]['availability_topic'] = self.get_discovery_topic('service', 'availability')
|
|
|
|
|
self.devices[device_id]['command_topic'] = self.get_discovery_topic(device_id, 'set')
|
|
|
|
|
self.mqttc.will_set(
|
|
|
|
|
self.get_discovery_topic(device_id,'state'),
|
|
|
|
|
json.dumps({'status': 'offline'}),
|
|
|
|
|
qos=self.mqtt_config['qos'],
|
|
|
|
|
retain=True
|
|
|
|
|
)
|
|
|
|
|
self.mqttc.will_set(
|
|
|
|
|
self.get_discovery_topic(device_id,'motion'),
|
|
|
|
|
None,
|
|
|
|
|
qos=self.mqtt_config['qos'],
|
|
|
|
|
retain=True
|
|
|
|
|
)
|
|
|
|
|
self.mqttc.will_set(
|
|
|
|
|
self.get_discovery_topic(device_id,'availability'),
|
|
|
|
|
'offline',
|
|
|
|
|
qos=self.mqtt_config['qos'],
|
|
|
|
|
retain=True
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.devices[device_id]['device'] = {
|
|
|
|
|
'name': config['device_name'],
|
|
|
|
|
@ -368,7 +339,6 @@ class AmcrestMqtt(object):
|
|
|
|
|
'serial_number': config['serial_number'],
|
|
|
|
|
'sw_version': config['software_version'],
|
|
|
|
|
}
|
|
|
|
|
self.devices[device_id]['availability'] = 'online'
|
|
|
|
|
self.devices[device_id]['motion'] = 'off'
|
|
|
|
|
else:
|
|
|
|
|
if first_time_through:
|
|
|
|
|
@ -507,7 +477,7 @@ class AmcrestMqtt(object):
|
|
|
|
|
def publish_device_state(self, device_id):
|
|
|
|
|
device = self.devices[device_id]
|
|
|
|
|
|
|
|
|
|
for topic in ['state','availability','storage','motion','human','doorbell','event','recording']:
|
|
|
|
|
for topic in ['state','storage','motion','human','doorbell','event','recording']:
|
|
|
|
|
if topic in device:
|
|
|
|
|
payload = json.dumps(device[topic]) if isinstance(device[topic], dict) else device[topic]
|
|
|
|
|
self.mqttc.publish(self.get_discovery_topic(device_id, topic), payload, qos=self.mqtt_config['qos'], retain=True)
|
|
|
|
|
@ -522,7 +492,6 @@ class AmcrestMqtt(object):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
device['state'] = { 'state': 'ON' }
|
|
|
|
|
device['availability'] = 'online'
|
|
|
|
|
|
|
|
|
|
self.publish_device_state(device_id)
|
|
|
|
|
|
|
|
|
|
@ -601,18 +570,14 @@ class AmcrestMqtt(object):
|
|
|
|
|
|
|
|
|
|
self.refresh_device(device_id)
|
|
|
|
|
|
|
|
|
|
# async functions -----------------------------------------------------------------------------
|
|
|
|
|
# async loops and main loop -------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
async def _handle_signals(self, signame, loop, tasks):
|
|
|
|
|
async def _handle_signals(self, signame, loop):
|
|
|
|
|
self.running = False
|
|
|
|
|
self.logger.warn(f'{signame} received, waiting for tasks to cancel...')
|
|
|
|
|
|
|
|
|
|
for t in tasks:
|
|
|
|
|
if not t.done():
|
|
|
|
|
t.cancel()
|
|
|
|
|
|
|
|
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
loop.stop()
|
|
|
|
|
for task in asyncio.all_tasks():
|
|
|
|
|
if not task.done(): task.cancel(f'{signame} received')
|
|
|
|
|
|
|
|
|
|
async def device_loop(self):
|
|
|
|
|
while self.running == True:
|
|
|
|
|
@ -640,14 +605,15 @@ class AmcrestMqtt(object):
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# setup signal handling for tasks
|
|
|
|
|
for signame in {'SIGINT','SIGTERM'}:
|
|
|
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
|
|
|
loop.add_signal_handler(
|
|
|
|
|
getattr(signal, signame),
|
|
|
|
|
lambda: asyncio.create_task(self._handle_signals(signame, loop, tasks))
|
|
|
|
|
sig, lambda: asyncio.create_task(self._handle_signals(sig.name, loop))
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
exit(1)
|
|
|
|
|
except Exception as err:
|
|
|
|
|
self.running = False
|
|
|
|
|
self.log.error(f'Caught exception: {err}')
|