]> www.average.org Git - loctrkd.git/blob - gps303/zmsg.py
Broadcast location, gps and approximated
[loctrkd.git] / gps303 / zmsg.py
1 """ Zeromq messages """
2
3 from datetime import datetime, timezone
4 import ipaddress as ip
5 from struct import pack, unpack
6
7 __all__ = "Bcast", "LocEvt", "Resp"
8
9
10 def pack_peer(peeraddr):
11     try:
12         saddr, port, _x, _y = peeraddr
13         addr = ip.ip_address(saddr)
14     except ValueError:
15         saddr, port = peeraddr
16         a4 = ip.ip_address(saddr)
17         addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
18     return addr.packed + pack("!H", port)
19
20
21 def unpack_peer(buffer):
22     a6 = ip.IPv6Address(buffer[:16])
23     port = unpack("!H", buffer[16:])[0]
24     addr = a6.ipv4_mapped
25     if addr is None:
26         addr = a6
27     return (addr, port)
28
29
30 class _Zmsg:
31     def __init__(self, *args, **kwargs):
32         if len(args) == 1:
33             self.decode(args[0])
34         elif bool(kwargs):
35             for k, v in self.KWARGS:
36                 setattr(self, k, kwargs.get(k, v))
37         else:
38             raise RuntimeError(
39                 self.__class__.__name__
40                 + ": both args "
41                 + str(args)
42                 + " and kwargs "
43                 + str(kwargs)
44             )
45
46     def __repr__(self):
47         return "{}({})".format(
48             self.__class__.__name__,
49             ", ".join(
50                 [
51                     "{}={}".format(
52                         k,
53                         'bytes.fromhex("{}")'.format(getattr(self, k).hex())
54                         if isinstance(getattr(self, k), bytes)
55                         else getattr(self, k),
56                     )
57                     for k, _ in self.KWARGS
58                 ]
59             ),
60         )
61
62     def decode(self, buffer):
63         raise RuntimeError(
64             self.__class__.__name__ + "must implement `encode()` method"
65         )
66
67     @property
68     def packed(self):
69         raise RuntimeError(
70             self.__class__.__name__ + "must implement `encode()` method"
71         )
72
73
74 class Bcast(_Zmsg):
75     """Zmq message to broadcast what was received from the terminal"""
76
77     KWARGS = (
78         ("proto", 256),
79         ("imei", None),
80         ("when", None),
81         ("peeraddr", None),
82         ("packet", b""),
83     )
84
85     @property
86     def packed(self):
87         return (
88             pack("B", self.proto)
89             + ("0000000000000000" if self.imei is None else self.imei).encode()
90             + (
91                 b"\0\0\0\0\0\0\0\0"
92                 if self.when is None
93                 else pack("!d", self.when)
94             )
95             + pack_peer(self.peeraddr)
96             + self.packet
97         )
98
99     def decode(self, buffer):
100         self.proto = buffer[0]
101         self.imei = buffer[1:17].decode()
102         if self.imei == "0000000000000000":
103             self.imei = None
104         self.when = unpack("!d", buffer[17:25])[0]
105         self.peeraddr = unpack_peer(buffer[25:43])
106         self.packet = buffer[43:]
107
108
109 class Resp(_Zmsg):
110     """Zmq message received from a third party to send to the terminal"""
111
112     KWARGS = (("imei", None), ("packet", b""))
113
114     @property
115     def packed(self):
116         return (
117             "0000000000000000" if self.imei is None else self.imei.encode()
118         ) + self.packet
119
120     def decode(self, buffer):
121         self.imei = buffer[:16].decode()
122         self.packet = buffer[16:]
123
124
125 class LocEvt(_Zmsg):
126     """Zmq message with original or approximated location from lookaside"""
127
128     KWARGS = (
129         ("imei", "0000000000000000"),
130         ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)),
131         ("lat", 0.0),
132         ("lon", 0.0),
133         ("is_gps", True),
134     )
135
136     @property
137     def packed(self):
138         return self.imei.encode() + pack(
139             "!dddB",
140             self.devtime.replace(tzinfo=timezone.utc).timestamp(),
141             self.lat,
142             self.lon,
143             int(self.is_gps),
144         )
145
146     def decode(self, buffer):
147         self.imei = buffer[:16].decode()
148         when, self.lat, self.lon, is_gps = unpack("!dddB", buffer[16:])
149         self.devtime = datetime.fromtimestamp(when).astimezone(tz=timezone.utc)
150         self.is_gps = bool(is_gps)