]> www.average.org Git - loctrkd.git/commitdiff
WIP on websocket gateway
authorEugene Crosser <crosser@average.org>
Wed, 4 May 2022 21:28:59 +0000 (23:28 +0200)
committerEugene Crosser <crosser@average.org>
Wed, 4 May 2022 22:16:46 +0000 (00:16 +0200)
debian/control
debian/gps303.conf
debian/gps303.target
debian/gps303.wsgateway.service [new file with mode: 0644]
gps303/wsgateway.py [new file with mode: 0644]
webdemo/index.html [new file with mode: 0644]

index a8ba49bc2b305ac6440b8dcbbf2be8729ae080fb..5746142fe4460550183c528141faf81aa022e99e 100644 (file)
@@ -9,12 +9,13 @@ Build-Depends: debhelper-compat (= 12),
  dh-python,
  python3-all,
  python3-setuptools,
- python3-zmq,
 
 Package: python3-gps303
 Architecture: all
 Section: python
 Depends: adduser,
+         python3-wsproto,
+         python3-zmq,
          ${misc:Depends},
          ${python3:Depends}
 Description: Suite of modules to collect reports from xz303 GPS trackers
index 2ee044bb7afee117665787dfa2de42986364ced7..9b8ea5bab6b9513069dec159243633ee9eccceb4 100644 (file)
@@ -3,6 +3,9 @@ port = 4303
 publishurl = ipc:///var/lib/gps303/collected
 listenurl = ipc:///var/lib/gps303/responses
 
+[wsgateway]
+port = 5049
+
 [storage]
 dbfn = /var/lib/gps303/gps303.sqlite
 
index cab6d93f3ebaec3403c1c256060531ad33264135..ac1de37f968322c5d5ae63b358d67caccbceb364 100644 (file)
@@ -3,11 +3,13 @@ Description=GPS303 support suite
 Requires=gps303.collector.service \
         gps303.storage.service \
         gps303.termconfig.service \
-        gps303.lookaside.service
+        gps303.lookaside.service \
+       gps303.wsgateway.service
 After=gps303.collector.service \
       gps303.storage.service \
       gps303.termconfig.service \
-      gps303.lookaside.service
+      gps303.lookaside.service \
+      gps303.wsgateway.service
 
 [Install]
 WantedBy=multi-user.target
diff --git a/debian/gps303.wsgateway.service b/debian/gps303.wsgateway.service
new file mode 100644 (file)
index 0000000..3072f3b
--- /dev/null
@@ -0,0 +1,17 @@
+[Unit]
+Description=GPS303 Websocket Gateway Service
+PartOf=gps303.target
+
+[Service]
+Type=simple
+EnvironmentFile=-/etc/default/gps303
+ExecStart=python3 -m gps303.wsgateway $OPTIONS
+KillSignal=INT
+Restart=on-failure
+StandardOutput=journal
+StandardError=inherit
+User=gps303
+Group=gps303
+
+[Install]
+WantedBy=gps303.target
diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py
new file mode 100644 (file)
index 0000000..cc1cb7e
--- /dev/null
@@ -0,0 +1,176 @@
+""" Websocket Gateway """
+
+from logging import getLogger
+from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
+from time import time
+from wsproto import ConnectionType, WSConnection
+from wsproto.events import AcceptConnection, CloseConnection, Message, Ping, Request, TextMessage
+import zmq
+
+from . import common
+from .zmsg import LocEvt
+
+log = getLogger("gps303/wsgateway")
+
+
+class Client:
+    """Websocket connection to the client"""
+
+    def __init__(self, sock, addr):
+        self.sock = sock
+        self.addr = addr
+        self.ws = WSConnection(ConnectionType.SERVER)
+        self.ws_data = b""
+
+    def close(self):
+        log.debug("Closing fd %d", self.sock.fileno())
+        self.sock.close()
+
+    def recv(self):
+        try:
+            data = self.sock.recv(4096)
+        except OSError:
+            log.warning(
+                "Reading from fd %d: %s",
+                self.sock.fileno(),
+                e,
+            )
+            self.ws.receive_data(None)
+            return None
+        if not data:  # Client has closed connection
+            log.info(
+                "EOF reading from fd %d",
+                self.sock.fileno(),
+            )
+            self.ws.receive_data(None)
+            return None
+        self.ws.receive_data(data)
+        msgs = []
+        for event in self.ws.events():
+            if isinstance(event, Request):
+                log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
+                #self.ws_data += self.ws.send(event.response())  # Why not?!
+                self.ws_data += self.ws.send(AcceptConnection())
+            elif isinstance(event, (CloseConnection, Ping)):
+                log.debug("%s on fd %d", event, self.sock.fileno())
+                self.ws_data += self.ws.send(event.response())
+            elif isinstance(event, TextMessage):
+                # TODO: save imei "subscription"
+                log.debug("%s on fd %d", event, self.sock.fileno())
+                msgs.append(event.data)
+            else:
+                log.warning("%s on fd %d", event, self.sock.fileno())
+        if self.ws_data:  # Temp hack
+            self.write()
+        return msgs
+
+    def send(self, imei, message):
+        # TODO: filter only wanted imei got from the client
+        self.ws_data += self.ws.send(Message(data=message))
+
+    def write(self):
+        try:
+            sent = self.sock.send(self.ws_data)
+            self.ws_data = self.ws_data[sent:]
+        except OSError as e:
+            log.error(
+                "Sending to fd %d (IMEI %s): %s",
+                self.sock.fileno(),
+                self.imei,
+                e,
+            )
+            self.ws_data = b""
+
+
+class Clients:
+    def __init__(self):
+        self.by_fd = {}
+
+    def add(self, clntsock, clntaddr):
+        fd = clntsock.fileno()
+        log.info("Start serving fd %d from %s", fd, clntaddr)
+        self.by_fd[fd] = Client(clntsock, clntaddr)
+        return fd
+
+    def stop(self, fd):
+        clnt = self.by_fd[fd]
+        log.info("Stop serving fd %d", clnt.sock.fileno())
+        clnt.close()
+        del self.by_fd[fd]
+
+    def recv(self, fd):
+        clnt = self.by_fd[fd]
+        msgs = clnt.recv()
+        if msgs is None:
+            return None
+        result = []
+        for msg in msgs:
+            log.debug("Received: %s", msg)
+        return result
+
+    def send(self, msgs):
+        for clnt in self.by_fd.values():
+            clnt.send(msgs)
+            clnt.write()
+
+def runserver(conf):
+    zctx = zmq.Context()
+    zsub = zctx.socket(zmq.SUB)
+    zsub.connect(conf.get("lookaside", "publishurl"))
+    zsub.setsockopt(zmq.SUBSCRIBE, b"")
+    tcpl = socket(AF_INET6, SOCK_STREAM)
+    tcpl.setblocking(False)
+    tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+    tcpl.bind(("", conf.getint("wsgateway", "port")))
+    tcpl.listen(5)
+    tcpfd = tcpl.fileno()
+    poller = zmq.Poller()
+    poller.register(zsub, flags=zmq.POLLIN)
+    poller.register(tcpfd, flags=zmq.POLLIN)
+    clients = Clients()
+    try:
+        while True:
+            tosend = []
+            topoll = []
+            tostop = []
+            events = poller.poll(1000)
+            for sk, fl in events:
+                if sk is zsub:
+                    while True:
+                        try:
+                            msg = zsub.recv(zmq.NOBLOCK)
+                            tosend.append(LocEvt(msg))
+                        except zmq.Again:
+                            break
+                elif sk == tcpfd:
+                    clntsock, clntaddr = tcpl.accept()
+                    topoll.append((clntsock, clntaddr))
+                elif fl & zmq.POLLIN:
+                    received = clients.recv(sk)
+                    if received is None:
+                        log.debug(
+                            "Client gone from fd %d", sk
+                        )
+                        tostop.append(sk)
+                    else:
+                        for msg in received:
+                            log.debug("Received from %d: %s", sk, msg)
+                else:
+                    log.debug("Stray event: %s on socket %s", fl, sk)
+            # poll queue consumed, make changes now
+            for fd in tostop:
+                poller.unregister(fd)
+                clients.stop(fd)
+            for zmsg in tosend:
+                log.debug("Sending to the client: %s", zmsg)
+                clients.send(zmsg)
+            for clntsock, clntaddr in topoll:
+                fd = clients.add(clntsock, clntaddr)
+                poller.register(fd, flags=zmq.POLLIN)
+            # TODO: Handle write overruns (register for POLLOUT)
+    except KeyboardInterrupt:
+        pass
+
+
+if __name__.endswith("__main__"):
+    runserver(common.init(log))
diff --git a/webdemo/index.html b/webdemo/index.html
new file mode 100644 (file)
index 0000000..d726b02
--- /dev/null
@@ -0,0 +1,82 @@
+<!DOCTYPE html>
+<html xmlns="http://www.w3.org/1999/xhtml" lang="en-US" xml:lang="en-US">
+<head>
+       <title>Location</title>
+       <script type="text/javascript">
+               var sts;
+               var log;
+               var ws;
+
+               function init() {
+                       sts = document.getElementById("sts");
+                       sts.innerHTML = "uninitialized";
+                       log = document.getElementById("log");
+                       log.innerHTML = "top of log<br>";
+                       imei = document.getElementById("imei");
+                       send = document.getElementById("send");
+                       open_ws();
+               }
+
+               function open_ws() {
+                       ws = new WebSocket("ws://localhost:5049");
+                       ws.onopen = ws_onopen;
+                       ws.onmessage = ws_onmessage;
+                       ws.onerror = ws_onerror;
+                       ws.onclose = ws_onclose;
+               }
+               function ws_onopen(event) {
+                       console.log("ws opened " + event);
+                       sts.innerHTML = "open";
+                       imei.disabled = false;
+                       send.disabled = false;
+               }
+               function ws_onmessage(event) {
+                       console.log("message " + event);
+                       log.innerHTML += "message " + event + "<br>";
+               }
+               function ws_onerror(event) {
+                       console.log("error " + event);
+                       sts.innerHTML = "error";
+               }
+               function ws_onclose(event) {
+                       console.log("close " + event);
+                       sts.innerHTML = "closed";
+                       imei.disabled = true;
+                       send.disabled = true;
+                       setTimeout(open_ws, 5000);
+               }
+
+               function sendIMEI() {
+                       console.log("sending " + imei.value);
+                       var msg = {
+                               imei: imei.value,
+                               type: "subscribe",
+                               date: Date.now()
+                       };
+                       ws.send(JSON.stringify(msg));
+                       document.title = imei.value;
+                       imei.value = "";
+               }
+
+               function handleKey(evt) {
+                       if (evt.keyCode === 13 || evt.keyCode === 14) {
+                               if (!imei.disabled) {
+                                       sendIMEI();
+                               }
+                       }
+               }
+       </script>
+
+</head>
+<body onload="init();">
+       <div style="width:100%; height:2%" id="hdr">
+               <input id="imei" type="text" name="imei"
+                size="16" maxlength="16" placeholder="Enter IMEI"
+                autocomplete="off" onkeyup="handleKey(event)">
+               <input type="button" id="send" name="send" value="Send"
+                onclick="sendIMEI()" disabled>
+       </div>
+       <div style="width:100%; height:96%" id="log"></div>
+       <div style="width:100%; height:2%" id="sts"></div>
+</body>
+</html>