]> www.average.org Git - loctrkd.git/blobdiff - gps303/wsgateway.py
WIP on supporting multiple markers
[loctrkd.git] / gps303 / wsgateway.py
index f2c6596c3e6a584c1280a9d3c2025e6ee265ebae..8a5af7f43ec37c94d1500c9ab3a46bf934991e33 100644 (file)
@@ -17,6 +17,7 @@ from wsproto.utilities import RemoteProtocolError
 import zmq
 
 from . import common
+from .backlog import blinit, backlog
 from .zmsg import LocEvt
 
 log = getLogger("gps303/wsgateway")
@@ -196,14 +197,7 @@ class Clients:
 
     def recv(self, fd):
         clnt = self.by_fd[fd]
-        msgs = clnt.recv()
-        if msgs is None:
-            return None
-        result = []
-        for msg in msgs:
-            log.debug("Received: %s", msg)
-            result.append(msg)
-        return result
+        return clnt.recv()
 
     def send(self, msg):
         towrite = set()
@@ -220,15 +214,21 @@ class Clients:
                 waiting.add(fd)
         return waiting
 
+    def subs(self):
+        result = set()
+        for clnt in self.by_fd.values():
+            result |= clnt.imeis
+        return result
+
 
 def runserver(conf):
     global htmlfile
 
+    blinit(conf.get("storage", "dbfn"), conf.get("opencellid", "dbfn"))
     htmlfile = conf.get("wsgateway", "htmlfile")
     zctx = zmq.Context()
     zsub = zctx.socket(zmq.SUB)
     zsub.connect(conf.get("lookaside", "publishurl"))
-    zsub.setsockopt(zmq.SUBSCRIBE, b"")
     tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setblocking(False)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
@@ -239,9 +239,17 @@ def runserver(conf):
     poller.register(zsub, flags=zmq.POLLIN)
     poller.register(tcpfd, flags=zmq.POLLIN)
     clients = Clients()
+    activesubs = set()
     try:
         towait = set()
         while True:
+            neededsubs = clients.subs()
+            for imei in neededsubs - activesubs:
+                zsub.setsockopt(zmq.SUBSCRIBE, imei.encode())
+            for imei in activesubs - neededsubs:
+                zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode())
+            activesubs = neededsubs
+            log.debug("Subscribed to: %s", activesubs)
             tosend = []
             topoll = []
             tostop = []
@@ -267,6 +275,11 @@ def runserver(conf):
                     else:
                         for msg in received:
                             log.debug("Received from %d: %s", sk, msg)
+                            if msg.get("type", None) == "subscribe":
+                                imei = msg.get("imei")
+                                numback = msg.get("backlog", 5)
+                                for elem in imei:
+                                    tosend.extend(backlog(elem, numback))
                         towrite.add(sk)
                 elif fl & zmq.POLLOUT:
                     log.debug("Write now open for fd %d", sk)