X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=5215d7b42289762b8b8bc79b2da0de0ec2203e09;hb=2ad23fd8e9afe8d6c46263124ecd88ea28fd3059;hp=00594e1d9f180bd542cdc99823f917f016f67f5d;hpb=10665d8aca2b350782616da33888343e8fcc1e65;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index 00594e1..5215d7b 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -3,7 +3,14 @@ from configparser import ConfigParser from logging import getLogger from os import umask -from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR +from socket import ( + socket, + AF_INET6, + SOCK_STREAM, + SOL_SOCKET, + SO_KEEPALIVE, + SO_REUSEADDR, +) from struct import pack from time import time from typing import Dict, List, Optional, Tuple @@ -11,6 +18,8 @@ import zmq from . import common from .gps303proto import ( + GPS303Conn, + StreamError, HIBERNATION, LOGIN, inline_response, @@ -21,6 +30,8 @@ from .zmsg import Bcast, Resp log = getLogger("gps303/collector") +MAXBUFFER: int = 4096 + class Client: """Connected socket to the terminal plus buffer and metadata""" @@ -28,18 +39,20 @@ class Client: def __init__(self, sock: socket, addr: Tuple[str, int]) -> None: self.sock = sock self.addr = addr - self.buffer = b"" + self.stream = GPS303Conn() self.imei: Optional[str] = None def close(self) -> None: log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) self.sock.close() - self.buffer = b"" + rest = self.stream.close() + if rest: + log.warning("%d bytes in buffer on close: %s", len(rest), rest) def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]: """Read from the socket and parse complete messages""" try: - segment = self.sock.recv(4096) + segment = self.sock.recv(MAXBUFFER) except OSError as e: log.warning( "Reading from fd %d (IMEI %s): %s", @@ -56,58 +69,31 @@ class Client: ) return None when = time() - self.buffer += segment - msgs = [] while True: - framestart = self.buffer.find(b"xx") - if framestart == -1: # No frames, return whatever we have - break - if framestart > 0: # Should not happen, report + try: + return [ + (when, self.addr, packet) + for packet in self.stream.recv(segment) + ] + except StreamError as e: log.warning( - 'Undecodable data "%s" from fd %d (IMEI %s)', - self.buffer[:framestart].hex(), - self.sock.fileno(), - self.imei, - ) - self.buffer = self.buffer[framestart:] - # At this point, buffer starts with a packet - if len(self.buffer) < 6: # no len and proto - cannot proceed - break - exp_end = self.buffer[2] + 3 # Expect '\r\n' here - frameend = 0 - # Length field can legitimeely be much less than the - # length of the packet (e.g. WiFi positioning), but - # it _should not_ be greater. Still sometimes it is. - # Luckily, not by too much: by maybe two or three bytes? - # Do this embarrassing hack to avoid accidental match - # of some binary data in the packet against '\r\n'. - while True: - frameend = self.buffer.find(b"\r\n", frameend) - if frameend >= (exp_end - 3): # Found realistic match - break - if frameend == -1: # Incomplete frame, return what we have - break - packet = self.buffer[2:frameend] - self.buffer = self.buffer[frameend + 2 :] - if proto_of_message(packet) == LOGIN.PROTO: - self.imei = parse_message(packet).imei - log.info( - "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei + "%s from fd %d (IMEI %s)", e, self.sock.fileno(), self.imei ) - msgs.append((when, self.addr, packet)) - return msgs def send(self, buffer: bytes) -> None: try: - self.sock.send(b"xx" + buffer + b"\r\n") + self.sock.send(self.stream.enframe(buffer)) except OSError as e: log.error( "Sending to fd %d (IMEI %s): %s", - self.sock.fileno, + self.sock.fileno(), self.imei, e, ) + def set_imei(self, imei: str) -> None: + self.imei = imei + class Clients: def __init__(self) -> None: @@ -137,8 +123,23 @@ class Clients: return None result = [] for when, peeraddr, packet in msgs: - if proto_of_message(packet) == LOGIN.PROTO: # Could do blindly... - if clnt.imei: + if proto_of_message(packet) == LOGIN.PROTO: + msg = parse_message(packet) + if isinstance(msg, LOGIN): # Can be unparseable + if clnt.imei is None: + clnt.imei = msg.imei + log.info( + "LOGIN from fd %d (IMEI %s)", + clnt.sock.fileno(), + clnt.imei, + ) + oldclnt = self.by_imei.get(clnt.imei) + if oldclnt is not None: + log.info( + "Orphaning fd %d with the same IMEI", + oldclnt.sock.fileno(), + ) + oldclnt.imei = None self.by_imei[clnt.imei] = clnt else: log.warning( @@ -162,7 +163,7 @@ class Clients: log.info("Not connected (IMEI %s)", resp.imei) -def runserver(conf: ConfigParser) -> None: +def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None: # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! zctx = zmq.Context() # type: ignore zpub = zctx.socket(zmq.PUB) # type: ignore @@ -197,6 +198,7 @@ def runserver(conf: ConfigParser) -> None: break elif sk == tcpfd: clntsock, clntaddr = tcpl.accept() + clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) topoll.append((clntsock, clntaddr)) elif fl & zmq.POLLIN: received = clients.recv(sk) @@ -215,7 +217,7 @@ def runserver(conf: ConfigParser) -> None: packet=packet, ).packed ) - if proto == HIBERNATION.PROTO: + if proto == HIBERNATION.PROTO and handle_hibernate: log.debug( "HIBERNATION from fd %d (IMEI %s)", sk, @@ -230,9 +232,6 @@ def runserver(conf: ConfigParser) -> None: else: log.debug("Stray event: %s on socket %s", fl, sk) # poll queue consumed, make changes now - for fd in tostop: - poller.unregister(fd) # type: ignore - clients.stop(fd) for zmsg in tosend: zpub.send( Bcast( @@ -245,11 +244,17 @@ def runserver(conf: ConfigParser) -> None: ) log.debug("Sending to the client: %s", zmsg) clients.response(zmsg) + for fd in tostop: + poller.unregister(fd) # type: ignore + clients.stop(fd) for clntsock, clntaddr in topoll: fd = clients.add(clntsock, clntaddr) poller.register(fd, flags=zmq.POLLIN) except KeyboardInterrupt: - pass + zpub.close() + zpull.close() + zctx.destroy() # type: ignore + tcpl.close() if __name__.endswith("__main__"):