]> www.average.org Git - loctrkd.git/blob - loctrkd/watch.py
Update changelog for 2.00 release
[loctrkd.git] / loctrkd / watch.py
1 """ Watch for locevt and print them """
2
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from importlib import import_module
6 from logging import getLogger
7 from typing import Any, cast, List
8 import zmq
9
10 from . import common
11 from .protomodule import ProtoModule
12 from .zmsg import Bcast, Rept
13
14 log = getLogger("loctrkd/watch")
15
16
17 def runserver(conf: ConfigParser) -> None:
18     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
19     zctx = zmq.Context()  # type: ignore
20     zraw = zctx.socket(zmq.SUB)  # type: ignore
21     zraw.connect(conf.get("collector", "publishurl"))
22     zraw.setsockopt(zmq.SUBSCRIBE, b"")
23     zrep = zctx.socket(zmq.SUB)  # type: ignore
24     zrep.connect(conf.get("rectifier", "publishurl"))
25     zrep.setsockopt(zmq.SUBSCRIBE, b"")
26     poller = zmq.Poller()  # type: ignore
27     poller.register(zraw, flags=zmq.POLLIN)
28     poller.register(zrep, flags=zmq.POLLIN)
29
30     try:
31         while True:
32             events = poller.poll(1000)
33             for sk, fl in events:
34                 if sk is zraw:
35                     while True:
36                         try:
37                             zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
38                         except zmq.Again:
39                             break
40                         print(
41                             "I" if zmsg.is_incoming else "O",
42                             zmsg.proto,
43                             zmsg.imei,
44                         )
45                         pmod = common.pmod_for_proto(zmsg.proto)
46                         if pmod is not None:
47                             msg = pmod.parse_message(
48                                 zmsg.packet, zmsg.is_incoming
49                             )
50                             print(msg)
51                             if zmsg.is_incoming and hasattr(msg, "rectified"):
52                                 print("Rectified:", msg.rectified())
53                 elif sk is zrep:
54                     while True:
55                         try:
56                             rept = Rept(zrep.recv(zmq.NOBLOCK))
57                         except zmq.Again:
58                             break
59                         print(rept)
60                 else:
61                     print("what is this socket?!", sk)
62     except KeyboardInterrupt:
63         pass
64
65
66 if __name__.endswith("__main__"):
67     runserver(common.init(log))