]> www.average.org Git - loctrkd.git/blobdiff - gps303/wsgateway.py
support subscriptions on the client side
[loctrkd.git] / gps303 / wsgateway.py
index c7f96524e98573463ac25318d0599421ccab28cb..f2c6596c3e6a584c1280a9d3c2025e6ee265ebae 100644 (file)
@@ -1,5 +1,6 @@
 """ Websocket Gateway """
 
+from json import loads
 from logging import getLogger
 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
 from time import time
@@ -63,7 +64,7 @@ def try_http(data, fd, e):
                     return (
                         f"{proto} 500 File not found\r\n"
                         f"Content-Type: text/plain\r\n\r\n"
-                        f'HTML file could not be opened\r\n'.encode()
+                        f"HTML file could not be opened\r\n".encode()
                     )
             else:
                 return (
@@ -90,6 +91,8 @@ class Client:
         self.addr = addr
         self.ws = WSConnection(ConnectionType.SERVER)
         self.ws_data = b""
+        self.ready = False
+        self.imeis = set()
 
     def close(self):
         log.debug("Closing fd %d", self.sock.fileno())
@@ -122,6 +125,7 @@ class Client:
                 e,
             )
             self.ws_data = try_http(data, self.sock.fileno(), e)
+            self.write()  # TODO this is a hack
             log.debug("Sending HTTP response to %d", self.sock.fileno())
             msgs = None
         else:
@@ -131,25 +135,33 @@ class Client:
                     log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
                     # self.ws_data += self.ws.send(event.response())  # Why not?!
                     self.ws_data += self.ws.send(AcceptConnection())
+                    self.ready = True
                 elif isinstance(event, (CloseConnection, Ping)):
                     log.debug("%s on fd %d", event, self.sock.fileno())
                     self.ws_data += self.ws.send(event.response())
                 elif isinstance(event, TextMessage):
                     # TODO: save imei "subscription"
                     log.debug("%s on fd %d", event, self.sock.fileno())
-                    msgs.append(event.data)
+                    msg = loads(event.data)
+                    msgs.append(msg)
+                    if msg.get("type", None) == "subscribe":
+                        self.imeis = set(msg.get("imei", []))
+                        log.debug(
+                            "subs list on fd %s is %s",
+                            self.sock.fileno(),
+                            self.imeis,
+                        )
                 else:
                     log.warning("%s on fd %d", event, self.sock.fileno())
-        if self.ws_data:  # Temp hack
-            self.write()
         return msgs
 
     def wants(self, imei):
+        log.debug("wants %s? set is %s on fd %d", imei, self.imeis, self.sock.fileno())
         return True  # TODO: check subscriptions
 
     def send(self, message):
-        # TODO: filter only wanted imei got from the client
-        self.ws_data += self.ws.send(Message(data=message.json))
+        if self.ready and message.imei in self.imeis:
+            self.ws_data += self.ws.send(Message(data=message.json))
 
     def write(self):
         if self.ws_data:
@@ -190,6 +202,7 @@ class Clients:
         result = []
         for msg in msgs:
             log.debug("Received: %s", msg)
+            result.append(msg)
         return result
 
     def send(self, msg):
@@ -250,9 +263,11 @@ def runserver(conf):
                     if received is None:
                         log.debug("Client gone from fd %d", sk)
                         tostop.append(sk)
+                        towait.discard(fd)
                     else:
                         for msg in received:
                             log.debug("Received from %d: %s", sk, msg)
+                        towrite.add(sk)
                 elif fl & zmq.POLLOUT:
                     log.debug("Write now open for fd %d", sk)
                     towrite.add(sk)