]> www.average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
Add some more commands
[loctrkd.git] / gps303 / gps303proto.py
index 6fa76550db1031ea901ff55d71998d1aeb9fd640..ad8fd7168fe4951d75eda0bf19f87e33bd9229e3 100755 (executable)
@@ -21,11 +21,11 @@ from struct import pack, unpack
 
 __all__ = (
     "handle_packet",
+    "inline_response",
     "make_object",
     "make_response",
     "parse_message",
     "proto_by_name",
-    "set_config",
     "GPS303Pkt",
     "UNKNOWN",
     "LOGIN",
@@ -58,7 +58,7 @@ log = getLogger("gps303")
 
 class GPS303Pkt:
     PROTO: int
-    CONFIG = None
+    INLINE = True
 
     def __init__(self, *args, **kwargs):
         assert len(args) == 0
@@ -88,19 +88,24 @@ class GPS303Pkt:
         return pack("BB", self.length, self.PROTO) + self.payload
 
     @classmethod
-    def response(cls, *args):
-        if len(args) == 0:
-            return None
-        assert len(args) == 1 and isinstance(args[0], bytes)
-        payload = args[0]
+    def make_packet(cls, payload):
+        assert isinstance(payload, bytes)
         length = len(payload) + 1
         if length > 6:
             length -= 6
         return pack("BB", length, cls.PROTO) + payload
 
+    @classmethod
+    def inline_response(cls, packet):
+        if cls.INLINE:
+            return cls.make_packet(b"")
+        else:
+            return None
+
 
 class UNKNOWN(GPS303Pkt):
     PROTO = 256  # > 255 is impossible in real packets
+    INLINE = False
 
 
 class LOGIN(GPS303Pkt):
@@ -113,20 +118,16 @@ class LOGIN(GPS303Pkt):
         self.ver = unpack("B", payload[-1:])[0]
         return self
 
-    @classmethod
-    def response(cls):
-        return super().response(b"")
-
 
 class SUPERVISION(GPS303Pkt):  # Server sends supervision number status
     PROTO = 0x05
+    INLINE = False
 
-    @classmethod
-    def response(cls, supnum=0):
+    def response(self, supnum=0):
         # 1: The device automatically answers Pickup effect
         # 2: Automatically Answering Two-way Calls
         # 3: Ring manually answer the two-way call
-        return super().response(b"")
+        return self.make_packet(pack("B", supnum))
 
 
 class HEARTBEAT(GPS303Pkt):
@@ -148,17 +149,18 @@ class _GPS_POSITIONING(GPS303Pkt):
         self.gps_nb_sat = payload[6] & 0x0F
         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
-        flip_lon =          bool(flags & 0b0000100000000000)  # bit 4
-        flip_lat = not      bool(flags & 0b0000010000000000)  # bit 5
-        self.heading =           flags & 0b0000001111111111   # bits 6 - last
+        flip_lon = bool(flags & 0b0000100000000000)  # bit 4
+        flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
+        self.heading = flags & 0b0000001111111111  # bits 6 - last
         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
         self.speed = speed
         self.flags = flags
         return self
 
-    def response(self):
-        return super().response(self.dtime)
+    @classmethod
+    def inline_response(cls, packet):
+        return cls.make_packet(packet[2:8])
 
 
 class GPS_POSITIONING(_GPS_POSITIONING):
@@ -171,6 +173,7 @@ class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
 
 class STATUS(GPS303Pkt):
     PROTO = 0x13
+    INLINE = False
 
     @classmethod
     def from_packet(cls, length, payload):
@@ -191,7 +194,7 @@ class STATUS(GPS303Pkt):
         return self
 
     def response(self, upload_interval=25):  # Set interval in minutes
-        return super().response(pack("B", upload_interval))
+        return self.make_packet(pack("B", upload_interval))
 
 
 class HIBERNATION(GPS303Pkt):
@@ -200,16 +203,18 @@ class HIBERNATION(GPS303Pkt):
 
 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
     PROTO = 0x15
+    INLINE = False
 
     def response(self):  # Server can send to initiate factory reset
-        return super().response(b"")
+        return self.make_packet(b"")
 
 
 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
     PROTO = 0x16
+    INLINE = False
 
     def response(self, number=3):  # Number of whitelist entries
-        return super().response(pack("B", number))
+        return self.make_packet(pack("B", number))
 
 
 class _WIFI_POSITIONING(GPS303Pkt):
@@ -244,35 +249,60 @@ class _WIFI_POSITIONING(GPS303Pkt):
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
-    def response(self):
-        return super().response(self.dtime)
+    @classmethod
+    def inline_response(cls, packet):
+        return cls.make_packet(packet[2:8])
 
 
 class TIME(GPS303Pkt):
     PROTO = 0x30
 
-    def response(self):
-        payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
-        return super().response(payload)
+    @classmethod
+    def inline_response(cls, packet):
+        return pack(
+            "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
+        )
 
 
 class PROHIBIT_LBS(GPS303Pkt):
     PROTO = 0x33
+    INLINE = False
 
     def response(self, status=1):  # Server sent, 0-off, 1-on
-        return super().response(pack("B", status))
+        return self.make_packet(pack("B", status))
+
+
+class LBS_SWITCH_TIMES(GPS303Pkt):
+    PROTO = 0x34
+    INLINE = False
+
+    def response(self):
+        return self.make_packet(b"")
+
+
+class REMOTE_MONITOR_PHONE(GPS303Pkt):
+    PROTO = 0x40
+    INLINE = False
+
+
+class SOS_PHONE(GPS303Pkt):
+    PROTO = 0x41
+    INLINE = False
+
+
+class DAD_PHONE(GPS303Pkt):
+    PROTO = 0x42
+    INLINE = False
 
 
 class MOM_PHONE(GPS303Pkt):
     PROTO = 0x43
+    INLINE = False
 
 
 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
     PROTO = 0x44
 
-    def response(self):
-        return super().response(b"")
-
 
 class STOP_ALARM(GPS303Pkt):
     PROTO = 0x56
@@ -280,6 +310,7 @@ class STOP_ALARM(GPS303Pkt):
 
 class SETUP(GPS303Pkt):
     PROTO = 0x57
+    INLINE = False
 
     def response(
         self,
@@ -313,7 +344,7 @@ class SETUP(GPS303Pkt):
             ]
             + [b";".join([el.encode() for el in phoneNumbers])]
         )
-        return super().response(payload)
+        return self.make_packet(payload)
 
 
 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
@@ -326,6 +357,7 @@ class RESTORE_PASSWORD(GPS303Pkt):
 
 class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
+    INLINE = False
 
     def response(self, lat=None, lon=None):
         if lat is None or lon is None:
@@ -334,11 +366,12 @@ class WIFI_POSITIONING(_WIFI_POSITIONING):
             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
                 "ascii"
             )
-        return super().response(payload)
+        return self.make_packet(payload)
 
 
 class MANUAL_POSITIONING(GPS303Pkt):
     PROTO = 0x80
+    INLINE = False
 
 
 class BATTERY_CHARGE(GPS303Pkt):
@@ -359,6 +392,7 @@ class VIBRATION_RECEIVED(GPS303Pkt):
 
 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
     PROTO = 0x98
+    INLINE = False
 
     @classmethod
     def from_packet(cls, length, payload):
@@ -366,8 +400,8 @@ class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
         self.interval = unpack("!H", payload[:2])
         return self
 
-    def response(self):
-        return super().response(pack("!H", self.interval))
+    def response(self, interval=10):
+        return self.make_packet(pack("!H", interval))
 
 
 class SOS_ALARM(GPS303Pkt):
@@ -398,6 +432,14 @@ def proto_of_message(packet):
     return unpack("B", packet[1:2])[0]
 
 
+def inline_response(packet):
+    proto = proto_of_message(packet)
+    if proto in CLASSES:
+        return CLASSES[proto].inline_response(packet)
+    else:
+        return None
+
+
 def make_object(length, proto, payload):
     if proto in CLASSES:
         return CLASSES[proto].from_packet(length, payload)
@@ -412,8 +454,7 @@ def parse_message(packet):
     payload = packet[2:]
     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
     if (
-        proto
-        not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
+        proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
         and length > 1
         and len(payload) + adjust != length
     ):
@@ -436,7 +477,3 @@ def handle_packet(packet):  # DEPRECATED
 def make_response(msg, **kwargs):  # DEPRECATED
     inframe = msg.response(**kwargs)
     return None if inframe is None else b"xx" + inframe + b"\r\n"
-
-
-def set_config(config):  # Note that we are setting _class_ attribute
-    GPS303Pkt.CONFIG = config