]> www.average.org Git - loctrkd.git/blobdiff - gps303/zmsg.py
Multiprotocol support in zmq messages and storage
[loctrkd.git] / gps303 / zmsg.py
index 28c233a3cafe7b45f89fc6b0c44f94c8a98d5251..b6faa7025a1df310422e6e51603878d9c01254b1 100644 (file)
@@ -1,35 +1,48 @@
 """ Zeromq messages """
 
-from datetime import datetime, timezone
-from json import dumps, loads
 import ipaddress as ip
 from struct import pack, unpack
+from typing import Any, cast, Optional, Tuple, Type, Union
 
-__all__ = "Bcast", "LocEvt", "Resp"
+__all__ = "Bcast", "Resp", "topic"
 
 
-def pack_peer(peeraddr):
-    try:
+def pack_peer(  # 18 bytes
+    peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]]
+) -> bytes:
+    if peeraddr is None:
+        addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0)
+        port = 0
+    elif len(peeraddr) == 2:
+        peeraddr = cast(Tuple[str, int], peeraddr)
+        saddr, port = peeraddr
+        addr = ip.ip_address(saddr)
+    elif len(peeraddr) == 4:
+        peeraddr = cast(Tuple[str, int, Any, Any], peeraddr)
         saddr, port, _x, _y = peeraddr
         addr = ip.ip_address(saddr)
-    except ValueError:
-        saddr, port = peeraddr
-        a4 = ip.ip_address(saddr)
-        addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
+    if isinstance(addr, ip.IPv4Address):
+        addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed)
     return addr.packed + pack("!H", port)
 
 
-def unpack_peer(buffer):
+def unpack_peer(
+    buffer: bytes,
+) -> Tuple[str, int]:
     a6 = ip.IPv6Address(buffer[:16])
     port = unpack("!H", buffer[16:])[0]
-    addr = a6.ipv4_mapped
-    if addr is None:
-        addr = a6
-    return (addr, port)
+    a4 = a6.ipv4_mapped
+    if a4 is not None:
+        return (str(a4), port)
+    elif a6 == ip.IPv6Address("::"):
+        return ("", 0)
+    return (str(a6), port)
 
 
 class _Zmsg:
-    def __init__(self, *args, **kwargs):
+    KWARGS: Tuple[Tuple[str, Any], ...]
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
         if len(args) == 1:
             self.decode(args[0])
         elif bool(kwargs):
@@ -44,7 +57,7 @@ class _Zmsg:
                 + str(kwargs)
             )
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "{}({})".format(
             self.__class__.__name__,
             ", ".join(
@@ -60,28 +73,39 @@ class _Zmsg:
             ),
         )
 
-    def __eq__(self, other):
-        return all(
-            [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
-        )
+    def __eq__(self, other: object) -> bool:
+        if isinstance(other, self.__class__):
+            return all(
+                [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
+            )
+        return NotImplemented
 
-    def decode(self, buffer):
+    def decode(self, buffer: bytes) -> None:
         raise NotImplementedError(
             self.__class__.__name__ + "must implement `decode()` method"
         )
 
     @property
-    def packed(self):
+    def packed(self) -> bytes:
         raise NotImplementedError(
             self.__class__.__name__ + "must implement `packed()` property"
         )
 
 
+def topic(
+    proto: str, is_incoming: bool = True, imei: Optional[str] = None
+) -> bytes:
+    return pack("B16s", is_incoming, proto.encode()) + (
+        b"" if imei is None else pack("16s", imei.encode())
+    )
+
+
 class Bcast(_Zmsg):
     """Zmq message to broadcast what was received from the terminal"""
 
     KWARGS = (
-        ("proto", 256),
+        ("is_incoming", True),
+        ("proto", "UNKNOWN"),
         ("imei", None),
         ("when", None),
         ("peeraddr", None),
@@ -89,90 +113,56 @@ class Bcast(_Zmsg):
     )
 
     @property
-    def packed(self):
+    def packed(self) -> bytes:
         return (
-            pack("B", self.proto)
-            + ("0000000000000000" if self.imei is None else self.imei).encode()
-            + (
-                b"\0\0\0\0\0\0\0\0"
-                if self.when is None
-                else pack("!d", self.when)
+            pack(
+                "!B16s16sd",
+                int(self.is_incoming),
+                self.proto[:16].ljust(16, "\0").encode(),
+                b"0000000000000000"
+                if self.imei is None
+                else self.imei.encode(),
+                0 if self.when is None else self.when,
             )
             + pack_peer(self.peeraddr)
             + self.packet
         )
 
-    def decode(self, buffer):
-        self.proto = buffer[0]
-        self.imei = buffer[1:17].decode()
-        if self.imei == "0000000000000000":
-            self.imei = None
-        self.when = unpack("!d", buffer[17:25])[0]
-        self.peeraddr = unpack_peer(buffer[25:43])
-        self.packet = buffer[43:]
+    def decode(self, buffer: bytes) -> None:
+        is_incoming, proto, imei, when = unpack("!B16s16sd", buffer[:41])
+        self.is_incoming = bool(is_incoming)
+        self.proto = proto.decode()
+        self.imei = (
+            None if imei == b"0000000000000000" else imei.decode().strip("\0")
+        )
+        self.when = when
+        self.peeraddr = unpack_peer(buffer[41:59])
+        self.packet = buffer[59:]
 
 
 class Resp(_Zmsg):
     """Zmq message received from a third party to send to the terminal"""
 
-    KWARGS = (("imei", None), ("packet", b""))
-
-    @property
-    def packed(self):
-        return (
-            "0000000000000000" if self.imei is None else self.imei.encode()
-        ) + self.packet
-
-    def decode(self, buffer):
-        self.imei = buffer[:16].decode()
-        self.packet = buffer[16:]
+    KWARGS = (("imei", None), ("when", None), ("packet", b""))
 
-
-class LocEvt(_Zmsg):
-    """Zmq message with original or approximated location from lookaside"""
-
-    KWARGS = (
-        ("imei", "0000000000000000"),
-        ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)),
-        ("lat", 0.0),
-        ("lon", 0.0),
-        ("is_gps", True),
-    )
-
-    # This message is for external consumption, so use json encoding,
-    # except imei that forms 16 byte prefix that can be used as the
-    # topic to subscribe.
     @property
-    def packed(self):
+    def packed(self) -> bytes:
         return (
-                ("0000000000000000" + self.imei)[-16:].encode()
-            + dumps(
-                {
-                    "devtime": str(self.devtime),
-                    "latitude": self.lat,
-                    "longitude": self.lon,
-                    "is-gps": self.is_gps,
-                }
-            ).encode()
+            pack(
+                "!16sd",
+                "0000000000000000"
+                if self.imei is None
+                else self.imei.encode(),
+                0 if self.when is None else self.when,
+            )
+            + self.packet
         )
 
-    # And this is full json that can be sent over websocket etc.
-    @property
-    def json(self):
-        return dumps(
-            {
-                "imei": self.imei,
-                "devtime": str(self.devtime),
-                "latitude": self.lat,
-                "longitude": self.lon,
-                "is-gps": self.is_gps,
-            }
+    def decode(self, buffer: bytes) -> None:
+        imei, when = unpack("!16sd", buffer[:24])
+        self.imei = (
+            None if imei == b"0000000000000000" else imei.decode().strip("\0")
         )
 
-    def decode(self, buffer):
-        self.imei = buffer[:16].decode()
-        json_data = loads(buffer[16:])
-        self.devtime = datetime.fromisoformat(json_data["devtime"])
-        self.lat = json_data["latitude"]
-        self.lon = json_data["longitude"]
-        self.is_gps = json_data["is-gps"]
+        self.when = when
+        self.packet = buffer[24:]