]> www.average.org Git - loctrkd.git/blob - gps303/zmsg.py
6e591ab65c2fb9a8c4ec7ff7a9bcab8d69e7de08
[loctrkd.git] / gps303 / zmsg.py
1 """ Zeromq messages """
2
3 import ipaddress as ip
4 from struct import pack, unpack
5
6 __all__ = "Bcast", "Resp"
7
8
9 def pack_peer(peeraddr):
10     saddr, port, _x, _y = peeraddr
11     addr6 = ip.ip_address(saddr)
12     addr = addr6.ipv4_mapped
13     if addr is None:
14         addr = addr6
15     return (
16         pack("B", addr.version)
17         + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16]
18         + pack("!H", port)
19     )
20
21
22 def unpack_peer(buffer):
23     version = buffer[0]
24     if version not in (4, 6):
25         return None
26     if version == 4:
27         addr = ip.IPv4Address(buffer[1:5])
28     else:
29         addr = ip.IPv6Address(buffer[1:17])
30     port = unpack("!H", buffer[17:19])[0]
31     return (addr, port)
32
33
34 class _Zmsg:
35     def __init__(self, *args, **kwargs):
36         if len(args) == 1:
37             self.decode(args[0])
38         elif bool(kwargs):
39             for k, v in self.KWARGS:
40                 setattr(self, k, kwargs.get(k, v))
41         else:
42             raise RuntimeError(
43                 self.__class__.__name__
44                 + ": both args "
45                 + str(args)
46                 + " and kwargs "
47                 + str(kwargs)
48             )
49
50     def __repr__(self):
51         return "{}({})".format(
52             self.__class__.__name__,
53             ", ".join(
54                 [
55                     "{}={}".format(
56                         k,
57                         'bytes.fromhex("{}")'.format(getattr(self, k).hex())
58                         if isinstance(getattr(self, k), bytes)
59                         else getattr(self, k),
60                     )
61                     for k, _ in self.KWARGS
62                 ]
63             ),
64         )
65
66     def decode(self, buffer):
67         raise RuntimeError(
68             self.__class__.__name__ + "must implement `encode()` method"
69         )
70
71     @property
72     def packed(self):
73         raise RuntimeError(
74             self.__class__.__name__ + "must implement `encode()` method"
75         )
76
77
78 class Bcast(_Zmsg):
79     """Zmq message to broadcast what was received from the terminal"""
80
81     KWARGS = (
82         ("proto", 256),
83         ("imei", None),
84         ("when", None),
85         ("peeraddr", None),
86         ("packet", b""),
87     )
88
89     @property
90     def packed(self):
91         return (
92             pack("B", self.proto)
93             + ("0000000000000000" if self.imei is None else self.imei).encode()
94             + (
95                 b"\0\0\0\0\0\0\0\0"
96                 if self.when is None
97                 else pack("!d", self.when)
98             )
99             + pack_peer(self.peeraddr)
100             + self.packet
101         )
102
103     def decode(self, buffer):
104         self.proto = buffer[0]
105         self.imei = buffer[1:17].decode()
106         if self.imei == "0000000000000000":
107             self.imei = None
108         self.when = unpack("!d", buffer[17:25])[0]
109         self.peeraddr = unpack_peer(buffer[25:44])
110         self.packet = buffer[44:]
111
112
113 class Resp(_Zmsg):
114     """Zmq message received from a third party to send to the terminal"""
115
116     KWARGS = (("imei", None), ("packet", b""))
117
118     @property
119     def packed(self):
120         return (
121             "0000000000000000" if self.imei is None else self.imei.encode()
122         ) + self.packet
123
124     def decode(self, buffer):
125         self.imei = buffer[:16].decode()
126         self.packet = buffer[16:]