]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/wsgateway.py
Send backlog only the the ws client that requested
[loctrkd.git] / loctrkd / wsgateway.py
index 522cd59269d8380596c0befb7f95dffc058fffd9..e94845eec4215e04797bde4e0dce43b05e8780ec 100644 (file)
@@ -2,6 +2,7 @@
 
 from configparser import ConfigParser
 from datetime import datetime, timezone
+from importlib import import_module
 from json import dumps, loads
 from logging import getLogger
 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
@@ -22,46 +23,21 @@ import zmq
 
 from . import common
 from .evstore import initdb, fetch
-from .zx303proto import (
-    GPS_POSITIONING,
-    STATUS,
-    WIFI_POSITIONING,
-    parse_message,
-    proto_name,
-)
-from .zmsg import Bcast, topic
+from .protomodule import ProtoModule
+from .zmsg import Rept, rtopic
 
 log = getLogger("loctrkd/wsgateway")
+
 htmlfile = None
 
 
 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
     result = []
-    for is_incoming, timestamp, packet in fetch(
-        imei,
-        [
-            (True, proto_name(GPS_POSITIONING)),
-            (False, proto_name(WIFI_POSITIONING)),
-        ],
-        numback,
-    ):
-        msg = parse_message(packet, is_incoming=is_incoming)
-        result.append(
-            {
-                "type": "location",
-                "imei": imei,
-                "timestamp": str(
-                    datetime.fromtimestamp(timestamp).astimezone(
-                        tz=timezone.utc
-                    )
-                ),
-                "longitude": msg.longitude,
-                "latitude": msg.latitude,
-                "accuracy": "gps"
-                if isinstance(msg, GPS_POSITIONING)
-                else "approximate",
-            }
-        )
+    for report in fetch(imei, numback):
+        report["type"] = "location"
+        timestamp = report.pop("devtime")
+        report["timestamp"] = timestamp
+        result.append(report)
     return result
 
 
@@ -233,16 +209,28 @@ class Clients:
         clnt.close()
         del self.by_fd[fd]
 
-    def recv(self, fd: int) -> Optional[List[Dict[str, Any]]]:
+    def recv(self, fd: int) -> Tuple[Client, Optional[List[Dict[str, Any]]]]:
         clnt = self.by_fd[fd]
-        return clnt.recv()
+        return (clnt, clnt.recv())
 
-    def send(self, msg: Dict[str, Any]) -> Set[int]:
+    def send(self, clnt: Optional[Client], msg: Dict[str, Any]) -> Set[int]:
         towrite = set()
-        for fd, clnt in self.by_fd.items():
-            if clnt.wants(msg["imei"]):
+        if clnt is None:
+            for fd, cl in self.by_fd.items():
+                if cl.wants(msg["imei"]):
+                    cl.send(msg)
+                    towrite.add(fd)
+        else:
+            fd = clnt.sock.fileno()
+            if self.by_fd.get(fd, None) == clnt:
                 clnt.send(msg)
                 towrite.add(fd)
+            else:
+                log.info(
+                    "Trying to send %s to client at %d, not in service",
+                    msg,
+                    fd,
+                )
         return towrite
 
     def write(self, towrite: Set[int]) -> Set[int]:
@@ -261,13 +249,12 @@ class Clients:
 
 def runserver(conf: ConfigParser) -> None:
     global htmlfile
-
     initdb(conf.get("storage", "dbfn"))
     htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
     zsub = zctx.socket(zmq.SUB)  # type: ignore
-    zsub.connect(conf.get("collector", "publishurl"))
+    zsub.connect(conf.get("rectifier", "publishurl"))
     tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setblocking(False)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
@@ -284,34 +271,12 @@ def runserver(conf: ConfigParser) -> None:
         while True:
             neededsubs = clients.subs()
             for imei in neededsubs - activesubs:
-                zsub.setsockopt(
-                    zmq.SUBSCRIBE,
-                    topic(proto_name(GPS_POSITIONING), True, imei),
-                )
-                zsub.setsockopt(
-                    zmq.SUBSCRIBE,
-                    topic(proto_name(WIFI_POSITIONING), False, imei),
-                )
-                zsub.setsockopt(
-                    zmq.SUBSCRIBE,
-                    topic(proto_name(STATUS), True, imei),
-                )
+                zsub.setsockopt(zmq.SUBSCRIBE, rtopic(imei))
             for imei in activesubs - neededsubs:
-                zsub.setsockopt(
-                    zmq.UNSUBSCRIBE,
-                    topic(proto_name(GPS_POSITIONING), True, imei),
-                )
-                zsub.setsockopt(
-                    zmq.UNSUBSCRIBE,
-                    topic(proto_name(WIFI_POSITIONING), False, imei),
-                )
-                zsub.setsockopt(
-                    zmq.UNSUBSCRIBE,
-                    topic(proto_name(STATUS), True, imei),
-                )
+                zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei))
             activesubs = neededsubs
             log.debug("Subscribed to: %s", activesubs)
-            tosend = []
+            tosend: List[Tuple[Optional[Client], Dict[str, Any]]] = []
             topoll = []
             tostop = []
             towrite = set()
@@ -320,46 +285,18 @@ def runserver(conf: ConfigParser) -> None:
                 if sk is zsub:
                     while True:
                         try:
-                            zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
-                            msg = parse_message(zmsg.packet, zmsg.is_incoming)
-                            log.debug("Got %s with %s", zmsg, msg)
-                            if isinstance(msg, STATUS):
-                                tosend.append(
-                                    {
-                                        "type": "status",
-                                        "imei": zmsg.imei,
-                                        "timestamp": str(
-                                            datetime.fromtimestamp(
-                                                zmsg.when
-                                            ).astimezone(tz=timezone.utc)
-                                        ),
-                                        "battery": msg.batt,
-                                    }
-                                )
-                            else:
-                                tosend.append(
-                                    {
-                                        "type": "location",
-                                        "imei": zmsg.imei,
-                                        "timestamp": str(
-                                            datetime.fromtimestamp(
-                                                zmsg.when
-                                            ).astimezone(tz=timezone.utc)
-                                        ),
-                                        "longitude": msg.longitude,
-                                        "latitude": msg.latitude,
-                                        "accuracy": "gps"
-                                        if zmsg.is_incoming
-                                        else "approximate",
-                                    }
-                                )
+                            zmsg = Rept(zsub.recv(zmq.NOBLOCK))
+                            msg = loads(zmsg.payload)
+                            msg["imei"] = zmsg.imei
+                            log.debug("Got %s, sending %s", zmsg, msg)
+                            tosend.append((None, msg))
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
                     clntsock, clntaddr = tcpl.accept()
                     topoll.append((clntsock, clntaddr))
                 elif fl & zmq.POLLIN:
-                    received = clients.recv(sk)
+                    clnt, received = clients.recv(sk)
                     if received is None:
                         log.debug("Client gone from fd %d", sk)
                         tostop.append(sk)
@@ -372,7 +309,12 @@ def runserver(conf: ConfigParser) -> None:
                                 imeis = cast(List[str], wsmsg.get("imei"))
                                 numback: int = wsmsg.get("backlog", 5)
                                 for imei in imeis:
-                                    tosend.extend(backlog(imei, numback))
+                                    tosend.extend(
+                                        [
+                                            (clnt, msg)
+                                            for msg in backlog(imei, numback)
+                                        ]
+                                    )
                         towrite.add(sk)
                 elif fl & zmq.POLLOUT:
                     log.debug("Write now open for fd %d", sk)
@@ -384,9 +326,9 @@ def runserver(conf: ConfigParser) -> None:
             for fd in tostop:
                 poller.unregister(fd)  # type: ignore
                 clients.stop(fd)
-            for wsmsg in tosend:
-                log.debug("Sending to the clients: %s", wsmsg)
-                towrite |= clients.send(wsmsg)
+            for towhom, wsmsg in tosend:
+                log.debug("Sending to the client %s: %s", towhom, wsmsg)
+                towrite |= clients.send(towhom, wsmsg)
             for clntsock, clntaddr in topoll:
                 fd = clients.add(clntsock, clntaddr)
                 poller.register(fd, flags=zmq.POLLIN)