]> www.average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
implement `inline_response()`
[loctrkd.git] / gps303 / gps303proto.py
index 83ac19cd4d68a67ba06091c8b3e0c9274f027110..445c34ac21d1e7c3a9fc86637bec0398280a55f6 100755 (executable)
@@ -21,6 +21,7 @@ from struct import pack, unpack
 
 __all__ = (
     "handle_packet",
+    "inline_response",
     "make_object",
     "make_response",
     "parse_message",
@@ -86,20 +87,25 @@ class GPS303Pkt:
         return pack("BB", self.length, self.PROTO) + self.payload
 
     @classmethod
-    def response(cls, *args):
-        if len(args) == 0:
-            return None
-        assert len(args) == 1 and isinstance(args[0], bytes)
-        payload = args[0]
+    def make_packet(cls, payload):
+        assert isinstance(payload, bytes)
         length = len(payload) + 1
         if length > 6:
             length -= 6
         return pack("BB", length, cls.PROTO) + payload
 
+    @classmethod
+    def inline_response(cls, packet):
+        return cls.make_packet(b"")
+
 
 class UNKNOWN(GPS303Pkt):
     PROTO = 256  # > 255 is impossible in real packets
 
+    @classmethod
+    def inline_response(cls, packet):
+        return None
+
 
 class LOGIN(GPS303Pkt):
     PROTO = 0x01
@@ -111,16 +117,14 @@ class LOGIN(GPS303Pkt):
         self.ver = unpack("B", payload[-1:])[0]
         return self
 
-    @classmethod
-    def response(cls):
-        return super().response(b"")
+    def response(self):
+        return self.make_packet(b"")
 
 
 class SUPERVISION(GPS303Pkt):  # Server sends supervision number status
     PROTO = 0x05
 
-    @classmethod
-    def response(cls, supnum=0):
+    def response(self, supnum=0):
         # 1: The device automatically answers Pickup effect
         # 2: Automatically Answering Two-way Calls
         # 3: Ring manually answer the two-way call
@@ -146,17 +150,21 @@ class _GPS_POSITIONING(GPS303Pkt):
         self.gps_nb_sat = payload[6] & 0x0F
         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
-        flip_lon =          bool(flags & 0b0000100000000000)  # bit 4
-        flip_lat = not      bool(flags & 0b0000010000000000)  # bit 5
-        self.heading =           flags & 0b0000001111111111   # bits 6 - last
+        flip_lon = bool(flags & 0b0000100000000000)  # bit 4
+        flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
+        self.heading = flags & 0b0000001111111111  # bits 6 - last
         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
         self.speed = speed
         self.flags = flags
         return self
 
+    @classmethod
+    def inline_response(cls, packet):
+        return cls.make_packet(packet[2:8])
+
     def response(self):
-        return super().response(self.dtime)
+        return self.make_packet(self.dtime)
 
 
 class GPS_POSITIONING(_GPS_POSITIONING):
@@ -188,6 +196,10 @@ class STATUS(GPS303Pkt):
             self.signal = None
         return self
 
+    @classmethod
+    def inline_response(cls, packet):
+        return None
+
     def response(self, upload_interval=25):  # Set interval in minutes
         return super().response(pack("B", upload_interval))
 
@@ -200,7 +212,7 @@ class RESET(GPS303Pkt):  # Device sends when it got reset SMS
     PROTO = 0x15
 
     def response(self):  # Server can send to initiate factory reset
-        return super().response(b"")
+        return self.make_packet(b"")
 
 
 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
@@ -242,6 +254,10 @@ class _WIFI_POSITIONING(GPS303Pkt):
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
+    @classmethod
+    def inline_response(cls, packet):
+        return cls.make_packet(packet[2:8])
+
     def response(self):
         return super().response(self.dtime)
 
@@ -249,6 +265,12 @@ class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
 class TIME(GPS303Pkt):
     PROTO = 0x30
 
+    @classmethod
+    def inline_response(cls, packet):
+        return pack(
+            "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
+        )
+
     def response(self):
         payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
         return super().response(payload)
@@ -325,6 +347,10 @@ class RESTORE_PASSWORD(GPS303Pkt):
 class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
 
+    @classmethod
+    def inline_response(cls, packet):
+        return None
+
     def response(self, lat=None, lon=None):
         if lat is None or lon is None:
             payload = b""
@@ -332,7 +358,7 @@ class WIFI_POSITIONING(_WIFI_POSITIONING):
             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
                 "ascii"
             )
-        return super().response(payload)
+        return self.make_packet(payload)
 
 
 class MANUAL_POSITIONING(GPS303Pkt):
@@ -364,8 +390,12 @@ class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
         self.interval = unpack("!H", payload[:2])
         return self
 
+    @classmethod
+    def inline_response(cls, packet):
+        return cls.make_packet(packet[2:4])
+
     def response(self):
-        return super().response(pack("!H", self.interval))
+        return self.make_packet(pack("!H", self.interval))
 
 
 class SOS_ALARM(GPS303Pkt):
@@ -396,6 +426,14 @@ def proto_of_message(packet):
     return unpack("B", packet[1:2])[0]
 
 
+def inline_response(packet):
+    proto = proto_of_message(packet)
+    if proto in CLASSES:
+        return CLASSES[proto].inline_response(packet)
+    else:
+        return None
+
+
 def make_object(length, proto, payload):
     if proto in CLASSES:
         return CLASSES[proto].from_packet(length, payload)
@@ -410,8 +448,7 @@ def parse_message(packet):
     payload = packet[2:]
     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
     if (
-        proto
-        not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
+        proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
         and length > 1
         and len(payload) + adjust != length
     ):
@@ -434,4 +471,3 @@ def handle_packet(packet):  # DEPRECATED
 def make_response(msg, **kwargs):  # DEPRECATED
     inframe = msg.response(**kwargs)
     return None if inframe is None else b"xx" + inframe + b"\r\n"
-