]> www.average.org Git - loctrkd.git/blobdiff - gps303/collector.py
collector: fix problems found by fuzzer test
[loctrkd.git] / gps303 / collector.py
index 0efc669fd908e83a480c3e86839efda8b2254d45..c2efd79b82182e13319526a644abccb0e85071b9 100644 (file)
@@ -1,10 +1,12 @@
 """ TCP server that communicates with terminals """
 
+from configparser import ConfigParser
 from logging import getLogger
 from os import umask
 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
-from time import time
 from struct import pack
+from time import time
+from typing import Dict, List, Optional, Tuple
 import zmq
 
 from . import common
@@ -19,25 +21,27 @@ from .zmsg import Bcast, Resp
 
 log = getLogger("gps303/collector")
 
+MAXBUFFER: int = 4096
+
 
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
 
-    def __init__(self, sock, addr):
+    def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
         self.sock = sock
         self.addr = addr
         self.buffer = b""
-        self.imei = None
+        self.imei: Optional[str] = None
 
-    def close(self):
+    def close(self) -> None:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
         self.buffer = b""
 
-    def recv(self):
+    def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
         """Read from the socket and parse complete messages"""
         try:
-            segment = self.sock.recv(4096)
+            segment = self.sock.recv(MAXBUFFER)
         except OSError as e:
             log.warning(
                 "Reading from fd %d (IMEI %s): %s",
@@ -55,6 +59,10 @@ class Client:
             return None
         when = time()
         self.buffer += segment
+        if len(self.buffer) > MAXBUFFER:
+            # We are receiving junk. Let's drop it or we run out of memory.
+            log.warning("More than %d unparseable data, dropping", MAXBUFFER)
+            self.buffer = b""
         msgs = []
         while True:
             framestart = self.buffer.find(b"xx")
@@ -62,8 +70,9 @@ class Client:
                 break
             if framestart > 0:  # Should not happen, report
                 log.warning(
-                    'Undecodable data "%s" from fd %d (IMEI %s)',
-                    self.buffer[:framestart].hex(),
+                    'Undecodable data (%d) "%s" from fd %d (IMEI %s)',
+                    framestart,
+                    self.buffer[:framestart][:64].hex(),
                     self.sock.fileno(),
                     self.imei,
                 )
@@ -80,8 +89,10 @@ class Client:
             # Do this embarrassing hack to avoid accidental match
             # of some binary data in the packet against '\r\n'.
             while True:
-                frameend = self.buffer.find(b"\r\n", frameend)
-                if frameend >= (exp_end - 3):  # Found realistic match
+                frameend = self.buffer.find(b"\r\n", frameend + 1)
+                if frameend == -1 or frameend >= (
+                    exp_end - 3
+                ):  # Found realistic match or none
                     break
             if frameend == -1:  # Incomplete frame, return what we have
                 break
@@ -95,7 +106,7 @@ class Client:
             msgs.append((when, self.addr, packet))
         return msgs
 
-    def send(self, buffer):
+    def send(self, buffer: bytes) -> None:
         try:
             self.sock.send(b"xx" + buffer + b"\r\n")
         except OSError as e:
@@ -108,17 +119,17 @@ class Client:
 
 
 class Clients:
-    def __init__(self):
-        self.by_fd = {}
-        self.by_imei = {}
+    def __init__(self) -> None:
+        self.by_fd: Dict[int, Client] = {}
+        self.by_imei: Dict[str, Client] = {}
 
-    def add(self, clntsock, clntaddr):
+    def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
         fd = clntsock.fileno()
         log.info("Start serving fd %d from %s", fd, clntaddr)
         self.by_fd[fd] = Client(clntsock, clntaddr)
         return fd
 
-    def stop(self, fd):
+    def stop(self, fd: int) -> None:
         clnt = self.by_fd[fd]
         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
         clnt.close()
@@ -126,7 +137,9 @@ class Clients:
             del self.by_imei[clnt.imei]
         del self.by_fd[fd]
 
-    def recv(self, fd):
+    def recv(
+        self, fd: int
+    ) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]:
         clnt = self.by_fd[fd]
         msgs = clnt.recv()
         if msgs is None:
@@ -134,7 +147,14 @@ class Clients:
         result = []
         for when, peeraddr, packet in msgs:
             if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
-                self.by_imei[clnt.imei] = clnt
+                if clnt.imei:
+                    self.by_imei[clnt.imei] = clnt
+                else:
+                    log.warning(
+                        "Login message from %s: %s, but client imei unfilled",
+                        peeraddr,
+                        packet,
+                    )
             result.append((clnt.imei, when, peeraddr, packet))
             log.debug(
                 "Received from %s (IMEI %s): %s",
@@ -144,17 +164,18 @@ class Clients:
             )
         return result
 
-    def response(self, resp):
+    def response(self, resp: Resp) -> None:
         if resp.imei in self.by_imei:
             self.by_imei[resp.imei].send(resp.packet)
         else:
             log.info("Not connected (IMEI %s)", resp.imei)
 
 
-def runserver(conf):
-    zctx = zmq.Context()
-    zpub = zctx.socket(zmq.PUB)
-    zpull = zctx.socket(zmq.PULL)
+def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
+    # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
+    zctx = zmq.Context()  # type: ignore
+    zpub = zctx.socket(zmq.PUB)  # type: ignore
+    zpull = zctx.socket(zmq.PULL)  # type: ignore
     oldmask = umask(0o117)
     zpub.bind(conf.get("collector", "publishurl"))
     zpull.bind(conf.get("collector", "listenurl"))
@@ -164,7 +185,7 @@ def runserver(conf):
     tcpl.bind(("", conf.getint("collector", "port")))
     tcpl.listen(5)
     tcpfd = tcpl.fileno()
-    poller = zmq.Poller()
+    poller = zmq.Poller()  # type: ignore
     poller.register(zpull, flags=zmq.POLLIN)
     poller.register(tcpfd, flags=zmq.POLLIN)
     clients = Clients()
@@ -203,7 +224,7 @@ def runserver(conf):
                                     packet=packet,
                                 ).packed
                             )
-                            if proto == HIBERNATION.PROTO:
+                            if proto == HIBERNATION.PROTO and handle_hibernate:
                                 log.debug(
                                     "HIBERNATION from fd %d (IMEI %s)",
                                     sk,
@@ -219,7 +240,7 @@ def runserver(conf):
                     log.debug("Stray event: %s on socket %s", fl, sk)
             # poll queue consumed, make changes now
             for fd in tostop:
-                poller.unregister(fd)
+                poller.unregister(fd)  # type: ignore
                 clients.stop(fd)
             for zmsg in tosend:
                 zpub.send(
@@ -237,7 +258,10 @@ def runserver(conf):
                 fd = clients.add(clntsock, clntaddr)
                 poller.register(fd, flags=zmq.POLLIN)
     except KeyboardInterrupt:
-        pass
+        zpub.close()
+        zpull.close()
+        zctx.destroy()  # type: ignore
+        tcpl.close()
 
 
 if __name__.endswith("__main__"):