1 """ Zeromq messages """
3 from datetime import datetime, timezone
5 from struct import pack, unpack
7 __all__ = "Bcast", "LocEvt", "Resp"
10 def pack_peer(peeraddr):
12 saddr, port, _x, _y = peeraddr
13 addr = ip.ip_address(saddr)
15 saddr, port = peeraddr
16 a4 = ip.ip_address(saddr)
17 addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
18 return addr.packed + pack("!H", port)
21 def unpack_peer(buffer):
22 a6 = ip.IPv6Address(buffer[:16])
23 port = unpack("!H", buffer[16:])[0]
31 def __init__(self, *args, **kwargs):
35 for k, v in self.KWARGS:
36 setattr(self, k, kwargs.get(k, v))
39 self.__class__.__name__
47 return "{}({})".format(
48 self.__class__.__name__,
53 'bytes.fromhex("{}")'.format(getattr(self, k).hex())
54 if isinstance(getattr(self, k), bytes)
55 else getattr(self, k),
57 for k, _ in self.KWARGS
62 def decode(self, buffer):
64 self.__class__.__name__ + "must implement `encode()` method"
70 self.__class__.__name__ + "must implement `encode()` method"
75 """Zmq message to broadcast what was received from the terminal"""
89 + ("0000000000000000" if self.imei is None else self.imei).encode()
93 else pack("!d", self.when)
95 + pack_peer(self.peeraddr)
99 def decode(self, buffer):
100 self.proto = buffer[0]
101 self.imei = buffer[1:17].decode()
102 if self.imei == "0000000000000000":
104 self.when = unpack("!d", buffer[17:25])[0]
105 self.peeraddr = unpack_peer(buffer[25:43])
106 self.packet = buffer[43:]
110 """Zmq message received from a third party to send to the terminal"""
112 KWARGS = (("imei", None), ("packet", b""))
117 "0000000000000000" if self.imei is None else self.imei.encode()
120 def decode(self, buffer):
121 self.imei = buffer[:16].decode()
122 self.packet = buffer[16:]
126 """Zmq message with original or approximated location from lookaside"""
129 ("imei", "0000000000000000"),
130 ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)),
138 return self.imei.encode() + pack(
140 self.devtime.replace(tzinfo=timezone.utc).timestamp(),
146 def decode(self, buffer):
147 self.imei = buffer[:16].decode()
148 when, self.lat, self.lon, is_gps = unpack("!dddB", buffer[16:])
149 self.devtime = datetime.fromtimestamp(when).astimezone(tz=timezone.utc)
150 self.is_gps = bool(is_gps)