]> www.average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
test: include lookaside and termconfig in the loop
[loctrkd.git] / gps303 / gps303proto.py
index 0d02b082c18688777aa9fa4a7bbbbc0ae3727276..0a222d05634930679995e6e4807d939d35104346 100755 (executable)
@@ -85,8 +85,8 @@ class DecodeError(Exception):
             setattr(self, k, v)
 
 
-def maybe_int(x: Optional[int]) -> Optional[int]:
-    return None if x is None else int(x)
+def maybe(typ: type) -> Callable[[Any], Any]:
+    return lambda x: None if x is None else typ(x)
 
 
 def intx(x: Union[str, int]) -> int:
@@ -303,7 +303,7 @@ class GPS303Pkt(metaclass=MetaPkt):
     @property
     def packed(self) -> bytes:
         payload = self.encode()
-        length = len(payload) + 1
+        length = getattr(self, "length", len(payload) + 1)
         return pack("BB", length, self.PROTO) + payload
 
 
@@ -389,7 +389,7 @@ class STATUS(GPS303Pkt):
         ("ver", int, 0),
         ("timezone", int, 0),
         ("intvl", int, 0),
-        ("signal", maybe_int, None),
+        ("signal", maybe(int), None),
     )
     OUT_KWARGS = (("upload_interval", int, 25),)
 
@@ -403,10 +403,8 @@ class STATUS(GPS303Pkt):
             self.signal = None
 
     def in_encode(self) -> bytes:
-        return (
-            pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + b""
-            if self.signal is None
-            else pack("B", self.signal)
+        return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
+            b"" if self.signal is None else pack("B", self.signal)
         )
 
     def out_encode(self) -> bytes:  # Set interval in minutes
@@ -435,6 +433,15 @@ class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
 
 
 class _WIFI_POSITIONING(GPS303Pkt):
+    IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
+        # IN_KWARGS = (
+        ("dtime", bytes, b"\0\0\0\0\0\0"),
+        ("wifi_aps", list, []),
+        ("mcc", int, 0),
+        ("mnc", int, 0),
+        ("gsm_cells", list, []),
+    )
+
     def in_decode(self, length: int, payload: bytes) -> None:
         self.dtime = payload[:6]
         if self.dtime == b"\0\0\0\0\0\0":
@@ -459,6 +466,28 @@ class _WIFI_POSITIONING(GPS303Pkt):
             )
             self.gsm_cells.append((locac, cellid, -sigstr))
 
+    def in_encode(self) -> bytes:
+        self.length = len(self.wifi_aps)
+        return b"".join(
+            [
+                self.dtime,
+                b"".join(
+                    [
+                        bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
+                        + pack("B", -sigstr)
+                        for mac, sigstr in self.wifi_aps
+                    ]
+                ),
+                pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
+                b"".join(
+                    [
+                        pack("!HHB", locac, cellid, -sigstr)
+                        for locac, cellid, sigstr in self.gsm_cells
+                    ]
+                ),
+            ]
+        )
+
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
@@ -655,6 +684,9 @@ class SETUP(GPS303Pkt):
             + [b";".join([el.encode() for el in self.phonenumbers])]
         )
 
+    def in_encode(self) -> bytes:
+        return b""
+
 
 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
     PROTO = 0x58