# HA Spec -- http://www.zigbee.org/zigbee-for-developers/applicationstandards/zigbeehomeautomation/ # ZCL Spec -- http://www.zigbee.org/download/standards-zigbee-cluster-library/ # Zigbee & Zigbee Pro -- http://www.zigbee.org/zigbee-for-developers/network-specifications/zigbeepro/ # ZLL -- http://www.zigbee.org/zigbee-for-developers/applicationstandards/zigbee-light-link/ # Direct document downloads: # HA Spec -- http://www.zigbee.org/?wpdmdl=2129 # Zigbee Pro Stack Profile -- http://www.zigbee.org/wp-content/uploads/2014/11/docs-07-4855-05-0csg-zigbee-pro-stack-profile-2.pdf # Zigbee Spec -- http://www.zigbee.org/wp-content/uploads/2014/11/docs-05-3474-20-0csg-zigbee-specification.pdf # ZCL Spec -- http://www.zigbee.org/~zigbeeor/wp-content/uploads/2014/10/07-5123-06-zigbee-cluster-library-specification.pdf # ZLL - http://www.zigbee.org/?wpdmdl=2132 # A ZigBee device is made up of endpoints, each of which corresponds # to a single piece of functionality on the device. Endpoint IDs are # 8-bit. Endpoints have an associated 16-bit profile ID, which defines # the category of functionality on that endpoint (e.g. "Home # Automation"). # # Each endpoint contains a set of input and output cluster IDs (also # 16-bit), which correspond to a specific function in that # profile. These clusters are defined in the Zigbee Cluster Library. # The profile defines which clusters are expected to be implemented # for a given device type. # # Input clusters allow you to send a message to the device. For # example, a light bulb has an endpoint on the ZLL or HA profile with # a 'level control' input cluster to set brightness. Output clusters # allow the device to send a message. For examplebutton has an HA # endpoint with an 'on off' output cluster. # # Clusters provide commands and attributes (e.g. the "on/off" cluster # has "turn on", "turn off" and "toggle" commands and an "on/off" # attribute). # # Devices can send commands from an endpoint to clusters on endpoints # on other devices. There are two types of commands - profile # commands and cluster commands. Profile commands are common to all # clusters and allow you to perform actions such as querying an # attribute from the specified cluster. Cluster commands are defined # on each cluster. # # For example, to query the state of a bulb, you would send the "read # attributes" profile command to the "on/off" cluster for the "on/off" # attribute id. To turn on the bulb, you would send the "on" cluster # command to the "on/off" cluster. # # There is a special "ZDO" endpoint (id=0) on all devices which uses # the "ZigBee" profile, and this allows general device configuration # and information. The ZDO clusters correspond to requests and # responses. The response cluster IDs have the high bit set. # Essentially each cluster only supports a single command, and there # # are only cluster commands, so the command id and command type are # not included in the request. # # Some common ZDO requests are: # "device announce" (broadcast from a device on power on) # "bind" (allows a device to connect an output cluster from another # device to an input cluster). # "active endpoints" (get the list of endpoint IDs) # t_int = 0 import enum import struct class DataType(enum.IntEnum): # ZCL Spec -- "2.5.2 Data Types" NULL = 0x00 BOOLEAN = 0x10 BITMAP8 = 0x18 BITMAP16 = 0x19 BITMAP64 = 0x1f UINT8 = 0x20 UINT16 = 0x21 UINT64 = 0x27 INT8 = 0x28 INT16 = 0x29 INT64 = 0x2f ENUM8 = 0x30 ENUM16 = 0x31 CHARACTER_STRING = 0x42 ANALOG_DATATYPES = set([ DataType.UINT8, DataType.UINT16, DataType.UINT64, DataType.INT8, DataType.INT16, DataType.INT64 ]) class Profile(enum.IntEnum): ZIGBEE = 0x0000 HOME_AUTOMATION = 0x0104 ZIGBEE_LIGHT_LINK = 0xc05e def get_profile_by_name(n): return getattr(Profile, n.upper(), None) class Endpoint(enum.IntEnum): ZDO = 0x00 class ZclCommandType(enum.IntEnum): PROFILE = 0 CLUSTER = 1 class Status(enum.IntEnum): SUCCESS = 0x00 FAILURE = 0x01 NOT_AUTHORIZED = 0x7E RESERVED_FIELD_NOT_ZERO = 0x7F MALFORMED_COMMAND = 0x80 UNSUP_CLUSTER_COMMAND = 0x81 UNSUP_GENERAL_COMMAND = 0x82 UNSUP_MANUF_CLUSTER_COMMAND = 0x83 UNSUP_MANUF_GENERAL_COMMAND = 0x84 INVALID_FIELD = 0x85 UNSUPPORTED_ATTRIBUTE = 0x86 INVALID_VALUE = 0x87 INSUFFICIENT_SPACE = 0x89 DUPLICATE_EXISTS = 0x8A NOT_FOUND = 0x8B UNREPORTABLE_ATTRIBUTE = 0x8C INVALID_DATA_TYPE = 0x8D INVALID_SELECTOR = 0x8E WRITE_ONLY = 0x8F INCONSISTENT_STARTUP_STATE = 0x90 DEFINED_OUT_OF_BAND = 0x91 INCONSISTENT = 0x92 ACTION_DENIED = 0x93 TIMEOUT = 0x94 ABORT = 0x95 INVALID_IMAGE = 0x96 WAIT_FOR_DATA = 0x97 NO_IMAGE_AVAILABLE = 0x98 REQUIRE_MORE_IMAGE = 0x99 NOTIFICATION_PENDING = 0x9A HARDWARE_FAILURE = 0xC0 SOFTWARE_FAILURE = 0xC1 CALIBRATION_ERROR = 0xC2 UNSUPPORTED_CLUSTER = 0xC3 ZDO_BY_NAME = { # Zigbee Spec -- "2.4.3.1.5 Simple_Desc_req" 'simple_desc': (0x0004, ('addr16:uint16', 'endpoint:uint8',),), # Zigbee Spec -- "2.4.4.1.5 Simple_Desc_resp" 'simple_desc_resp': (0x8004, ('status:enum8:success,invalid_ep,not_active,device_not_found,inv_requesttype,no_descriptor', 'addr16:uint16', 'b_simple_descriptors:uint8', 'simple_descriptors:#simple_descriptor',),), # Zigbee Spec -- "2.4.3.1.6 Active_EP_req" 'active_ep': (0x0005, ('addr16:uint16',),), # Zigbee Spec -- "2.4.4.1.6 Active_EP_resp" 'active_ep_resp': (0x8005, ('status:enum8:success,device_not_found,inv_requesttype,no_descriptor', 'addr16:uint16', 'n_active_eps:uint8', 'active_eps:*uint8',),), # Zigbee Spec -- "2.4.3.1.7 Match_Desc_req" 'match_desc': (0x0006, ('addr16:uint16', 'profile:uint16', 'n_in_clusters:uint8', 'in_clusters:*uint16', 'n_out_clusters:uint8', 'out_clusters:*uint16',),), # Zigbee Spec -- "2.4.4.1.7 Match_Desc_resp" 'match_desc_resp': (0x8006, ('status:enum8:success,device_not_found,inv_requesttype,no_descriptor', 'addr16:uint16', 'n_match_list:uint8', 'match_list:*uint8',),), # Zigbee Spec -- "2.4.3.2.2 Bind_req" 'bind': (0x0021, ('src_addr:uint64', 'src_ep:uint8', 'cluster:uint16', 'dst_addr_mode:enum8:_,addr16,_,addr64', 'dst_addr:uint64', 'dst_ep:uint8',),), # Zigbee Spec -- "2.4.3.2.3 Unbind_req" 'unbind': (0x0022, ('src_addr:uint64', 'src_ep:uint8', 'cluster:uint16', 'dst_addr_mode:enum8:_,addr16,_,addr64', 'dst_addr:uint64', 'dst_ep:uint8',),), # Zigbee Spec -- "2.4.4.2.2 Bind_resp" 'bind_resp': (0x8021, ('status:enum8:success,not_supported,invalid_ep,table_full,not_authorized',),), # Zigbee Spec -- "2.4.4.2.3 Unbind_resp" 'unbind_resp': (0x8022, ('status:enum8:success,not_supported,invalid_ep,table_full,not_authorized',),), # Spec -- "2.4.3.1.11 Device_annce" 'device_annce': (0x0013, ('addr16:uint16', 'addr64:uint64', 'capability:uint8'),), # See Figure 2.17 # Zigbee Spec -- "2.4.4.3.9 Mgmt_NWK_Update_notify" 'mgmt_nwk_update_notify': (0x8038, ('status:uint8', 'scanned_channels:uint32', 'total_transmissions:uint16', 'transmisson_failures:uint16', 'n_energy_values:uint8', 'energy_values:*uint8',),), } ZDO_BY_ID = { cluster: (name, args) for name, (cluster, args) in ZDO_BY_NAME.items() } def _decode_helper(args, data, i=0): kwargs = {} n = 1 b = 0 for arg in args: arg = arg.split(':') name, datatype = arg[0], arg[1], v = None decode, encode = STRUCT_TYPES[datatype.strip('*#%')] if not callable(decode): fmt, nbytes, = decode, encode decode = lambda dd, ii, _: (struct.unpack(fmt, dd[ii:ii + nbytes])[0], ii + nbytes,) if datatype.startswith('*'): # Repeat n times v = [] for _i in range(n): x, i = decode(data, i, kwargs) v.append(x) elif datatype.startswith('#'): # There are b bytes of records v = [] ii = i + b while i < ii: x, i = decode(data, i, kwargs) v.append(x) elif datatype.startswith('%'): # Keep reading records until end of frame v = [] ii = len(data) while i < ii: x, i = decode(data, i, kwargs) v.append(x) else: v, i = decode(data, i, kwargs) stop = False if name.startswith('s_'): name = name[2:] if v != 'SUCCESS': stop = True if name.startswith('n_'): n = v elif name.startswith('b_'): b = v else: kwargs[name] = v n = 1 b = 0 if stop: break return kwargs, i def _decode_simple_descriptor(data, i, obj): return _decode_helper(('endpoint:uint8', 'profile:uint16', 'device_identifier:uint16', 'device_version:uint8', 'n_in_clusters:uint8', 'in_clusters:*uint16', 'n_out_clusters:uint8', 'out_clusters:*uint16',), data, i) def _encode_simple_descriptor(): pass def _decode_read_attr_status(data, i, obj): return _decode_helper(('attribute:uint16', 's_status:status8', 'datatype:uint8', 'value:datatype',), data, i) def _encode_read_attr_status(): pass def _decode_datatype(data, i, obj): if 'datatype' not in obj: raise ValueError('Object needs datatype field.') datatype = obj['datatype'] if datatype == DataType.NULL: return None, i, if datatype not in DATATYPE_STRUCT_TYPES: raise ValueError('Unknown struct type') decode, encode = STRUCT_TYPES[DATATYPE_STRUCT_TYPES[datatype]] if not callable(decode): fmt, nbytes, = decode, encode decode = lambda dd, ii, _: (struct.unpack(fmt, dd[ii:ii + nbytes])[0], ii + nbytes,) return decode(data, i, obj) def _encode_datatype(): pass def _decode_attr_reporting_config(): pass def _encode_attr_reporting_config(obj): data = bytes() datatype = DATATYPES_BY_NAME[obj['datatype']] # min=1s, max=60s data += struct.pack('