]> www.average.org Git - loctrkd.git/blob - gps303/zmsg.py
full encoder/decoder for zmq messages
[loctrkd.git] / gps303 / zmsg.py
1 """ Zeromq messages """
2
3 import ipaddress as ip
4 from struct import pack, unpack
5
6 __all__ = "Bcast", "Resp"
7
8 def pack_peer(peeraddr):
9     saddr, port = peeraddr
10     addr = ip.ip_address(saddr)
11     return pack("B", addr.version) + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + pack("!H", port)
12
13 def unpack_peer(buffer):
14     version = buffer[0]
15     if version not in (4, 6):
16         return None
17     if version == 4:
18         addr = ip.IPv4Address(buffer[1:5])
19     else:
20         addr = ip.IPv6Address(buffer[1:17])
21     port = unpack("!H", buffer[17:19])[0]
22     return (addr, port)
23
24
25 class _Zmsg:
26     def __init__(self, *args, **kwargs):
27         if len(args) == 1:
28             self.decode(args[0])
29         elif bool(kwargs):
30             for k, v in self.KWARGS:
31                 setattr(self, k, kwargs.get(k, v))
32         else:
33             raise RuntimeError(
34                 self.__class__.__name__
35                 + ": both args "
36                 + str(args)
37                 + " and kwargs "
38                 + str(kwargs)
39             )
40
41     def decode(self, buffer):
42         raise RuntimeError(
43             self.__class__.__name__ + "must implement `encode()` method"
44         )
45
46     @property
47     def packed(self):
48         raise RuntimeError(
49             self.__class__.__name__ + "must implement `encode()` method"
50         )
51
52
53 class Bcast(_Zmsg):
54     """Zmq message to broadcast what was received from the terminal"""
55
56     KWARGS = (
57         ("proto", 256),
58         ("imei", None),
59         ("when", None),
60         ("peeraddr", None),
61         ("packet", b""),
62     )
63
64     @property
65     def packed(self):
66         return (
67             pack("B", self.proto)
68             + ("0000000000000000" if self.imei is None else self.imei).encode()
69             + (b"\0\0\0\0\0\0\0\0" if self.when is None else pack("!d", self.when))
70             + pack_peer(self.peeraddr)
71             + self.packet
72         )
73
74     def decode(self, buffer):
75         self.proto = buffer[0]
76         self.imei = buffer[1:17].decode()
77         if self.imei == "0000000000000000":
78             self.imei = None
79         self.when = unpack("!d", buffer[17:25])[0]
80         self.peeraddr = unpack_peer(buffer[25:44])
81         self.packet = buffer[44:]
82
83
84 class Resp(_Zmsg):
85     """Zmq message received from a third party to send to the terminal"""
86
87     KWARGS = (("imei", None), ("packet", b""))
88
89     @property
90     def packed(self):
91         return (
92             "0000000000000000" if self.imei is None else self.imei.encode()
93         ) + self.packet
94
95     def decode(self, buffer):
96         self.imei = buffer[:16].decode()
97         self.packet = buffer[16:]