]> www.average.org Git - loctrkd.git/blobdiff - gps303/collector.py
typechecking: annotate collector.py
[loctrkd.git] / gps303 / collector.py
index 345d0b99a0de7048fc75f8a1a515007d2233318d..f4d960eb228d7cb65c5249b90c7bb394160a438f 100644 (file)
@@ -1,59 +1,59 @@
 """ TCP server that communicates with terminals """
 
-from getopt import getopt
-from logging import getLogger, StreamHandler, DEBUG, INFO
-from logging.handlers import SysLogHandler
-from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
+from configparser import ConfigParser
+from logging import getLogger
+from os import umask
+from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
+from struct import pack
 from time import time
-import sys
+from typing import Dict, List, Optional, Tuple
 import zmq
 
-from .config import readconfig
-from .gps303proto import parse_message, HIBERNATION, LOGIN, set_config
-
-CONF = "/etc/gps303.conf"
+from . import common
+from .gps303proto import (
+    HIBERNATION,
+    LOGIN,
+    inline_response,
+    parse_message,
+    proto_of_message,
+)
+from .zmsg import Bcast, Resp
 
 log = getLogger("gps303/collector")
 
 
-class Bcast:
-    """Zmq message to broadcast what was received from the terminal"""
-    def __init__(self, imei, msg):
-        self.as_bytes = imei.encode() + msg.to_packet()
-
-
-class Resp:
-    """Zmq message received from a third party to send to the terminal"""
-    def __init__(self, msg):
-        self.imei = msg[:16].decode()
-        self.payload = msg[16:]
-
-
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
-    def __init__(self, sock, addr):
+
+    def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
         self.sock = sock
         self.addr = addr
         self.buffer = b""
-        self.imei = None
+        self.imei: Optional[str] = None
 
-    def close(self):
+    def close(self) -> None:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
         self.buffer = b""
-        self.imei = None
 
-    def recv(self):
-        """ Read from the socket and parse complete messages """
+    def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
+        """Read from the socket and parse complete messages"""
         try:
             segment = self.sock.recv(4096)
-        except OSError:
-            log.warning("Reading from fd %d (IMEI %s): %s",
-                    self.sock.fileno(), self.imei, e)
+        except OSError as e:
+            log.warning(
+                "Reading from fd %d (IMEI %s): %s",
+                self.sock.fileno(),
+                self.imei,
+                e,
+            )
             return None
         if not segment:  # Terminal has closed connection
-            log.info("EOF reading from fd %d (IMEI %s)",
-                    self.sock.fileno(), self.imei)
+            log.info(
+                "EOF reading from fd %d (IMEI %s)",
+                self.sock.fileno(),
+                self.imei,
+            )
             return None
         when = time()
         self.buffer += segment
@@ -63,40 +63,64 @@ class Client:
             if framestart == -1:  # No frames, return whatever we have
                 break
             if framestart > 0:  # Should not happen, report
-                log.warning("Undecodable data \"%s\" from fd %d (IMEI %s)",
-                        self.buffer[:framestart].hex(), self.sock.fileno(), self.imei)
+                log.warning(
+                    'Undecodable data "%s" from fd %d (IMEI %s)',
+                    self.buffer[:framestart].hex(),
+                    self.sock.fileno(),
+                    self.imei,
+                )
                 self.buffer = self.buffer[framestart:]
             # At this point, buffer starts with a packet
-            frameend = self.buffer.find(b"\r\n", 4)
+            if len(self.buffer) < 6:  # no len and proto - cannot proceed
+                break
+            exp_end = self.buffer[2] + 3  # Expect '\r\n' here
+            frameend = 0
+            # Length field can legitimeely be much less than the
+            # length of the packet (e.g. WiFi positioning), but
+            # it _should not_ be greater. Still sometimes it is.
+            # Luckily, not by too much: by maybe two or three bytes?
+            # Do this embarrassing hack to avoid accidental match
+            # of some binary data in the packet against '\r\n'.
+            while True:
+                frameend = self.buffer.find(b"\r\n", frameend)
+                if frameend >= (exp_end - 3):  # Found realistic match
+                    break
             if frameend == -1:  # Incomplete frame, return what we have
                 break
-            msg = parse_message(self.buffer[2:frameend])
-            self.buffer = self.buffer[frameend+2:]
-            if isinstance(msg, LOGIN):
-                self.imei = msg.imei
-                log.info("LOGIN from fd %d: IMEI %s",
-                        self.sock.fileno(), self.imei)
-            msgs.append(msg)
+            packet = self.buffer[2:frameend]
+            self.buffer = self.buffer[frameend + 2 :]
+            if proto_of_message(packet) == LOGIN.PROTO:
+                self.imei = parse_message(packet).imei
+                log.info(
+                    "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
+                )
+            msgs.append((when, self.addr, packet))
         return msgs
 
-    def send(self, buffer):
+    def send(self, buffer: bytes) -> None:
         try:
             self.sock.send(b"xx" + buffer + b"\r\n")
         except OSError as e:
-            log.error("Sending to fd %d (IMEI %s): %s",
-                    self.sock.fileno, self.imei, e)
+            log.error(
+                "Sending to fd %d (IMEI %s): %s",
+                self.sock.fileno,
+                self.imei,
+                e,
+            )
+
 
 class Clients:
-    def __init__(self):
-        self.by_fd = {}
-        self.by_imei = {}
+    def __init__(self) -> None:
+        self.by_fd: Dict[int, Client] = {}
+        self.by_imei: Dict[str, Client] = {}
 
-    def add(self, clntsock, clntaddr):
+    def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
         fd = clntsock.fileno()
+        log.info("Start serving fd %d from %s", fd, clntaddr)
         self.by_fd[fd] = Client(clntsock, clntaddr)
         return fd
 
-    def stop(self, fd):
+    def stop(self, fd: int) -> None:
         clnt = self.by_fd[fd]
         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
         clnt.close()
@@ -104,34 +128,50 @@ class Clients:
             del self.by_imei[clnt.imei]
         del self.by_fd[fd]
 
-    def recv(self, fd):
+    def recv(self, fd: int) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]:
         clnt = self.by_fd[fd]
         msgs = clnt.recv()
+        if msgs is None:
+            return None
         result = []
-        for msg in msgs:
-            if isinstance(msg, LOGIN):
-                self.by_imei[clnt.imei] = clnt
-            result.append((clnt.imei, msg))
+        for when, peeraddr, packet in msgs:
+            if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
+                if clnt.imei:
+                    self.by_imei[clnt.imei] = clnt
+                else:
+                    log.warning("Login message from %s: %s, but client imei unfilled", peeraddr, packet)
+            result.append((clnt.imei, when, peeraddr, packet))
+            log.debug(
+                "Received from %s (IMEI %s): %s",
+                peeraddr,
+                clnt.imei,
+                packet.hex(),
+            )
         return result
 
-    def response(self, resp):
+    def response(self, resp: Resp) -> None:
         if resp.imei in self.by_imei:
-            self.by_imei[resp.imei].send(resp.payload)
+            self.by_imei[resp.imei].send(resp.packet)
+        else:
+            log.info("Not connected (IMEI %s)", resp.imei)
 
 
-def runserver(opts, conf):
-    zctx = zmq.Context()
-    zpub = zctx.socket(zmq.PUB)
+def runserver(conf: ConfigParser) -> None:
+    # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
+    zctx = zmq.Context()  # type: ignore
+    zpub = zctx.socket(zmq.PUB)  # type: ignore
+    zpull = zctx.socket(zmq.PULL)  # type: ignore
+    oldmask = umask(0o117)
     zpub.bind(conf.get("collector", "publishurl"))
-    zsub = zctx.socket(zmq.SUB)
-    zsub.connect(conf.get("collector", "listenurl"))
-    tcpl = socket(AF_INET, SOCK_STREAM)
+    zpull.bind(conf.get("collector", "listenurl"))
+    umask(oldmask)
+    tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
     tcpl.bind(("", conf.getint("collector", "port")))
     tcpl.listen(5)
     tcpfd = tcpl.fileno()
-    poller = zmq.Poller()
-    poller.register(zsub, flags=zmq.POLLIN)
+    poller = zmq.Poller()  # type: ignore
+    poller.register(zpull, flags=zmq.POLLIN)
     poller.register(tcpfd, flags=zmq.POLLIN)
     clients = Clients()
     try:
@@ -139,45 +179,72 @@ def runserver(opts, conf):
             tosend = []
             topoll = []
             tostop = []
-            events = poller.poll(10)
+            events = poller.poll(1000)
             for sk, fl in events:
-                if sk is zsub:
+                if sk is zpull:
                     while True:
                         try:
-                            msg = zsub.recv(zmq.NOBLOCK)
-                            tosend.append(Resp(msg))
+                            msg = zpull.recv(zmq.NOBLOCK)
+                            zmsg = Resp(msg)
+                            tosend.append(zmsg)
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
                     clntsock, clntaddr = tcpl.accept()
                     topoll.append((clntsock, clntaddr))
+                elif fl & zmq.POLLIN:
+                    received = clients.recv(sk)
+                    if received is None:
+                        log.debug("Terminal gone from fd %d", sk)
+                        tostop.append(sk)
+                    else:
+                        for imei, when, peeraddr, packet in received:
+                            proto = proto_of_message(packet)
+                            zpub.send(
+                                Bcast(
+                                    proto=proto,
+                                    imei=imei,
+                                    when=when,
+                                    peeraddr=peeraddr,
+                                    packet=packet,
+                                ).packed
+                            )
+                            if proto == HIBERNATION.PROTO:
+                                log.debug(
+                                    "HIBERNATION from fd %d (IMEI %s)",
+                                    sk,
+                                    imei,
+                                )
+                                tostop.append(sk)
+                            respmsg = inline_response(packet)
+                            if respmsg is not None:
+                                tosend.append(
+                                    Resp(imei=imei, when=when, packet=respmsg)
+                                )
                 else:
-                    for imei, msg in clients.recv(sk):
-                        zpub.send(Bcast(imei, msg).as_bytes)
-                        if msg is None or isinstance(msg, HIBERNATION):
-                            log.debug("HIBERNATION from fd %d", sk)
-                            tostop.append(sk)
+                    log.debug("Stray event: %s on socket %s", fl, sk)
             # poll queue consumed, make changes now
             for fd in tostop:
-                poller.unregister(fd)
+                poller.unregister(fd)  # type: ignore
                 clients.stop(fd)
             for zmsg in tosend:
+                zpub.send(
+                    Bcast(
+                        is_incoming=False,
+                        proto=proto_of_message(zmsg.packet),
+                        when=zmsg.when,
+                        imei=zmsg.imei,
+                        packet=zmsg.packet,
+                    ).packed
+                )
+                log.debug("Sending to the client: %s", zmsg)
                 clients.response(zmsg)
             for clntsock, clntaddr in topoll:
                 fd = clients.add(clntsock, clntaddr)
-                poller.register(fd)
+                poller.register(fd, flags=zmq.POLLIN)
     except KeyboardInterrupt:
         pass
 
 
 if __name__.endswith("__main__"):
-    opts, _ = getopt(sys.argv[1:], "c:d")
-    opts = dict(opts)
-    conf = readconfig(opts["-c"] if "-c" in opts else CONF)
-    if sys.stdout.isatty():
-        log.addHandler(StreamHandler(sys.stderr))
-    else:
-        log.addHandler(SysLogHandler(address="/dev/log"))
-    log.setLevel(DEBUG if "-d" in opts else INFO)
-    log.info("starting with options: %s", opts)
-    runserver(opts, conf)
+    runserver(common.init(log))