From f24ba18e34cbfe34bf648459f0e80bd874334fe4 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Fri, 29 Jul 2022 23:24:01 +0200 Subject: [PATCH] watch: watch for both raw and rectified feeds --- loctrkd/common.py | 4 +++- loctrkd/rectifier.py | 24 ++++++++++---------- loctrkd/watch.py | 52 ++++++++++++++++++++++++++++++++++---------- 3 files changed, 56 insertions(+), 24 deletions(-) diff --git a/loctrkd/common.py b/loctrkd/common.py index 1257a72..81f8bb9 100644 --- a/loctrkd/common.py +++ b/loctrkd/common.py @@ -87,7 +87,9 @@ class Report: return ( self.__class__.__name__ + "(" - + ", ".join([f"{k}={v}" for k, v in self.__dict__.items()]) + + ", ".join( + [f"{k}={v.__repr__()}" for k, v in self.__dict__.items()] + ) + ")" ) diff --git a/loctrkd/rectifier.py b/loctrkd/rectifier.py index 47d9d05..1da5752 100644 --- a/loctrkd/rectifier.py +++ b/loctrkd/rectifier.py @@ -51,7 +51,7 @@ def runserver(conf: ConfigParser) -> None: zpush = zctx.socket(zmq.PUSH) # type: ignore zpush.connect(conf.get("collector", "listenurl")) zpub = zctx.socket(zmq.PUB) # type: ignore - zpub.connect(conf.get("rectifier", "publishurl")) + zpub.bind(conf.get("rectifier", "publishurl")) try: while True: @@ -89,19 +89,21 @@ def runserver(conf: ConfigParser) -> None: ) log.debug("Sending reponse %s", resp) zpush.send(resp.packed) + rept = CoordReport( + devtime=rect.devtime, + battery_percentage=rect.battery_percentage, + accuracy=-1, + altitude=-1, + speed=-1, + direction=-1, + latitude=lat, + longitude=lon, + ) + log.debug("Sending report %s", rept) zpub.send( Rept( imei=zmsg.imei, - payload=CoordReport( - devtime=rect.devtime, - battery_percentage=rect.battery_percentage, - accuracy=-1, - altitude=-1, - speed=-1, - direction=-1, - latitude=lat, - longitude=lon, - ).json, + payload=rept.json, ).packed ) except Exception as e: diff --git a/loctrkd/watch.py b/loctrkd/watch.py index bda952c..5338d03 100644 --- a/loctrkd/watch.py +++ b/loctrkd/watch.py @@ -9,7 +9,7 @@ import zmq from . import common from .protomodule import ProtoModule -from .zmsg import Bcast +from .zmsg import Bcast, Rept log = getLogger("loctrkd/watch") @@ -17,20 +17,48 @@ log = getLogger("loctrkd/watch") def runserver(conf: ConfigParser) -> None: # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! zctx = zmq.Context() # type: ignore - zsub = zctx.socket(zmq.SUB) # type: ignore - zsub.connect(conf.get("collector", "publishurl")) - zsub.setsockopt(zmq.SUBSCRIBE, b"") + zraw = zctx.socket(zmq.SUB) # type: ignore + zraw.connect(conf.get("collector", "publishurl")) + zraw.setsockopt(zmq.SUBSCRIBE, b"") + zrep = zctx.socket(zmq.SUB) # type: ignore + zrep.connect(conf.get("rectifier", "publishurl")) + zrep.setsockopt(zmq.SUBSCRIBE, b"") + poller = zmq.Poller() # type: ignore + poller.register(zraw, flags=zmq.POLLIN) + poller.register(zrep, flags=zmq.POLLIN) try: while True: - zmsg = Bcast(zsub.recv()) - print("I" if zmsg.is_incoming else "O", zmsg.proto, zmsg.imei) - pmod = common.pmod_for_proto(zmsg.proto) - if pmod is not None: - msg = pmod.parse_message(zmsg.packet, zmsg.is_incoming) - print(msg) - if zmsg.is_incoming and hasattr(msg, "rectified"): - print(msg.rectified()) + events = poller.poll(1000) + for sk, fl in events: + if sk is zraw: + while True: + try: + zmsg = Bcast(zraw.recv(zmq.NOBLOCK)) + except zmq.Again: + break + print( + "I" if zmsg.is_incoming else "O", + zmsg.proto, + zmsg.imei, + ) + pmod = common.pmod_for_proto(zmsg.proto) + if pmod is not None: + msg = pmod.parse_message( + zmsg.packet, zmsg.is_incoming + ) + print(msg) + if zmsg.is_incoming and hasattr(msg, "rectified"): + print("Rectified:", msg.rectified()) + elif sk is zrep: + while True: + try: + rept = Rept(zrep.recv(zmq.NOBLOCK)) + except zmq.Again: + break + print(rept) + else: + print("what is this socket?!", sk) except KeyboardInterrupt: pass -- 2.43.0