From: Eugene Crosser Date: Fri, 6 May 2022 23:50:49 +0000 (+0200) Subject: WIP on baclog fetcher X-Git-Tag: 0.01~3 X-Git-Url: http://www.average.org/gitweb/?p=loctrkd.git;a=commitdiff_plain;h=96538346bd332d76d2cac5d6a0ef2b4e4a40de30 WIP on baclog fetcher --- diff --git a/gps303/backlog.py b/gps303/backlog.py new file mode 100644 index 0000000..39dfcbc --- /dev/null +++ b/gps303/backlog.py @@ -0,0 +1,26 @@ +""" Get backlog from evstore """ + +from .opencellid import qry_cell +from .evstore import initdb, fetch +from .gps303proto import GPS_POSITIONING, WIFI_POSITIONING, parse_message +from .zmsg import LocEvt + +OCDB = None + +def blinit(evdb, ocdb): + global OCDB + OCDB = ocdb + initdb(evdb) + +def backlog(imei, backlog): + result = [] + for packet in fetch(imei, (GPS_POSITIONING.PROTO, WIFI_POSITIONING.PROTO), backlog): + msg = parse_message(packet) + if isinstance(msg, GPS_POSITIONING): + result.append(LocEvt(devtime=msg.devtime, lon=msg.longitude, + lat=msg.latitude, is_gps=True, imei=imei)) + elif isinstance(msg, WIFI_POSITIONING): + lat, lon = qry_cell(OCDB, msg.mcc, msg.gsm_cells) + result.append(LocEvt(devtime=msg.devtime, lon=lon, + lat=lat, is_gps=False, imei=imei)) + return reversed(result) diff --git a/gps303/evstore.py b/gps303/evstore.py index 70a3ac6..c10ecb1 100644 --- a/gps303/evstore.py +++ b/gps303/evstore.py @@ -2,7 +2,7 @@ from sqlite3 import connect -__all__ = "initdb", "stow" +__all__ = "fetch", "initdb", "stow" DB = None @@ -43,3 +43,15 @@ def stow(**kwargs): parms, ) DB.commit() + +def fetch(imei, protos, backlog): + assert DB is not None + protosel = ", ".join(["?" for _ in range(len(protos))]) + cur = DB.cursor() + cur.execute(f"""select packet from events + where proto in ({protosel}) and imei = ? + order by tstamp desc limit ?""", + protos + (imei, backlog)) + result = [row[0] for row in cur] + cur.close() + return result diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index f9f5c6a..7756fe9 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -17,6 +17,7 @@ from wsproto.utilities import RemoteProtocolError import zmq from . import common +from .backlog import blinit, backlog from .zmsg import LocEvt log = getLogger("gps303/wsgateway") @@ -196,14 +197,7 @@ class Clients: def recv(self, fd): clnt = self.by_fd[fd] - msgs = clnt.recv() - if msgs is None: - return None - result = [] - for msg in msgs: - log.debug("Received: %s", msg) - result.append(msg) - return result + return clnt.recv() def send(self, msg): towrite = set() @@ -230,6 +224,7 @@ class Clients: def runserver(conf): global htmlfile + blinit(conf.get("storage", "dbfn"), conf.get("opencellid", "dbfn")) htmlfile = conf.get("wsgateway", "htmlfile") zctx = zmq.Context() zsub = zctx.socket(zmq.SUB) @@ -280,6 +275,10 @@ def runserver(conf): else: for msg in received: log.debug("Received from %d: %s", sk, msg) + if msg.get("type", None) == "subscribe": + imei = msg.get("imei") + if imei: + tosend.extend(backlog(imei[0], 5)) towrite.add(sk) elif fl & zmq.POLLOUT: log.debug("Write now open for fd %d", sk)