X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=28ee4a8cd92321a8007747fea3dd7226066902b7;hb=a7065cf3a3dcd36d0b47e9c25acdf30189019f9c;hp=b8cc379a16a5cbac14615ead70e84b841e7425ab;hpb=2cc29ee67b6432e1cd74a21b3c9181b8b5b557f9;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index b8cc379..28ee4a8 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -1,10 +1,12 @@ """ TCP server that communicates with terminals """ +from configparser import ConfigParser from logging import getLogger from os import umask from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR -from time import time from struct import pack +from time import time +from typing import Dict, List, Optional, Tuple import zmq from . import common @@ -19,26 +21,28 @@ from .zmsg import Bcast, Resp log = getLogger("gps303/collector") +MAXBUFFER: int = 4096 + class Client: """Connected socket to the terminal plus buffer and metadata""" - def __init__(self, sock, addr): + def __init__(self, sock: socket, addr: Tuple[str, int]) -> None: self.sock = sock self.addr = addr self.buffer = b"" - self.imei = None + self.imei: Optional[str] = None - def close(self): + def close(self) -> None: log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) self.sock.close() self.buffer = b"" - def recv(self): + def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]: """Read from the socket and parse complete messages""" try: - segment = self.sock.recv(4096) - except OSError: + segment = self.sock.recv(MAXBUFFER) + except OSError as e: log.warning( "Reading from fd %d (IMEI %s): %s", self.sock.fileno(), @@ -55,6 +59,10 @@ class Client: return None when = time() self.buffer += segment + if len(self.buffer) > MAXBUFFER: + # We are receiving junk. Let's drop it or we run out of memory. + log.warning("More than %d unparseable data, dropping", MAXBUFFER) + self.buffer = b"" msgs = [] while True: framestart = self.buffer.find(b"xx") @@ -62,14 +70,30 @@ class Client: break if framestart > 0: # Should not happen, report log.warning( - 'Undecodable data "%s" from fd %d (IMEI %s)', - self.buffer[:framestart].hex(), + 'Undecodable data (%d) "%s" from fd %d (IMEI %s)', + framestart, + self.buffer[:framestart][:64].hex(), self.sock.fileno(), self.imei, ) self.buffer = self.buffer[framestart:] # At this point, buffer starts with a packet - frameend = self.buffer.find(b"\r\n", 4) + if len(self.buffer) < 6: # no len and proto - cannot proceed + break + exp_end = self.buffer[2] + 3 # Expect '\r\n' here + frameend = 0 + # Length field can legitimeely be much less than the + # length of the packet (e.g. WiFi positioning), but + # it _should not_ be greater. Still sometimes it is. + # Luckily, not by too much: by maybe two or three bytes? + # Do this embarrassing hack to avoid accidental match + # of some binary data in the packet against '\r\n'. + while True: + frameend = self.buffer.find(b"\r\n", frameend + 1) + if frameend == -1 or frameend >= ( + exp_end - 3 + ): # Found realistic match or none + break if frameend == -1: # Incomplete frame, return what we have break packet = self.buffer[2:frameend] @@ -82,7 +106,7 @@ class Client: msgs.append((when, self.addr, packet)) return msgs - def send(self, buffer): + def send(self, buffer: bytes) -> None: try: self.sock.send(b"xx" + buffer + b"\r\n") except OSError as e: @@ -95,17 +119,17 @@ class Client: class Clients: - def __init__(self): - self.by_fd = {} - self.by_imei = {} + def __init__(self) -> None: + self.by_fd: Dict[int, Client] = {} + self.by_imei: Dict[str, Client] = {} - def add(self, clntsock, clntaddr): + def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int: fd = clntsock.fileno() log.info("Start serving fd %d from %s", fd, clntaddr) self.by_fd[fd] = Client(clntsock, clntaddr) return fd - def stop(self, fd): + def stop(self, fd: int) -> None: clnt = self.by_fd[fd] log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei) clnt.close() @@ -113,7 +137,9 @@ class Clients: del self.by_imei[clnt.imei] del self.by_fd[fd] - def recv(self, fd): + def recv( + self, fd: int + ) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]: clnt = self.by_fd[fd] msgs = clnt.recv() if msgs is None: @@ -121,7 +147,14 @@ class Clients: result = [] for when, peeraddr, packet in msgs: if proto_of_message(packet) == LOGIN.PROTO: # Could do blindly... - self.by_imei[clnt.imei] = clnt + if clnt.imei: + self.by_imei[clnt.imei] = clnt + else: + log.warning( + "Login message from %s: %s, but client imei unfilled", + peeraddr, + packet, + ) result.append((clnt.imei, when, peeraddr, packet)) log.debug( "Received from %s (IMEI %s): %s", @@ -131,17 +164,18 @@ class Clients: ) return result - def response(self, resp): + def response(self, resp: Resp) -> None: if resp.imei in self.by_imei: self.by_imei[resp.imei].send(resp.packet) else: log.info("Not connected (IMEI %s)", resp.imei) -def runserver(conf): - zctx = zmq.Context() - zpub = zctx.socket(zmq.PUB) - zpull = zctx.socket(zmq.PULL) +def runserver(conf: ConfigParser) -> None: + # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! + zctx = zmq.Context() # type: ignore + zpub = zctx.socket(zmq.PUB) # type: ignore + zpull = zctx.socket(zmq.PULL) # type: ignore oldmask = umask(0o117) zpub.bind(conf.get("collector", "publishurl")) zpull.bind(conf.get("collector", "listenurl")) @@ -151,7 +185,7 @@ def runserver(conf): tcpl.bind(("", conf.getint("collector", "port"))) tcpl.listen(5) tcpfd = tcpl.fileno() - poller = zmq.Poller() + poller = zmq.Poller() # type: ignore poller.register(zpull, flags=zmq.POLLIN) poller.register(tcpfd, flags=zmq.POLLIN) clients = Clients() @@ -167,15 +201,6 @@ def runserver(conf): try: msg = zpull.recv(zmq.NOBLOCK) zmsg = Resp(msg) - zpub.send( - Bcast( - is_incoming=False, - proto=proto_of_message(zmsg.packet), - when=zmsg.when, - imei=zmsg.imei, - packet=zmsg.packet, - ).packed - ) tosend.append(zmsg) except zmq.Again: break @@ -185,9 +210,7 @@ def runserver(conf): elif fl & zmq.POLLIN: received = clients.recv(sk) if received is None: - log.debug( - "Terminal gone from fd %d (IMEI %s)", sk, imei - ) + log.debug("Terminal gone from fd %d", sk) tostop.append(sk) else: for imei, when, peeraddr, packet in received: @@ -210,16 +233,25 @@ def runserver(conf): tostop.append(sk) respmsg = inline_response(packet) if respmsg is not None: - clients.response( + tosend.append( Resp(imei=imei, when=when, packet=respmsg) ) else: log.debug("Stray event: %s on socket %s", fl, sk) # poll queue consumed, make changes now for fd in tostop: - poller.unregister(fd) + poller.unregister(fd) # type: ignore clients.stop(fd) for zmsg in tosend: + zpub.send( + Bcast( + is_incoming=False, + proto=proto_of_message(zmsg.packet), + when=zmsg.when, + imei=zmsg.imei, + packet=zmsg.packet, + ).packed + ) log.debug("Sending to the client: %s", zmsg) clients.response(zmsg) for clntsock, clntaddr in topoll: