1 """ Zeromq messages """
4 from struct import pack, unpack
6 __all__ = "Bcast", "Resp"
9 def pack_peer(peeraddr):
11 saddr, port, _x, _y = peeraddr
12 addr = ip.ip_address(saddr)
14 saddr, port = peeraddr
15 a4 = ip.ip_address(saddr)
16 addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
17 return addr.packed + pack("!H", port)
20 def unpack_peer(buffer):
21 a6 = ip.IPv6Address(buffer[:16])
22 port = unpack("!H", buffer[16:])[0]
30 def __init__(self, *args, **kwargs):
34 for k, v in self.KWARGS:
35 setattr(self, k, kwargs.get(k, v))
38 self.__class__.__name__
46 return "{}({})".format(
47 self.__class__.__name__,
52 'bytes.fromhex("{}")'.format(getattr(self, k).hex())
53 if isinstance(getattr(self, k), bytes)
54 else getattr(self, k),
56 for k, _ in self.KWARGS
61 def __eq__(self, other):
63 [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
66 def decode(self, buffer):
67 raise NotImplementedError(
68 self.__class__.__name__ + "must implement `decode()` method"
73 raise NotImplementedError(
74 self.__class__.__name__ + "must implement `packed()` property"
79 """Zmq message to broadcast what was received from the terminal"""
93 + ("0000000000000000" if self.imei is None else self.imei).encode()
97 else pack("!d", self.when)
99 + pack_peer(self.peeraddr)
103 def decode(self, buffer):
104 self.proto = buffer[0]
105 self.imei = buffer[1:17].decode()
106 if self.imei == "0000000000000000":
108 self.when = unpack("!d", buffer[17:25])[0]
109 self.peeraddr = unpack_peer(buffer[25:43])
110 self.packet = buffer[43:]
114 """Zmq message received from a third party to send to the terminal"""
116 KWARGS = (("imei", None), ("packet", b""))
121 "0000000000000000" if self.imei is None else self.imei.encode()
124 def decode(self, buffer):
125 self.imei = buffer[:16].decode()
126 self.packet = buffer[16:]