]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/googlemaps.py
Update changelog for 2.00 release
[loctrkd.git] / loctrkd / googlemaps.py
index 881a2bd0500f95e9b567c2436e77292b3e4c8c42..b84b64d18bdb1e19e4ca668f631dccd7ba051c26 100644 (file)
@@ -1,10 +1,15 @@
+"""
+Google Maps location service lookaside backend
+"""
+
+from configparser import ConfigParser
 import googlemaps as gmaps
-from typing import Any, Dict, List, Tuple
+from typing import Any, Callable, Dict, List, Tuple
 
 gclient = None
 
 
-def init(conf: Dict[str, Any]) -> None:
+def init(conf: ConfigParser) -> None:
     global gclient
     with open(conf["googlemaps"]["accesstoken"], encoding="ascii") as fl:
         token = fl.read().rstrip()
@@ -15,12 +20,12 @@ def shut() -> None:
     return
 
 
-def lookup(
+def _lookup(
     mcc: int,
     mnc: int,
     gsm_cells: List[Tuple[int, int, int]],
     wifi_aps: List[Tuple[str, int]],
-) -> Tuple[float, float]:
+) -> Any:
     assert gclient is not None
     kwargs = {
         "home_mobile_country_code": mcc,
@@ -40,38 +45,51 @@ def lookup(
             {"macAddress": mac, "signalStrength": sig} for mac, sig in wifi_aps
         ],
     }
-    result = gclient.geolocate(**kwargs)
+    return gclient.geolocate(**kwargs)
+
+
+def lookup(
+    mcc: int,
+    mnc: int,
+    gsm_cells: List[Tuple[int, int, int]],
+    wifi_aps: List[Tuple[str, int]],
+) -> Tuple[float, float, float]:
+    result = _lookup(mcc, mnc, gsm_cells, wifi_aps)
     if "location" in result:
-        return result["location"]["lat"], result["location"]["lng"]
+        return (
+            result["location"]["lat"],
+            result["location"]["lng"],
+            result["accuracy"],
+        )
     else:
         raise ValueError("google geolocation: " + str(result))
 
 
 if __name__.endswith("__main__"):
-    from datetime import datetime, timezone
-    from sqlite3 import connect
-    import sys
-    from .zx303proto import *
-    from .zx303proto import WIFI_POSITIONING, WIFI_OFFLINE_POSITIONING
+    from getopt import getopt
+    from json import loads
+    from logging import getLogger
+    from sys import argv
+    from . import common
 
-    db = connect(sys.argv[1])
-    c = db.cursor()
-    c.execute(
-        """select tstamp, packet from events
-            where proto in (?, ?)""",
-        (proto_name(WIFI_POSITIONING), proto_name(WIFI_OFFLINE_POSITIONING)),
-    )
-    init({"googlemaps": {"accesstoken": sys.argv[2]}})
-    count = 0
-    for timestamp, packet in c:
-        obj = parse_message(packet)
-        print(obj)
-        avlat, avlon = lookup(obj.mcc, obj.mnc, obj.gsm_cells, obj.wifi_aps)
-        print(
-            "{} {:+#010.8g},{:+#010.8g}".format(
-                datetime.fromtimestamp(timestamp), avlat, avlon
-            )
-        )
-        count += 1
-        if count > 10:
-            break
+    def cell_list(s: str) -> List[Tuple[int, int, int]]:
+        return [(int(ac), int(ci), int(sg)) for [ac, ci, sg] in loads(s)]
+
+    def ap_list(s: str) -> List[Tuple[str, int]]:
+        return [(mac, int(sg)) for [mac, sg] in loads(s)]
+
+    log = getLogger("loctrkd/googlemaps")
+    opts, args = getopt(argv[1:], "c:d")
+    conf = common.init(log, opts=opts)
+    init(conf)
+    parms = {}
+    needed: Dict[str, Callable[[Any], Any]] = {
+        "mcc": int,
+        "mnc": int,
+        "gsm_cells": cell_list,
+        "wifi_aps": ap_list,
+    }
+    parms = {k: needed.pop(k)(v) for k, v in [arg.split("=") for arg in args]}
+    if needed:
+        raise ValueError(f"still needed: {needed}")
+    print(_lookup(**parms))