]> www.average.org Git - loctrkd.git/commitdiff
wsgateway: switch to the use of cooked reports
authorEugene Crosser <crosser@average.org>
Tue, 2 Aug 2022 22:37:06 +0000 (00:37 +0200)
committerEugene Crosser <crosser@average.org>
Tue, 2 Aug 2022 22:37:06 +0000 (00:37 +0200)
loctrkd/evstore.py
loctrkd/wsgateway.py
loctrkd/zmsg.py

index b0265055dec85cd55785f626e6ae690aeb1599f5..85e8c9d25767de36fbf4e30444c9caa6f7bd6d5c 100644 (file)
@@ -1,8 +1,8 @@
 """ sqlite event store """
 
 from datetime import datetime
 """ sqlite event store """
 
 from datetime import datetime
-from json import dumps
-from sqlite3 import connect, OperationalError
+from json import dumps, loads
+from sqlite3 import connect, OperationalError, Row
 from typing import Any, Dict, List, Tuple
 
 __all__ = "fetch", "initdb", "stow", "stowloc"
 from typing import Any, Dict, List, Tuple
 
 __all__ = "fetch", "initdb", "stow", "stowloc"
@@ -32,14 +32,9 @@ SCHEMA = (
 def initdb(dbname: str) -> None:
     global DB
     DB = connect(dbname)
 def initdb(dbname: str) -> None:
     global DB
     DB = connect(dbname)
-    try:
-        DB.execute(
-            """alter table events add column
-                is_incoming int not null default TRUE"""
-        )
-    except OperationalError:
-        for stmt in SCHEMA:
-            DB.execute(stmt)
+    DB.row_factory = Row
+    for stmt in SCHEMA:
+        DB.execute(stmt)
 
 
 def stow(**kwargs: Any) -> None:
 
 
 def stow(**kwargs: Any) -> None:
@@ -91,23 +86,20 @@ def stowloc(**kwargs: Dict[str, Any]) -> None:
     DB.commit()
 
 
     DB.commit()
 
 
-def fetch(
-    imei: str, matchlist: List[Tuple[bool, str]], backlog: int
-) -> List[Tuple[bool, float, str, bytes]]:
-    # matchlist is a list of tuples (is_incoming, proto)
-    # returns a list of tuples (is_incoming, timestamp, packet)
+def fetch(imei: str, backlog: int) -> List[Dict[str, Any]]:
     assert DB is not None
     assert DB is not None
-    selector = " or ".join(
-        (f"(is_incoming = ? and proto = ?)" for _ in range(len(matchlist)))
-    )
     cur = DB.cursor()
     cur.execute(
     cur = DB.cursor()
     cur.execute(
-        f"""select is_incoming, tstamp, proto, packet from events
-                    where ({selector}) and imei = ?
-                    order by tstamp desc limit ?""",
-        tuple(item for sublist in matchlist for item in sublist)
-        + (imei, backlog),
+        """select imei, devtime, accuracy, latitude, longitude, remainder
+                    from reports where imei = ?
+                    order by devtime desc limit ?""",
+        (imei, backlog),
     )
     )
-    result = list(cur)
+    result = []
+    for row in cur:
+        dic = dict(row)
+        remainder = loads(dic.pop("remainder"))
+        dic.update(remainder)
+        result.append(dic)
     cur.close()
     return list(reversed(result))
     cur.close()
     return list(reversed(result))
index b6d10e84798f89e7b36e322074f9afb115d50489..e568584054dd571e0a83c04039a0f2cbfeeaa828 100644 (file)
@@ -24,42 +24,20 @@ import zmq
 from . import common
 from .evstore import initdb, fetch
 from .protomodule import ProtoModule
 from . import common
 from .evstore import initdb, fetch
 from .protomodule import ProtoModule
-from .zmsg import Bcast, topic
+from .zmsg import Rept, rtopic
 
 log = getLogger("loctrkd/wsgateway")
 
 
 log = getLogger("loctrkd/wsgateway")
 
-
 htmlfile = None
 htmlfile = None
-pmods: List[ProtoModule] = []
-selector: List[Tuple[bool, str]] = []
 
 
 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
     result = []
 
 
 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
     result = []
-    for is_incoming, timestamp, proto, packet in fetch(
-        imei,
-        selector,
-        numback,
-    ):
-        for pmod in pmods:
-            if pmod.proto_handled(proto):
-                msg = pmod.parse_message(packet, is_incoming=is_incoming)
-        result.append(
-            {
-                "type": "location",
-                "imei": imei,
-                "timestamp": str(
-                    datetime.fromtimestamp(timestamp).astimezone(
-                        tz=timezone.utc
-                    )
-                ),
-                "longitude": msg.longitude,
-                "latitude": msg.latitude,
-                "accuracy": "gps"
-                if True  # TODO isinstance(msg, GPS_POSITIONING)
-                else "approximate",
-            }
-        )
+    for report in fetch(imei, numback):
+        report["type"] = "location"
+        timestamp = report.pop("devtime")
+        report["timestamp"] = timestamp
+        result.append(report)
     return result
 
 
     return result
 
 
@@ -258,21 +236,13 @@ class Clients:
 
 
 def runserver(conf: ConfigParser) -> None:
 
 
 def runserver(conf: ConfigParser) -> None:
-    global htmlfile, pmods, selector
-    pmods = [
-        cast(ProtoModule, import_module("." + modnm, __package__))
-        for modnm in conf.get("common", "protocols").split(",")
-    ]
-    for pmod in pmods:
-        for proto, is_incoming in pmod.exposed_protos():
-            if proto != "ZX:STATUS":  # TODO make it better
-                selector.append((is_incoming, proto))
+    global htmlfile
     initdb(conf.get("storage", "dbfn"))
     htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
     zsub = zctx.socket(zmq.SUB)  # type: ignore
     initdb(conf.get("storage", "dbfn"))
     htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
     zsub = zctx.socket(zmq.SUB)  # type: ignore
-    zsub.connect(conf.get("collector", "publishurl"))
+    zsub.connect(conf.get("rectifier", "publishurl"))
     tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setblocking(False)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
     tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setblocking(False)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
@@ -288,18 +258,10 @@ def runserver(conf: ConfigParser) -> None:
         towait: Set[int] = set()
         while True:
             neededsubs = clients.subs()
         towait: Set[int] = set()
         while True:
             neededsubs = clients.subs()
-            for pmod in pmods:
-                for proto, is_incoming in pmod.exposed_protos():
-                    for imei in neededsubs - activesubs:
-                        zsub.setsockopt(
-                            zmq.SUBSCRIBE,
-                            topic(proto, is_incoming, imei),
-                        )
-                    for imei in activesubs - neededsubs:
-                        zsub.setsockopt(
-                            zmq.UNSUBSCRIBE,
-                            topic(proto, is_incoming, imei),
-                        )
+            for imei in neededsubs - activesubs:
+                zsub.setsockopt(zmq.SUBSCRIBE, rtopic(imei))
+            for imei in activesubs - neededsubs:
+                zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei))
             activesubs = neededsubs
             log.debug("Subscribed to: %s", activesubs)
             tosend = []
             activesubs = neededsubs
             log.debug("Subscribed to: %s", activesubs)
             tosend = []
@@ -311,43 +273,11 @@ def runserver(conf: ConfigParser) -> None:
                 if sk is zsub:
                     while True:
                         try:
                 if sk is zsub:
                     while True:
                         try:
-                            zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
-                            for pmod in pmods:
-                                if pmod.proto_handled(zmsg.proto):
-                                    msg = pmod.parse_message(
-                                        zmsg.packet, zmsg.is_incoming
-                                    )
-                            log.debug("Got %s with %s", zmsg, msg)
-                            if zmsg.proto == "ZX:STATUS":
-                                tosend.append(
-                                    {
-                                        "type": "status",
-                                        "imei": zmsg.imei,
-                                        "timestamp": str(
-                                            datetime.fromtimestamp(
-                                                zmsg.when
-                                            ).astimezone(tz=timezone.utc)
-                                        ),
-                                        "battery": msg.batt,
-                                    }
-                                )
-                            else:
-                                tosend.append(
-                                    {
-                                        "type": "location",
-                                        "imei": zmsg.imei,
-                                        "timestamp": str(
-                                            datetime.fromtimestamp(
-                                                zmsg.when
-                                            ).astimezone(tz=timezone.utc)
-                                        ),
-                                        "longitude": msg.longitude,
-                                        "latitude": msg.latitude,
-                                        "accuracy": "gps"
-                                        if zmsg.is_incoming
-                                        else "approximate",
-                                    }
-                                )
+                            zmsg = Rept(zsub.recv(zmq.NOBLOCK))
+                            msg = loads(zmsg.payload)
+                            msg["imei"] = zmsg.imei
+                            log.debug("Got %s, sending %s", zmsg, msg)
+                            tosend.append(msg)
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
index 4da88d2f6754e7e78db97ef8ecc9e989dcd1475c..da0dc77e39da6f38d7eaefe5adbdfe897f928b28 100644 (file)
@@ -4,7 +4,7 @@ import ipaddress as ip
 from struct import pack, unpack
 from typing import Any, cast, Optional, Tuple, Type, Union
 
 from struct import pack, unpack
 from typing import Any, cast, Optional, Tuple, Type, Union
 
-__all__ = "Bcast", "Resp", "topic"
+__all__ = "Bcast", "Resp", "topic", "rtopic"
 
 
 def pack_peer(  # 18 bytes
 
 
 def pack_peer(  # 18 bytes
@@ -100,6 +100,10 @@ def topic(
     )
 
 
     )
 
 
+def rtopic(imei: str) -> bytes:
+    return pack("16s", imei.encode())
+
+
 class Bcast(_Zmsg):
     """Zmq message to broadcast what was received from the terminal"""
 
 class Bcast(_Zmsg):
     """Zmq message to broadcast what was received from the terminal"""