]> www.average.org Git - loctrkd.git/blobdiff - gps303/collector.py
full encoder/decoder for zmq messages
[loctrkd.git] / gps303 / collector.py
index 385285494d7f0782bfc3c8ab20eff1ece22bbf7e..4cb7956d2809c4dfd57782d7a15b9bf57abfdbeb 100644 (file)
@@ -14,33 +14,11 @@ from .gps303proto import (
     parse_message,
     proto_of_message,
 )
+from .zmsg import Bcast, Resp
 
 log = getLogger("gps303/collector")
 
 
-class Bcast:
-    """Zmq message to broadcast what was received from the terminal"""
-
-    def __init__(self, imei, msg):
-        self.as_bytes = (
-            pack("B", proto_of_message(msg))
-            + ("0000000000000000" if imei is None else imei).encode()
-            + msg
-        )
-
-
-class Resp:
-    """Zmq message received from a third party to send to the terminal"""
-
-    def __init__(self, *args, **kwargs):
-        if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
-            self.imei = msg[:16].decode()
-            self.payload = msg[16:]
-        elif len(args) == 0:
-            self.imei = kwargs["imei"]
-            self.payload = kwargs["payload"]
-
-
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
 
@@ -101,7 +79,7 @@ class Client:
                 log.info(
                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
                 )
-            msgs.append(packet)
+            msgs.append((when, self.addr, packet))
         return msgs
 
     def send(self, buffer):
@@ -140,15 +118,15 @@ class Clients:
         if msgs is None:
             return None
         result = []
-        for msg in msgs:
-            if proto_of_message(msg) == LOGIN.PROTO:  # Could do blindly...
+        for when, peeraddr, packet in msgs:
+            if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
                 self.by_imei[clnt.imei] = clnt
-            result.append((clnt.imei, msg))
+            result.append((clnt.imei, when, peeraddr, packet))
         return result
 
     def response(self, resp):
         if resp.imei in self.by_imei:
-            self.by_imei[resp.imei].send(resp.payload)
+            self.by_imei[resp.imei].send(resp.packet)
 
 
 def runserver(conf):
@@ -191,19 +169,28 @@ def runserver(conf):
                         )
                         tostop.append(sk)
                     else:
-                        for imei, msg in received:
-                            zpub.send(Bcast(imei, msg).as_bytes)
-                            if proto_of_message(msg) == HIBERNATION.PROTO:
+                        for imei, when, peeraddr, packet in received:
+                            proto = proto_of_message(packet)
+                            zpub.send(
+                                Bcast(
+                                    proto=proto,
+                                    imei=imei,
+                                    when=when,
+                                    peeraddr=peeraddr,
+                                    packet=packet,
+                                ).packed
+                            )
+                            if proto == HIBERNATION.PROTO:
                                 log.debug(
                                     "HIBERNATION from fd %d (IMEI %s)",
                                     sk,
                                     imei,
                                 )
                                 tostop.append(sk)
-                            respmsg = inline_response(msg)
+                            respmsg = inline_response(packet)
                             if respmsg is not None:
                                 clients.response(
-                                    Resp(imei=imei, payload=respmsg)
+                                    Resp(imei=imei, packet=respmsg)
                                 )
             # poll queue consumed, make changes now
             for fd in tostop: