]> www.average.org Git - loctrkd.git/blob - gps303/zmsg.py
fill in `when` in Resp packet
[loctrkd.git] / gps303 / zmsg.py
1 """ Zeromq messages """
2
3 import ipaddress as ip
4 from struct import pack, unpack
5
6 __all__ = "Bcast", "Resp", "topic"
7
8
9 def pack_peer(peeraddr):
10     try:
11         if peeraddr is None:
12             saddr = "::"
13             port = 0
14         else:
15             saddr, port, _x, _y = peeraddr
16         addr = ip.ip_address(saddr)
17     except ValueError:
18         saddr, port = peeraddr
19         a4 = ip.ip_address(saddr)
20         addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
21     return addr.packed + pack("!H", port)
22
23
24 def unpack_peer(buffer):
25     a6 = ip.IPv6Address(buffer[:16])
26     port = unpack("!H", buffer[16:])[0]
27     addr = a6.ipv4_mapped
28     if addr is None:
29         addr = a6
30     return (addr, port)
31
32
33 class _Zmsg:
34     def __init__(self, *args, **kwargs):
35         if len(args) == 1:
36             self.decode(args[0])
37         elif bool(kwargs):
38             for k, v in self.KWARGS:
39                 setattr(self, k, kwargs.get(k, v))
40         else:
41             raise RuntimeError(
42                 self.__class__.__name__
43                 + ": both args "
44                 + str(args)
45                 + " and kwargs "
46                 + str(kwargs)
47             )
48
49     def __repr__(self):
50         return "{}({})".format(
51             self.__class__.__name__,
52             ", ".join(
53                 [
54                     "{}={}".format(
55                         k,
56                         'bytes.fromhex("{}")'.format(getattr(self, k).hex())
57                         if isinstance(getattr(self, k), bytes)
58                         else getattr(self, k),
59                     )
60                     for k, _ in self.KWARGS
61                 ]
62             ),
63         )
64
65     def __eq__(self, other):
66         return all(
67             [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
68         )
69
70     def decode(self, buffer):
71         raise NotImplementedError(
72             self.__class__.__name__ + "must implement `decode()` method"
73         )
74
75     @property
76     def packed(self):
77         raise NotImplementedError(
78             self.__class__.__name__ + "must implement `packed()` property"
79         )
80
81
82 def topic(proto, is_incoming=True, imei=None):
83     return (
84         pack("BB", is_incoming, proto) + b"" if imei is None else imei.encode()
85     )
86
87
88 class Bcast(_Zmsg):
89     """Zmq message to broadcast what was received from the terminal"""
90
91     KWARGS = (
92         ("is_incoming", True),
93         ("proto", 256),
94         ("imei", None),
95         ("when", None),
96         ("peeraddr", None),
97         ("packet", b""),
98     )
99
100     @property
101     def packed(self):
102         return (
103             pack("BB", int(self.is_incoming), self.proto)
104             + ("0000000000000000" if self.imei is None else self.imei).encode()
105             + (
106                 b"\0\0\0\0\0\0\0\0"
107                 if self.when is None
108                 else pack("!d", self.when)
109             )
110             + pack_peer(self.peeraddr)
111             + self.packet
112         )
113
114     def decode(self, buffer):
115         self.is_incoming = bool(buffer[0])
116         self.proto = buffer[1]
117         self.imei = buffer[2:18].decode()
118         if self.imei == "0000000000000000":
119             self.imei = None
120         self.when = unpack("!d", buffer[18:26])[0]
121         self.peeraddr = unpack_peer(buffer[26:44])
122         self.packet = buffer[44:]
123
124
125 class Resp(_Zmsg):
126     """Zmq message received from a third party to send to the terminal"""
127
128     KWARGS = (("imei", None), ("when", None), ("packet", b""))
129
130     @property
131     def packed(self):
132         return (
133             ("0000000000000000" if self.imei is None else self.imei.encode())
134             + (
135                 b"\0\0\0\0\0\0\0\0"
136                 if self.when is None
137                 else pack("!d", self.when)
138             )
139             + self.packet
140         )
141
142     def decode(self, buffer):
143         self.imei = buffer[:16].decode()
144         self.when = unpack("!d", buffer[16:24])[0]
145         self.packet = buffer[24:]