]> www.average.org Git - loctrkd.git/blob - gps303/googlemaps.py
typchecking: annotate googlemaps.py
[loctrkd.git] / gps303 / googlemaps.py
1 import googlemaps as gmaps
2 from typing import Any, Dict, List, Tuple
3
4 gclient = None
5
6
7 def init(conf: Dict[str, Any]) -> None:
8     global gclient
9     with open(conf["googlemaps"]["accesstoken"], encoding="ascii") as fl:
10         token = fl.read().rstrip()
11     gclient = gmaps.Client(key=token)
12
13
14 def lookup(
15     mcc: int,
16     mnc: int,
17     gsm_cells: List[Tuple[int, int, int]],
18     wifi_aps: List[Tuple[str, int]],
19 ) -> Tuple[float, float]:
20     assert gclient is not None
21     kwargs = {
22         "home_mobile_country_code": mcc,
23         "home_mobile_network_code": mnc,
24         "radio_type": "gsm",
25         "carrier": "O2",
26         "consider_ip": False,
27         "cell_towers": [
28             {
29                 "locationAreaCode": loc,
30                 "cellId": cellid,
31                 "signalStrength": sig,
32             }
33             for loc, cellid, sig in gsm_cells
34         ],
35         "wifi_access_points": [
36             {"macAddress": mac, "signalStrength": sig} for mac, sig in wifi_aps
37         ],
38     }
39     result = gclient.geolocate(**kwargs)
40     if "location" in result:
41         return result["location"]["lat"], result["location"]["lng"]
42     else:
43         raise ValueError("google geolocation: " + str(result))
44
45
46 if __name__.endswith("__main__"):
47     from datetime import datetime, timezone
48     from sqlite3 import connect
49     import sys
50     from .gps303proto import *
51
52     db = connect(sys.argv[1])
53     c = db.cursor()
54     c.execute(
55         """select tstamp, packet from events
56             where proto in (?, ?)""",
57         (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO),
58     )
59     init({"googlemaps": {"accesstoken": sys.argv[2]}})
60     count = 0
61     for timestamp, packet in c:
62         obj = parse_message(packet)
63         print(obj)
64         avlat, avlon = lookup(obj.mcc, obj.mnc, obj.gsm_cells, obj.wifi_aps)
65         print(
66             "{} {:+#010.8g},{:+#010.8g}".format(
67                 datetime.fromtimestamp(timestamp), avlat, avlon
68             )
69         )
70         count += 1
71         if count > 10:
72             break