]> www.average.org Git - loctrkd.git/blobdiff - gps303/collector.py
fill in `when` in Resp packet
[loctrkd.git] / gps303 / collector.py
index 385285494d7f0782bfc3c8ab20eff1ece22bbf7e..b8cc379a16a5cbac14615ead70e84b841e7425ab 100644 (file)
@@ -1,7 +1,8 @@
 """ TCP server that communicates with terminals """
 
 from logging import getLogger
-from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
+from os import umask
+from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
 from time import time
 from struct import pack
 import zmq
@@ -14,33 +15,11 @@ from .gps303proto import (
     parse_message,
     proto_of_message,
 )
+from .zmsg import Bcast, Resp
 
 log = getLogger("gps303/collector")
 
 
-class Bcast:
-    """Zmq message to broadcast what was received from the terminal"""
-
-    def __init__(self, imei, msg):
-        self.as_bytes = (
-            pack("B", proto_of_message(msg))
-            + ("0000000000000000" if imei is None else imei).encode()
-            + msg
-        )
-
-
-class Resp:
-    """Zmq message received from a third party to send to the terminal"""
-
-    def __init__(self, *args, **kwargs):
-        if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
-            self.imei = msg[:16].decode()
-            self.payload = msg[16:]
-        elif len(args) == 0:
-            self.imei = kwargs["imei"]
-            self.payload = kwargs["payload"]
-
-
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
 
@@ -54,7 +33,6 @@ class Client:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
         self.buffer = b""
-        self.imei = None
 
     def recv(self):
         """Read from the socket and parse complete messages"""
@@ -101,7 +79,7 @@ class Client:
                 log.info(
                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
                 )
-            msgs.append(packet)
+            msgs.append((when, self.addr, packet))
         return msgs
 
     def send(self, buffer):
@@ -123,6 +101,7 @@ class Clients:
 
     def add(self, clntsock, clntaddr):
         fd = clntsock.fileno()
+        log.info("Start serving fd %d from %s", fd, clntaddr)
         self.by_fd[fd] = Client(clntsock, clntaddr)
         return fd
 
@@ -140,30 +119,40 @@ class Clients:
         if msgs is None:
             return None
         result = []
-        for msg in msgs:
-            if proto_of_message(msg) == LOGIN.PROTO:  # Could do blindly...
+        for when, peeraddr, packet in msgs:
+            if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
                 self.by_imei[clnt.imei] = clnt
-            result.append((clnt.imei, msg))
+            result.append((clnt.imei, when, peeraddr, packet))
+            log.debug(
+                "Received from %s (IMEI %s): %s",
+                peeraddr,
+                clnt.imei,
+                packet.hex(),
+            )
         return result
 
     def response(self, resp):
         if resp.imei in self.by_imei:
-            self.by_imei[resp.imei].send(resp.payload)
+            self.by_imei[resp.imei].send(resp.packet)
+        else:
+            log.info("Not connected (IMEI %s)", resp.imei)
 
 
 def runserver(conf):
     zctx = zmq.Context()
     zpub = zctx.socket(zmq.PUB)
+    zpull = zctx.socket(zmq.PULL)
+    oldmask = umask(0o117)
     zpub.bind(conf.get("collector", "publishurl"))
-    zsub = zctx.socket(zmq.SUB)
-    zsub.connect(conf.get("collector", "listenurl"))
-    tcpl = socket(AF_INET, SOCK_STREAM)
+    zpull.bind(conf.get("collector", "listenurl"))
+    umask(oldmask)
+    tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
     tcpl.bind(("", conf.getint("collector", "port")))
     tcpl.listen(5)
     tcpfd = tcpl.fileno()
     poller = zmq.Poller()
-    poller.register(zsub, flags=zmq.POLLIN)
+    poller.register(zpull, flags=zmq.POLLIN)
     poller.register(tcpfd, flags=zmq.POLLIN)
     clients = Clients()
     try:
@@ -171,19 +160,29 @@ def runserver(conf):
             tosend = []
             topoll = []
             tostop = []
-            events = poller.poll(10)
+            events = poller.poll(1000)
             for sk, fl in events:
-                if sk is zsub:
+                if sk is zpull:
                     while True:
                         try:
-                            msg = zsub.recv(zmq.NOBLOCK)
-                            tosend.append(Resp(msg))
+                            msg = zpull.recv(zmq.NOBLOCK)
+                            zmsg = Resp(msg)
+                            zpub.send(
+                                Bcast(
+                                    is_incoming=False,
+                                    proto=proto_of_message(zmsg.packet),
+                                    when=zmsg.when,
+                                    imei=zmsg.imei,
+                                    packet=zmsg.packet,
+                                ).packed
+                            )
+                            tosend.append(zmsg)
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
                     clntsock, clntaddr = tcpl.accept()
                     topoll.append((clntsock, clntaddr))
-                else:
+                elif fl & zmq.POLLIN:
                     received = clients.recv(sk)
                     if received is None:
                         log.debug(
@@ -191,29 +190,41 @@ def runserver(conf):
                         )
                         tostop.append(sk)
                     else:
-                        for imei, msg in received:
-                            zpub.send(Bcast(imei, msg).as_bytes)
-                            if proto_of_message(msg) == HIBERNATION.PROTO:
+                        for imei, when, peeraddr, packet in received:
+                            proto = proto_of_message(packet)
+                            zpub.send(
+                                Bcast(
+                                    proto=proto,
+                                    imei=imei,
+                                    when=when,
+                                    peeraddr=peeraddr,
+                                    packet=packet,
+                                ).packed
+                            )
+                            if proto == HIBERNATION.PROTO:
                                 log.debug(
                                     "HIBERNATION from fd %d (IMEI %s)",
                                     sk,
                                     imei,
                                 )
                                 tostop.append(sk)
-                            respmsg = inline_response(msg)
+                            respmsg = inline_response(packet)
                             if respmsg is not None:
                                 clients.response(
-                                    Resp(imei=imei, payload=respmsg)
+                                    Resp(imei=imei, when=when, packet=respmsg)
                                 )
+                else:
+                    log.debug("Stray event: %s on socket %s", fl, sk)
             # poll queue consumed, make changes now
             for fd in tostop:
                 poller.unregister(fd)
                 clients.stop(fd)
             for zmsg in tosend:
+                log.debug("Sending to the client: %s", zmsg)
                 clients.response(zmsg)
             for clntsock, clntaddr in topoll:
                 fd = clients.add(clntsock, clntaddr)
-                poller.register(fd)
+                poller.register(fd, flags=zmq.POLLIN)
     except KeyboardInterrupt:
         pass