]> www.average.org Git - loctrkd.git/blobdiff - gps303/zmsg.py
Broadcast location, gps and approximated
[loctrkd.git] / gps303 / zmsg.py
index 7ab3ce4caf755271bd91531cc81f9103240d8b35..80cf0003cbb97eee3baf55684a9085cb010d2c3f 100644 (file)
@@ -1,27 +1,29 @@
 """ Zeromq messages """
 
+from datetime import datetime, timezone
 import ipaddress as ip
 from struct import pack, unpack
 
-__all__ = "Bcast", "Resp"
+__all__ = "Bcast", "LocEvt", "Resp"
+
 
 def pack_peer(peeraddr):
-    saddr, port, _x, _y = peeraddr
-    addr6 = ip.ip_address(saddr)
-    addr = addr6.ipv4_mapped
-    if addr is None:
-        addr = addr6
-    return pack("B", addr.version) + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + pack("!H", port)
+    try:
+        saddr, port, _x, _y = peeraddr
+        addr = ip.ip_address(saddr)
+    except ValueError:
+        saddr, port = peeraddr
+        a4 = ip.ip_address(saddr)
+        addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
+    return addr.packed + pack("!H", port)
+
 
 def unpack_peer(buffer):
-    version = buffer[0]
-    if version not in (4, 6):
-        return None
-    if version == 4:
-        addr = ip.IPv4Address(buffer[1:5])
-    else:
-        addr = ip.IPv6Address(buffer[1:17])
-    port = unpack("!H", buffer[17:19])[0]
+    a6 = ip.IPv6Address(buffer[:16])
+    port = unpack("!H", buffer[16:])[0]
+    addr = a6.ipv4_mapped
+    if addr is None:
+        addr = a6
     return (addr, port)
 
 
@@ -41,6 +43,22 @@ class _Zmsg:
                 + str(kwargs)
             )
 
+    def __repr__(self):
+        return "{}({})".format(
+            self.__class__.__name__,
+            ", ".join(
+                [
+                    "{}={}".format(
+                        k,
+                        'bytes.fromhex("{}")'.format(getattr(self, k).hex())
+                        if isinstance(getattr(self, k), bytes)
+                        else getattr(self, k),
+                    )
+                    for k, _ in self.KWARGS
+                ]
+            ),
+        )
+
     def decode(self, buffer):
         raise RuntimeError(
             self.__class__.__name__ + "must implement `encode()` method"
@@ -69,7 +87,11 @@ class Bcast(_Zmsg):
         return (
             pack("B", self.proto)
             + ("0000000000000000" if self.imei is None else self.imei).encode()
-            + (b"\0\0\0\0\0\0\0\0" if self.when is None else pack("!d", self.when))
+            + (
+                b"\0\0\0\0\0\0\0\0"
+                if self.when is None
+                else pack("!d", self.when)
+            )
             + pack_peer(self.peeraddr)
             + self.packet
         )
@@ -80,8 +102,8 @@ class Bcast(_Zmsg):
         if self.imei == "0000000000000000":
             self.imei = None
         self.when = unpack("!d", buffer[17:25])[0]
-        self.peeraddr = unpack_peer(buffer[25:44])
-        self.packet = buffer[44:]
+        self.peeraddr = unpack_peer(buffer[25:43])
+        self.packet = buffer[43:]
 
 
 class Resp(_Zmsg):
@@ -98,3 +120,31 @@ class Resp(_Zmsg):
     def decode(self, buffer):
         self.imei = buffer[:16].decode()
         self.packet = buffer[16:]
+
+
+class LocEvt(_Zmsg):
+    """Zmq message with original or approximated location from lookaside"""
+
+    KWARGS = (
+        ("imei", "0000000000000000"),
+        ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)),
+        ("lat", 0.0),
+        ("lon", 0.0),
+        ("is_gps", True),
+    )
+
+    @property
+    def packed(self):
+        return self.imei.encode() + pack(
+            "!dddB",
+            self.devtime.replace(tzinfo=timezone.utc).timestamp(),
+            self.lat,
+            self.lon,
+            int(self.is_gps),
+        )
+
+    def decode(self, buffer):
+        self.imei = buffer[:16].decode()
+        when, self.lat, self.lon, is_gps = unpack("!dddB", buffer[16:])
+        self.devtime = datetime.fromtimestamp(when).astimezone(tz=timezone.utc)
+        self.is_gps = bool(is_gps)