]> www.average.org Git - loctrkd.git/commitdiff
Work with cell location data; use opencellid
authorEugene Crosser <crosser@average.org>
Wed, 16 Mar 2022 22:47:51 +0000 (23:47 +0100)
committerEugene Crosser <crosser@average.org>
Wed, 16 Mar 2022 23:02:09 +0000 (00:02 +0100)
gps303/GT06mod.py
gps303/__main__.py
gps303/evstore.py
gps303/opencellid.py [new file with mode: 0644]
gps303/qry.py

index aa8246facd3823946cb6aaef89df746ab8d345d4..c3918c754b0e5350ad1601f9f66326b2a9758180 100755 (executable)
@@ -3,7 +3,7 @@ Implementation of the protocol used by zx303 GPS+GPRS module
 Description from https://github.com/tobadia/petGPS/tree/master/resources
 """
 
 Description from https://github.com/tobadia/petGPS/tree/master/resources
 """
 
-from datetime import datetime
+from datetime import datetime, timezone
 from inspect import isclass
 from logging import getLogger
 from struct import pack, unpack
 from inspect import isclass
 from logging import getLogger
 from struct import pack, unpack
@@ -67,8 +67,8 @@ class _GT06pkt:
         )
 
     @classmethod
         )
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        return cls(proto=proto, payload=payload)
+    def from_packet(cls, length, proto, payload):
+        return cls(proto=proto, payload=payload, length=length)
 
     def response(self, *args):
         if len(args) == 0:
 
     def response(self, *args):
         if len(args) == 0:
@@ -89,8 +89,8 @@ class LOGIN(_GT06pkt):
     PROTO = 0x01
 
     @classmethod
     PROTO = 0x01
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         self.imei = payload[:-1].hex()
         self.ver = unpack("B", payload[-1:])[0]
         return self
         self.imei = payload[:-1].hex()
         self.ver = unpack("B", payload[-1:])[0]
         return self
@@ -109,10 +109,26 @@ class HEARTBEAT(_GT06pkt):
 
 class _GPS_POSITIONING(_GT06pkt):
     @classmethod
 
 class _GPS_POSITIONING(_GT06pkt):
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         self.dtime = payload[:6]
         self.dtime = payload[:6]
-        # TODO parse the rest
+        if self.dtime == b"\0\0\0\0\0\0":
+            self.devtime = None
+        else:
+            self.devtime = datetime(
+                *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
+            )
+        self.gps_data_length = payload[6] >> 4
+        self.gps_nb_sat = payload[6] & 0x0F
+        lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
+        self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
+        flip_lon = bool(flags & 0b0000100000000000)  # bit 4
+        flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
+        self.heading = flags & 0b0000001111111111  # bits 6 - last
+        self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
+        self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
+        self.speed = speed
+        self.flags = flags
         return self
 
     def response(self):
         return self
 
     def response(self):
@@ -131,8 +147,8 @@ class STATUS(_GT06pkt):
     PROTO = 0x13
 
     @classmethod
     PROTO = 0x13
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         if len(payload) == 5:
             self.batt, self.ver, self.intvl, self.signal, _ = unpack(
                 "BBBBB", payload
         if len(payload) == 5:
             self.batt, self.ver, self.intvl, self.signal, _ = unpack(
                 "BBBBB", payload
@@ -155,9 +171,41 @@ class WHITELIST_TOTAL(_GT06pkt):
     PROTO = 0x16
 
 
     PROTO = 0x16
 
 
-class WIFI_OFFLINE_POSITIONING(_GT06pkt):
+class _WIFI_POSITIONING(_GT06pkt):
+    @classmethod
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
+        self.dtime = payload[:6]
+        if self.dtime == b"\0\0\0\0\0\0":
+            self.devtime = None
+        else:
+            self.devtime = datetime.strptime(
+                self.dtime.hex(), "%y%m%d%H%M%S"
+            ).astimezone(tz=timezone.utc)
+        self.wifi_aps = []
+        for i in range(self.length):  # length has special meaning here
+            slice = payload[6 + i * 7 : 13 + i * 7]
+            self.wifi_aps.append(
+                (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
+            )
+        gsm_slice = payload[6 + self.length * 7 :]
+        ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
+        self.gsm_cells = []
+        for i in range(ncells):
+            slice = gsm_slice[4 + i * 5 : 9 + i * 5]
+            locac, cellid, sigstr = unpack(
+                "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
+            )
+            self.gsm_cells.append((locac, cellid, -sigstr))
+        return self
+
+
+class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
     PROTO = 0x17
 
+    def response(self):
+        return super().response(self.dtime)
+
 
 class TIME(_GT06pkt):
     PROTO = 0x30
 
 class TIME(_GT06pkt):
     PROTO = 0x30
@@ -221,9 +269,13 @@ class RESTORE_PASSWORD(_GT06pkt):
     PROTO = 0x67
 
 
     PROTO = 0x67
 
 
-class WIFI_POSITIONING(_GT06pkt):
+class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
 
     PROTO = 0x69
 
+    def response(self):
+        payload = b""  # TODO fill payload
+        return super().response(payload)
+
 
 class MANUAL_POSITIONING(_GT06pkt):
     PROTO = 0x80
 
 class MANUAL_POSITIONING(_GT06pkt):
     PROTO = 0x80
@@ -249,8 +301,8 @@ class POSITION_UPLOAD_INTERVAL(_GT06pkt):
     PROTO = 0x98
 
     @classmethod
     PROTO = 0x98
 
     @classmethod
-    def from_packet(cls, proto, payload):
-        self = super().from_packet(proto, payload)
+    def from_packet(cls, length, proto, payload):
+        self = super().from_packet(length, proto, payload)
         self.interval = unpack("!H", payload[:2])
         return self
 
         self.interval = unpack("!H", payload[:2])
         return self
 
@@ -271,28 +323,39 @@ if True:  # just to indent the code, sorry!
         if hasattr(cls, "PROTO"):
             CLASSES[cls.PROTO] = cls
 
         if hasattr(cls, "PROTO"):
             CLASSES[cls.PROTO] = cls
 
-def make_object(proto, payload):
+
+def make_object(length, proto, payload):
     if proto in CLASSES:
     if proto in CLASSES:
-        return CLASSES[proto].from_packet(proto, payload)
+        return CLASSES[proto].from_packet(length, proto, payload)
     else:
     else:
-        return UNKNOWN.from_packet(proto, payload)
+        return UNKNOWN.from_packet(length, proto, payload)
+
 
 def handle_packet(packet, addr, when):
     if len(packet) < 6:
 
 def handle_packet(packet, addr, when):
     if len(packet) < 6:
-        return UNKNOWN.from_packet(0, packet)
+        return UNKNOWN.from_packet(0, 0, packet)
     else:
         xx, length, proto = unpack("!2sBB", packet[:4])
         crlf = packet[-2:]
         payload = packet[4:-2]
         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
     else:
         xx, length, proto = unpack("!2sBB", packet[:4])
         crlf = packet[-2:]
         payload = packet[4:-2]
         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
-        if length > 1 and len(payload) + adjust != length:
+        if (
+            proto
+            not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
+            and length > 1
+            and len(payload) + adjust != length
+        ):
             log.warning(
             log.warning(
-                "length is %d but payload length is %d", length, len(payload)
+                "With proto %d length is %d but payload length is %d+%d",
+                proto,
+                length,
+                len(payload),
+                adjust,
             )
         if xx != b"xx" or crlf != b"\r\n":
             )
         if xx != b"xx" or crlf != b"\r\n":
-            return UNKNOWN.from_packet(proto, packet)  # full packet as payload
+            return UNKNOWN.from_packet(length, proto, packet)  # full packet
         else:
         else:
-            return make_object(proto, payload)
+            return make_object(length, proto, payload)
 
 
 def make_response(msg):
 
 
 def make_response(msg):
index ef03c61eff92d9ef31dcdefba8f6920025dcb696..665615d81064d0ddde11bfe292d778f7c976f0f9 100755 (executable)
@@ -69,11 +69,18 @@ if __name__.endswith("__main__"):
                     when = time()
                     if packet:
                         msg = handle_packet(packet, clntaddr, when)
                     when = time()
                     if packet:
                         msg = handle_packet(packet, clntaddr, when)
-                        log.debug("%s from %s fd %d'", msg, clntaddr, fd)
+                        log.debug("%s from %s fd %d", msg, clntaddr, fd)
                         if isinstance(msg, LOGIN):
                             imei = msg.imei
                             clnt_dict[fd] = (clntsock, clntaddr, imei)
                         if isinstance(msg, LOGIN):
                             imei = msg.imei
                             clnt_dict[fd] = (clntsock, clntaddr, imei)
-                        stow(clntaddr, when, imei, msg.proto, msg.payload)
+                        stow(
+                            clntaddr,
+                            when,
+                            imei,
+                            msg.length,
+                            msg.proto,
+                            msg.payload,
+                        )
                         response = make_response(msg)
                         if response:
                             try:
                         response = make_response(msg)
                         if response:
                             try:
index 360ab359505c7a141cdf6a5857d7dbcc14c86b53..9bc60d5594a1f7656360fff741cbe1470ed346ab 100644 (file)
@@ -11,6 +11,7 @@ SCHEMA = """create table if not exists events (
     timestamp real not null,
     imei text,
     clntaddr text not null,
     timestamp real not null,
     imei text,
     clntaddr text not null,
+    length int,
     proto int not null,
     payload blob
 )"""
     proto int not null,
     payload blob
 )"""
@@ -18,23 +19,25 @@ SCHEMA = """create table if not exists events (
 
 def initdb(dbname):
     global DB
 
 def initdb(dbname):
     global DB
-    log.info("Using Sqlite3 database \"%s\"", dbname)
+    log.info('Using Sqlite3 database "%s"', dbname)
     DB = connect(dbname)
     DB.execute(SCHEMA)
 
 
     DB = connect(dbname)
     DB.execute(SCHEMA)
 
 
-def stow(clntaddr, timestamp, imei, proto, payload):
+def stow(clntaddr, timestamp, imei, length, proto, payload):
     assert DB is not None
     parms = dict(
         zip(
     assert DB is not None
     parms = dict(
         zip(
-            ("clntaddr", "timestamp", "imei", "proto", "payload"),
-            (str(clntaddr), timestamp, imei, proto, payload),
+            ("clntaddr", "timestamp", "imei", "length", "proto", "payload"),
+            (str(clntaddr), timestamp, imei, length, proto, payload),
         )
     )
     DB.execute(
         """insert or ignore into events
         )
     )
     DB.execute(
         """insert or ignore into events
-                (timestamp, imei, clntaddr, proto, payload)
-                values (:timestamp, :imei, :clntaddr, :proto, :payload)""",
+                (timestamp, imei, clntaddr, length, proto, payload)
+                values
+                (:timestamp, :imei, :clntaddr, :length, :proto, :payload)
+        """,
         parms,
     )
     DB.commit()
         parms,
     )
     DB.commit()
diff --git a/gps303/opencellid.py b/gps303/opencellid.py
new file mode 100644 (file)
index 0000000..5527a19
--- /dev/null
@@ -0,0 +1,56 @@
+"""
+Download csv for your carrier and your area from https://opencellid.org/
+$ sqlite3 <cell-database-file>
+sqlite> create table if not exists cells (
+  "radio" text,
+  "mcc" int,
+  "net" int,
+  "area" int,
+  "cell" int,
+  "unit" int,
+  "lon" int,
+  "lat" int,
+  "range" int,
+  "samples" int,
+  "changeable" int,
+  "created" int,
+  "updated" int,
+  "averageSignal" int
+);
+sqlite> .mode csv
+sqlite> .import <downloaded-file.csv> cells
+sqlite> create index if not exists cell_idx on cells (mcc, area, cell);
+"""
+
+from datetime import datetime, timezone
+from pprint import pprint
+from sqlite3 import connect
+import sys
+
+from .GT06mod import *
+
+db = connect(sys.argv[1])
+ldb = connect(sys.argv[2])
+lc = ldb.cursor()
+c = db.cursor()
+c.execute(
+    """select timestamp, imei, clntaddr, length, proto, payload from events
+        where proto in (?, ?)""",
+    (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO),
+)
+for timestamp, imei, clntaddr, length, proto, payload in c:
+    obj = make_object(length, proto, payload)
+    qry = """select lat, lon from cells
+             where mcc = {} and (area, cell) in ({})""".format(
+        obj.mcc,
+        ", ".join(
+            [
+                "({}, {})".format(locac, cellid)
+                for locac, cellid, _ in obj.gsm_cells
+            ]
+        ),
+    )
+    print(qry)
+    lc.execute(qry)
+    for lat, lon in lc:
+        print(lat, lon)
index 2d4889921334e452226ec09faa04826fcc66ec6a..2a8e266351d5a4c6932d7b16f61a59fe20ad45e8 100644 (file)
@@ -1,4 +1,4 @@
-from datetime import datetime
+from datetime import datetime, timezone
 from sqlite3 import connect
 import sys
 
 from sqlite3 import connect
 import sys
 
@@ -6,7 +6,13 @@ from .GT06mod import *
 
 db = connect(sys.argv[1])
 c = db.cursor()
 
 db = connect(sys.argv[1])
 c = db.cursor()
-c.execute("select timestamp, imei, clntaddr, proto, payload from events")
-for timestamp, imei, clntaddr, proto, payload in c:
-    print(datetime.fromtimestamp(timestamp).isoformat(),
-            make_object(proto, payload))
+c.execute(
+    "select timestamp, imei, clntaddr, length, proto, payload from events"
+)
+for timestamp, imei, clntaddr, length, proto, payload in c:
+    print(
+        datetime.fromtimestamp(timestamp)
+        .astimezone(tz=timezone.utc)
+        .isoformat(),
+        make_object(length, proto, payload),
+    )