From c3bc6d5bbdc0d0bf10e338c6e3bad1a519d5afa0 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Thu, 7 Jul 2022 23:28:38 +0200 Subject: [PATCH] Multiprotocol support in zmq messages and storage --- debian/gps303.conf | 2 +- gps303/collector.py | 2 +- gps303/evstore.py | 6 ++--- gps303/googlemaps.py | 2 +- gps303/gps303proto.py | 25 ++++++++++++++++----- gps303/lookaside.py | 4 ++-- gps303/mkgpx.py | 2 +- gps303/opencellid.py | 2 +- gps303/termconfig.py | 6 ++--- gps303/wsgateway.py | 18 +++++++++------ gps303/zmsg.py | 51 ++++++++++++++++++++----------------------- 11 files changed, 68 insertions(+), 52 deletions(-) diff --git a/debian/gps303.conf b/debian/gps303.conf index 8bfff8e..88bf74a 100644 --- a/debian/gps303.conf +++ b/debian/gps303.conf @@ -12,7 +12,7 @@ port = 5049 htmlfile = /var/lib/gps303/index.html [storage] -dbfn = /var/lib/gps303/gps303.sqlite +dbfn = /var/lib/gps303/trkloc.sqlite [lookaside] # "opencellid" and "googlemaps" can be here. Both require an access token, diff --git a/gps303/collector.py b/gps303/collector.py index 4fc946c..cb45d81 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -58,7 +58,7 @@ class ProtoModule: ... @staticmethod - def proto_of_message(packet: bytes) -> int: + def proto_of_message(packet: bytes) -> str: ... @staticmethod diff --git a/gps303/evstore.py b/gps303/evstore.py index 2b23e8e..07b6dc4 100644 --- a/gps303/evstore.py +++ b/gps303/evstore.py @@ -12,7 +12,7 @@ SCHEMA = """create table if not exists events ( imei text, peeraddr text not null, is_incoming int not null default TRUE, - proto int not null, + proto text not null, packet blob )""" @@ -38,7 +38,7 @@ def stow(**kwargs: Any) -> None: ("peeraddr", None), ("when", 0.0), ("imei", None), - ("proto", -1), + ("proto", "UNKNOWN"), ("packet", b""), ) } @@ -55,7 +55,7 @@ def stow(**kwargs: Any) -> None: def fetch( - imei: str, matchlist: List[Tuple[bool, int]], backlog: int + imei: str, matchlist: List[Tuple[bool, str]], backlog: int ) -> List[Tuple[bool, float, bytes]]: # matchlist is a list of tuples (is_incoming, proto) # returns a list of tuples (is_incoming, timestamp, packet) diff --git a/gps303/googlemaps.py b/gps303/googlemaps.py index d4deea2..dac6079 100644 --- a/gps303/googlemaps.py +++ b/gps303/googlemaps.py @@ -58,7 +58,7 @@ if __name__.endswith("__main__"): c.execute( """select tstamp, packet from events where proto in (?, ?)""", - (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO), + (proto_name(WIFI_POSITIONING), proto_name(WIFI_OFFLINE_POSITIONING)), ) init({"googlemaps": {"accesstoken": sys.argv[2]}}) count = 0 diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index e3776c3..efb02d2 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -38,6 +38,7 @@ __all__ = ( "parse_message", "probe_buffer", "proto_by_name", + "proto_name", "DecodeError", "Respond", "GPS303Pkt", @@ -80,6 +81,8 @@ __all__ = ( "UNKNOWN_B3", ) +PROTO_PREFIX = "ZX" + ### Deframer ### MAXBUFFER: int = 4096 @@ -872,16 +875,28 @@ def class_by_prefix( return CLASSES[proto] +def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str: + return ( + PROTO_PREFIX + + ":" + + ( + obj.__class__.__name__ + if isinstance(obj, GPS303Pkt) + else obj.__name__ + ) + ).ljust(16, "\0")[:16] + + def proto_by_name(name: str) -> int: return PROTOS.get(name, -1) -def proto_of_message(packet: bytes) -> int: - return packet[1] +def proto_of_message(packet: bytes) -> str: + return proto_name(CLASSES.get(packet[1], UNKNOWN)) def imei_from_packet(packet: bytes) -> Optional[str]: - if proto_of_message(packet) == LOGIN.PROTO: + if packet[1] == LOGIN.PROTO: msg = parse_message(packet) if isinstance(msg, LOGIN): return msg.imei @@ -889,11 +904,11 @@ def imei_from_packet(packet: bytes) -> Optional[str]: def is_goodbye_packet(packet: bytes) -> bool: - return proto_of_message(packet) == HIBERNATION.PROTO + return packet[1] == HIBERNATION.PROTO def inline_response(packet: bytes) -> Optional[bytes]: - proto = proto_of_message(packet) + proto = packet[1] if proto in CLASSES: cls = CLASSES[proto] if cls.RESPOND is Respond.INL: diff --git a/gps303/lookaside.py b/gps303/lookaside.py index 1d21489..0c1e4cd 100644 --- a/gps303/lookaside.py +++ b/gps303/lookaside.py @@ -9,7 +9,7 @@ from struct import pack import zmq from . import common -from .gps303proto import parse_message, WIFI_POSITIONING +from .gps303proto import parse_message, proto_name, WIFI_POSITIONING from .zmsg import Bcast, Resp, topic log = getLogger("gps303/lookaside") @@ -22,7 +22,7 @@ def runserver(conf: ConfigParser) -> None: zctx = zmq.Context() # type: ignore zsub = zctx.socket(zmq.SUB) # type: ignore zsub.connect(conf.get("collector", "publishurl")) - zsub.setsockopt(zmq.SUBSCRIBE, topic(WIFI_POSITIONING.PROTO)) + zsub.setsockopt(zmq.SUBSCRIBE, topic(proto_name(WIFI_POSITIONING))) zpush = zctx.socket(zmq.PUSH) # type: ignore zpush.connect(conf.get("collector", "listenurl")) diff --git a/gps303/mkgpx.py b/gps303/mkgpx.py index ef63e04..456b94f 100644 --- a/gps303/mkgpx.py +++ b/gps303/mkgpx.py @@ -18,7 +18,7 @@ c.execute( and ((is_incoming = false and proto = ?) or (is_incoming = true and proto = ?)) order by tstamp""", - (sys.argv[2], WIFI_POSITIONING.PROTO, GPS_POSITIONING.PROTO), + (sys.argv[2], proto_name(WIFI_POSITIONING), proto_name(GPS_POSITIONING)), ) print( diff --git a/gps303/opencellid.py b/gps303/opencellid.py index 90a8a74..7b6e413 100644 --- a/gps303/opencellid.py +++ b/gps303/opencellid.py @@ -64,7 +64,7 @@ if __name__.endswith("__main__"): c.execute( """select tstamp, packet from events where proto in (?, ?)""", - (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO), + (proto_name(WIFI_POSITIONING), proto_name(WIFI_OFFLINE_POSITIONING)), ) init({"opencellid": {"dbfn": sys.argv[2]}}) for timestamp, packet in c: diff --git a/gps303/termconfig.py b/gps303/termconfig.py index 723e8f6..6ec43fd 100644 --- a/gps303/termconfig.py +++ b/gps303/termconfig.py @@ -19,9 +19,9 @@ def runserver(conf: ConfigParser) -> None: zsub = zctx.socket(zmq.SUB) # type: ignore zsub.connect(conf.get("collector", "publishurl")) for proto in ( - STATUS.PROTO, - SETUP.PROTO, - POSITION_UPLOAD_INTERVAL.PROTO, + proto_name(STATUS), + proto_name(SETUP), + proto_name(POSITION_UPLOAD_INTERVAL), ): zsub.setsockopt(zmq.SUBSCRIBE, topic(proto)) zpush = zctx.socket(zmq.PUSH) # type: ignore diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index 381a855..80fdac7 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -27,6 +27,7 @@ from .gps303proto import ( STATUS, WIFI_POSITIONING, parse_message, + proto_name, ) from .zmsg import Bcast, topic @@ -38,7 +39,10 @@ def backlog(imei: str, numback: int) -> List[Dict[str, Any]]: result = [] for is_incoming, timestamp, packet in fetch( imei, - [(True, GPS_POSITIONING.PROTO), (False, WIFI_POSITIONING.PROTO)], + [ + (True, proto_name(GPS_POSITIONING)), + (False, proto_name(WIFI_POSITIONING)), + ], numback, ): msg = parse_message(packet, is_incoming=is_incoming) @@ -282,28 +286,28 @@ def runserver(conf: ConfigParser) -> None: for imei in neededsubs - activesubs: zsub.setsockopt( zmq.SUBSCRIBE, - topic(GPS_POSITIONING.PROTO, True, imei), + topic(proto_name(GPS_POSITIONING), True, imei), ) zsub.setsockopt( zmq.SUBSCRIBE, - topic(WIFI_POSITIONING.PROTO, False, imei), + topic(proto_name(WIFI_POSITIONING), False, imei), ) zsub.setsockopt( zmq.SUBSCRIBE, - topic(STATUS.PROTO, True, imei), + topic(proto_name(STATUS), True, imei), ) for imei in activesubs - neededsubs: zsub.setsockopt( zmq.UNSUBSCRIBE, - topic(GPS_POSITIONING.PROTO, True, imei), + topic(proto_name(GPS_POSITIONING), True, imei), ) zsub.setsockopt( zmq.UNSUBSCRIBE, - topic(WIFI_POSITIONING.PROTO, False, imei), + topic(proto_name(WIFI_POSITIONING), False, imei), ) zsub.setsockopt( zmq.UNSUBSCRIBE, - topic(STATUS.PROTO, True, imei), + topic(proto_name(STATUS), True, imei), ) activesubs = neededsubs log.debug("Subscribed to: %s", activesubs) diff --git a/gps303/zmsg.py b/gps303/zmsg.py index 2d497c0..b6faa70 100644 --- a/gps303/zmsg.py +++ b/gps303/zmsg.py @@ -7,7 +7,7 @@ from typing import Any, cast, Optional, Tuple, Type, Union __all__ = "Bcast", "Resp", "topic" -def pack_peer( +def pack_peer( # 18 bytes peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]] ) -> bytes: if peeraddr is None: @@ -93,9 +93,9 @@ class _Zmsg: def topic( - proto: int, is_incoming: bool = True, imei: Optional[str] = None + proto: str, is_incoming: bool = True, imei: Optional[str] = None ) -> bytes: - return pack("BB", is_incoming, proto) + ( + return pack("B16s", is_incoming, proto.encode()) + ( b"" if imei is None else pack("16s", imei.encode()) ) @@ -105,7 +105,7 @@ class Bcast(_Zmsg): KWARGS = ( ("is_incoming", True), - ("proto", 256), + ("proto", "UNKNOWN"), ("imei", None), ("when", None), ("peeraddr", None), @@ -116,31 +116,28 @@ class Bcast(_Zmsg): def packed(self) -> bytes: return ( pack( - "BB16s", + "!B16s16sd", int(self.is_incoming), - self.proto, + self.proto[:16].ljust(16, "\0").encode(), b"0000000000000000" if self.imei is None else self.imei.encode(), - ) - + ( - b"\0\0\0\0\0\0\0\0" - if self.when is None - else pack("!d", self.when) + 0 if self.when is None else self.when, ) + pack_peer(self.peeraddr) + self.packet ) def decode(self, buffer: bytes) -> None: - self.is_incoming = bool(buffer[0]) - self.proto = buffer[1] - self.imei: Optional[str] = buffer[2:18].decode() - if self.imei == "0000000000000000": - self.imei = None - self.when = unpack("!d", buffer[18:26])[0] - self.peeraddr = unpack_peer(buffer[26:44]) - self.packet = buffer[44:] + is_incoming, proto, imei, when = unpack("!B16s16sd", buffer[:41]) + self.is_incoming = bool(is_incoming) + self.proto = proto.decode() + self.imei = ( + None if imei == b"0000000000000000" else imei.decode().strip("\0") + ) + self.when = when + self.peeraddr = unpack_peer(buffer[41:59]) + self.packet = buffer[59:] class Resp(_Zmsg): @@ -152,20 +149,20 @@ class Resp(_Zmsg): def packed(self) -> bytes: return ( pack( - "16s", + "!16sd", "0000000000000000" if self.imei is None else self.imei.encode(), - ) - + ( - b"\0\0\0\0\0\0\0\0" - if self.when is None - else pack("!d", self.when) + 0 if self.when is None else self.when, ) + self.packet ) def decode(self, buffer: bytes) -> None: - self.imei = buffer[:16].decode() - self.when = unpack("!d", buffer[16:24])[0] + imei, when = unpack("!16sd", buffer[:24]) + self.imei = ( + None if imei == b"0000000000000000" else imei.decode().strip("\0") + ) + + self.when = when self.packet = buffer[24:] -- 2.43.0