add will_sets; mqtt msg handling

pull/106/head
Jeff Culverhouse 11 months ago
parent e09809978b
commit d1c9120ab1

@ -1 +1 @@
0.99.12
0.99.13

@ -125,20 +125,20 @@ class AmcrestAPI(object):
tasks = [self.get_events_from_device(device_id) for device_id in self.devices]
await asyncio.gather(*tasks)
except Exception as err:
self.logger.error(f'collect_all_device_events: {err}')
self.logger.error(f'collect_all_device_events: {err}', exc_info=True)
async def get_events_from_device(self, device_id):
try:
async for code, payload in self.devices[device_id]["camera"].async_event_actions("All"):
await self.process_device_event(device_id, code, payload)
except Exception as err:
self.logger.error(f'get_events_from_device: {err}')
self.logger.error(f'Failed to get events from device ({device_id}): {err}', exc_info=True)
async def process_device_event(self, device_id, code, payload):
try:
config = self.devices[device_id]['config']
self.logger.debug(f'Event on {config["host"]} - {code}: {payload}')
self.logger.info(f'Event on {config["host"]} - {code}: {payload}')
if ((code == "ProfileAlarmTransmit" and config["is_ad110"])
or (code == "VideoMotion" and not config["is_ad110"])):
@ -150,10 +150,20 @@ class AmcrestAPI(object):
elif code == "_DoTalkAction_":
doorbell_payload = "on" if payload["data"]["Action"] == "Invite" else "off"
self.events.append({ 'device_id': device_id, 'event': 'doorbell', 'payload': doorbell_payload })
self.events.append({ 'device_id': device_id, 'event': code , 'payload': payload['action'] })
elif code == "NewFile":
file_payload = { 'file': payload["data"]["File"], 'size': payload["data"]["Size"] }
self.events.append({ 'device_id': device_id, 'event': 'recording', 'payload': file_payload })
# lets ignore the event codes we don't care about (for now)
elif code == "VideoMotionInfo":
pass
elif code == "TimeChange":
pass
elif code == "NTPAdjustTime":
pass
else:
self.events.append({ 'device_id': device_id, 'event': code , 'payload': payload['action'] })
except Exception as err:
self.logger.error(f'process_device_event: {err}')
self.logger.error(f'process_device_event: {err}', exc_info=True)
def get_next_event(self):

@ -82,10 +82,10 @@ class AmcrestMqtt(object):
def mqtt_on_disconnect(self, client, userdata, flags, rc, properties):
self.logger.info('MQTT connection closed')
# if reconnect, lets use a new client_id
# if we try to reconnect, lets use a new client_id
self.client_id = self.get_new_client_id()
if time.time() > self.mqtt_connect_time + 10:
if time.time() > self.mqtt_connect_time + 20:
self.mqttc_create()
else:
exit()
@ -97,10 +97,14 @@ class AmcrestMqtt(object):
self.logger.warn(f'MQTT LOG: {msg}')
def mqtt_on_message(self, client, userdata, msg):
if not msg or not msg.payload:
try:
topic = msg.topic
payload = json.loads(msg.payload)
except json.JSONDecodeError:
payload = msg.payload.decode('utf-8')
except:
self.logger.error('Failed to understand MQTT message, ignoring')
return
topic = msg.topic
payload = json.loads(msg.payload)
self.logger.info(f'Got MQTT message for {topic} - {payload}')
@ -112,21 +116,25 @@ class AmcrestMqtt(object):
components = topic.split('/')
# handle this message if it's for us, otherwise pass along to amcrest API
if components[-2] == self.get_component_slug('service'):
self.handle_service_message(None, payload)
elif components[-3] == self.get_component_slug('service'):
self.handle_service_message(components[-1], payload)
else:
if components[-1] == 'set':
mac = components[-2][-16:]
elif components[-2] == 'set':
mac = components[-3][-16:]
try:
if components[-2] == self.get_component_slug('service'):
self.handle_service_message(None, payload)
elif components[-3] == self.get_component_slug('service'):
self.handle_service_message(components[-1], payload)
else:
self.logger.error(f'UNKNOWN MQTT MESSAGE STRUCTURE: {topic}')
return
# ok, lets format the device_id and send to amcrest
device_id = ':'.join([mac[i:i+2] for i in range (0, len(mac), 2)])
self.send_command(device_id, payload)
if components[-1] == 'set':
device_id = components[-2].split('-')[1]
elif components[-2] == 'set':
device_id = components[-3].split('-')[1]
else:
self.logger.error(f'UNKNOWN MQTT MESSAGE STRUCTURE: {topic}')
return
# ok, lets format the device_id and send to amcrest
# for Amcrest devices, we use the string as-is (after the vendor name)
self.send_command(device_id, payload)
except Exception as err:
self.logger.error(f'Failed to understand MQTT message slug ({topic}): {err}, ignoring')
return
def mqtt_on_subscribe(self, client, userdata, mid, reason_code_list, properties):
rc_list = map(lambda x: x.getName(), reason_code_list)
@ -160,7 +168,7 @@ class AmcrestMqtt(object):
self.mqttc.on_subscribe = self.mqtt_on_subscribe
self.mqttc.on_log = self.mqtt_on_log
# self.mqttc.will_set(self.get_state_topic(self.service_slug) + '/availability', payload="offline", qos=0, retain=True)
self.mqttc.will_set(self.get_discovery_topic('service', 'availability'), payload="offline", qos=self.mqtt_config['qos'], retain=True)
try:
self.mqttc.connect(
@ -218,7 +226,7 @@ class AmcrestMqtt(object):
self.mqttc.publish(
self.get_discovery_topic('service','config'),
json.dumps({
'qos': 0,
'qos': self.mqtt_config['qos'],
'state_topic': state_topic,
'availability_topic': availability_topic,
'device': {
@ -297,11 +305,13 @@ class AmcrestMqtt(object):
first = True
self.devices[device_id] = {}
self.configs[device_id] = config
self.devices[device_id]['qos'] = 0
self.devices[device_id]['qos'] = self.mqtt_config['qos'],
self.devices[device_id]['state_topic'] = self.get_discovery_topic(device_id, 'state')
self.devices[device_id]['availability_topic'] = self.get_discovery_topic(device_id, 'availability')
self.devices[device_id]['command_topic'] = self.get_discovery_topic(device_id, 'set')
# self.mqttc.will_set(self.get_state_topic(device_id)+'/availability', payload="offline", qos=0, retain=True)
self.mqttc.will_set(self.get_discovery_topic(device_id,'state'), payload=json.dumps({'status': 'offline'}), qos=self.mqtt_config['qos'], retain=True)
self.mqttc.will_set(self.get_discovery_topic(device_id,'motion'), payload=None, qos=self.mqtt_config['qos'], retain=True)
self.mqttc.will_set(self.get_discovery_topic(device_id,'availability'), payload='offline', qos=self.mqtt_config['qos'], retain=True)
self.devices[device_id]['device'] = {
'name': config['device_name'],
@ -331,9 +341,11 @@ class AmcrestMqtt(object):
else:
self.logger.debug(f'Updated device: {self.devices[device_id]['device']['name']}')
# device discovery sent, now it is save to add these to the dict
# device discovery sent, now it is save to add these to the
# dict (so they aren't included in device discovery object itself)
self.devices[device_id]['state'] = {}
self.devices[device_id]['availability'] = 'online'
self.devices[device_id]['motion'] = 'off'
else:
if first_time_through:
self.logger.info(f'Saw device, but not supported yet: "{config["device_name"]}" [amcrest {config["device_type"]}] ({device_id})')
@ -357,6 +369,7 @@ class AmcrestMqtt(object):
'value_template': '{{ value_json.doorbell }}',
'unique_id': self.get_slug(device_id, 'doorbell'),
}
self.mqttc.will_set(self.get_discovery_topic(device_id,'doorbell'), payload=None, qos=self.mqtt_config['qos'], retain=True)
if config['is_ad410']:
components[self.get_slug(device_id, 'human')] = {
@ -369,6 +382,7 @@ class AmcrestMqtt(object):
'value_template': '{{ value_json.human }}',
'unique_id': self.get_slug(device_id, 'human'),
}
self.mqttc.will_set(self.get_discovery_topic(device_id,'human'), payload=None, qos=self.mqtt_config['qos'], retain=True)
components[self.get_slug(device_id, 'motion')] = {
'name': 'Motion',
@ -454,7 +468,6 @@ class AmcrestMqtt(object):
'unique_id': self.get_slug(device_id, 'last_update'),
}
# since we always add at least `motion`, this should always be true
if len(components) > 0:
device['components'] = components
@ -501,7 +514,7 @@ class AmcrestMqtt(object):
self.publish_device(device_id)
def publish_device(self, device_id):
for topic in ['state','availability','storage','motion','human','doorbell','event']:
for topic in ['state','availability','storage','motion','human','doorbell','event','recording']:
if topic in self.devices[device_id]:
self.mqttc.publish(
self.get_discovery_topic(device_id,topic),
@ -521,22 +534,14 @@ class AmcrestMqtt(object):
self.update_service_device()
def send_command(self, device_id, data):
caps = self.convert_attributes_to_capabilities(data)
sku = self.devices[device_id]['device']['model']
self.logger.debug(f'COMMAND {device_id} = {caps}')
device = self.devices[device_id]
self.logger.info(f'COMMAND {device_id} = {data}')
first = True
for key in caps:
if not first:
time.sleep(1)
self.logger.debug(f'CMD DEVICE {self.devices[device_id]['device']['name']} ({device_id}) {key} = {caps[key]}')
self.amcrestc.send_command(device_id, sku, caps[key]['type'], caps[key]['instance'], caps[key]['value'])
self.update_service_device()
first = False
if data == 'PRESS':
pass
else:
self.logger.error(f'We got a command ({data}), but do not know what to do')
if device_id not in self.boosted:
self.boosted.append(device_id)
def check_for_events(self):
while device_event := self.amcrestc.get_next_event():
@ -550,7 +555,7 @@ class AmcrestMqtt(object):
self.logger.info(f'Got event for {device_id}: {event} {payload}')
# if one of our known sensors
if event in ['motion','human','doorbell']:
if event in ['motion','human','doorbell','recording']:
device[event] = payload
# otherwise, just store generically
else:

Loading…
Cancel
Save