]> www.average.org Git - loctrkd.git/blobdiff - gps303/wsgateway.py
do not respond to hibernation; minor cleanup
[loctrkd.git] / gps303 / wsgateway.py
index f9f5c6a2ad936f33c9961419245c65de14414dcd..d66070e2f57a7781fe76a8249ac176536e3192d4 100644 (file)
@@ -1,6 +1,7 @@
 """ Websocket Gateway """
 
-from json import loads
+from datetime import datetime, timezone
+from json import dumps, loads
 from logging import getLogger
 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
 from time import time
@@ -17,7 +18,13 @@ from wsproto.utilities import RemoteProtocolError
 import zmq
 
 from . import common
-from .zmsg import LocEvt
+from .backlog import blinit, backlog
+from .gps303proto import (
+    GPS_POSITIONING,
+    WIFI_POSITIONING,
+    parse_message,
+)
+from .zmsg import Bcast, topic
 
 log = getLogger("gps303/wsgateway")
 htmlfile = None
@@ -156,12 +163,17 @@ class Client:
         return msgs
 
     def wants(self, imei):
-        log.debug("wants %s? set is %s on fd %d", imei, self.imeis, self.sock.fileno())
+        log.debug(
+            "wants %s? set is %s on fd %d",
+            imei,
+            self.imeis,
+            self.sock.fileno(),
+        )
         return True  # TODO: check subscriptions
 
     def send(self, message):
-        if self.ready and message.imei in self.imeis:
-            self.ws_data += self.ws.send(Message(data=message.json))
+        if self.ready and message["imei"] in self.imeis:
+            self.ws_data += self.ws.send(Message(data=dumps(message)))
 
     def write(self):
         if self.ws_data:
@@ -196,19 +208,12 @@ class Clients:
 
     def recv(self, fd):
         clnt = self.by_fd[fd]
-        msgs = clnt.recv()
-        if msgs is None:
-            return None
-        result = []
-        for msg in msgs:
-            log.debug("Received: %s", msg)
-            result.append(msg)
-        return result
+        return clnt.recv()
 
     def send(self, msg):
         towrite = set()
         for fd, clnt in self.by_fd.items():
-            if clnt.wants(msg.imei):
+            if clnt.wants(msg["imei"]):
                 clnt.send(msg)
                 towrite.add(fd)
         return towrite
@@ -230,10 +235,11 @@ class Clients:
 def runserver(conf):
     global htmlfile
 
+    blinit(conf.get("storage", "dbfn"))
     htmlfile = conf.get("wsgateway", "htmlfile")
     zctx = zmq.Context()
     zsub = zctx.socket(zmq.SUB)
-    zsub.connect(conf.get("lookaside", "publishurl"))
+    zsub.connect(conf.get("collector", "publishurl"))
     tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setblocking(False)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
@@ -250,9 +256,23 @@ def runserver(conf):
         while True:
             neededsubs = clients.subs()
             for imei in neededsubs - activesubs:
-                zsub.setsockopt(zmq.SUBSCRIBE, imei.encode())
+                zsub.setsockopt(
+                    zmq.SUBSCRIBE,
+                    topic(GPS_POSITIONING.PROTO, True, imei),
+                )
+                zsub.setsockopt(
+                    zmq.SUBSCRIBE,
+                    topic(WIFI_POSITIONING.PROTO, False, imei),
+                )
             for imei in activesubs - neededsubs:
-                zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode())
+                zsub.setsockopt(
+                    zmq.UNSUBSCRIBE,
+                    topic(GPS_POSITIONING.PROTO, True, imei),
+                )
+                zsub.setsockopt(
+                    zmq.UNSUBSCRIBE,
+                    topic(WIFI_POSITIONING.PROTO, False, imei),
+                )
             activesubs = neededsubs
             log.debug("Subscribed to: %s", activesubs)
             tosend = []
@@ -264,8 +284,21 @@ def runserver(conf):
                 if sk is zsub:
                     while True:
                         try:
-                            zmsg = LocEvt(zsub.recv(zmq.NOBLOCK))
-                            tosend.append(zmsg)
+                            zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
+                            msg = parse_message(zmsg.packet, zmsg.is_incoming)
+                            log.debug("Got %s with %s", zmsg, msg)
+                            tosend.append(
+                                {
+                                    "imei": zmsg.imei,
+                                    "timestamp": str(
+                                        datetime.fromtimestamp(
+                                            zmsg.when
+                                        ).astimezone(tz=timezone.utc)
+                                    ),
+                                    "longitude": msg.longitude,
+                                    "latitude": msg.latitude,
+                                }
+                            )
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
@@ -280,6 +313,11 @@ def runserver(conf):
                     else:
                         for msg in received:
                             log.debug("Received from %d: %s", sk, msg)
+                            if msg.get("type", None) == "subscribe":
+                                imei = msg.get("imei")
+                                numback = msg.get("backlog", 5)
+                                for elem in imei:
+                                    tosend.extend(backlog(elem, numback))
                         towrite.add(sk)
                 elif fl & zmq.POLLOUT:
                     log.debug("Write now open for fd %d", sk)