]> www.average.org Git - loctrkd.git/blob - gps303/zmsg.py
56392007f1ff74ecd61cf59b8a95b1b9c2c06c48
[loctrkd.git] / gps303 / zmsg.py
1 """ Zeromq messages """
2
3 import ipaddress as ip
4 from struct import pack, unpack
5
6 __all__ = "Bcast", "Resp"
7
8
9 def pack_peer(peeraddr):
10     try:
11         saddr, port, _x, _y = peeraddr
12         addr = ip.ip_address(saddr)
13     except ValueError:
14         saddr, port = peeraddr
15         a4 = ip.ip_address(saddr)
16         addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
17     return addr.packed + pack("!H", port)
18
19
20 def unpack_peer(buffer):
21     a6 = ip.IPv6Address(buffer[:16])
22     port = unpack("!H", buffer[16:])[0]
23     addr = a6.ipv4_mapped
24     if addr is None:
25         addr = a6
26     return (addr, port)
27
28
29 class _Zmsg:
30     def __init__(self, *args, **kwargs):
31         if len(args) == 1:
32             self.decode(args[0])
33         elif bool(kwargs):
34             for k, v in self.KWARGS:
35                 setattr(self, k, kwargs.get(k, v))
36         else:
37             raise RuntimeError(
38                 self.__class__.__name__
39                 + ": both args "
40                 + str(args)
41                 + " and kwargs "
42                 + str(kwargs)
43             )
44
45     def __repr__(self):
46         return "{}({})".format(
47             self.__class__.__name__,
48             ", ".join(
49                 [
50                     "{}={}".format(
51                         k,
52                         'bytes.fromhex("{}")'.format(getattr(self, k).hex())
53                         if isinstance(getattr(self, k), bytes)
54                         else getattr(self, k),
55                     )
56                     for k, _ in self.KWARGS
57                 ]
58             ),
59         )
60
61     def decode(self, buffer):
62         raise RuntimeError(
63             self.__class__.__name__ + "must implement `encode()` method"
64         )
65
66     @property
67     def packed(self):
68         raise RuntimeError(
69             self.__class__.__name__ + "must implement `encode()` method"
70         )
71
72
73 class Bcast(_Zmsg):
74     """Zmq message to broadcast what was received from the terminal"""
75
76     KWARGS = (
77         ("proto", 256),
78         ("imei", None),
79         ("when", None),
80         ("peeraddr", None),
81         ("packet", b""),
82     )
83
84     @property
85     def packed(self):
86         return (
87             pack("B", self.proto)
88             + ("0000000000000000" if self.imei is None else self.imei).encode()
89             + (
90                 b"\0\0\0\0\0\0\0\0"
91                 if self.when is None
92                 else pack("!d", self.when)
93             )
94             + pack_peer(self.peeraddr)
95             + self.packet
96         )
97
98     def decode(self, buffer):
99         self.proto = buffer[0]
100         self.imei = buffer[1:17].decode()
101         if self.imei == "0000000000000000":
102             self.imei = None
103         self.when = unpack("!d", buffer[17:25])[0]
104         self.peeraddr = unpack_peer(buffer[25:43])
105         self.packet = buffer[43:]
106
107
108 class Resp(_Zmsg):
109     """Zmq message received from a third party to send to the terminal"""
110
111     KWARGS = (("imei", None), ("packet", b""))
112
113     @property
114     def packed(self):
115         return (
116             "0000000000000000" if self.imei is None else self.imei.encode()
117         ) + self.packet
118
119     def decode(self, buffer):
120         self.imei = buffer[:16].decode()
121         self.packet = buffer[16:]