From ca67cd29fc86054f08bcbf4995d484bab77a4e60 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Wed, 3 Aug 2022 00:37:06 +0200 Subject: [PATCH] wsgateway: switch to the use of cooked reports --- loctrkd/evstore.py | 40 +++++++---------- loctrkd/wsgateway.py | 104 +++++++------------------------------------ loctrkd/zmsg.py | 6 ++- 3 files changed, 38 insertions(+), 112 deletions(-) diff --git a/loctrkd/evstore.py b/loctrkd/evstore.py index b026505..85e8c9d 100644 --- a/loctrkd/evstore.py +++ b/loctrkd/evstore.py @@ -1,8 +1,8 @@ """ sqlite event store """ from datetime import datetime -from json import dumps -from sqlite3 import connect, OperationalError +from json import dumps, loads +from sqlite3 import connect, OperationalError, Row from typing import Any, Dict, List, Tuple __all__ = "fetch", "initdb", "stow", "stowloc" @@ -32,14 +32,9 @@ SCHEMA = ( def initdb(dbname: str) -> None: global DB DB = connect(dbname) - try: - DB.execute( - """alter table events add column - is_incoming int not null default TRUE""" - ) - except OperationalError: - for stmt in SCHEMA: - DB.execute(stmt) + DB.row_factory = Row + for stmt in SCHEMA: + DB.execute(stmt) def stow(**kwargs: Any) -> None: @@ -91,23 +86,20 @@ def stowloc(**kwargs: Dict[str, Any]) -> None: DB.commit() -def fetch( - imei: str, matchlist: List[Tuple[bool, str]], backlog: int -) -> List[Tuple[bool, float, str, bytes]]: - # matchlist is a list of tuples (is_incoming, proto) - # returns a list of tuples (is_incoming, timestamp, packet) +def fetch(imei: str, backlog: int) -> List[Dict[str, Any]]: assert DB is not None - selector = " or ".join( - (f"(is_incoming = ? and proto = ?)" for _ in range(len(matchlist))) - ) cur = DB.cursor() cur.execute( - f"""select is_incoming, tstamp, proto, packet from events - where ({selector}) and imei = ? - order by tstamp desc limit ?""", - tuple(item for sublist in matchlist for item in sublist) - + (imei, backlog), + """select imei, devtime, accuracy, latitude, longitude, remainder + from reports where imei = ? + order by devtime desc limit ?""", + (imei, backlog), ) - result = list(cur) + result = [] + for row in cur: + dic = dict(row) + remainder = loads(dic.pop("remainder")) + dic.update(remainder) + result.append(dic) cur.close() return list(reversed(result)) diff --git a/loctrkd/wsgateway.py b/loctrkd/wsgateway.py index b6d10e8..e568584 100644 --- a/loctrkd/wsgateway.py +++ b/loctrkd/wsgateway.py @@ -24,42 +24,20 @@ import zmq from . import common from .evstore import initdb, fetch from .protomodule import ProtoModule -from .zmsg import Bcast, topic +from .zmsg import Rept, rtopic log = getLogger("loctrkd/wsgateway") - htmlfile = None -pmods: List[ProtoModule] = [] -selector: List[Tuple[bool, str]] = [] def backlog(imei: str, numback: int) -> List[Dict[str, Any]]: result = [] - for is_incoming, timestamp, proto, packet in fetch( - imei, - selector, - numback, - ): - for pmod in pmods: - if pmod.proto_handled(proto): - msg = pmod.parse_message(packet, is_incoming=is_incoming) - result.append( - { - "type": "location", - "imei": imei, - "timestamp": str( - datetime.fromtimestamp(timestamp).astimezone( - tz=timezone.utc - ) - ), - "longitude": msg.longitude, - "latitude": msg.latitude, - "accuracy": "gps" - if True # TODO isinstance(msg, GPS_POSITIONING) - else "approximate", - } - ) + for report in fetch(imei, numback): + report["type"] = "location" + timestamp = report.pop("devtime") + report["timestamp"] = timestamp + result.append(report) return result @@ -258,21 +236,13 @@ class Clients: def runserver(conf: ConfigParser) -> None: - global htmlfile, pmods, selector - pmods = [ - cast(ProtoModule, import_module("." + modnm, __package__)) - for modnm in conf.get("common", "protocols").split(",") - ] - for pmod in pmods: - for proto, is_incoming in pmod.exposed_protos(): - if proto != "ZX:STATUS": # TODO make it better - selector.append((is_incoming, proto)) + global htmlfile initdb(conf.get("storage", "dbfn")) htmlfile = conf.get("wsgateway", "htmlfile", fallback=None) # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! zctx = zmq.Context() # type: ignore zsub = zctx.socket(zmq.SUB) # type: ignore - zsub.connect(conf.get("collector", "publishurl")) + zsub.connect(conf.get("rectifier", "publishurl")) tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setblocking(False) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) @@ -288,18 +258,10 @@ def runserver(conf: ConfigParser) -> None: towait: Set[int] = set() while True: neededsubs = clients.subs() - for pmod in pmods: - for proto, is_incoming in pmod.exposed_protos(): - for imei in neededsubs - activesubs: - zsub.setsockopt( - zmq.SUBSCRIBE, - topic(proto, is_incoming, imei), - ) - for imei in activesubs - neededsubs: - zsub.setsockopt( - zmq.UNSUBSCRIBE, - topic(proto, is_incoming, imei), - ) + for imei in neededsubs - activesubs: + zsub.setsockopt(zmq.SUBSCRIBE, rtopic(imei)) + for imei in activesubs - neededsubs: + zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei)) activesubs = neededsubs log.debug("Subscribed to: %s", activesubs) tosend = [] @@ -311,43 +273,11 @@ def runserver(conf: ConfigParser) -> None: if sk is zsub: while True: try: - zmsg = Bcast(zsub.recv(zmq.NOBLOCK)) - for pmod in pmods: - if pmod.proto_handled(zmsg.proto): - msg = pmod.parse_message( - zmsg.packet, zmsg.is_incoming - ) - log.debug("Got %s with %s", zmsg, msg) - if zmsg.proto == "ZX:STATUS": - tosend.append( - { - "type": "status", - "imei": zmsg.imei, - "timestamp": str( - datetime.fromtimestamp( - zmsg.when - ).astimezone(tz=timezone.utc) - ), - "battery": msg.batt, - } - ) - else: - tosend.append( - { - "type": "location", - "imei": zmsg.imei, - "timestamp": str( - datetime.fromtimestamp( - zmsg.when - ).astimezone(tz=timezone.utc) - ), - "longitude": msg.longitude, - "latitude": msg.latitude, - "accuracy": "gps" - if zmsg.is_incoming - else "approximate", - } - ) + zmsg = Rept(zsub.recv(zmq.NOBLOCK)) + msg = loads(zmsg.payload) + msg["imei"] = zmsg.imei + log.debug("Got %s, sending %s", zmsg, msg) + tosend.append(msg) except zmq.Again: break elif sk == tcpfd: diff --git a/loctrkd/zmsg.py b/loctrkd/zmsg.py index 4da88d2..da0dc77 100644 --- a/loctrkd/zmsg.py +++ b/loctrkd/zmsg.py @@ -4,7 +4,7 @@ import ipaddress as ip from struct import pack, unpack from typing import Any, cast, Optional, Tuple, Type, Union -__all__ = "Bcast", "Resp", "topic" +__all__ = "Bcast", "Resp", "topic", "rtopic" def pack_peer( # 18 bytes @@ -100,6 +100,10 @@ def topic( ) +def rtopic(imei: str) -> bytes: + return pack("16s", imei.encode()) + + class Bcast(_Zmsg): """Zmq message to broadcast what was received from the terminal""" -- 2.43.0