From 15537c8be40f3ba25c5a51af75d9bed53a8a215d Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Fri, 15 Jul 2022 00:04:10 +0200 Subject: [PATCH] WIP converting wsgateway to multiprotocols --- loctrkd/beesure.py | 8 ++++ loctrkd/evstore.py | 4 +- loctrkd/wsgateway.py | 96 +++++++++++++++++++++++-------------------- loctrkd/zx303proto.py | 10 +++++ 4 files changed, 72 insertions(+), 46 deletions(-) diff --git a/loctrkd/beesure.py b/loctrkd/beesure.py index 369abc2..9346b55 100755 --- a/loctrkd/beesure.py +++ b/loctrkd/beesure.py @@ -26,6 +26,7 @@ __all__ = ( "Stream", "class_by_prefix", "enframe", + "exposed_protos", "inline_response", "proto_handled", "parse_message", @@ -565,3 +566,10 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> BeeSurePkt: retobj.proto = proto # Override class attr with object attr retobj.cause = cause return retobj + + +def exposed_protos() -> List[Tuple[str, bool]]: + return [ + (proto_name(UD), True), + (proto_name(UD2), False), + ] diff --git a/loctrkd/evstore.py b/loctrkd/evstore.py index 07b6dc4..da34cb9 100644 --- a/loctrkd/evstore.py +++ b/loctrkd/evstore.py @@ -56,7 +56,7 @@ def stow(**kwargs: Any) -> None: def fetch( imei: str, matchlist: List[Tuple[bool, str]], backlog: int -) -> List[Tuple[bool, float, bytes]]: +) -> List[Tuple[bool, float, str, bytes]]: # matchlist is a list of tuples (is_incoming, proto) # returns a list of tuples (is_incoming, timestamp, packet) assert DB is not None @@ -65,7 +65,7 @@ def fetch( ) cur = DB.cursor() cur.execute( - f"""select is_incoming, tstamp, packet from events + f"""select is_incoming, tstamp, proto, packet from events where ({selector}) and imei = ? order by tstamp desc limit ?""", tuple(item for sublist in matchlist for item in sublist) diff --git a/loctrkd/wsgateway.py b/loctrkd/wsgateway.py index 522cd59..8f2c648 100644 --- a/loctrkd/wsgateway.py +++ b/loctrkd/wsgateway.py @@ -2,6 +2,7 @@ from configparser import ConfigParser from datetime import datetime, timezone +from importlib import import_module from json import dumps, loads from logging import getLogger from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR @@ -22,30 +23,40 @@ import zmq from . import common from .evstore import initdb, fetch -from .zx303proto import ( - GPS_POSITIONING, - STATUS, - WIFI_POSITIONING, - parse_message, - proto_name, -) from .zmsg import Bcast, topic log = getLogger("loctrkd/wsgateway") + + +class ProtoModule: + @staticmethod + def parse_message(packet: bytes, is_incoming: bool = True) -> Any: + ... + + @staticmethod + def exposed_protos() -> List[Tuple[str, bool]]: + ... + + @staticmethod + def proto_handled(proto: str) -> bool: + ... + + htmlfile = None +pmods: List[ProtoModule] = [] +selector: List[Tuple[bool, str]] = [] def backlog(imei: str, numback: int) -> List[Dict[str, Any]]: result = [] - for is_incoming, timestamp, packet in fetch( + for is_incoming, timestamp, proto, packet in fetch( imei, - [ - (True, proto_name(GPS_POSITIONING)), - (False, proto_name(WIFI_POSITIONING)), - ], + selector, numback, ): - msg = parse_message(packet, is_incoming=is_incoming) + for pmod in pmods: + if pmod.proto_handled(proto): + msg = pmod.parse_message(packet, is_incoming=is_incoming) result.append( { "type": "location", @@ -58,7 +69,7 @@ def backlog(imei: str, numback: int) -> List[Dict[str, Any]]: "longitude": msg.longitude, "latitude": msg.latitude, "accuracy": "gps" - if isinstance(msg, GPS_POSITIONING) + if True # TODO isinstance(msg, GPS_POSITIONING) else "approximate", } ) @@ -260,8 +271,15 @@ class Clients: def runserver(conf: ConfigParser) -> None: - global htmlfile - + global htmlfile, pmods, selector + pmods = [ + cast(ProtoModule, import_module("." + modnm, __package__)) + for modnm in conf.get("collector", "protocols").split(",") + ] + for pmod in pmods: + for proto, is_incoming in pmod.exposed_protos(): + if proto != "ZX:STATUS": # TODO make it better + selector.append((is_incoming, proto)) initdb(conf.get("storage", "dbfn")) htmlfile = conf.get("wsgateway", "htmlfile", fallback=None) # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! @@ -283,32 +301,18 @@ def runserver(conf: ConfigParser) -> None: towait: Set[int] = set() while True: neededsubs = clients.subs() - for imei in neededsubs - activesubs: - zsub.setsockopt( - zmq.SUBSCRIBE, - topic(proto_name(GPS_POSITIONING), True, imei), - ) - zsub.setsockopt( - zmq.SUBSCRIBE, - topic(proto_name(WIFI_POSITIONING), False, imei), - ) - zsub.setsockopt( - zmq.SUBSCRIBE, - topic(proto_name(STATUS), True, imei), - ) - for imei in activesubs - neededsubs: - zsub.setsockopt( - zmq.UNSUBSCRIBE, - topic(proto_name(GPS_POSITIONING), True, imei), - ) - zsub.setsockopt( - zmq.UNSUBSCRIBE, - topic(proto_name(WIFI_POSITIONING), False, imei), - ) - zsub.setsockopt( - zmq.UNSUBSCRIBE, - topic(proto_name(STATUS), True, imei), - ) + for pmod in pmods: + for proto, is_incoming in pmod.exposed_protos(): + for imei in neededsubs - activesubs: + zsub.setsockopt( + zmq.SUBSCRIBE, + topic(proto, is_incoming, imei), + ) + for imei in activesubs - neededsubs: + zsub.setsockopt( + zmq.UNSUBSCRIBE, + topic(proto, is_incoming, imei), + ) activesubs = neededsubs log.debug("Subscribed to: %s", activesubs) tosend = [] @@ -321,9 +325,13 @@ def runserver(conf: ConfigParser) -> None: while True: try: zmsg = Bcast(zsub.recv(zmq.NOBLOCK)) - msg = parse_message(zmsg.packet, zmsg.is_incoming) + for pmod in pmods: + if pmod.proto_handled(zmsg.proto): + msg = pmod.parse_message( + zmsg.packet, zmsg.is_incoming + ) log.debug("Got %s with %s", zmsg, msg) - if isinstance(msg, STATUS): + if zmsg.proto == "ZX:STATUS": tosend.append( { "type": "status", diff --git a/loctrkd/zx303proto.py b/loctrkd/zx303proto.py index 4d57303..bd19e10 100755 --- a/loctrkd/zx303proto.py +++ b/loctrkd/zx303proto.py @@ -34,6 +34,8 @@ from typing import ( __all__ = ( "Stream", "class_by_prefix", + "enframe", + "exposed_protos", "inline_response", "proto_handled", "parse_message", @@ -946,3 +948,11 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt: retobj.PROTO = proto # Override class attr with object attr retobj.cause = cause return retobj + + +def exposed_protos() -> List[Tuple[str, bool]]: + return [ + (proto_name(GPS_POSITIONING), True), + (proto_name(WIFI_POSITIONING), False), + (proto_name(STATUS), True), + ] -- 2.43.0