]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/watch.py
Update man pages to use correct name
[loctrkd.git] / loctrkd / watch.py
index b2b5c063f25ccc6e8eebf0e6c2044afd0f18ed40..5338d03495564d25bd5d100769c7294dfae944d3 100644 (file)
@@ -8,44 +8,57 @@ from typing import Any, cast, List
 import zmq
 
 from . import common
-from .zmsg import Bcast
+from .protomodule import ProtoModule
+from .zmsg import Bcast, Rept
 
 log = getLogger("loctrkd/watch")
 
 
-class ProtoModule:
-    @staticmethod
-    def proto_handled(proto: str) -> bool:
-        ...
-
-    @staticmethod
-    def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
-        ...
-
-
-pmods: List[ProtoModule] = []
-
-
 def runserver(conf: ConfigParser) -> None:
-    global pmods
-    pmods = [
-        cast(ProtoModule, import_module("." + modnm, __package__))
-        for modnm in conf.get("collector", "protocols").split(",")
-    ]
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
-    zsub = zctx.socket(zmq.SUB)  # type: ignore
-    zsub.connect(conf.get("collector", "publishurl"))
-    zsub.setsockopt(zmq.SUBSCRIBE, b"")
+    zraw = zctx.socket(zmq.SUB)  # type: ignore
+    zraw.connect(conf.get("collector", "publishurl"))
+    zraw.setsockopt(zmq.SUBSCRIBE, b"")
+    zrep = zctx.socket(zmq.SUB)  # type: ignore
+    zrep.connect(conf.get("rectifier", "publishurl"))
+    zrep.setsockopt(zmq.SUBSCRIBE, b"")
+    poller = zmq.Poller()  # type: ignore
+    poller.register(zraw, flags=zmq.POLLIN)
+    poller.register(zrep, flags=zmq.POLLIN)
 
     try:
         while True:
-            zmsg = Bcast(zsub.recv())
-            print("I" if zmsg.is_incoming else "O", zmsg.proto, zmsg.imei)
-            for pmod in pmods:
-                if pmod.proto_handled(zmsg.proto):
-                    msg = pmod.parse_message(zmsg.packet, zmsg.is_incoming)
-                    print(msg)
+            events = poller.poll(1000)
+            for sk, fl in events:
+                if sk is zraw:
+                    while True:
+                        try:
+                            zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
+                        except zmq.Again:
+                            break
+                        print(
+                            "I" if zmsg.is_incoming else "O",
+                            zmsg.proto,
+                            zmsg.imei,
+                        )
+                        pmod = common.pmod_for_proto(zmsg.proto)
+                        if pmod is not None:
+                            msg = pmod.parse_message(
+                                zmsg.packet, zmsg.is_incoming
+                            )
+                            print(msg)
+                            if zmsg.is_incoming and hasattr(msg, "rectified"):
+                                print("Rectified:", msg.rectified())
+                elif sk is zrep:
+                    while True:
+                        try:
+                            rept = Rept(zrep.recv(zmq.NOBLOCK))
+                        except zmq.Again:
+                            break
+                        print(rept)
+                else:
+                    print("what is this socket?!", sk)
     except KeyboardInterrupt:
         pass