X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=loctrkd%2Fstorage.py;h=843991502772c69693e77d34c5eba6737ebb2681;hb=HEAD;hp=473806e2154c36b2f9c445b35dba818e0441564c;hpb=dbdf9d63af31770ad57302e16b17a2fdc526773f;p=loctrkd.git diff --git a/loctrkd/storage.py b/loctrkd/storage.py index 473806e..8439915 100644 --- a/loctrkd/storage.py +++ b/loctrkd/storage.py @@ -2,48 +2,83 @@ from configparser import ConfigParser from datetime import datetime, timezone +from json import loads from logging import getLogger import zmq from . import common -from .evstore import initdb, stow -from .zx303proto import proto_of_message -from .zmsg import Bcast +from .evstore import initdb, stow, stowloc, stowpmod +from .zmsg import Bcast, Rept log = getLogger("loctrkd/storage") def runserver(conf: ConfigParser) -> None: + stowevents = conf.getboolean("storage", "events", fallback=False) dbname = conf.get("storage", "dbfn") log.info('Using Sqlite3 database "%s"', dbname) initdb(dbname) # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! zctx = zmq.Context() # type: ignore - zsub = zctx.socket(zmq.SUB) # type: ignore - zsub.connect(conf.get("collector", "publishurl")) - zsub.setsockopt(zmq.SUBSCRIBE, b"") + zraw = zctx.socket(zmq.SUB) # type: ignore + zraw.connect(conf.get("collector", "publishurl")) + zraw.setsockopt(zmq.SUBSCRIBE, b"") + zrep = zctx.socket(zmq.SUB) # type: ignore + zrep.connect(conf.get("rectifier", "publishurl")) + zrep.setsockopt(zmq.SUBSCRIBE, b"") + poller = zmq.Poller() # type: ignore + poller.register(zraw, flags=zmq.POLLIN) + poller.register(zrep, flags=zmq.POLLIN) try: while True: - zmsg = Bcast(zsub.recv()) - log.debug( - "%s IMEI %s from %s at %s: %s", - "I" if zmsg.is_incoming else "O", - zmsg.imei, - zmsg.peeraddr, - datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), - zmsg.packet.hex(), - ) - stow( - is_incoming=zmsg.is_incoming, - peeraddr=str(zmsg.peeraddr), - when=zmsg.when, - imei=zmsg.imei, - proto=proto_of_message(zmsg.packet), - packet=zmsg.packet, - ) + events = poller.poll(1000) + for sk, fl in events: + if sk is zraw: + while True: + try: + zmsg = Bcast(zraw.recv(zmq.NOBLOCK)) + except zmq.Again: + break + log.debug( + "%s IMEI %s from %s at %s %s: %s", + "I" if zmsg.is_incoming else "O", + zmsg.imei, + zmsg.peeraddr, + zmsg.pmod, + datetime.fromtimestamp(zmsg.when).astimezone( + tz=timezone.utc + ), + zmsg.packet.hex(), + ) + if zmsg.imei is not None and zmsg.pmod is not None: + stowpmod(zmsg.imei, zmsg.pmod) + if stowevents: + stow( + is_incoming=zmsg.is_incoming, + peeraddr=str(zmsg.peeraddr), + when=zmsg.when, + imei=zmsg.imei, + proto=zmsg.proto, + packet=zmsg.packet, + ) + elif sk is zrep: + while True: + try: + rept = Rept(zrep.recv(zmq.NOBLOCK)) + except zmq.Again: + break + data = loads(rept.payload) + log.debug("R IMEI %s %s", rept.imei, data) + if data.pop("type") == "location": + data["imei"] = rept.imei + stowloc(**data) + + else: + log.error("Event %s on unknown socket %s", fl, sk) except KeyboardInterrupt: - zsub.close() + zrep.close() + zraw.close() zctx.destroy() # type: ignore