]> www.average.org Git - loctrkd.git/blob - loctrkd/zmsg.py
Change reporting of pmod to events
[loctrkd.git] / loctrkd / zmsg.py
1 """ Zeromq messages """
2
3 import ipaddress as ip
4 from struct import pack, unpack
5 from typing import Any, cast, Optional, Tuple, Type, Union
6
7 __all__ = "Bcast", "Resp", "topic", "rtopic"
8
9
10 def pack_peer(  # 18 bytes
11     peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]]
12 ) -> bytes:
13     if peeraddr is None:
14         addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0)
15         port = 0
16     elif len(peeraddr) == 2:
17         peeraddr = cast(Tuple[str, int], peeraddr)
18         saddr, port = peeraddr
19         addr = ip.ip_address(saddr)
20     elif len(peeraddr) == 4:
21         peeraddr = cast(Tuple[str, int, Any, Any], peeraddr)
22         saddr, port, _x, _y = peeraddr
23         addr = ip.ip_address(saddr)
24     if isinstance(addr, ip.IPv4Address):
25         addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed)
26     return addr.packed + pack("!H", port)
27
28
29 def unpack_peer(
30     buffer: bytes,
31 ) -> Tuple[str, int]:
32     a6 = ip.IPv6Address(buffer[:16])
33     port = unpack("!H", buffer[16:])[0]
34     a4 = a6.ipv4_mapped
35     if a4 is not None:
36         return (str(a4), port)
37     elif a6 == ip.IPv6Address("::"):
38         return ("", 0)
39     return (str(a6), port)
40
41
42 class _Zmsg:
43     KWARGS: Tuple[Tuple[str, Any], ...]
44
45     def __init__(self, *args: Any, **kwargs: Any) -> None:
46         if len(args) == 1:
47             self.decode(args[0])
48         elif bool(kwargs):
49             for k, v in self.KWARGS:
50                 setattr(self, k, kwargs.get(k, v))
51         else:
52             raise RuntimeError(
53                 self.__class__.__name__
54                 + ": both args "
55                 + str(args)
56                 + " and kwargs "
57                 + str(kwargs)
58             )
59
60     def __repr__(self) -> str:
61         return "{}({})".format(
62             self.__class__.__name__,
63             ", ".join(
64                 [
65                     "{}={}".format(
66                         k,
67                         'bytes.fromhex("{}")'.format(getattr(self, k).hex())
68                         if isinstance(getattr(self, k), bytes)
69                         else getattr(self, k),
70                     )
71                     for k, _ in self.KWARGS
72                 ]
73             ),
74         )
75
76     def __eq__(self, other: object) -> bool:
77         if isinstance(other, self.__class__):
78             return all(
79                 [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
80             )
81         return NotImplemented
82
83     def decode(self, buffer: bytes) -> None:
84         raise NotImplementedError(
85             self.__class__.__name__ + "must implement `decode()` method"
86         )
87
88     @property
89     def packed(self) -> bytes:
90         raise NotImplementedError(
91             self.__class__.__name__ + "must implement `packed()` property"
92         )
93
94
95 def topic(
96     proto: str, is_incoming: bool = True, imei: Optional[str] = None
97 ) -> bytes:
98     return pack("B16s", is_incoming, proto.encode()) + (
99         b"" if imei is None else pack("16s", imei.encode())
100     )
101
102
103 def rtopic(imei: str) -> bytes:
104     return pack("16s", imei.encode())
105
106
107 class Bcast(_Zmsg):
108     """Zmq message to broadcast what was received from the terminal"""
109
110     KWARGS = (
111         ("is_incoming", True),
112         ("proto", "UNKNOWN"),
113         ("pmod", None),
114         ("imei", None),
115         ("when", None),
116         ("peeraddr", None),
117         ("packet", b""),
118     )
119
120     @property
121     def packed(self) -> bytes:
122         return (
123             pack(
124                 "!B16s16sd16s",
125                 int(self.is_incoming),
126                 self.proto[:16].ljust(16, "\0").encode(),
127                 b"0000000000000000"
128                 if self.imei is None
129                 else self.imei.encode(),
130                 0 if self.when is None else self.when,
131                 b"                "
132                 if self.pmod is None
133                 else self.pmod.encode(),
134             )
135             + pack_peer(self.peeraddr)
136             + self.packet
137         )
138
139     def decode(self, buffer: bytes) -> None:
140         is_incoming, proto, imei, when, pmod = unpack(
141             "!B16s16sd16s", buffer[:57]
142         )
143         self.is_incoming = bool(is_incoming)
144         self.proto = proto.decode().rstrip("\0")
145         self.imei = (
146             None if imei == b"0000000000000000" else imei.decode().strip("\0")
147         )
148         self.when = when
149         self.pmod = (
150             None if pmod == b"                " else pmod.decode().strip("\0")
151         )
152         self.peeraddr = unpack_peer(buffer[57:75])
153         self.packet = buffer[75:]
154
155
156 class Resp(_Zmsg):
157     """Zmq message received from a third party to send to the terminal"""
158
159     KWARGS = (("imei", None), ("when", None), ("packet", b""))
160
161     @property
162     def packed(self) -> bytes:
163         return (
164             pack(
165                 "!16sd",
166                 "0000000000000000"
167                 if self.imei is None
168                 else self.imei.encode(),
169                 0 if self.when is None else self.when,
170             )
171             + self.packet
172         )
173
174     def decode(self, buffer: bytes) -> None:
175         imei, when = unpack("!16sd", buffer[:24])
176         self.imei = (
177             None if imei == b"0000000000000000" else imei.decode().strip("\0")
178         )
179
180         self.when = when
181         self.packet = buffer[24:]
182
183
184 class Rept(_Zmsg):
185     """Broadcast zmq message with "rectified" proto-agnostic json data"""
186
187     KWARGS = (("imei", None), ("payload", ""))
188
189     @property
190     def packed(self) -> bytes:
191         return (
192             pack(
193                 "16s",
194                 b"0000000000000000"
195                 if self.imei is None
196                 else self.imei.encode(),
197             )
198             + self.payload.encode()
199         )
200
201     def decode(self, buffer: bytes) -> None:
202         imei = buffer[:16]
203         self.imei = (
204             None if imei == b"0000000000000000" else imei.decode().strip("\0")
205         )
206         self.payload = buffer[16:].decode()