]> www.average.org Git - loctrkd.git/blob - loctrkd/googlemaps.py
rename gps303 -> loctrkd
[loctrkd.git] / loctrkd / googlemaps.py
1 import googlemaps as gmaps
2 from typing import Any, Dict, List, Tuple
3
4 gclient = None
5
6
7 def init(conf: Dict[str, Any]) -> None:
8     global gclient
9     with open(conf["googlemaps"]["accesstoken"], encoding="ascii") as fl:
10         token = fl.read().rstrip()
11     gclient = gmaps.Client(key=token)
12
13
14 def shut() -> None:
15     return
16
17
18 def lookup(
19     mcc: int,
20     mnc: int,
21     gsm_cells: List[Tuple[int, int, int]],
22     wifi_aps: List[Tuple[str, int]],
23 ) -> Tuple[float, float]:
24     assert gclient is not None
25     kwargs = {
26         "home_mobile_country_code": mcc,
27         "home_mobile_network_code": mnc,
28         "radio_type": "gsm",
29         "carrier": "O2",
30         "consider_ip": False,
31         "cell_towers": [
32             {
33                 "locationAreaCode": loc,
34                 "cellId": cellid,
35                 "signalStrength": sig,
36             }
37             for loc, cellid, sig in gsm_cells
38         ],
39         "wifi_access_points": [
40             {"macAddress": mac, "signalStrength": sig} for mac, sig in wifi_aps
41         ],
42     }
43     result = gclient.geolocate(**kwargs)
44     if "location" in result:
45         return result["location"]["lat"], result["location"]["lng"]
46     else:
47         raise ValueError("google geolocation: " + str(result))
48
49
50 if __name__.endswith("__main__"):
51     from datetime import datetime, timezone
52     from sqlite3 import connect
53     import sys
54     from .zx303proto import *
55
56     db = connect(sys.argv[1])
57     c = db.cursor()
58     c.execute(
59         """select tstamp, packet from events
60             where proto in (?, ?)""",
61         (proto_name(WIFI_POSITIONING), proto_name(WIFI_OFFLINE_POSITIONING)),
62     )
63     init({"googlemaps": {"accesstoken": sys.argv[2]}})
64     count = 0
65     for timestamp, packet in c:
66         obj = parse_message(packet)
67         print(obj)
68         avlat, avlon = lookup(obj.mcc, obj.mnc, obj.gsm_cells, obj.wifi_aps)
69         print(
70             "{} {:+#010.8g},{:+#010.8g}".format(
71                 datetime.fromtimestamp(timestamp), avlat, avlon
72             )
73         )
74         count += 1
75         if count > 10:
76             break