]> www.average.org Git - loctrkd.git/blobdiff - gps303/collector.py
make collector.py work
[loctrkd.git] / gps303 / collector.py
index 7ffa7526cbce36f3ff98d4d88570b97ee8c84e94..345d0b99a0de7048fc75f8a1a515007d2233318d 100644 (file)
@@ -9,7 +9,7 @@ import sys
 import zmq
 
 from .config import readconfig
-from .gps303proto import handle_packet, make_response, LOGIN, set_config
+from .gps303proto import parse_message, HIBERNATION, LOGIN, set_config
 
 CONF = "/etc/gps303.conf"
 
@@ -19,7 +19,7 @@ log = getLogger("gps303/collector")
 class Bcast:
     """Zmq message to broadcast what was received from the terminal"""
     def __init__(self, imei, msg):
-        self.as_bytes = imei.encode() + msg.encode()
+        self.as_bytes = imei.encode() + msg.to_packet()
 
 
 class Resp:
@@ -70,7 +70,7 @@ class Client:
             frameend = self.buffer.find(b"\r\n", 4)
             if frameend == -1:  # Incomplete frame, return what we have
                 break
-            msg = parse_message(self.buffer[:frameend])
+            msg = parse_message(self.buffer[2:frameend])
             self.buffer = self.buffer[frameend+2:]
             if isinstance(msg, LOGIN):
                 self.imei = msg.imei
@@ -97,7 +97,7 @@ class Clients:
         return fd
 
     def stop(self, fd):
-        clnt = by_fd[fd]
+        clnt = self.by_fd[fd]
         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
         clnt.close()
         if clnt.imei:
@@ -105,13 +105,13 @@ class Clients:
         del self.by_fd[fd]
 
     def recv(self, fd):
-        clnt = by_fd[fd]
+        clnt = self.by_fd[fd]
         msgs = clnt.recv()
         result = []
         for msg in msgs:
             if isinstance(msg, LOGIN):
                 self.by_imei[clnt.imei] = clnt
-            result.append(clnt.imei, msg)
+            result.append((clnt.imei, msg))
         return result
 
     def response(self, resp):
@@ -149,23 +149,23 @@ def runserver(opts, conf):
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
-                    clntsock, clntaddr = ctlsock.accept()
+                    clntsock, clntaddr = tcpl.accept()
                     topoll.append((clntsock, clntaddr))
                 else:
-                    imei, msg = clients.recv(sk)
-                    zpub.send(Bcast(imei, msg).as_bytes)
-                    if msg is None or isinstance(msg, HIBERNATION):
-                        log.debug("HIBERNATION from fd %d", sk)
-                        tostop.append(sk)
+                    for imei, msg in clients.recv(sk):
+                        zpub.send(Bcast(imei, msg).as_bytes)
+                        if msg is None or isinstance(msg, HIBERNATION):
+                            log.debug("HIBERNATION from fd %d", sk)
+                            tostop.append(sk)
             # poll queue consumed, make changes now
             for fd in tostop:
+                poller.unregister(fd)
                 clients.stop(fd)
-                pollset.unregister(fd)
             for zmsg in tosend:
                 clients.response(zmsg)
             for clntsock, clntaddr in topoll:
                 fd = clients.add(clntsock, clntaddr)
-                pollset.register(fd)
+                poller.register(fd)
     except KeyboardInterrupt:
         pass