# -*- coding: utf-8 -*-
"""HiQnet IP communication."""
from __future__ import print_function
__author__ = 'Raphaël Doursenaud'
import binascii
from twisted.internet import protocol
from ..protocol import Command
PORT = 3804 # IANA declared as IQnet. Go figure.
[docs]class Connection(object):
"""Handles HiQnet IP connection.
.. warning:: Other connection types such as RS232, RS485 or USB are not handled yet.
"""
udp_transport = None
tcp_transport = None
def __init__(self, udp_transport, tcp_transport):
"""Initiate a HiQnet IP connection over UDP and TCP.
:param udp_transport: Twisted UDP transport
:type udp_transport: twisted.internet.interfaces.IUDPTransport
:param tcp_transport: Twisted TCP transport
:type tcp_transport: twisted.internet.interfaces.ITCPTransport
:return:
"""
self.udp_transport = udp_transport
self.tcp_transport = tcp_transport
[docs] def sendto(self, command, destination='<broadcast>'):
"""Send command to the destination.
:param command: Message to send
:type command: Command
:param destination: Destination IPv4 address or '<broadcast>'
:type destination: str
"""
if command.flags.guaranteed:
# Send TCP message if the Guaranteed flag is set
# noinspection PyArgumentList
self.tcp_transport.write(bytes(command), (destination, PORT))
else:
# noinspection PyArgumentList
self.udp_transport.write(bytes(command), (destination, PORT))
print("=>") # DEBUG
print(vars(command)) # DEBUG
# noinspection PyClassHasNoInit
[docs]class TCPProtocol(protocol.Protocol):
"""HiQnet Twisted TCP protocol."""
name = "HiQnetTCP"
# noinspection PyPep8Naming
[docs] def startProtocol(self):
"""Called after protocol started listening."""
self.factory.app.tcp_transport = self.transport
[docs] def dataReceived(self, data):
"""Called when data is received.
:param data: Received binary data
:type data: bytearray
"""
# FIXME: debugging output should go into a logger
print("<=")
print(self.name + " data:")
print(binascii.hexlify(data))
command = Command(command=data)
print(vars(command)) # DEBUG
# TODO: Process some more :)
self.factory.app.handle_message(command, None, self.name)
[docs]class UDPProtocol(protocol.DatagramProtocol):
"""HiQnet Twisted UDP protocol."""
name = "HiQnetUDP"
def __init__(self, app):
self.app = app
[docs] def startProtocol(self):
"""Called after protocol started listening."""
self.transport.setBroadcastAllowed(True) # Some messages needs to be broadcasted
self.app.udp_transport = self.transport
[docs] def datagramReceived(self, data, addr):
"""Called when data is received.
:param data: Received binary data
:type data: bytearray
:param addr: IPv4 address and port of the sender
:type addr: tuple
"""
(host, port) = addr
# FIXME: debugging output should go into a logger
print("<=")
print(self.name + "data:")
print(binascii.hexlify(data))
print("from ", end="")
print(host, end="")
print(":", end="")
print(port)
command = Command(command=data)
print(vars(command)) # DEBUG
# TODO: Process some more :)
self.app.handle_message(command, host, self.name)
[docs]class Factory(protocol.Factory):
"""HiQnet Twisted Factory."""
protocol = TCPProtocol
def __init__(self, app):
self.app = app