X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=loctrkd%2Fgooglemaps.py;h=b84b64d18bdb1e19e4ca668f631dccd7ba051c26;hb=refs%2Fheads%2Fmaster;hp=418523d45eb9581dce89d07664c48a5eeb775926;hpb=dbdf9d63af31770ad57302e16b17a2fdc526773f;p=loctrkd.git diff --git a/loctrkd/googlemaps.py b/loctrkd/googlemaps.py index 418523d..b84b64d 100644 --- a/loctrkd/googlemaps.py +++ b/loctrkd/googlemaps.py @@ -1,10 +1,15 @@ +""" +Google Maps location service lookaside backend +""" + +from configparser import ConfigParser import googlemaps as gmaps -from typing import Any, Dict, List, Tuple +from typing import Any, Callable, Dict, List, Tuple gclient = None -def init(conf: Dict[str, Any]) -> None: +def init(conf: ConfigParser) -> None: global gclient with open(conf["googlemaps"]["accesstoken"], encoding="ascii") as fl: token = fl.read().rstrip() @@ -15,12 +20,12 @@ def shut() -> None: return -def lookup( +def _lookup( mcc: int, mnc: int, gsm_cells: List[Tuple[int, int, int]], wifi_aps: List[Tuple[str, int]], -) -> Tuple[float, float]: +) -> Any: assert gclient is not None kwargs = { "home_mobile_country_code": mcc, @@ -40,37 +45,51 @@ def lookup( {"macAddress": mac, "signalStrength": sig} for mac, sig in wifi_aps ], } - result = gclient.geolocate(**kwargs) + return gclient.geolocate(**kwargs) + + +def lookup( + mcc: int, + mnc: int, + gsm_cells: List[Tuple[int, int, int]], + wifi_aps: List[Tuple[str, int]], +) -> Tuple[float, float, float]: + result = _lookup(mcc, mnc, gsm_cells, wifi_aps) if "location" in result: - return result["location"]["lat"], result["location"]["lng"] + return ( + result["location"]["lat"], + result["location"]["lng"], + result["accuracy"], + ) else: raise ValueError("google geolocation: " + str(result)) if __name__.endswith("__main__"): - from datetime import datetime, timezone - from sqlite3 import connect - import sys - from .zx303proto import * + from getopt import getopt + from json import loads + from logging import getLogger + from sys import argv + from . import common - db = connect(sys.argv[1]) - c = db.cursor() - c.execute( - """select tstamp, packet from events - where proto in (?, ?)""", - (proto_name(WIFI_POSITIONING), proto_name(WIFI_OFFLINE_POSITIONING)), - ) - init({"googlemaps": {"accesstoken": sys.argv[2]}}) - count = 0 - for timestamp, packet in c: - obj = parse_message(packet) - print(obj) - avlat, avlon = lookup(obj.mcc, obj.mnc, obj.gsm_cells, obj.wifi_aps) - print( - "{} {:+#010.8g},{:+#010.8g}".format( - datetime.fromtimestamp(timestamp), avlat, avlon - ) - ) - count += 1 - if count > 10: - break + def cell_list(s: str) -> List[Tuple[int, int, int]]: + return [(int(ac), int(ci), int(sg)) for [ac, ci, sg] in loads(s)] + + def ap_list(s: str) -> List[Tuple[str, int]]: + return [(mac, int(sg)) for [mac, sg] in loads(s)] + + log = getLogger("loctrkd/googlemaps") + opts, args = getopt(argv[1:], "c:d") + conf = common.init(log, opts=opts) + init(conf) + parms = {} + needed: Dict[str, Callable[[Any], Any]] = { + "mcc": int, + "mnc": int, + "gsm_cells": cell_list, + "wifi_aps": ap_list, + } + parms = {k: needed.pop(k)(v) for k, v in [arg.split("=") for arg in args]} + if needed: + raise ValueError(f"still needed: {needed}") + print(_lookup(**parms))