]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/wsgateway.py
Implement sending commands from the web interface
[loctrkd.git] / loctrkd / wsgateway.py
index 522cd59269d8380596c0befb7f95dffc058fffd9..c5dcf5a785a03d13e8748bbfb441afec5b21492d 100644 (file)
@@ -2,6 +2,7 @@
 
 from configparser import ConfigParser
 from datetime import datetime, timezone
+from importlib import import_module
 from json import dumps, loads
 from logging import getLogger
 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
@@ -21,47 +22,22 @@ from wsproto.utilities import RemoteProtocolError
 import zmq
 
 from . import common
-from .evstore import initdb, fetch
-from .zx303proto import (
-    GPS_POSITIONING,
-    STATUS,
-    WIFI_POSITIONING,
-    parse_message,
-    proto_name,
-)
-from .zmsg import Bcast, topic
+from .evstore import initdb, fetch, fetchpmod
+from .protomodule import ProtoModule
+from .zmsg import Rept, Resp, rtopic
 
 log = getLogger("loctrkd/wsgateway")
+
 htmlfile = None
 
 
 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
     result = []
-    for is_incoming, timestamp, packet in fetch(
-        imei,
-        [
-            (True, proto_name(GPS_POSITIONING)),
-            (False, proto_name(WIFI_POSITIONING)),
-        ],
-        numback,
-    ):
-        msg = parse_message(packet, is_incoming=is_incoming)
-        result.append(
-            {
-                "type": "location",
-                "imei": imei,
-                "timestamp": str(
-                    datetime.fromtimestamp(timestamp).astimezone(
-                        tz=timezone.utc
-                    )
-                ),
-                "longitude": msg.longitude,
-                "latitude": msg.latitude,
-                "accuracy": "gps"
-                if isinstance(msg, GPS_POSITIONING)
-                else "approximate",
-            }
-        )
+    for report in fetch(imei, numback):
+        report["type"] = "location"
+        timestamp = report.pop("devtime")
+        report["timestamp"] = timestamp
+        result.append(report)
     return result
 
 
@@ -125,6 +101,9 @@ class Client:
         self.ready = False
         self.imeis: Set[str] = set()
 
+    def __str__(self) -> str:
+        return f"{self.__class__.__name__}(fd={self.sock.fileno()}, addr={self.addr})"
+
     def close(self) -> None:
         log.debug("Closing fd %d", self.sock.fileno())
         self.sock.close()
@@ -233,16 +212,28 @@ class Clients:
         clnt.close()
         del self.by_fd[fd]
 
-    def recv(self, fd: int) -> Optional[List[Dict[str, Any]]]:
+    def recv(self, fd: int) -> Tuple[Client, Optional[List[Dict[str, Any]]]]:
         clnt = self.by_fd[fd]
-        return clnt.recv()
+        return (clnt, clnt.recv())
 
-    def send(self, msg: Dict[str, Any]) -> Set[int]:
+    def send(self, clnt: Optional[Client], msg: Dict[str, Any]) -> Set[int]:
         towrite = set()
-        for fd, clnt in self.by_fd.items():
-            if clnt.wants(msg["imei"]):
+        if clnt is None:
+            for fd, cl in self.by_fd.items():
+                if cl.wants(msg["imei"]):
+                    cl.send(msg)
+                    towrite.add(fd)
+        else:
+            fd = clnt.sock.fileno()
+            if self.by_fd.get(fd, None) == clnt:
                 clnt.send(msg)
                 towrite.add(fd)
+            else:
+                log.info(
+                    "Trying to send %s to client at %d, not in service",
+                    msg,
+                    fd,
+                )
         return towrite
 
     def write(self, towrite: Set[int]) -> Set[int]:
@@ -259,15 +250,52 @@ class Clients:
         return result
 
 
+def sendcmd(zpush: Any, wsmsg: Dict[str, Any]) -> Dict[str, Any]:
+    imei = wsmsg.pop("imei", None)
+    cmd = wsmsg.pop("type", None)
+    if imei is None or cmd is None:
+        log.info("Unhandled message %s %s %s", cmd, imei, wsmsg)
+        return {
+            "type": "cmdresult",
+            "imei": imei,
+            "result": "Did not get imei or cmd",
+        }
+    pmod = fetchpmod(imei)
+    if pmod is None:
+        log.info("Uknown type of recipient for %s %s %s", cmd, imei, wsmsg)
+        return {
+            "type": "cmdresult",
+            "imei": imei,
+            "result": "Type of the terminal is unknown",
+        }
+    tmsg = common.make_response(pmod, cmd, imei, **wsmsg)
+    if tmsg is None:
+        log.info("Could not make packet for %s %s %s", cmd, imei, wsmsg)
+        return {
+            "type": "cmdresult",
+            "imei": imei,
+            "result": f"{cmd} unimplemented for terminal protocol {pmod}",
+        }
+    resp = Resp(imei=imei, when=time(), packet=tmsg.packed)
+    log.debug("Response: %s", resp)
+    zpush.send(resp.packed)
+    return {
+        "type": "cmdresult",
+        "imei": imei,
+        "result": f"{cmd} sent to {imei}",
+    }
+
+
 def runserver(conf: ConfigParser) -> None:
     global htmlfile
-
     initdb(conf.get("storage", "dbfn"))
     htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
     zsub = zctx.socket(zmq.SUB)  # type: ignore
-    zsub.connect(conf.get("collector", "publishurl"))
+    zsub.connect(conf.get("rectifier", "publishurl"))
+    zpush = zctx.socket(zmq.PUSH)  # type: ignore
+    zpush.connect(conf.get("collector", "listenurl"))
     tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setblocking(False)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
@@ -284,34 +312,12 @@ def runserver(conf: ConfigParser) -> None:
         while True:
             neededsubs = clients.subs()
             for imei in neededsubs - activesubs:
-                zsub.setsockopt(
-                    zmq.SUBSCRIBE,
-                    topic(proto_name(GPS_POSITIONING), True, imei),
-                )
-                zsub.setsockopt(
-                    zmq.SUBSCRIBE,
-                    topic(proto_name(WIFI_POSITIONING), False, imei),
-                )
-                zsub.setsockopt(
-                    zmq.SUBSCRIBE,
-                    topic(proto_name(STATUS), True, imei),
-                )
+                zsub.setsockopt(zmq.SUBSCRIBE, rtopic(imei))
             for imei in activesubs - neededsubs:
-                zsub.setsockopt(
-                    zmq.UNSUBSCRIBE,
-                    topic(proto_name(GPS_POSITIONING), True, imei),
-                )
-                zsub.setsockopt(
-                    zmq.UNSUBSCRIBE,
-                    topic(proto_name(WIFI_POSITIONING), False, imei),
-                )
-                zsub.setsockopt(
-                    zmq.UNSUBSCRIBE,
-                    topic(proto_name(STATUS), True, imei),
-                )
+                zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei))
             activesubs = neededsubs
             log.debug("Subscribed to: %s", activesubs)
-            tosend = []
+            tosend: List[Tuple[Optional[Client], Dict[str, Any]]] = []
             topoll = []
             tostop = []
             towrite = set()
@@ -320,46 +326,18 @@ def runserver(conf: ConfigParser) -> None:
                 if sk is zsub:
                     while True:
                         try:
-                            zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
-                            msg = parse_message(zmsg.packet, zmsg.is_incoming)
-                            log.debug("Got %s with %s", zmsg, msg)
-                            if isinstance(msg, STATUS):
-                                tosend.append(
-                                    {
-                                        "type": "status",
-                                        "imei": zmsg.imei,
-                                        "timestamp": str(
-                                            datetime.fromtimestamp(
-                                                zmsg.when
-                                            ).astimezone(tz=timezone.utc)
-                                        ),
-                                        "battery": msg.batt,
-                                    }
-                                )
-                            else:
-                                tosend.append(
-                                    {
-                                        "type": "location",
-                                        "imei": zmsg.imei,
-                                        "timestamp": str(
-                                            datetime.fromtimestamp(
-                                                zmsg.when
-                                            ).astimezone(tz=timezone.utc)
-                                        ),
-                                        "longitude": msg.longitude,
-                                        "latitude": msg.latitude,
-                                        "accuracy": "gps"
-                                        if zmsg.is_incoming
-                                        else "approximate",
-                                    }
-                                )
+                            zmsg = Rept(zsub.recv(zmq.NOBLOCK))
+                            msg = loads(zmsg.payload)
+                            msg["imei"] = zmsg.imei
+                            log.debug("Got %s, sending %s", zmsg, msg)
+                            tosend.append((None, msg))
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
                     clntsock, clntaddr = tcpl.accept()
                     topoll.append((clntsock, clntaddr))
                 elif fl & zmq.POLLIN:
-                    received = clients.recv(sk)
+                    clnt, received = clients.recv(sk)
                     if received is None:
                         log.debug("Client gone from fd %d", sk)
                         tostop.append(sk)
@@ -372,7 +350,14 @@ def runserver(conf: ConfigParser) -> None:
                                 imeis = cast(List[str], wsmsg.get("imei"))
                                 numback: int = wsmsg.get("backlog", 5)
                                 for imei in imeis:
-                                    tosend.extend(backlog(imei, numback))
+                                    tosend.extend(
+                                        [
+                                            (clnt, msg)
+                                            for msg in backlog(imei, numback)
+                                        ]
+                                    )
+                            else:
+                                tosend.append((clnt, sendcmd(zpush, wsmsg)))
                         towrite.add(sk)
                 elif fl & zmq.POLLOUT:
                     log.debug("Write now open for fd %d", sk)
@@ -384,9 +369,9 @@ def runserver(conf: ConfigParser) -> None:
             for fd in tostop:
                 poller.unregister(fd)  # type: ignore
                 clients.stop(fd)
-            for wsmsg in tosend:
-                log.debug("Sending to the clients: %s", wsmsg)
-                towrite |= clients.send(wsmsg)
+            for towhom, wsmsg in tosend:
+                log.debug("Sending to the client %s: %s", towhom, wsmsg)
+                towrite |= clients.send(towhom, wsmsg)
             for clntsock, clntaddr in topoll:
                 fd = clients.add(clntsock, clntaddr)
                 poller.register(fd, flags=zmq.POLLIN)