From c41f98b94321d35cf7f54a9c659fc36b13990f25 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Sat, 7 May 2022 00:22:07 +0200 Subject: [PATCH] wsgateway aggregate subscriptions upstream --- gps303/wsgateway.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index f2c6596..f9f5c6a 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -220,6 +220,12 @@ class Clients: waiting.add(fd) return waiting + def subs(self): + result = set() + for clnt in self.by_fd.values(): + result |= clnt.imeis + return result + def runserver(conf): global htmlfile @@ -228,7 +234,6 @@ def runserver(conf): zctx = zmq.Context() zsub = zctx.socket(zmq.SUB) zsub.connect(conf.get("lookaside", "publishurl")) - zsub.setsockopt(zmq.SUBSCRIBE, b"") tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setblocking(False) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) @@ -239,9 +244,17 @@ def runserver(conf): poller.register(zsub, flags=zmq.POLLIN) poller.register(tcpfd, flags=zmq.POLLIN) clients = Clients() + activesubs = set() try: towait = set() while True: + neededsubs = clients.subs() + for imei in neededsubs - activesubs: + zsub.setsockopt(zmq.SUBSCRIBE, imei.encode()) + for imei in activesubs - neededsubs: + zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode()) + activesubs = neededsubs + log.debug("Subscribed to: %s", activesubs) tosend = [] topoll = [] tostop = [] -- 2.39.2