From 88213d8cfd248de750f53ec11feb30473818ac13 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Sat, 7 May 2022 00:05:21 +0200 Subject: [PATCH] support subscriptions on the client side --- gps303/wsgateway.py | 27 +++++++++++++++++++------ webdemo/index.html | 49 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 58 insertions(+), 18 deletions(-) diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index c7f9652..f2c6596 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -1,5 +1,6 @@ """ Websocket Gateway """ +from json import loads from logging import getLogger from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from time import time @@ -63,7 +64,7 @@ def try_http(data, fd, e): return ( f"{proto} 500 File not found\r\n" f"Content-Type: text/plain\r\n\r\n" - f'HTML file could not be opened\r\n'.encode() + f"HTML file could not be opened\r\n".encode() ) else: return ( @@ -90,6 +91,8 @@ class Client: self.addr = addr self.ws = WSConnection(ConnectionType.SERVER) self.ws_data = b"" + self.ready = False + self.imeis = set() def close(self): log.debug("Closing fd %d", self.sock.fileno()) @@ -122,6 +125,7 @@ class Client: e, ) self.ws_data = try_http(data, self.sock.fileno(), e) + self.write() # TODO this is a hack log.debug("Sending HTTP response to %d", self.sock.fileno()) msgs = None else: @@ -131,25 +135,33 @@ class Client: log.debug("WebSocket upgrade on fd %d", self.sock.fileno()) # self.ws_data += self.ws.send(event.response()) # Why not?! self.ws_data += self.ws.send(AcceptConnection()) + self.ready = True elif isinstance(event, (CloseConnection, Ping)): log.debug("%s on fd %d", event, self.sock.fileno()) self.ws_data += self.ws.send(event.response()) elif isinstance(event, TextMessage): # TODO: save imei "subscription" log.debug("%s on fd %d", event, self.sock.fileno()) - msgs.append(event.data) + msg = loads(event.data) + msgs.append(msg) + if msg.get("type", None) == "subscribe": + self.imeis = set(msg.get("imei", [])) + log.debug( + "subs list on fd %s is %s", + self.sock.fileno(), + self.imeis, + ) else: log.warning("%s on fd %d", event, self.sock.fileno()) - if self.ws_data: # Temp hack - self.write() return msgs def wants(self, imei): + log.debug("wants %s? set is %s on fd %d", imei, self.imeis, self.sock.fileno()) return True # TODO: check subscriptions def send(self, message): - # TODO: filter only wanted imei got from the client - self.ws_data += self.ws.send(Message(data=message.json)) + if self.ready and message.imei in self.imeis: + self.ws_data += self.ws.send(Message(data=message.json)) def write(self): if self.ws_data: @@ -190,6 +202,7 @@ class Clients: result = [] for msg in msgs: log.debug("Received: %s", msg) + result.append(msg) return result def send(self, msg): @@ -250,9 +263,11 @@ def runserver(conf): if received is None: log.debug("Client gone from fd %d", sk) tostop.append(sk) + towait.discard(fd) else: for msg in received: log.debug("Received from %d: %s", sk, msg) + towrite.add(sk) elif fl & zmq.POLLOUT: log.debug("Write now open for fd %d", sk) towrite.add(sk) diff --git a/webdemo/index.html b/webdemo/index.html index f356f96..2b3edd7 100644 --- a/webdemo/index.html +++ b/webdemo/index.html @@ -8,12 +8,20 @@