]> www.average.org Git - loctrkd.git/blobdiff - gps303/collector.py
the whole shebang is working now
[loctrkd.git] / gps303 / collector.py
index be1bdeccc609c35b85bea167ceda15fed65bb6b9..921833c083be0ac8aecda838a338a4f72b5f818c 100644 (file)
@@ -32,7 +32,6 @@ class Client:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
         self.buffer = b""
-        self.imei = None
 
     def recv(self):
         """Read from the socket and parse complete messages"""
@@ -127,21 +126,23 @@ class Clients:
     def response(self, resp):
         if resp.imei in self.by_imei:
             self.by_imei[resp.imei].send(resp.packet)
+        else:
+            log.info("Not connected (IMEI %s)", resp.imei)
 
 
 def runserver(conf):
     zctx = zmq.Context()
     zpub = zctx.socket(zmq.PUB)
     zpub.bind(conf.get("collector", "publishurl"))
-    zsub = zctx.socket(zmq.SUB)
-    zsub.connect(conf.get("collector", "listenurl"))
+    zpull = zctx.socket(zmq.PULL)
+    zpull.bind(conf.get("collector", "listenurl"))
     tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
     tcpl.bind(("", conf.getint("collector", "port")))
     tcpl.listen(5)
     tcpfd = tcpl.fileno()
     poller = zmq.Poller()
-    poller.register(zsub, flags=zmq.POLLIN)
+    poller.register(zpull, flags=zmq.POLLIN)
     poller.register(tcpfd, flags=zmq.POLLIN)
     clients = Clients()
     try:
@@ -149,19 +150,19 @@ def runserver(conf):
             tosend = []
             topoll = []
             tostop = []
-            events = poller.poll(10)
+            events = poller.poll(1000)
             for sk, fl in events:
-                if sk is zsub:
+                if sk is zpull:
                     while True:
                         try:
-                            msg = zsub.recv(zmq.NOBLOCK)
+                            msg = zpull.recv(zmq.NOBLOCK)
                             tosend.append(Resp(msg))
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
                     clntsock, clntaddr = tcpl.accept()
                     topoll.append((clntsock, clntaddr))
-                else:
+                elif fl & zmq.POLLIN:
                     received = clients.recv(sk)
                     if received is None:
                         log.debug(
@@ -192,15 +193,18 @@ def runserver(conf):
                                 clients.response(
                                     Resp(imei=imei, packet=respmsg)
                                 )
+                else:
+                    log.debug("Stray event: %s on socket %s", fl, sk)
             # poll queue consumed, make changes now
             for fd in tostop:
                 poller.unregister(fd)
                 clients.stop(fd)
             for zmsg in tosend:
+                log.debug("Sending to the client: %s", zmsg)
                 clients.response(zmsg)
             for clntsock, clntaddr in topoll:
                 fd = clients.add(clntsock, clntaddr)
-                poller.register(fd)
+                poller.register(fd, flags=zmq.POLLIN)
     except KeyboardInterrupt:
         pass