@ -60,11 +60,11 @@
# t_int = 0
# t_int = 0
import enum
#import enum
import struct
import struct
class DataType ( enum . IntEnum ) :
class DataType ( ) :
# ZCL Spec -- "2.5.2 Data Types"
# ZCL Spec -- "2.5.2 Data Types"
NULL = 0x00
NULL = 0x00
BOOLEAN = 0x10
BOOLEAN = 0x10
@ -82,12 +82,12 @@ class DataType(enum.IntEnum):
CHARACTER_STRING = 0x42
CHARACTER_STRING = 0x42
ANALOG_DATATYPES = set ( [
ANALOG_DATATYPES = [
DataType . UINT8 , DataType . UINT16 , DataType . UINT64 , DataType . INT8 , DataType . INT16 , DataType . INT64
DataType . UINT8 , DataType . UINT16 , DataType . UINT64 , DataType . INT8 , DataType . INT16 , DataType . INT64
] )
]
class Profile ( enum . IntEnum ) :
class Profile ( ) :
ZIGBEE = 0x0000
ZIGBEE = 0x0000
HOME_AUTOMATION = 0x0104
HOME_AUTOMATION = 0x0104
ZIGBEE_LIGHT_LINK = 0xc05e
ZIGBEE_LIGHT_LINK = 0xc05e
@ -97,16 +97,16 @@ def get_profile_by_name(n):
return getattr ( Profile , n . upper ( ) , None )
return getattr ( Profile , n . upper ( ) , None )
class Endpoint ( enum . IntEnum ) :
class Endpoint ( ) :
ZDO = 0x00
ZDO = 0x00
class ZclCommandType ( enum . IntEnum ) :
class ZclCommandType ( ) :
PROFILE = 0
PROFILE = 0
CLUSTER = 1
CLUSTER = 1
class Status ( enum . IntEnum ) :
class Status ( ) :
SUCCESS = 0x00
SUCCESS = 0x00
FAILURE = 0x01
FAILURE = 0x01
NOT_AUTHORIZED = 0x7E
NOT_AUTHORIZED = 0x7E
@ -143,37 +143,37 @@ class Status(enum.IntEnum):
UNSUPPORTED_CLUSTER = 0xC3
UNSUPPORTED_CLUSTER = 0xC3
ZDO_BY_NAME = {
#ZDO_BY_NAME = {
# Zigbee Spec -- "2.4.3.1.5 Simple_Desc_req"
# # Zigbee Spec -- "2.4.3.1.5 Simple_Desc_req"
' simple_desc ' : ( 0x0004 , ( ' addr16:uint16 ' , ' endpoint:uint8 ' , ) , ) ,
# 'simple_desc': (0x0004, ('addr16:uint16', 'endpoint:uint8',),),
# Zigbee Spec -- "2.4.4.1.5 Simple_Desc_resp"
# # Zigbee Spec -- "2.4.4.1.5 Simple_Desc_resp"
' simple_desc_resp ' : ( 0x8004 , ( ' status:enum8:success,invalid_ep,not_active,device_not_found,inv_requesttype,no_descriptor ' , ' addr16:uint16 ' , ' b_simple_descriptors:uint8 ' , ' simple_descriptors:#simple_descriptor ' , ) , ) ,
# 'simple_desc_resp': (0x8004, ('status:enum8:success,invalid_ep,not_active,device_not_found,inv_requesttype,no_descriptor', 'addr16:uint16', 'b_simple_descriptors:uint8', 'simple_descriptors:#simple_descriptor',),),
# Zigbee Spec -- "2.4.3.1.6 Active_EP_req"
# # Zigbee Spec -- "2.4.3.1.6 Active_EP_req"
' active_ep ' : ( 0x0005 , ( ' addr16:uint16 ' , ) , ) ,
# 'active_ep': (0x0005, ('addr16:uint16',),),
# Zigbee Spec -- "2.4.4.1.6 Active_EP_resp"
# # Zigbee Spec -- "2.4.4.1.6 Active_EP_resp"
' active_ep_resp ' : ( 0x8005 , ( ' status:enum8:success,device_not_found,inv_requesttype,no_descriptor ' , ' addr16:uint16 ' , ' n_active_eps:uint8 ' , ' active_eps:*uint8 ' , ) , ) ,
# 'active_ep_resp': (0x8005, ('status:enum8:success,device_not_found,inv_requesttype,no_descriptor', 'addr16:uint16', 'n_active_eps:uint8', 'active_eps:*uint8',),),
# Zigbee Spec -- "2.4.3.1.7 Match_Desc_req"
# # Zigbee Spec -- "2.4.3.1.7 Match_Desc_req"
' match_desc ' : ( 0x0006 , ( ' addr16:uint16 ' , ' profile:uint16 ' , ' n_in_clusters:uint8 ' , ' in_clusters:*uint16 ' , ' n_out_clusters:uint8 ' , ' out_clusters:*uint16 ' , ) , ) ,
# 'match_desc': (0x0006, ('addr16:uint16', 'profile:uint16', 'n_in_clusters:uint8', 'in_clusters:*uint16', 'n_out_clusters:uint8', 'out_clusters:*uint16',),),
# Zigbee Spec -- "2.4.4.1.7 Match_Desc_resp"
# # Zigbee Spec -- "2.4.4.1.7 Match_Desc_resp"
' match_desc_resp ' : ( 0x8006 , ( ' status:enum8:success,device_not_found,inv_requesttype,no_descriptor ' , ' addr16:uint16 ' , ' n_match_list:uint8 ' , ' match_list:*uint8 ' , ) , ) ,
# 'match_desc_resp': (0x8006, ('status:enum8:success,device_not_found,inv_requesttype,no_descriptor', 'addr16:uint16', 'n_match_list:uint8', 'match_list:*uint8',),),
# Zigbee Spec -- "2.4.3.2.2 Bind_req"
# # Zigbee Spec -- "2.4.3.2.2 Bind_req"
' bind ' : ( 0x0021 , ( ' src_addr:uint64 ' , ' src_ep:uint8 ' , ' cluster:uint16 ' , ' dst_addr_mode:enum8:_,addr16,_,addr64 ' , ' dst_addr:uint64 ' , ' dst_ep:uint8 ' , ) , ) ,
# 'bind': (0x0021, ('src_addr:uint64', 'src_ep:uint8', 'cluster:uint16', 'dst_addr_mode:enum8:_,addr16,_,addr64', 'dst_addr:uint64', 'dst_ep:uint8',),),
# Zigbee Spec -- "2.4.3.2.3 Unbind_req"
# # Zigbee Spec -- "2.4.3.2.3 Unbind_req"
' unbind ' : ( 0x0022 , ( ' src_addr:uint64 ' , ' src_ep:uint8 ' , ' cluster:uint16 ' , ' dst_addr_mode:enum8:_,addr16,_,addr64 ' , ' dst_addr:uint64 ' , ' dst_ep:uint8 ' , ) , ) ,
# 'unbind': (0x0022, ('src_addr:uint64', 'src_ep:uint8', 'cluster:uint16', 'dst_addr_mode:enum8:_,addr16,_,addr64', 'dst_addr:uint64', 'dst_ep:uint8',),),
# Zigbee Spec -- "2.4.4.2.2 Bind_resp"
# # Zigbee Spec -- "2.4.4.2.2 Bind_resp"
' bind_resp ' : ( 0x8021 , ( ' status:enum8:success,not_supported,invalid_ep,table_full,not_authorized ' , ) , ) ,
# 'bind_resp': (0x8021, ('status:enum8:success,not_supported,invalid_ep,table_full,not_authorized',),),
# Zigbee Spec -- "2.4.4.2.3 Unbind_resp"
# # Zigbee Spec -- "2.4.4.2.3 Unbind_resp"
' unbind_resp ' : ( 0x8022 , ( ' status:enum8:success,not_supported,invalid_ep,table_full,not_authorized ' , ) , ) ,
# 'unbind_resp': (0x8022, ('status:enum8:success,not_supported,invalid_ep,table_full,not_authorized',),),
# Spec -- "2.4.3.1.11 Device_annce"
# # Spec -- "2.4.3.1.11 Device_annce"
' device_annce ' : ( 0x0013 , ( ' addr16:uint16 ' , ' addr64:uint64 ' , ' capability:uint8 ' ) , ) , # See Figure 2.17
# 'device_annce': (0x0013, ('addr16:uint16', 'addr64:uint64', 'capability:uint8'),), # See Figure 2.17
# Zigbee Spec -- "2.4.4.3.9 Mgmt_NWK_Update_notify"
# # Zigbee Spec -- "2.4.4.3.9 Mgmt_NWK_Update_notify"
' mgmt_nwk_update_notify ' : ( 0x8038 , ( ' status:uint8 ' , ' scanned_channels:uint32 ' , ' total_transmissions:uint16 ' , ' transmisson_failures:uint16 ' , ' n_energy_values:uint8 ' , ' energy_values:*uint8 ' , ) , ) ,
# 'mgmt_nwk_update_notify': (0x8038, ('status:uint8', 'scanned_channels:uint32', 'total_transmissions:uint16', 'transmisson_failures:uint16', 'n_energy_values:uint8', 'energy_values:*uint8',),),
}
# }
ZDO_BY_ID = {
#ZDO_BY_ID = {
cluster : ( name , args ) for name , ( cluster , args ) in ZDO_BY_NAME . items ( )
# cluster: (name, args) for name, (cluster, args) in ZDO_BY_NAME.items()
}
# }
def _decode_helper ( args , data , i = 0 ) :
def _decode_helper ( args , data , i = 0 ) :
@ -240,15 +240,15 @@ def _decode_helper(args, data, i=0):
def _decode_simple_descriptor ( data , i , obj ) :
def _decode_simple_descriptor ( data , i , obj ) :
return _decode_helper ( ( ' endpoint:uint8 ' , ' profile:uint16 ' , ' device_identifier:uint16 ' , ' device_version:uint8 ' , ' n_in_clusters:uint8 ' , ' in_clusters:*uint16 ' , ' n_out_clusters:uint8 ' , ' out_clusters:*uint16 ' , ) , data , i )
return _decode_helper ( ( ' endpoint:uint8 ' , ' profile:uint16 ' , ' device_identifier:uint16 ' , ' device_version:uint8 ' , ' n_in_clusters:uint8 ' , ' in_clusters:*uint16 ' , ' n_out_clusters:uint8 ' , ' out_clusters:*uint16 ' , ) , data , i )
def _encode_simple_descriptor ( ) :
#def _encode_simple_descriptor() :
pass
# pass
def _decode_read_attr_status ( data , i , obj ) :
def _decode_read_attr_status ( data , i , obj ) :
return _decode_helper ( ( ' attribute:uint16 ' , ' s_status:status8 ' , ' datatype:uint8 ' , ' value:datatype ' , ) , data , i )
return _decode_helper ( ( ' attribute:uint16 ' , ' s_status:status8 ' , ' datatype:uint8 ' , ' value:datatype ' , ) , data , i )
def _encode_read_attr_status ( ) :
#def _encode_read_attr_status() :
pass
# pass
def _decode_datatype ( data , i , obj ) :
def _decode_datatype ( data , i , obj ) :
@ -269,23 +269,23 @@ def _decode_datatype(data, i, obj):
return decode ( data , i , obj )
return decode ( data , i , obj )
def _encode_datatype ( ) :
#def _encode_datatype() :
pass
# pass
def _decode_attr_reporting_config ( ) :
def _decode_attr_reporting_config ( ) :
pass
pass
def _encode_attr_reporting_config ( obj ) :
#def _encode_attr_reporting_config(obj) :
data = bytes ( )
# data = bytes()
datatype = DATATYPES_BY_NAME [ obj [ ' datatype ' ] ]
# datatype = DATATYPES_BY_NAME[obj['datatype']]
# min=1s, max=60s
# # min=1s, max=60s
data + = struct . pack ( ' <BHBHH ' , 0 , obj [ ' attribute ' ] , datatype , obj [ ' minimum ' ] , obj [ ' maximum ' ] )
# data += struct.pack('<BHBHH', 0, obj['attribute'], datatype, obj['minimum'], obj['maximum'])
if datatype in ANALOG_DATATYPES :
# if datatype in ANALOG_DATATYPES:
decode , encode = STRUCT_TYPES [ DATATYPE_STRUCT_TYPES [ datatype ] ]
# decode, encode = STRUCT_TYPES[DATATYPE_STRUCT_TYPES[datatype]]
fmt , _nbytes = decode , encode
# fmt, _nbytes = decode, encode
data + = struct . pack ( fmt , obj . get ( ' delta ' , 1 ) )
# data += struct.pack(fmt, obj.get('delta', 1))
return data
# return data
def _decode_attr_reporting_status ( data , i , obj ) :
def _decode_attr_reporting_status ( data , i , obj ) :
# Note that attribute status records are not included for successfully configured attributes, in order to save bandwidth. In the case of successful configuration of all attributes, only a single attribute status record SHALL be included in the command, with the status field set to SUCCESS and the direction and attribute identifier fields omitted.
# Note that attribute status records are not included for successfully configured attributes, in order to save bandwidth. In the case of successful configuration of all attributes, only a single attribute status record SHALL be included in the command, with the status field set to SUCCESS and the direction and attribute identifier fields omitted.
@ -296,15 +296,15 @@ def _decode_attr_reporting_status(data, i, obj):
return _decode_helper ( ( ' status:status8 ' , ' direction:uint8 ' , ' attribute:uint16 ' , ) , data , i )
return _decode_helper ( ( ' status:status8 ' , ' direction:uint8 ' , ' attribute:uint16 ' , ) , data , i )
def _encode_attr_reporting_status ( ) :
#def _encode_attr_reporting_status() :
pass
# pass
def _decode_reported_attribute ( data , i , obj ) :
def _decode_reported_attribute ( data , i , obj ) :
return _decode_helper ( ( ' attribute:uint16 ' , ' datatype:uint8 ' , ' value:datatype ' , ) , data , i )
return _decode_helper ( ( ' attribute:uint16 ' , ' datatype:uint8 ' , ' value:datatype ' , ) , data , i )
def _encode_reported_attribute ( ) :
#def _encode_reported_attribute() :
pass
# pass
def _decode_string ( data , i , obj ) :
def _decode_string ( data , i , obj ) :
@ -346,12 +346,12 @@ STRUCT_TYPES = {
' enum16 ' : ( ' <H ' , 2 , ) ,
' enum16 ' : ( ' <H ' , 2 , ) ,
' status8 ' : ( _decode_status , _encode_status , ) ,
' status8 ' : ( _decode_status , _encode_status , ) ,
' string ' : ( _decode_string , _encode_string , ) ,
' string ' : ( _decode_string , _encode_string , ) ,
' simple_descriptor ' : ( _decode_simple_descriptor , _encode_simple_descriptor , ) ,
#'simple_descriptor': (_decode_simple_descriptor, _encode_simple_descriptor,) ,
' read_attr_status ' : ( _decode_read_attr_status , _encode_read_attr_status , ) ,
#'read_attr_status': (_decode_read_attr_status, _encode_read_attr_status,) ,
' datatype ' : ( _decode_datatype , _encode_datatype , ) ,
#'datatype': (_decode_datatype, _encode_datatype,) ,
' attr_reporting_config ' : ( _decode_attr_reporting_config , _encode_attr_reporting_config , ) ,
#'attr_reporting_config': (_decode_attr_reporting_config, _encode_attr_reporting_config,) ,
' attr_reporting_status ' : ( _decode_attr_reporting_status , _encode_attr_reporting_status , ) ,
#'attr_reporting_status': (_decode_attr_reporting_status, _encode_attr_reporting_status,) ,
' reported_attribute ' : ( _decode_reported_attribute , _encode_reported_attribute , ) ,
#'reported_attribute': (_decode_reported_attribute, _encode_reported_attribute,) ,
}
}
@ -371,6 +371,7 @@ DATATYPE_STRUCT_TYPES = {
DataType . CHARACTER_STRING : ' string ' ,
DataType . CHARACTER_STRING : ' string ' ,
}
}
DATATYPES_BY_NAME = {
DATATYPES_BY_NAME = {
' bool ' : DataType . BOOLEAN ,
' bool ' : DataType . BOOLEAN ,
' bitmap8 ' : DataType . BITMAP8 ,
' bitmap8 ' : DataType . BITMAP8 ,
@ -388,62 +389,62 @@ DATATYPES_BY_NAME = {
}
}
def decode_zdo ( cluster , data ) :
#def decode_zdo(cluster, data) :
if cluster not in ZDO_BY_ID :
# if cluster not in ZDO_BY_ID:
raise ValueError ( ' Unknown ZDO 0x {:04x} ' . format ( cluster ) )
# raise ValueError('Unknown ZDO 0x{:04x}'.format(cluster))
#print([hex(b) for b in data])
#print([hex(b) for b in data])
cluster_name , args = ZDO_BY_ID [ cluster ]
# cluster_name, args = ZDO_BY_ID[cluster]
seq , = struct . unpack ( ' <B ' , data [ : 1 ] )
# seq, = struct.unpack('<B', data[:1])
data = data [ 1 : ]
# data = data[1:]
kwargs , _nbytes = _decode_helper ( args , data )
return cluster_name , seq , kwargs
def _encode_helper ( args , kwargs ) :
data = bytes ( )
for arg in args :
arg = arg . split ( ' : ' )
name , datatype = arg [ 0 ] , arg [ 1 ] ,
if name . startswith ( ' n_ ' ) :
name = name [ 2 : ]
values = [ len ( kwargs [ name ] ) ]
else :
values = [ kwargs [ name ] ]
if datatype . startswith ( ' * ' ) :
# kwargs, _nbytes =_decode_helper(args, data)
datatype = datatype [ 1 : ]
values = kwargs [ name ]
decode , encode = STRUCT_TYPES [ datatype ]
# return cluster_name, seq, kwargs
for value in values :
if not callable ( decode ) :
fmt , _nbytes = decode , encode
if datatype == ' uint64 ' and isinstance ( value , str ) :
value = int ( value , 16 )
data + = struct . pack ( fmt , value )
else :
data + = encode ( value )
return data
#def _encode_helper(args, kwargs):
# data = bytes()
#
# for arg in args:
# arg = arg.split(':')
# name, datatype = arg[0], arg[1],
#
# if name.startswith('n_'):
# name = name[2:]
# values = [len(kwargs[name])]
# else:
# values = [kwargs[name]]
#
# if datatype.startswith('*'):
# datatype = datatype[1:]
# values = kwargs[name]
#
# decode, encode = STRUCT_TYPES[datatype]
#
# for value in values:
# if not callable(decode):
# fmt, _nbytes = decode, encode
# if datatype == 'uint64' and isinstance(value, str):
# value = int(value, 16)
# data += struct.pack(fmt, value)
# else:
# data += encode(value)
#
# return data
def encode_zdo ( cluster_name , seq , * * kwargs ) :
#def encode_zdo(cluster_name, seq, **kwargs) :
if cluster_name not in ZDO_BY_NAME :
# if cluster_name not in ZDO_BY_NAME:
raise ValueError ( ' Unknown ZDO " {} " ' . format ( cluster_name ) )
# raise ValueError('Unknown ZDO "{}"'.format(cluster_name))
cluster , args = ZDO_BY_NAME [ cluster_name ]
# cluster, args = ZDO_BY_NAME[cluster_name]
data = struct . pack ( ' <B ' , seq ) + _encode_helper ( args , kwargs )
# data = struct.pack('<B', seq) + _encode_helper(args, kwargs)
return cluster , data
# return cluster, data
PROFILE_COMMANDS_BY_NAME = {
PROFILE_COMMANDS_BY_NAME = {
@ -577,32 +578,32 @@ CLUSTERS_BY_NAME = {
} , ) ,
} , ) ,
# ZCL Spec -- Chapter 5 -- Lighting
# ZCL Spec -- Chapter 5 -- Lighting
' color ' : ( 0x0300 , {
# 'color': (0x0300, {
' move_to_hue ' : ( 0x00 , ( ' hue:uint8 ' , ' dir:enum8:shortest,longest,up,down ' , ' time:uint16 ' , ) , ) ,
# 'move_to_hue': (0x00, ('hue:uint8', 'dir:enum8:shortest,longest,up,down', 'time:uint16',),),
' move_hue ' : ( 0x01 , ( ' mode:enum8:stop,up,_,down ' , ' rate:uint8 ' , ) , ) ,
# 'move_hue': (0x01, ('mode:enum8:stop,up,_,down', 'rate:uint8',),),
' step_hue ' : ( 0x02 , ( ' mode:enum8:_,up,_,down ' , ' size:uint8 ' , ' time:uint8 ' , ) , ) ,
# 'step_hue': (0x02, ('mode:enum8:_,up,_,down', 'size:uint8', 'time:uint8',),),
' move_to_saturation ' : ( 0x03 , ( ' saturation:uint8 ' , ' dir:enum8:shortest,longest,up,down ' , ' time:uint16 ' , ) , ) ,
# 'move_to_saturation': (0x03, ('saturation:uint8', 'dir:enum8:shortest,longest,up,down', 'time:uint16',),),
' move_saturation ' : ( 0x04 , ( ' mode:enum8:stop,up,_,down ' , ' rate:uint8 ' , ) , ) ,
# 'move_saturation': (0x04, ('mode:enum8:stop,up,_,down', 'rate:uint8',),),
' step_saturation ' : ( 0x05 , ( ' mode:enum8:_,up,_,down ' , ' size:uint8 ' , ' time:uint8 ' , ) , ) ,
# 'step_saturation': (0x05, ('mode:enum8:_,up,_,down', 'size:uint8', 'time:uint8',),),
' move_to_hue_saturation ' : ( 0x06 , ( ' hue:uint8 ' , ' saturation:uint8 ' , ' time:uint16 ' , ) , ) ,
# 'move_to_hue_saturation': (0x06, ('hue:uint8', 'saturation:uint8', 'time:uint16',),),
' move_to_color_temperature ' : ( 0x0a , ( ' mireds:uint16 ' , ' time:uint16 ' , ) , ) ,
# 'move_to_color_temperature': (0x0a, ('mireds:uint16', 'time:uint16',),),
} , {
# }, {
} , {
# }, {
' hue ' : ( 0x0000 , ' uint8 ' , ) ,
# 'hue': (0x0000, 'uint8',),
' saturation ' : ( 0x0001 , ' uint8 ' , ) ,
# 'saturation': (0x0001, 'uint8',),
' remaining_time ' : ( 0x0002 , ' uint16 ' , ) ,
# 'remaining_time': (0x0002, 'uint16',),
' temperature ' : ( 0x0007 , ' uint16 ' , ) , # mireds, e.g. 1e6/K
# 'temperature': (0x0007, 'uint16',), # mireds, e.g. 1e6/K
} , ) ,
# },),
# ZCL Spec -- Chapter 13 -- Commissioning
# ZCL Spec -- Chapter 13 -- Commissioning
' commissioning ' : ( 0x0015 , {
# 'commissioning': (0x0015, {
} , {
# }, {
} , {
# }, {
} , ) ,
# },),
' touchlink ' : ( 0x1000 , {
# 'touchlink': (0x1000, {
} , {
#}, {
} , {
#}, {
} , ) ,
#},) ,
}
}
@ -622,6 +623,11 @@ for cluster_name, (cluster, rx_commands, tx_commands, attributes,) in CLUSTERS_B
)
)
def sequence_number ( data ) :
seq , command = struct . unpack ( ' <BB ' , data [ 1 : 3 ] )
return seq
def decode_zcl ( cluster , data ) :
def decode_zcl ( cluster , data ) :
frame_control , = struct . unpack ( ' <B ' , data [ : 1 ] )
frame_control , = struct . unpack ( ' <B ' , data [ : 1 ] )
frame_type = frame_control & 1
frame_type = frame_control & 1
@ -665,6 +671,17 @@ def decode_zcl(cluster, data):
return cluster_name , seq , ZclCommandType . CLUSTER , command_name , not disable_default_response , kwargs
return cluster_name , seq , ZclCommandType . CLUSTER , command_name , not disable_default_response , kwargs
def return_attributes ( cluster , data ) :
seq , command = struct . unpack ( ' <BB ' , data [ 1 : 3 ] )
data = data [ 3 : ]
cluster_name , rx_commands , tx_commands , attributes = CLUSTERS_BY_ID [ cluster ]
commands = tx_commands
command_name , args = commands [ command ]
kwargs , _nbytes = _decode_helper ( args , data )
return kwargs
def get_cluster_by_name ( cluster_name ) :
def get_cluster_by_name ( cluster_name ) :
if cluster_name not in CLUSTERS_BY_NAME :
if cluster_name not in CLUSTERS_BY_NAME :
raise ValueError ( ' Unknown cluster " {} " ' . format ( cluster_name ) )
raise ValueError ( ' Unknown cluster " {} " ' . format ( cluster_name ) )
@ -686,95 +703,133 @@ def get_cluster_rx_command(cluster_name, command_name):
return cluster , command , args
return cluster , command , args
def encode_cluster_command ( cluster_name , command_name , seq , direction = 0 , default_response = True , manufacturer_code = None , * * kwargs ) :
#def encode_cluster_command(cluster_name, command_name, seq, direction=0, default_response=True, manufacturer_code=None, **kwargs):
cluster , command , args = get_cluster_rx_command ( cluster_name , command_name )
# cluster, command, args = get_cluster_rx_command(cluster_name, command_name)
#
# ZCL Spec - "2.4.1.1 Frame Control Field"
# # ZCL Spec - "2.4.1.1 Frame Control Field"
frame_control = 1 # Cluster command (command is specific to this cluster)
# frame_control = 1 # Cluster command (command is specific to this cluster)
if direction :
# if direction:
frame_control | = 1 << 3
# frame_control |= 1 << 3
if not default_response :
# if not default_response:
frame_control | = 1 << 4
# frame_control |= 1 << 4
#
if manufacturer_code is not None :
# if manufacturer_code is not None:
frame_control | = 1 << 2
# frame_control |= 1 << 2
data = struct . pack ( ' <BHBB ' , frame_control , manufacturer_code , seq , command )
# data = struct.pack('<BHBB', frame_control, manufacturer_code, seq, command)
else :
# else:
data = struct . pack ( ' <BBB ' , frame_control , seq , command )
# data = struct.pack('<BBB', frame_control, seq, command)
#
data + = _encode_helper ( args , kwargs )
# data += _encode_helper(args, kwargs)
#
return cluster , data
# return cluster, data
def encode_profile_command ( cluster_name , command_name , seq , direction = 0 , default_response = True , manufacturer_code = None , * * kwargs ) :
if cluster_name not in CLUSTERS_BY_NAME :
raise ValueError ( ' Unknown cluster " {} " ' . format ( cluster_name ) )
cluster , rx_commands , tx_commands , attributes = CLUSTERS_BY_NAME [ cluster_name ]
if command_name not in PROFILE_COMMANDS_BY_NAME :
raise ValueError ( ' Unknown command " {} " ' . format ( command_name ) )
command , args = PROFILE_COMMANDS_BY_NAME [ command_name ]
# ZCL Spec - "2.4.1.1 Frame Control Field"
frame_control = 0 # Profile command (command applies to all clusters)
data = struct . pack ( ' <BBB ' , frame_control , seq , command )
data + = _encode_helper ( args , kwargs )
return cluster , data
def get_json ( ) :
#def encode_profile_command(cluster_name, command_name, seq, direction=0, default_response=True, manufacturer_code=None, **kwargs):
return {
# if cluster_name not in CLUSTERS_BY_NAME:
' profile ' : [
# raise ValueError('Unknown cluster "{}"'.format(cluster_name))
{
#
' name ' : p . name ,
# cluster, rx_commands, tx_commands, attributes = CLUSTERS_BY_NAME[cluster_name]
' profile ' : p
#
} for p in Profile
# if command_name not in PROFILE_COMMANDS_BY_NAME:
] ,
# raise ValueError('Unknown command "{}"'.format(command_name))
' zdo ' : [
#
{
# command, args = PROFILE_COMMANDS_BY_NAME[command_name]
' cluster_name ' : cluster_name ,
#
' cluster ' : cluster ,
# # ZCL Spec - "2.4.1.1 Frame Control Field"
' args ' : args
# frame_control = 0 # Profile command (command applies to all clusters)
} for cluster_name , ( cluster , args ) in ZDO_BY_NAME . items ( )
# data = struct.pack('<BBB', frame_control, seq, command)
] ,
#
' status ' : [ s . name for s in Status ] ,
# data += _encode_helper(args, kwargs)
' profile_command ' : [
#
{
# return cluster, data
' name ' : command_name ,
' command ' : command ,
' args ' : args
#def get_json():
} for command_name , ( command , args ) in PROFILE_COMMANDS_BY_NAME . items ( )
# return {
] ,
# 'profile': [
' cluster ' : [
# {
{
# 'name': p.name,
' name ' : cluster_name ,
# 'profile': p
' cluster ' : cluster ,
# } for p in Profile
' rx_commands ' : [
# ],
{
# # 'zdo': [
' name ' : command_name ,
# # {
' command ' : command ,
# # 'cluster_name': cluster_name,
' args ' : args
# # 'cluster': cluster,
} for command_name , ( command , args ) in rx_commands . items ( )
# # 'args': args
] ,
# # } for cluster_name, (cluster, args) in ZDO_BY_NAME.items()
' tx_commands ' : [
# # ],
{
# 'status': [s.name for s in Status],
' name ' : command_name ,
# 'profile_command': [
' command ' : command ,
# {
' args ' : args
# 'name': command_name,
} for command_name , ( command , args ) in tx_commands . items ( )
# 'command': command,
] ,
# 'args': args
' attributes ' : [
# } for command_name, (command, args) in PROFILE_COMMANDS_BY_NAME.items()
{
# ],
' name ' : attribute_name ,
# 'cluster': [
' attribute ' : attribute ,
# {
' datatype ' : datatype
# 'name': cluster_name,
} for attribute_name , ( attribute , datatype ) in attributes . items ( )
# 'cluster': cluster,
] ,
# 'rx_commands': [
} for cluster_name , ( cluster , rx_commands , tx_commands , attributes ) in CLUSTERS_BY_NAME . items ( )
# {
]
# 'name': command_name,
}
# 'command': command,
# 'args': args
# } for command_name, (command, args) in rx_commands.items()
# ],
# 'tx_commands': [
# {
# 'name': command_name,
# 'command': command,
# 'args': args
# } for command_name, (command, args) in tx_commands.items()
# ],
# 'attributes': [
# {
# 'name': attribute_name,
# 'attribute': attribute,
# 'datatype': datatype
# } for attribute_name, (attribute, datatype) in attributes.items()
# ],
# } for cluster_name, (cluster, rx_commands, tx_commands, attributes) in CLUSTERS_BY_NAME.items()
# ]
# }
def attribute_result ( kwargs ) :
attr_list = kwargs [ ' attributes ' ]
return_array = ' '
for i in attr_list :
if i == 0 : #zcl version default 0x2
return_array = b ' \x00 \x00 '
return_array = return_array + b ' \x00 \x20 \x03 '
if i == 1 : #Application Version, gonna be \x01 didn't make it 0 didn't make it
return_array = b ' \x01 \x00 '
return_array = return_array + b ' \x00 \x20 \x01 '
if i == 2 : #stack version
return_array = b ' \x02 \x00 '
return_array = return_array + b ' \x00 \x20 \x03 '
if i == 3 : #hardware version
return_array = b ' \x03 \x00 '
return_array = return_array + b ' \x00 \x20 \x01 '
if i == 4 : #manufacturer name
return_array = b ' \x04 \x00 '
return_array = return_array + b ' \x00 \x42 \x0B \x44 \x69 \x67 \x69 \x20 \x4e \x61 \x74 \x68 \x61 \x46 '
if i == 5 : #model identifier this is used for z2m tie
return_array = b ' \x05 \x00 '
return_array = return_array + b ' \x00 \x42 \x0F \x44 \x69 \x67 \x69 \x20 \x47 \x61 \x72 \x61 \x67 \x65 \x44 \x6f \x6f \x72 '
if i == 6 : # dateCode
return_array = b ' \x06 \x00 '
return_array = return_array + b ' \x00 \x42 \x08 \x32 \x30 \x32 \x31 \x30 \x36 \x31 \x35 '
if i == 7 : #power source did make it
return_array = b ' \x07 \x00 '
return_array = return_array + b ' \x00 \x30 \x04 '
if i == 17 : #physical environment
return_array = b ' \x11 \x00 '
return_array = return_array + b ' \x00 \x30 \x19 '
if i == 16384 : #swbuildID , gonna be E didn't make it 24 didn't make it
return_array = b ' \x00 \x40 '
return_array = return_array + b ' \x00 \x42 \x01 \x45 '
return return_array