# -*- coding: utf-8 -*-
"""HiQnet protocol library."""
from __future__ import print_function
__author__ = 'Raphaël Doursenaud'
import itertools
import os
import socket
import binascii
from flags import *
from networkinfo import *
PROTOCOL_VERSION = 2
PROTOCOL_MIN_VERSION = 1
PROTOCOL_MAX_VERSION = 3
MIN_HEADER_LEN = 25 # bytes
DEFAULT_HOP_COUNTER = 5
DEFAULT_FLAG_MASK = b'\x01\xff'
SUPPORTED_FLAG_MASK = DEFAULT_FLAG_MASK
DEFAULT_KEEPALIVE = 10000 # ms
[docs]class Message(object):
"""HiQnet messages handling."""
MESSAGES = {
'DISCOINFO': b'\x00\x00',
'RESERVED0': b'\x00\x01',
'GETNETINFO': b'\x00\x02',
'RESERVED1': b'\x00\x03',
'REQADDR': b'\x00\x04',
'ADDRUSED': b'\x00\x05',
'SETADDR': b'\x00\x06',
'GOODBYE': b'\x00\x07',
'HELLO': b'\x00\x08',
'MULTPARMSET': b'\x01\x00',
'MULTOBJPARMSET': b'\x01\x01',
'PARMSETPCT': b'\x01\x02',
'MULTPARMGET': b'\x01\x03',
'GETATTR': b'\x01\x0d',
'SETATTR': b'\x01\x0e',
'MULTPARMSUB': b'\x01\x0f',
'PARMSUBPCT': b'\x01\x11',
'MULTPARMUNSUB': b'\x01\x12',
'PARMSUBALL': b'\x01\x13',
'PARMUNSUBALL': b'\x01\x14',
'SUBEVTLOGMSGS': b'\x01\x15',
'GETVDLIST': b'\x01\x1a',
'STORE': b'\x01\x24',
'RECALL': b'\x01\x25',
'LOCATE': b'\x01\x29',
'UNSUBEVTLOGMSGS': b'\x01\x2b',
'REQEVTLOG': b'\x01\x2c',
b'\x00\x00': 'DISCOINFO',
b'\x00\x01': 'RESERVED0',
b'\x00\x02': 'GETNETINFO',
b'\x00\x03': 'RESERVED1',
b'\x00\x04': 'REQADDR',
b'\x00\x05': 'ADDRUSED',
b'\x00\x06': 'SETADDR',
b'\x00\x07': 'GOODBYE',
b'\x00\x08': 'HELLO',
b'\x01\x00': 'MULTPARMSET',
b'\x01\x01': 'MULTOBJPARMSET',
b'\x01\x02': 'PARMSETPCT',
b'\x01\x03': 'MULTPARMGET',
b'\x01\x0d': 'GETATTR',
b'\x01\x0e': 'SETATTR',
b'\x01\x0f': 'MULTPARMSUB',
b'\x01\x11': 'PARMSUBPCT',
b'\x01\x12': 'MULTPARMUNSUB',
b'\x01\x13': 'PARMSUBALL',
b'\x01\x14': 'PARMUNSUBALL',
b'\x01\x15': 'SUBEVTLOGMSGS',
b'\x01\x1a': 'GETVDLIST',
b'\x01\x24': 'STORE',
b'\x01\x25': 'RECALL',
b'\x01\x29': 'LOCATE',
b'\x01\x2b': 'UNSUBEVTLOGMSGS',
b'\x01\x2c': 'REQEVTLOG',
}
identifier = None
name = None
def __init__(self, identifier=None, name=None):
"""Build a message.
:param identifier: The message ID
:type identifier: bytearray
:param name: The message name
:type name: str
"""
if identifier and name:
raise ValueError("You must no supply both a identifier and name.")
if identifier:
if identifier in self.MESSAGES:
self.identifier = identifier
self.name = self.MESSAGES[identifier]
else:
raise ValueError("Unknown message ID.")
if name:
if name in self.MESSAGES:
self.name = name
self.identifier = self.MESSAGES[name]
else:
raise ValueError("Unknown message name.")
def __str__(self):
return self.identifier
def __repr__(self):
return self.name
[docs]class FullyQualifiedAddress(object):
"""Fully Qualified HiQnet Address."""
device_address = None
vd_address = None
object_address = None
def __init__(self,
device_address=None,
vd_address=b'\x00',
object_address=b'\x00\x00\x00',
devicevdobject=None,
):
"""Build a Fully Qualified HiQnet Address.
:param devicevdobject: Full binary address
:type devicevdobject: bytearray
16 bits = Device address
8 bits = VD address
24 bits = Object address
:param device_address: Device address
:type device_address: int between 1 and 65535
:param vd_address: Virtual device address
:type vd_address: bytearray 8 bits
:param object_address: Object address
:type object_address: bytearray 24 bits
"""
if devicevdobject:
repr(devicevdobject)
self.device_address = struct.unpack('!H', devicevdobject[0:2])[0]
# TODO: make vd_address and object_address int rather than byte arrays
self.vd_address = devicevdobject[2]
self.object_address = devicevdobject[3:6]
else:
self.device_address = device_address
# TODO: make vd_address and object_address int rather than byte arrays
self.vd_address = vd_address
self.object_address = object_address
@classmethod
[docs] def broadcast_address(cls):
"""Get the Fully Qualified HiQnet Broadcast Address.
:type cls: FullyQualifiedAddress
:rtype: FullyQualifiedAddress
"""
# noinspection PyCallingNonCallable
return cls(device_address=65535, vd_address=b'\x00', object_address=b'\x00\x00\x00')
def __bytes__(self):
"""Get the address as bytes."""
# TODO: make vd_address and object_address int rather than byte arrays
return struct.pack('!H', self.device_address) + self.vd_address + self.object_address
def __str__(self):
"""Get the address in a printable format."""
return self.__bytes__()
def __repr__(self):
return str(self.device_address) + '.' + \
"%d" % struct.unpack('!B', self.vd_address) + \
'.' + "%d.%d.%d" % struct.unpack('!BBB', self.object_address)
[docs]class Command(object):
"""HiQnet command."""
# Placeholder, will be filled later
header = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
_version = PROTOCOL_VERSION # 1 byte
"""
The Version Number indicates the revision number of the entire protocol; it is not
used for differentiating between revisions of individual commands. HiQnet is
currently at revision 2. Devices that communicate with HiQnet version 1.0
include the dbx ZonePro family. All others use version 2.0.
It also seems that a version 3.0 is in the works for IEEE AVB integration.
:type: int
"""
_headerlen = MIN_HEADER_LEN # 1 byte
"""
The Header Length is the size in bytes of the entire command header, including any
additional headers such as 'Error' or 'Multi-part'.
:type: int
"""
_commandlen = 0 # 4 bytes
"""
The command length is the size in bytes of the entire command - from the
‘Version’ field through to the last byte of the payload.
:type: int
"""
source_address = FullyQualifiedAddress() # 6 byte (48 bits)
"""
The Source Address specifies the HiQnet address where the command has come
from; this is often used by the recipient for sending back reply commands.
"""
destination_address = FullyQualifiedAddress() # 6 byte (48 bits)
"""The Destination Address specifies where the command is to be delivered to."""
message = Message(identifier=b'\x00\x00') # 2 bytes
"""
The Message ID is a unique identifier that indicates the method that the
destination Device must perform. If there is a payload, it is usually specific to the
type of method indicated by the Message ID. Product-specific IDs may also exist
and will be documented appropriately.
"""
flags = DeviceFlags() # 2 bytes
"""
The Flags denote what kinds of options are active when set to ‘1’.
They are allocated in the following manner:
+----------+-------------------+
| Bit 15-9 | Bit 8 |
+==========+===================+
| Reserved | Session number |
| |(Header extension) |
+----------+-------------------+
+----------+--------------------+------------+----------+
| Bit 7 | Bit 6 | Bit 5 | Bit 4 |
+==========+====================+============+==========+
| Reserved | Multi-part message | Guaranteed | Reserved |
| |(Header extension) | | |
+----------+--------------------+------------+----------+
+--------------------+-------------+-----------------+-------------------------+
| Bit 3 | Bit 2 | Bit 1 | Bit 0 |
+====================+=============+=================+=========================+
| Error | Information | Acknowledgement | Request Acknowledgement |
| (Header extension) | | | |
+--------------------+-------------+-----------------+-------------------------+
Bit 5 must be set for any applications using TCP/IP only on the network
interface. This will ensure that any commands are sent guaranteed (TCP rather
than UDP).
"""
hop_counter = DEFAULT_HOP_COUNTER # 1 byte
"""
The Hop Count denotes the number of network hops that a command has traversed
and is used to stop broadcast loops. This field should generally be defaulted to
0x05.
:type: int
"""
new_sequence_number = itertools.count()
sequence_number = 0 # 2 bytes
"""
The Sequence number is used to uniquely identify each HiQnet command leaving a
Device. This is primarily used for diagnostic purposes. The sequence number
starts at 0 on power-up and increments for each successive command the Routing
Layer sends to the Packet Layer. The Sequence Number rolls over at the top of its
range.
:type: int
"""
optional_headers = b'' # Placeholder, may not be filled
# TODO: optional headers object?
error_code = 0
error_string = ''
start_seq_no = 0
bytes_remaining = 0
session_number = 0
payload = b'' # Placeholder, filled later, depends on the message
def __init__(self, source=None, destination=None, command=None):
"""Initiate an HiQnet command from source to destination.
:param source: Source of the command
:type source: FullyQualifiedAddress
:param destination: destination of the command
:type destination: FullyQualifiedAddress
:return:
"""
if command:
self.decode(command=command)
else:
self.source_address = source
self.destination_address = destination # TODO: use broadcast if not provided
self.sequence_number = next(self.new_sequence_number)
[docs] def decode(self, command):
"""Decodes a binary command.
:param command: The binary command to decode
"""
print("Real command length: ", len(command))
if len(command) < MIN_HEADER_LEN:
raise BufferError("Command too short")
self.version = struct.unpack('!B', command[0])[0]
self.headerlen = struct.unpack('!B', command[1])[0]
if len(command) < self.headerlen:
raise BufferError("Command is smaller than it's header length")
self.commandlen = struct.unpack('!L', command[2:6])[0]
if len(command) != self.commandlen:
raise BufferError("Command length header and actual length missmatch")
self.source_address = FullyQualifiedAddress(devicevdobject=command[6:12])
self.destination_address = FullyQualifiedAddress(devicevdobject=command[12:18])
self.message = Message(identifier=command[18:20])
self.flags.asByte = struct.unpack('!H', command[20:22])[0]
print("Flags: ", repr(self.flags))
self.hop_counter = struct.unpack('!B', command[22])[0]
self.sequence_number = struct.unpack('!H', command[23:25])[0]
if self.headerlen > MIN_HEADER_LEN:
index = MIN_HEADER_LEN
# Optional Headers are present, check the flags
if self.flags.error:
self.error_code = struct.unpack('!B', command[index])[0]
index += 1
self.error_string = struct.unpack('s', command[index])[0] # FIXME: detect string end
index += len(self.error_string)
raise NotImplementedError
if self.flags.multipart:
self.start_seq_no = struct.unpack('!B', command[index])[0]
index += 1
self.bytes_remaining = struct.unpack('!L', command[index])[0]
index += 4
raise NotImplementedError
if self.flags.session:
self.session_number = struct.unpack('!H', command[index:index + 2])[0]
index += 2
print(self.session_number) # DEBUG
self.payload = command[self.headerlen:self.commandlen]
# TODO: decode payload by message type
if self.message.name == 'DISCOINFO':
self.decode_discoinfo()
[docs] def decode_discoinfo(self):
"""Decode discovery information command payload.
Payload:
- HiQnet Device
- Cost
- Serial Number
- Max Message size
- Keep alive period
- NetwordID
- NetworkInfo
"""
# Message type
if self.flags.b.info:
print("DiscoInfo(I)")
else:
print("DiscoInfo(Q)")
index = 0
size = 2
print("Device address: ", end='')
print(struct.unpack('!H', self.payload[index:index + size])[0])
index += size
print("Cost: ", end='')
print(struct.unpack('!B', self.payload[index])[0])
index += 1
# TODO: extract this algo as the "BLOCK" type
size = 2
data_len = struct.unpack('!H', self.payload[index:index + size])[0]
index += size
size = data_len
print("Serial number: ", end='')
serial_number = ''
while size:
serial_number += struct.unpack('!s', self.payload[index])[0]
index += 1
size -= 1
print(serial_number)
size = 4
print("Max message size: ", end='')
print(struct.unpack('!L', self.payload[index:index + size])[0])
index += size
size = 2
print("Keep alive period: ", end='')
print(struct.unpack('!H', self.payload[index:index + size])[0])
index += size
print("NetworkID: ", end='')
network_id = struct.unpack('!B', self.payload[index])[0]
print(network_id)
index += 1
print("NetworkInfo: ", end='')
if network_id == NetworkInfo.NET_ID_TCP_IP:
# TCP/IP
size = 6
mac_address = "%02x:%02x:%02x:%02x:%02x:%02x" % struct.unpack("!BBBBBB", self.payload[index:index + size])
index += size
dhcp = bool(struct.unpack("B", self.payload[index])[0])
index += 1
size = 4
ip_address = "%d.%d.%d.%d" % struct.unpack('!BBBB', self.payload[index:index + size])
index += size
size = 4
subnet_mask = "%d.%d.%d.%d" % struct.unpack('!BBBB', self.payload[index:index + size])
index += size
size = 4
gateway_address = "%d.%d.%d.%d" % struct.unpack('!BBBB', self.payload[index:index + size])
index += size
network_info = IPNetworkInfo(mac_address=mac_address, dhcp=dhcp,
ip_address=ip_address, subnet_mask=subnet_mask,
gateway_address=gateway_address)
print(vars(network_info)) # DEBUG
elif network_id == NetworkInfo.NET_ID_RS232:
network_info = RS232NetworkInfo
raise NotImplementedError
else:
raise NotImplementedError
@property
def version(self):
return self._version
@version.setter
def version(self, version):
"""Set the version.
:param version: Version number.
:type version: int
"""
if not PROTOCOL_MIN_VERSION <= version <= PROTOCOL_MAX_VERSION:
raise ValueError("This HiQnet version is unknown.")
self._version = version
@property
def headerlen(self):
return self._headerlen
@headerlen.setter
def headerlen(self, headerlen):
"""Set the header length.
:param headerlen: Header length
:type headerlen: int
"""
if headerlen < MIN_HEADER_LEN:
raise ValueError("The header can't be smaller than " + str(MIN_HEADER_LEN))
self._headerlen = headerlen
@property
def commandlen(self):
return self._commandlen
@commandlen.setter
def commandlen(self, commandlen):
""" Set the command length
:param commandlen: Command lenght
:type commandlen: int
"""
if commandlen < self.headerlen or commandlen < MIN_HEADER_LEN:
raise ValueError("Command can't be smaller than the header")
self._commandlen = commandlen
[docs] def disco_info(self, device, disco_type='Q'):
"""Build a Discovery Information command.
:param device: The HiQnet device sending the discovery command
:type device: Device
:param disco_type: Discovery type message. I(nfo) or Q(uery)
:type disco_type: str
"""
if disco_type == 'I':
self.flags.info = 1
else:
self.flags.info = 0
self.message = Message(name='DISCOINFO')
# Payload
device_address = struct.pack('!H', self.source_address.device_address)
cost = b'\x01'
serial_number_len = struct.pack('!H', 16)
try:
serial_number = bytes(device.manager.serial_number.decode('ascii'))
except AttributeError:
# We are running Python 3
# noinspection PyArgumentList
serial_number = bytes(device.manager.serial_number, 'ascii')
serial_number = struct.pack('!16s', serial_number) # May use utf-16-be == UCS-2
max_message_size = struct.pack('!I', 65535) # FIXME: should really be the server's buffer size
keep_alive_period = struct.pack('!H', DEFAULT_KEEPALIVE)
network_id = struct.pack('!B', device.network_info.network_id)
mac_address = binascii.unhexlify(device.network_info.mac_address.replace(':', ''))
dhcp = struct.pack('!B', device.network_info.dhcp)
ip_address = socket.inet_aton(device.network_info.ip_address)
subnet_mask = socket.inet_aton(device.network_info.subnet_mask)
gateway_address = socket.inet_aton(device.network_info.gateway_address)
self.payload = device_address + cost + serial_number_len + serial_number \
+ max_message_size + keep_alive_period + network_id \
+ mac_address + dhcp + ip_address + subnet_mask + gateway_address
[docs] def request_address(self, req_addr):
"""Build a Request Address command.
:param req_addr:
:type req_addr: int
"""
self.message = Message(name='REQADDR')
self.payload = struct.pack('!H', req_addr)
[docs] def address_used(self):
"""Build an Address Used command."""
self.message = Message(name='ADDRUSED')
[docs] def hello(self):
"""Build an hello command.
Starts a session.
:return: The session number
:rtype: int
"""
self.message = Message(name='HELLO')
session_number = os.urandom(2)
flag_mask = SUPPORTED_FLAG_MASK
self.payload = session_number + flag_mask
return session_number
[docs] def get_attributes(self):
"""Build a Get Attributes command."""
self.message = Message(name='GETATTR')
raise NotImplementedError
[docs] def get_vd_list(self, workgroup=''):
"""Build a Get VD List command.
:param workgroup: The workgroup to get the VD list from.
:type workgroup: str
"""
self.message = Message(name='GETVDLIST')
self.payload = workgroup
# raise NotImplementedError
[docs] def store(self):
"""Build a Store command.
Stores current state to a preset.
"""
self.message = Message(name='STORE')
raise NotImplementedError
[docs] def recall(self):
"""Build a Recall command.
Recalls a preset.
"""
self.message = Message(name='RECALL')
raise NotImplementedError
[docs] def locate(self, time, serial_number):
"""Builds a Locate command.
The receiver makes itself visible. Usually this is done by flashing some LEDs on the front panel.
:param time: time the leds should flash in ms
0x0000 turns off locate led(s)
0xffff turns on locate led(s)
:type time: bytearray
:param serial_number: The target device's serial number
:type serial_number: str
.. seealso:: :py:func:`locate_on`, :py:func:`locate_off`
"""
self.message = Message(name='LOCATE')
serial_number_len = struct.pack('!H', len(serial_number))
self.payload = time + serial_number_len + serial_number
[docs] def locate_on(self, serial_number):
"""Builds a locate command asking for the visual clue to be active.
:param serial_number: The target device's serial number
:type serial_number: str
"""
self.locate(b'\xff\xff', serial_number)
[docs] def locate_off(self, serial_number):
"""Builds a locate command asking for the visual clue to be inactive.
:param serial_number: The target device's serial number
:type serial_number: str
"""
self.locate(b'\x00\x00', serial_number)
def _build_optional_headers(self):
"""Builds the optional command headers."""
# Optional error header
if self.flags.error:
error_code = b'\x02'
error_string = b''
self.optional_headers += error_code + error_string
raise NotImplementedError
# Optional multi-part header
if self.flags.multipart:
start_seq_no = b'\x02'
bytes_remaining = b'\x00\x00\x00\x00' # 4 bytes
self.optional_headers += start_seq_no + bytes_remaining
raise NotImplementedError
# Optional session number header
if self.flags.session:
session_number = b'\x00\x00' # 2 bytes
self.optional_headers += session_number
raise NotImplementedError
def _compute_headerlen(self):
"""Computes the header length."""
self.headerlen = MIN_HEADER_LEN + len(self.optional_headers)
def _compute_commandlen(self):
"""Computes the command length."""
self.commandlen = len(self.payload) + self.headerlen
def _build_header(self):
"""Builds the command header."""
self._build_optional_headers()
self._compute_headerlen()
self._compute_commandlen()
self.header = struct.pack('!B', self.version) \
+ struct.pack('!B', self.headerlen) \
+ struct.pack('!L', self.commandlen) \
+ bytes(self.source_address) + bytes(self.destination_address) \
+ bytes(self.message) \
+ bytes(self.flags) \
+ struct.pack('!B', self.hop_counter) \
+ struct.pack('!H', self.sequence_number) \
+ bytes(self.optional_headers)
def __bytes__(self):
"""Get the command as bytes."""
self._build_header()
return self.header + self.payload
def __str__(self):
"""Get the command in a printable form."""
return self.__bytes__()
def __repr__(self):
return vars(self)
# TODO: Event logs
# TODO: Session
# TODO: Device arrival announce