X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=cb45d8144bb9ec132265f8bed3d18881b6dab6de;hb=bf48ccad4b4b91e7d7e09d1087f5953bc2db97d7;hp=9d694b0ae4c83180a7a718aa21be5daa4e7fe6f4;hpb=057cce452eb53d5fbe365a66669bd8dec7dfe989;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index 9d694b0..cb45d81 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -1,6 +1,7 @@ """ TCP server that communicates with terminals """ from configparser import ConfigParser +from importlib import import_module from logging import getLogger from os import umask from socket import ( @@ -13,17 +14,10 @@ from socket import ( ) from struct import pack from time import time -from typing import Dict, List, Optional, Tuple +from typing import Any, cast, Dict, List, Optional, Tuple, Union import zmq from . import common -from .gps303proto import ( - HIBERNATION, - LOGIN, - inline_response, - parse_message, - proto_of_message, -) from .zmsg import Bcast, Resp log = getLogger("gps303/collector") @@ -31,19 +25,71 @@ log = getLogger("gps303/collector") MAXBUFFER: int = 4096 +class ProtoModule: + class Stream: + @staticmethod + def enframe(buffer: bytes) -> bytes: + ... + + def recv(self, segment: bytes) -> List[Union[bytes, str]]: + ... + + def close(self) -> bytes: + ... + + @staticmethod + def probe_buffer(buffer: bytes) -> bool: + ... + + @staticmethod + def parse_message(packet: bytes, is_incoming: bool = True) -> Any: + ... + + @staticmethod + def inline_response(packet: bytes) -> Optional[bytes]: + ... + + @staticmethod + def is_goodbye_packet(packet: bytes) -> bool: + ... + + @staticmethod + def imei_from_packet(packet: bytes) -> Optional[str]: + ... + + @staticmethod + def proto_of_message(packet: bytes) -> str: + ... + + @staticmethod + def proto_by_name(name: str) -> int: + ... + + +pmods: List[ProtoModule] = [] + + class Client: """Connected socket to the terminal plus buffer and metadata""" def __init__(self, sock: socket, addr: Tuple[str, int]) -> None: self.sock = sock self.addr = addr - self.buffer = b"" + self.pmod: Optional[ProtoModule] = None + self.stream: Optional[ProtoModule.Stream] = None self.imei: Optional[str] = None def close(self) -> None: log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) self.sock.close() - self.buffer = b"" + if self.stream: + rest = self.stream.close() + else: + rest = b"" + if rest: + log.warning( + "%d bytes in buffer on close: %s", len(rest), rest[:64].hex() + ) def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]: """Read from the socket and parse complete messages""" @@ -64,65 +110,38 @@ class Client: self.imei, ) return None + if self.stream is None: + for pmod in pmods: + if pmod.probe_buffer(segment): + self.pmod = pmod + self.stream = pmod.Stream() + break + if self.stream is None: + log.info( + "unrecognizable %d bytes of data %s from fd %d", + len(segment), + segment[:32].hex(), + self.sock.fileno(), + ) + return [] when = time() - self.buffer += segment - if len(self.buffer) > MAXBUFFER: - # We are receiving junk. Let's drop it or we run out of memory. - log.warning("More than %d unparseable data, dropping", MAXBUFFER) - self.buffer = b"" msgs = [] - while True: - framestart = self.buffer.find(b"xx") - if framestart == -1: # No frames, return whatever we have - break - if framestart > 0: # Should not happen, report + for elem in self.stream.recv(segment): + if isinstance(elem, bytes): + msgs.append((when, self.addr, elem)) + else: log.warning( - 'Undecodable data (%d) "%s" from fd %d (IMEI %s)', - framestart, - self.buffer[:framestart][:64].hex(), + "%s from fd %d (IMEI %s)", + elem, self.sock.fileno(), self.imei, ) - self.buffer = self.buffer[framestart:] - # At this point, buffer starts with a packet - if len(self.buffer) < 6: # no len and proto - cannot proceed - break - exp_end = self.buffer[2] + 3 # Expect '\r\n' here - frameend = 0 - # Length field can legitimeely be much less than the - # length of the packet (e.g. WiFi positioning), but - # it _should not_ be greater. Still sometimes it is. - # Luckily, not by too much: by maybe two or three bytes? - # Do this embarrassing hack to avoid accidental match - # of some binary data in the packet against '\r\n'. - while True: - frameend = self.buffer.find(b"\r\n", frameend + 1) - if frameend == -1 or frameend >= ( - exp_end - 3 - ): # Found realistic match or none - break - if frameend == -1: # Incomplete frame, return what we have - break - packet = self.buffer[2:frameend] - self.buffer = self.buffer[frameend + 2 :] - if len(packet) < 2: # frameend comes too early - log.warning("Packet too short: %s", packet) - break - if proto_of_message(packet) == LOGIN.PROTO: - msg = parse_message(packet) - if isinstance(msg, LOGIN): # Can be unparseable - self.imei = msg.imei - log.info( - "LOGIN from fd %d (IMEI %s)", - self.sock.fileno(), - self.imei, - ) - msgs.append((when, self.addr, packet)) return msgs def send(self, buffer: bytes) -> None: + assert self.stream is not None try: - self.sock.send(b"xx" + buffer + b"\r\n") + self.sock.send(self.stream.enframe(buffer)) except OSError as e: log.error( "Sending to fd %d (IMEI %s): %s", @@ -153,15 +172,28 @@ class Clients: def recv( self, fd: int - ) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]: + ) -> Optional[ + List[Tuple[ProtoModule, Optional[str], float, Tuple[str, int], bytes]] + ]: clnt = self.by_fd[fd] msgs = clnt.recv() if msgs is None: return None result = [] for when, peeraddr, packet in msgs: - if proto_of_message(packet) == LOGIN.PROTO: # Could do blindly... - if clnt.imei: + assert clnt.pmod is not None + if clnt.imei is None: + imei = clnt.pmod.imei_from_packet(packet) + if imei is not None: + log.info("LOGIN from fd %d (IMEI %s)", fd, imei) + clnt.imei = imei + oldclnt = self.by_imei.get(clnt.imei) + if oldclnt is not None: + log.info( + "Orphaning fd %d with the same IMEI", + oldclnt.sock.fileno(), + ) + oldclnt.imei = None self.by_imei[clnt.imei] = clnt else: log.warning( @@ -169,7 +201,7 @@ class Clients: peeraddr, packet, ) - result.append((clnt.imei, when, peeraddr, packet)) + result.append((clnt.pmod, clnt.imei, when, peeraddr, packet)) log.debug( "Received from %s (IMEI %s): %s", peeraddr, @@ -178,14 +210,22 @@ class Clients: ) return result - def response(self, resp: Resp) -> None: + def response(self, resp: Resp) -> Optional[ProtoModule]: if resp.imei in self.by_imei: - self.by_imei[resp.imei].send(resp.packet) + clnt = self.by_imei[resp.imei] + clnt.send(resp.packet) + return clnt.pmod else: log.info("Not connected (IMEI %s)", resp.imei) + return None def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: + global pmods + pmods = [ + cast(ProtoModule, import_module("." + modnm, __package__)) + for modnm in conf.get("collector", "protocols").split(",") + ] # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! zctx = zmq.Context() # type: ignore zpub = zctx.socket(zmq.PUB) # type: ignore @@ -228,8 +268,8 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: log.debug("Terminal gone from fd %d", sk) tostop.append(sk) else: - for imei, when, peeraddr, packet in received: - proto = proto_of_message(packet) + for pmod, imei, when, peeraddr, packet in received: + proto = pmod.proto_of_message(packet) zpub.send( Bcast( proto=proto, @@ -239,14 +279,17 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: packet=packet, ).packed ) - if proto == HIBERNATION.PROTO and handle_hibernate: + if ( + pmod.is_goodbye_packet(packet) + and handle_hibernate + ): log.debug( - "HIBERNATION from fd %d (IMEI %s)", + "Goodbye from fd %d (IMEI %s)", sk, imei, ) tostop.append(sk) - respmsg = inline_response(packet) + respmsg = pmod.inline_response(packet) if respmsg is not None: tosend.append( Resp(imei=imei, when=when, packet=respmsg) @@ -255,17 +298,18 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: log.debug("Stray event: %s on socket %s", fl, sk) # poll queue consumed, make changes now for zmsg in tosend: - zpub.send( - Bcast( - is_incoming=False, - proto=proto_of_message(zmsg.packet), - when=zmsg.when, - imei=zmsg.imei, - packet=zmsg.packet, - ).packed - ) log.debug("Sending to the client: %s", zmsg) - clients.response(zmsg) + rpmod = clients.response(zmsg) + if rpmod is not None: + zpub.send( + Bcast( + is_incoming=False, + proto=rpmod.proto_of_message(zmsg.packet), + when=zmsg.when, + imei=zmsg.imei, + packet=zmsg.packet, + ).packed + ) for fd in tostop: poller.unregister(fd) # type: ignore clients.stop(fd)