]> www.average.org Git - loctrkd.git/blob - gps303/zmsg.py
Change LocEvt to use json encoding
[loctrkd.git] / gps303 / zmsg.py
1 """ Zeromq messages """
2
3 from datetime import datetime, timezone
4 from json import dumps, loads
5 import ipaddress as ip
6 from struct import pack, unpack
7
8 __all__ = "Bcast", "LocEvt", "Resp"
9
10
11 def pack_peer(peeraddr):
12     try:
13         saddr, port, _x, _y = peeraddr
14         addr = ip.ip_address(saddr)
15     except ValueError:
16         saddr, port = peeraddr
17         a4 = ip.ip_address(saddr)
18         addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
19     return addr.packed + pack("!H", port)
20
21
22 def unpack_peer(buffer):
23     a6 = ip.IPv6Address(buffer[:16])
24     port = unpack("!H", buffer[16:])[0]
25     addr = a6.ipv4_mapped
26     if addr is None:
27         addr = a6
28     return (addr, port)
29
30
31 class _Zmsg:
32     def __init__(self, *args, **kwargs):
33         if len(args) == 1:
34             self.decode(args[0])
35         elif bool(kwargs):
36             for k, v in self.KWARGS:
37                 setattr(self, k, kwargs.get(k, v))
38         else:
39             raise RuntimeError(
40                 self.__class__.__name__
41                 + ": both args "
42                 + str(args)
43                 + " and kwargs "
44                 + str(kwargs)
45             )
46
47     def __repr__(self):
48         return "{}({})".format(
49             self.__class__.__name__,
50             ", ".join(
51                 [
52                     "{}={}".format(
53                         k,
54                         'bytes.fromhex("{}")'.format(getattr(self, k).hex())
55                         if isinstance(getattr(self, k), bytes)
56                         else getattr(self, k),
57                     )
58                     for k, _ in self.KWARGS
59                 ]
60             ),
61         )
62
63     def __eq__(self, other):
64         return all(
65             [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
66         )
67
68     def decode(self, buffer):
69         raise NotImplementedError(
70             self.__class__.__name__ + "must implement `decode()` method"
71         )
72
73     @property
74     def packed(self):
75         raise NotImplementedError(
76             self.__class__.__name__ + "must implement `packed()` property"
77         )
78
79
80 class Bcast(_Zmsg):
81     """Zmq message to broadcast what was received from the terminal"""
82
83     KWARGS = (
84         ("proto", 256),
85         ("imei", None),
86         ("when", None),
87         ("peeraddr", None),
88         ("packet", b""),
89     )
90
91     @property
92     def packed(self):
93         return (
94             pack("B", self.proto)
95             + ("0000000000000000" if self.imei is None else self.imei).encode()
96             + (
97                 b"\0\0\0\0\0\0\0\0"
98                 if self.when is None
99                 else pack("!d", self.when)
100             )
101             + pack_peer(self.peeraddr)
102             + self.packet
103         )
104
105     def decode(self, buffer):
106         self.proto = buffer[0]
107         self.imei = buffer[1:17].decode()
108         if self.imei == "0000000000000000":
109             self.imei = None
110         self.when = unpack("!d", buffer[17:25])[0]
111         self.peeraddr = unpack_peer(buffer[25:43])
112         self.packet = buffer[43:]
113
114
115 class Resp(_Zmsg):
116     """Zmq message received from a third party to send to the terminal"""
117
118     KWARGS = (("imei", None), ("packet", b""))
119
120     @property
121     def packed(self):
122         return (
123             "0000000000000000" if self.imei is None else self.imei.encode()
124         ) + self.packet
125
126     def decode(self, buffer):
127         self.imei = buffer[:16].decode()
128         self.packet = buffer[16:]
129
130
131 class LocEvt(_Zmsg):
132     """Zmq message with original or approximated location from lookaside"""
133
134     KWARGS = (
135         ("imei", "0000000000000000"),
136         ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)),
137         ("lat", 0.0),
138         ("lon", 0.0),
139         ("is_gps", True),
140     )
141
142     # This message is for external consumption, so use json encoding,
143     # except imei that forms 16 byte prefix that can be used as the
144     # topic to subscribe.
145     @property
146     def packed(self):
147         return (
148             self.imei.encode()
149             + dumps(
150                 {
151                     "devtime": str(self.devtime),
152                     "latitude": self.lat,
153                     "longitude": self.lon,
154                     "is-gps": self.is_gps,
155                 }
156             ).encode()
157         )
158
159     def decode(self, buffer):
160         self.imei = buffer[:16].decode()
161         json_data = loads(buffer[16:])
162         self.devtime = datetime.fromisoformat(json_data["devtime"])
163         self.lat = json_data["latitude"]
164         self.lon = json_data["longitude"]
165         self.is_gps = json_data["is-gps"]