X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=cb45d8144bb9ec132265f8bed3d18881b6dab6de;hb=c3bc6d5bbdc0d0bf10e338c6e3bad1a519d5afa0;hp=68c95bfb77d0ddbb75e8da64d25f6972d7e94847;hpb=4c173a5448990cd4da398be1bf3479bef17b1048;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index 68c95bf..cb45d81 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -1,62 +1,101 @@ """ TCP server that communicates with terminals """ -from getopt import getopt +from configparser import ConfigParser +from importlib import import_module from logging import getLogger -from logging.handlers import SysLogHandler -from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR -from time import time +from os import umask +from socket import ( + socket, + AF_INET6, + SOCK_STREAM, + SOL_SOCKET, + SO_KEEPALIVE, + SO_REUSEADDR, +) from struct import pack +from time import time +from typing import Any, cast, Dict, List, Optional, Tuple, Union import zmq from . import common -from .gps303proto import HIBERNATION, LOGIN, parse_message, proto_of_message +from .zmsg import Bcast, Resp log = getLogger("gps303/collector") +MAXBUFFER: int = 4096 + + +class ProtoModule: + class Stream: + @staticmethod + def enframe(buffer: bytes) -> bytes: + ... + + def recv(self, segment: bytes) -> List[Union[bytes, str]]: + ... + + def close(self) -> bytes: + ... + + @staticmethod + def probe_buffer(buffer: bytes) -> bool: + ... -class Bcast: - """Zmq message to broadcast what was received from the terminal""" + @staticmethod + def parse_message(packet: bytes, is_incoming: bool = True) -> Any: + ... - def __init__(self, imei, msg): - self.as_bytes = ( - pack("B", proto_of_message(msg)) - + ("0000000000000000" if imei is None else imei).encode() - + msg - ) + @staticmethod + def inline_response(packet: bytes) -> Optional[bytes]: + ... + @staticmethod + def is_goodbye_packet(packet: bytes) -> bool: + ... -class Resp: - """Zmq message received from a third party to send to the terminal""" + @staticmethod + def imei_from_packet(packet: bytes) -> Optional[str]: + ... - def __init__(self, *args, **kwargs): - if not kwargs and len(args) == 1 and isinstance(args[0], bytes): - self.imei = msg[:16].decode() - self.payload = msg[16:] - elif len(args) == 0: - self.imei = kwargs["imei"] - self.payload = kwargs["payload"] + @staticmethod + def proto_of_message(packet: bytes) -> str: + ... + + @staticmethod + def proto_by_name(name: str) -> int: + ... + + +pmods: List[ProtoModule] = [] class Client: """Connected socket to the terminal plus buffer and metadata""" - def __init__(self, sock, addr): + def __init__(self, sock: socket, addr: Tuple[str, int]) -> None: self.sock = sock self.addr = addr - self.buffer = b"" - self.imei = None + self.pmod: Optional[ProtoModule] = None + self.stream: Optional[ProtoModule.Stream] = None + self.imei: Optional[str] = None - def close(self): + def close(self) -> None: log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) self.sock.close() - self.buffer = b"" - self.imei = None + if self.stream: + rest = self.stream.close() + else: + rest = b"" + if rest: + log.warning( + "%d bytes in buffer on close: %s", len(rest), rest[:64].hex() + ) - def recv(self): + def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]: """Read from the socket and parse complete messages""" try: - segment = self.sock.recv(4096) - except OSError: + segment = self.sock.recv(MAXBUFFER) + except OSError as e: log.warning( "Reading from fd %d (IMEI %s): %s", self.sock.fileno(), @@ -71,58 +110,59 @@ class Client: self.imei, ) return None + if self.stream is None: + for pmod in pmods: + if pmod.probe_buffer(segment): + self.pmod = pmod + self.stream = pmod.Stream() + break + if self.stream is None: + log.info( + "unrecognizable %d bytes of data %s from fd %d", + len(segment), + segment[:32].hex(), + self.sock.fileno(), + ) + return [] when = time() - self.buffer += segment msgs = [] - while True: - framestart = self.buffer.find(b"xx") - if framestart == -1: # No frames, return whatever we have - break - if framestart > 0: # Should not happen, report + for elem in self.stream.recv(segment): + if isinstance(elem, bytes): + msgs.append((when, self.addr, elem)) + else: log.warning( - 'Undecodable data "%s" from fd %d (IMEI %s)', - self.buffer[:framestart].hex(), + "%s from fd %d (IMEI %s)", + elem, self.sock.fileno(), self.imei, ) - self.buffer = self.buffer[framestart:] - # At this point, buffer starts with a packet - frameend = self.buffer.find(b"\r\n", 4) - if frameend == -1: # Incomplete frame, return what we have - break - packet = self.buffer[2:frameend] - self.buffer = self.buffer[frameend + 2 :] - if proto_of_message(packet) == LOGIN.PROTO: - self.imei = parse_message(packet).imei - log.info( - "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei - ) - msgs.append(packet) return msgs - def send(self, buffer): + def send(self, buffer: bytes) -> None: + assert self.stream is not None try: - self.sock.send(b"xx" + buffer + b"\r\n") + self.sock.send(self.stream.enframe(buffer)) except OSError as e: log.error( "Sending to fd %d (IMEI %s): %s", - self.sock.fileno, + self.sock.fileno(), self.imei, e, ) class Clients: - def __init__(self): - self.by_fd = {} - self.by_imei = {} + def __init__(self) -> None: + self.by_fd: Dict[int, Client] = {} + self.by_imei: Dict[str, Client] = {} - def add(self, clntsock, clntaddr): + def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int: fd = clntsock.fileno() + log.info("Start serving fd %d from %s", fd, clntaddr) self.by_fd[fd] = Client(clntsock, clntaddr) return fd - def stop(self, fd): + def stop(self, fd: int) -> None: clnt = self.by_fd[fd] log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei) clnt.close() @@ -130,36 +170,77 @@ class Clients: del self.by_imei[clnt.imei] del self.by_fd[fd] - def recv(self, fd): + def recv( + self, fd: int + ) -> Optional[ + List[Tuple[ProtoModule, Optional[str], float, Tuple[str, int], bytes]] + ]: clnt = self.by_fd[fd] msgs = clnt.recv() if msgs is None: return None result = [] - for msg in msgs: - if proto_of_message(msg) == LOGIN.PROTO: # Could do blindly... - self.by_imei[clnt.imei] = clnt - result.append((clnt.imei, msg)) + for when, peeraddr, packet in msgs: + assert clnt.pmod is not None + if clnt.imei is None: + imei = clnt.pmod.imei_from_packet(packet) + if imei is not None: + log.info("LOGIN from fd %d (IMEI %s)", fd, imei) + clnt.imei = imei + oldclnt = self.by_imei.get(clnt.imei) + if oldclnt is not None: + log.info( + "Orphaning fd %d with the same IMEI", + oldclnt.sock.fileno(), + ) + oldclnt.imei = None + self.by_imei[clnt.imei] = clnt + else: + log.warning( + "Login message from %s: %s, but client imei unfilled", + peeraddr, + packet, + ) + result.append((clnt.pmod, clnt.imei, when, peeraddr, packet)) + log.debug( + "Received from %s (IMEI %s): %s", + peeraddr, + clnt.imei, + packet.hex(), + ) return result - def response(self, resp): + def response(self, resp: Resp) -> Optional[ProtoModule]: if resp.imei in self.by_imei: - self.by_imei[resp.imei].send(resp.payload) + clnt = self.by_imei[resp.imei] + clnt.send(resp.packet) + return clnt.pmod + else: + log.info("Not connected (IMEI %s)", resp.imei) + return None -def runserver(conf): - zctx = zmq.Context() - zpub = zctx.socket(zmq.PUB) +def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: + global pmods + pmods = [ + cast(ProtoModule, import_module("." + modnm, __package__)) + for modnm in conf.get("collector", "protocols").split(",") + ] + # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! + zctx = zmq.Context() # type: ignore + zpub = zctx.socket(zmq.PUB) # type: ignore + zpull = zctx.socket(zmq.PULL) # type: ignore + oldmask = umask(0o117) zpub.bind(conf.get("collector", "publishurl")) - zsub = zctx.socket(zmq.SUB) - zsub.connect(conf.get("collector", "listenurl")) - tcpl = socket(AF_INET, SOCK_STREAM) + zpull.bind(conf.get("collector", "listenurl")) + umask(oldmask) + tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) tcpl.bind(("", conf.getint("collector", "port"))) tcpl.listen(5) tcpfd = tcpl.fileno() - poller = zmq.Poller() - poller.register(zsub, flags=zmq.POLLIN) + poller = zmq.Poller() # type: ignore + poller.register(zpull, flags=zmq.POLLIN) poller.register(tcpfd, flags=zmq.POLLIN) clients = Clients() try: @@ -167,42 +248,79 @@ def runserver(conf): tosend = [] topoll = [] tostop = [] - events = poller.poll(10) + events = poller.poll(1000) for sk, fl in events: - if sk is zsub: + if sk is zpull: while True: try: - msg = zsub.recv(zmq.NOBLOCK) - tosend.append(Resp(msg)) + msg = zpull.recv(zmq.NOBLOCK) + zmsg = Resp(msg) + tosend.append(zmsg) except zmq.Again: break elif sk == tcpfd: clntsock, clntaddr = tcpl.accept() + clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) topoll.append((clntsock, clntaddr)) - else: - for imei, msg in clients.recv(sk): - zpub.send(Bcast(imei, msg).as_bytes) - if ( - msg is None - or proto_of_message(msg) == HIBERNATION.PROTO - ): - log.debug( - "HIBERNATION from fd %d (IMEI %s)", sk, imei + elif fl & zmq.POLLIN: + received = clients.recv(sk) + if received is None: + log.debug("Terminal gone from fd %d", sk) + tostop.append(sk) + else: + for pmod, imei, when, peeraddr, packet in received: + proto = pmod.proto_of_message(packet) + zpub.send( + Bcast( + proto=proto, + imei=imei, + when=when, + peeraddr=peeraddr, + packet=packet, + ).packed ) - tostop.append(sk) - elif proto_of_message(msg) == LOGIN.PROTO: - clients.response(Resp(imei=imei, payload=LOGIN.response())) + if ( + pmod.is_goodbye_packet(packet) + and handle_hibernate + ): + log.debug( + "Goodbye from fd %d (IMEI %s)", + sk, + imei, + ) + tostop.append(sk) + respmsg = pmod.inline_response(packet) + if respmsg is not None: + tosend.append( + Resp(imei=imei, when=when, packet=respmsg) + ) + else: + log.debug("Stray event: %s on socket %s", fl, sk) # poll queue consumed, make changes now + for zmsg in tosend: + log.debug("Sending to the client: %s", zmsg) + rpmod = clients.response(zmsg) + if rpmod is not None: + zpub.send( + Bcast( + is_incoming=False, + proto=rpmod.proto_of_message(zmsg.packet), + when=zmsg.when, + imei=zmsg.imei, + packet=zmsg.packet, + ).packed + ) for fd in tostop: - poller.unregister(fd) + poller.unregister(fd) # type: ignore clients.stop(fd) - for zmsg in tosend: - clients.response(zmsg) for clntsock, clntaddr in topoll: fd = clients.add(clntsock, clntaddr) - poller.register(fd) + poller.register(fd, flags=zmq.POLLIN) except KeyboardInterrupt: - pass + zpub.close() + zpull.close() + zctx.destroy() # type: ignore + tcpl.close() if __name__.endswith("__main__"):