]> www.average.org Git - loctrkd.git/blob - gps303/zmsg.py
1fe18123c441739a9891b1433ec8bc276b5c876a
[loctrkd.git] / gps303 / zmsg.py
1 """ Zeromq messages """
2
3 import ipaddress as ip
4 from struct import pack, unpack
5
6 __all__ = "Bcast", "Resp", "topic"
7
8
9 def pack_peer(peeraddr):
10     try:
11         if peeraddr is None:
12             saddr = "::"
13             port = 0
14         else:
15             saddr, port, _x, _y = peeraddr
16         addr = ip.ip_address(saddr)
17     except ValueError:
18         saddr, port = peeraddr
19         a4 = ip.ip_address(saddr)
20         addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
21     return addr.packed + pack("!H", port)
22
23
24 def unpack_peer(buffer):
25     a6 = ip.IPv6Address(buffer[:16])
26     port = unpack("!H", buffer[16:])[0]
27     addr = a6.ipv4_mapped
28     if addr is None:
29         addr = a6
30     return (addr, port)
31
32
33 class _Zmsg:
34     def __init__(self, *args, **kwargs):
35         if len(args) == 1:
36             self.decode(args[0])
37         elif bool(kwargs):
38             for k, v in self.KWARGS:
39                 setattr(self, k, kwargs.get(k, v))
40         else:
41             raise RuntimeError(
42                 self.__class__.__name__
43                 + ": both args "
44                 + str(args)
45                 + " and kwargs "
46                 + str(kwargs)
47             )
48
49     def __repr__(self):
50         return "{}({})".format(
51             self.__class__.__name__,
52             ", ".join(
53                 [
54                     "{}={}".format(
55                         k,
56                         'bytes.fromhex("{}")'.format(getattr(self, k).hex())
57                         if isinstance(getattr(self, k), bytes)
58                         else getattr(self, k),
59                     )
60                     for k, _ in self.KWARGS
61                 ]
62             ),
63         )
64
65     def __eq__(self, other):
66         return all(
67             [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
68         )
69
70     def decode(self, buffer):
71         raise NotImplementedError(
72             self.__class__.__name__ + "must implement `decode()` method"
73         )
74
75     @property
76     def packed(self):
77         raise NotImplementedError(
78             self.__class__.__name__ + "must implement `packed()` property"
79         )
80
81
82 def topic(proto, is_incoming=True, imei=None):
83     return pack("BB", is_incoming, proto) + (
84         b"" if imei is None else pack("16s", imei.encode())
85     )
86
87
88 class Bcast(_Zmsg):
89     """Zmq message to broadcast what was received from the terminal"""
90
91     KWARGS = (
92         ("is_incoming", True),
93         ("proto", 256),
94         ("imei", None),
95         ("when", None),
96         ("peeraddr", None),
97         ("packet", b""),
98     )
99
100     @property
101     def packed(self):
102         return (
103             pack(
104                 "BB16s",
105                 int(self.is_incoming),
106                 self.proto,
107                 "0000000000000000"
108                 if self.imei is None
109                 else self.imei.encode(),
110             )
111             + (
112                 b"\0\0\0\0\0\0\0\0"
113                 if self.when is None
114                 else pack("!d", self.when)
115             )
116             + pack_peer(self.peeraddr)
117             + self.packet
118         )
119
120     def decode(self, buffer):
121         self.is_incoming = bool(buffer[0])
122         self.proto = buffer[1]
123         self.imei = buffer[2:18].decode()
124         if self.imei == "0000000000000000":
125             self.imei = None
126         self.when = unpack("!d", buffer[18:26])[0]
127         self.peeraddr = unpack_peer(buffer[26:44])
128         self.packet = buffer[44:]
129
130
131 class Resp(_Zmsg):
132     """Zmq message received from a third party to send to the terminal"""
133
134     KWARGS = (("imei", None), ("when", None), ("packet", b""))
135
136     @property
137     def packed(self):
138         return (
139             pack(
140                 "16s",
141                 "0000000000000000"
142                 if self.imei is None
143                 else self.imei.encode(),
144             )
145             + (
146                 b"\0\0\0\0\0\0\0\0"
147                 if self.when is None
148                 else pack("!d", self.when)
149             )
150             + self.packet
151         )
152
153     def decode(self, buffer):
154         self.imei = buffer[:16].decode()
155         self.when = unpack("!d", buffer[16:24])[0]
156         self.packet = buffer[24:]