]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/zmsg.py
test: add fuzzer for beesure protocol
[loctrkd.git] / loctrkd / zmsg.py
index b6faa7025a1df310422e6e51603878d9c01254b1..da0dc77e39da6f38d7eaefe5adbdfe897f928b28 100644 (file)
@@ -4,7 +4,7 @@ import ipaddress as ip
 from struct import pack, unpack
 from typing import Any, cast, Optional, Tuple, Type, Union
 
 from struct import pack, unpack
 from typing import Any, cast, Optional, Tuple, Type, Union
 
-__all__ = "Bcast", "Resp", "topic"
+__all__ = "Bcast", "Resp", "topic", "rtopic"
 
 
 def pack_peer(  # 18 bytes
 
 
 def pack_peer(  # 18 bytes
@@ -100,6 +100,10 @@ def topic(
     )
 
 
     )
 
 
+def rtopic(imei: str) -> bytes:
+    return pack("16s", imei.encode())
+
+
 class Bcast(_Zmsg):
     """Zmq message to broadcast what was received from the terminal"""
 
 class Bcast(_Zmsg):
     """Zmq message to broadcast what was received from the terminal"""
 
@@ -131,7 +135,7 @@ class Bcast(_Zmsg):
     def decode(self, buffer: bytes) -> None:
         is_incoming, proto, imei, when = unpack("!B16s16sd", buffer[:41])
         self.is_incoming = bool(is_incoming)
     def decode(self, buffer: bytes) -> None:
         is_incoming, proto, imei, when = unpack("!B16s16sd", buffer[:41])
         self.is_incoming = bool(is_incoming)
-        self.proto = proto.decode()
+        self.proto = proto.decode().rstrip("\0")
         self.imei = (
             None if imei == b"0000000000000000" else imei.decode().strip("\0")
         )
         self.imei = (
             None if imei == b"0000000000000000" else imei.decode().strip("\0")
         )
@@ -166,3 +170,28 @@ class Resp(_Zmsg):
 
         self.when = when
         self.packet = buffer[24:]
 
         self.when = when
         self.packet = buffer[24:]
+
+
+class Rept(_Zmsg):
+    """Broadcast Zzmq message with "rectified" proto-agnostic json data"""
+
+    KWARGS = (("imei", None), ("payload", ""))
+
+    @property
+    def packed(self) -> bytes:
+        return (
+            pack(
+                "16s",
+                "0000000000000000"
+                if self.imei is None
+                else self.imei.encode(),
+            )
+            + self.payload.encode()
+        )
+
+    def decode(self, buffer: bytes) -> None:
+        imei = buffer[:16]
+        self.imei = (
+            None if imei == b"0000000000000000" else imei.decode().strip("\0")
+        )
+        self.payload = buffer[16:].decode()