From 21ab3e6ccd337ef7c69e149a4f21d6a6139557d8 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Wed, 4 May 2022 23:28:59 +0200 Subject: [PATCH] WIP on websocket gateway --- debian/control | 3 +- debian/gps303.conf | 3 + debian/gps303.target | 6 +- debian/gps303.wsgateway.service | 17 +++ gps303/wsgateway.py | 176 ++++++++++++++++++++++++++++++++ webdemo/index.html | 82 +++++++++++++++ 6 files changed, 284 insertions(+), 3 deletions(-) create mode 100644 debian/gps303.wsgateway.service create mode 100644 gps303/wsgateway.py create mode 100644 webdemo/index.html diff --git a/debian/control b/debian/control index a8ba49b..5746142 100644 --- a/debian/control +++ b/debian/control @@ -9,12 +9,13 @@ Build-Depends: debhelper-compat (= 12), dh-python, python3-all, python3-setuptools, - python3-zmq, Package: python3-gps303 Architecture: all Section: python Depends: adduser, + python3-wsproto, + python3-zmq, ${misc:Depends}, ${python3:Depends} Description: Suite of modules to collect reports from xz303 GPS trackers diff --git a/debian/gps303.conf b/debian/gps303.conf index 2ee044b..9b8ea5b 100644 --- a/debian/gps303.conf +++ b/debian/gps303.conf @@ -3,6 +3,9 @@ port = 4303 publishurl = ipc:///var/lib/gps303/collected listenurl = ipc:///var/lib/gps303/responses +[wsgateway] +port = 5049 + [storage] dbfn = /var/lib/gps303/gps303.sqlite diff --git a/debian/gps303.target b/debian/gps303.target index cab6d93..ac1de37 100644 --- a/debian/gps303.target +++ b/debian/gps303.target @@ -3,11 +3,13 @@ Description=GPS303 support suite Requires=gps303.collector.service \ gps303.storage.service \ gps303.termconfig.service \ - gps303.lookaside.service + gps303.lookaside.service \ + gps303.wsgateway.service After=gps303.collector.service \ gps303.storage.service \ gps303.termconfig.service \ - gps303.lookaside.service + gps303.lookaside.service \ + gps303.wsgateway.service [Install] WantedBy=multi-user.target diff --git a/debian/gps303.wsgateway.service b/debian/gps303.wsgateway.service new file mode 100644 index 0000000..3072f3b --- /dev/null +++ b/debian/gps303.wsgateway.service @@ -0,0 +1,17 @@ +[Unit] +Description=GPS303 Websocket Gateway Service +PartOf=gps303.target + +[Service] +Type=simple +EnvironmentFile=-/etc/default/gps303 +ExecStart=python3 -m gps303.wsgateway $OPTIONS +KillSignal=INT +Restart=on-failure +StandardOutput=journal +StandardError=inherit +User=gps303 +Group=gps303 + +[Install] +WantedBy=gps303.target diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py new file mode 100644 index 0000000..cc1cb7e --- /dev/null +++ b/gps303/wsgateway.py @@ -0,0 +1,176 @@ +""" Websocket Gateway """ + +from logging import getLogger +from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR +from time import time +from wsproto import ConnectionType, WSConnection +from wsproto.events import AcceptConnection, CloseConnection, Message, Ping, Request, TextMessage +import zmq + +from . import common +from .zmsg import LocEvt + +log = getLogger("gps303/wsgateway") + + +class Client: + """Websocket connection to the client""" + + def __init__(self, sock, addr): + self.sock = sock + self.addr = addr + self.ws = WSConnection(ConnectionType.SERVER) + self.ws_data = b"" + + def close(self): + log.debug("Closing fd %d", self.sock.fileno()) + self.sock.close() + + def recv(self): + try: + data = self.sock.recv(4096) + except OSError: + log.warning( + "Reading from fd %d: %s", + self.sock.fileno(), + e, + ) + self.ws.receive_data(None) + return None + if not data: # Client has closed connection + log.info( + "EOF reading from fd %d", + self.sock.fileno(), + ) + self.ws.receive_data(None) + return None + self.ws.receive_data(data) + msgs = [] + for event in self.ws.events(): + if isinstance(event, Request): + log.debug("WebSocket upgrade on fd %d", self.sock.fileno()) + #self.ws_data += self.ws.send(event.response()) # Why not?! + self.ws_data += self.ws.send(AcceptConnection()) + elif isinstance(event, (CloseConnection, Ping)): + log.debug("%s on fd %d", event, self.sock.fileno()) + self.ws_data += self.ws.send(event.response()) + elif isinstance(event, TextMessage): + # TODO: save imei "subscription" + log.debug("%s on fd %d", event, self.sock.fileno()) + msgs.append(event.data) + else: + log.warning("%s on fd %d", event, self.sock.fileno()) + if self.ws_data: # Temp hack + self.write() + return msgs + + def send(self, imei, message): + # TODO: filter only wanted imei got from the client + self.ws_data += self.ws.send(Message(data=message)) + + def write(self): + try: + sent = self.sock.send(self.ws_data) + self.ws_data = self.ws_data[sent:] + except OSError as e: + log.error( + "Sending to fd %d (IMEI %s): %s", + self.sock.fileno(), + self.imei, + e, + ) + self.ws_data = b"" + + +class Clients: + def __init__(self): + self.by_fd = {} + + def add(self, clntsock, clntaddr): + fd = clntsock.fileno() + log.info("Start serving fd %d from %s", fd, clntaddr) + self.by_fd[fd] = Client(clntsock, clntaddr) + return fd + + def stop(self, fd): + clnt = self.by_fd[fd] + log.info("Stop serving fd %d", clnt.sock.fileno()) + clnt.close() + del self.by_fd[fd] + + def recv(self, fd): + clnt = self.by_fd[fd] + msgs = clnt.recv() + if msgs is None: + return None + result = [] + for msg in msgs: + log.debug("Received: %s", msg) + return result + + def send(self, msgs): + for clnt in self.by_fd.values(): + clnt.send(msgs) + clnt.write() + +def runserver(conf): + zctx = zmq.Context() + zsub = zctx.socket(zmq.SUB) + zsub.connect(conf.get("lookaside", "publishurl")) + zsub.setsockopt(zmq.SUBSCRIBE, b"") + tcpl = socket(AF_INET6, SOCK_STREAM) + tcpl.setblocking(False) + tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + tcpl.bind(("", conf.getint("wsgateway", "port"))) + tcpl.listen(5) + tcpfd = tcpl.fileno() + poller = zmq.Poller() + poller.register(zsub, flags=zmq.POLLIN) + poller.register(tcpfd, flags=zmq.POLLIN) + clients = Clients() + try: + while True: + tosend = [] + topoll = [] + tostop = [] + events = poller.poll(1000) + for sk, fl in events: + if sk is zsub: + while True: + try: + msg = zsub.recv(zmq.NOBLOCK) + tosend.append(LocEvt(msg)) + except zmq.Again: + break + elif sk == tcpfd: + clntsock, clntaddr = tcpl.accept() + topoll.append((clntsock, clntaddr)) + elif fl & zmq.POLLIN: + received = clients.recv(sk) + if received is None: + log.debug( + "Client gone from fd %d", sk + ) + tostop.append(sk) + else: + for msg in received: + log.debug("Received from %d: %s", sk, msg) + else: + log.debug("Stray event: %s on socket %s", fl, sk) + # poll queue consumed, make changes now + for fd in tostop: + poller.unregister(fd) + clients.stop(fd) + for zmsg in tosend: + log.debug("Sending to the client: %s", zmsg) + clients.send(zmsg) + for clntsock, clntaddr in topoll: + fd = clients.add(clntsock, clntaddr) + poller.register(fd, flags=zmq.POLLIN) + # TODO: Handle write overruns (register for POLLOUT) + except KeyboardInterrupt: + pass + + +if __name__.endswith("__main__"): + runserver(common.init(log)) diff --git a/webdemo/index.html b/webdemo/index.html new file mode 100644 index 0000000..d726b02 --- /dev/null +++ b/webdemo/index.html @@ -0,0 +1,82 @@ + + + + Location + + + + +
+ + +
+
+
+ + -- 2.39.2