]> www.average.org Git - loctrkd.git/blobdiff - gps303/wsgateway.py
reimplement backlog query again
[loctrkd.git] / gps303 / wsgateway.py
index 00770ebd88032c35f9b9e7172dba3c63233d7434..79937d8e5b1f5d1835e1dfab8a627f2844701f31 100644 (file)
@@ -1,6 +1,7 @@
 """ Websocket Gateway """
 
-from json import loads
+from datetime import datetime, timezone
+from json import dumps, loads
 from logging import getLogger
 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
 from time import time
@@ -17,7 +18,7 @@ from wsproto.utilities import RemoteProtocolError
 import zmq
 
 from . import common
-from .backlog import blinit, backlog
+from .evstore import initdb, fetch
 from .gps303proto import (
     GPS_POSITIONING,
     WIFI_POSITIONING,
@@ -29,6 +30,29 @@ log = getLogger("gps303/wsgateway")
 htmlfile = None
 
 
+def backlog(imei, numback):
+    result = []
+    for is_incoming, timestamp, packet in fetch(
+        imei,
+        ((True, GPS_POSITIONING.PROTO), (False, WIFI_POSITIONING.PROTO)),
+        numback,
+    ):
+        msg = parse_message(packet, is_incoming=is_incoming)
+        result.append(
+            {
+                "imei": imei,
+                "timestamp": str(
+                    datetime.fromtimestamp(timestamp).astimezone(
+                        tz=timezone.utc
+                    )
+                ),
+                "longitude": msg.longitude,
+                "latitude": msg.latitude,
+            }
+        )
+    return result
+
+
 def try_http(data, fd, e):
     global htmlfile
     try:
@@ -171,8 +195,8 @@ class Client:
         return True  # TODO: check subscriptions
 
     def send(self, message):
-        if self.ready and message.imei in self.imeis:
-            self.ws_data += self.ws.send(Message(data=message.json))
+        if self.ready and message["imei"] in self.imeis:
+            self.ws_data += self.ws.send(Message(data=dumps(message)))
 
     def write(self):
         if self.ws_data:
@@ -212,7 +236,7 @@ class Clients:
     def send(self, msg):
         towrite = set()
         for fd, clnt in self.by_fd.items():
-            if clnt.wants(msg.imei):
+            if clnt.wants(msg["imei"]):
                 clnt.send(msg)
                 towrite.add(fd)
         return towrite
@@ -234,7 +258,7 @@ class Clients:
 def runserver(conf):
     global htmlfile
 
-    blinit(conf.get("storage", "dbfn"))
+    initdb(conf.get("storage", "dbfn"))
     htmlfile = conf.get("wsgateway", "htmlfile")
     zctx = zmq.Context()
     zsub = zctx.socket(zmq.SUB)
@@ -255,7 +279,6 @@ def runserver(conf):
         while True:
             neededsubs = clients.subs()
             for imei in neededsubs - activesubs:
-                log.debug("topics: %s", [tpc.hex() for tpc in [topic(GPS_POSITIONING.PROTO, True, imei), topic(WIFI_POSITIONING.PROTO, False, imei)]])
                 zsub.setsockopt(
                     zmq.SUBSCRIBE,
                     topic(GPS_POSITIONING.PROTO, True, imei),
@@ -284,13 +307,21 @@ def runserver(conf):
                 if sk is zsub:
                     while True:
                         try:
-                            buf = zsub.recv(zmq.NOBLOCK)
-                            zmsg = Bcast(buf)
-                            log.debug("zmq packet: %s", buf.hex())
-                            # zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
-                            msg = parse_message(zmsg.packet)
-                            tosend.append(zmsg)
-                            log.debug("Got %s", zmsg)
+                            zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
+                            msg = parse_message(zmsg.packet, zmsg.is_incoming)
+                            log.debug("Got %s with %s", zmsg, msg)
+                            tosend.append(
+                                {
+                                    "imei": zmsg.imei,
+                                    "timestamp": str(
+                                        datetime.fromtimestamp(
+                                            zmsg.when
+                                        ).astimezone(tz=timezone.utc)
+                                    ),
+                                    "longitude": msg.longitude,
+                                    "latitude": msg.latitude,
+                                }
+                            )
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
@@ -306,10 +337,10 @@ def runserver(conf):
                         for msg in received:
                             log.debug("Received from %d: %s", sk, msg)
                             if msg.get("type", None) == "subscribe":
-                                imei = msg.get("imei")
+                                imeis = msg.get("imei")
                                 numback = msg.get("backlog", 5)
-                                for elem in imei:
-                                    tosend.extend(backlog(elem, numback))
+                                for imei in imeis:
+                                    tosend.extend(backlog(imei, numback))
                         towrite.add(sk)
                 elif fl & zmq.POLLOUT:
                     log.debug("Write now open for fd %d", sk)