]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/zx303proto.py
Implement sending commands from the web interface
[loctrkd.git] / loctrkd / zx303proto.py
index e27eb5f72c985456b925594a30066973b252bb64..a7329c316326187c950580ec11a65843981ace2d 100755 (executable)
@@ -19,6 +19,7 @@ from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from time import time
 from inspect import isclass
 from struct import error, pack, unpack
 from time import time
+from types import SimpleNamespace
 from typing import (
     Any,
     Callable,
 from typing import (
     Any,
     Callable,
@@ -31,6 +32,7 @@ from typing import (
     Union,
 )
 
     Union,
 )
 
+from .common import CoordReport, HintReport, StatusReport
 from .protomodule import ProtoClass
 
 __all__ = (
 from .protomodule import ProtoClass
 
 __all__ = (
@@ -42,12 +44,12 @@ __all__ = (
     "proto_handled",
     "parse_message",
     "probe_buffer",
     "proto_handled",
     "parse_message",
     "probe_buffer",
-    "proto_name",
     "DecodeError",
     "Respond",
 )
 
     "DecodeError",
     "Respond",
 )
 
-PROTO_PREFIX = "ZX:"
+MODNAME = __name__.split(".")[-1]
+PROTO_PREFIX: str = "ZX:"
 
 ### Deframer ###
 
 
 ### Deframer ###
 
@@ -291,6 +293,11 @@ class GPS303Pkt(ProtoClass):
         # Overridden in subclasses, otherwise make empty payload
         return b""
 
         # Overridden in subclasses, otherwise make empty payload
         return b""
 
+    @classmethod
+    def proto_name(cls) -> str:
+        """Name of the command as used externally"""
+        return (PROTO_PREFIX + cls.__name__)[:16]
+
     @property
     def packed(self) -> bytes:
         payload = self.encode()
     @property
     def packed(self) -> bytes:
         payload = self.encode()
@@ -363,15 +370,17 @@ class _GPS_POSITIONING(GPS303Pkt):
         ttup = (tup[0] % 100,) + tup[1:6]
         return pack("BBBBBB", *ttup)
 
         ttup = (tup[0] % 100,) + tup[1:6]
         return pack("BBBBBB", *ttup)
 
-    def rectified(self) -> Dict[str, Any]:  # JSON-able dict
-        return {
-            "type": "location",
-            "devtime": str(self.devtime),
-            "speed": self.speed,
-            "direction": self.heading,
-            "latitude": self.latitude,
-            "longitude": self.longitude,
-        }
+    def rectified(self) -> Tuple[str, CoordReport]:  # JSON-able dict
+        return MODNAME, CoordReport(
+            devtime=str(self.devtime),
+            battery_percentage=None,
+            accuracy=None,
+            altitude=None,
+            speed=self.speed,
+            direction=self.heading,
+            latitude=self.latitude,
+            longitude=self.longitude,
+        )
 
 
 class GPS_POSITIONING(_GPS_POSITIONING):
 
 
 class GPS_POSITIONING(_GPS_POSITIONING):
@@ -411,6 +420,9 @@ class STATUS(GPS303Pkt):
     def out_encode(self) -> bytes:  # Set interval in minutes
         return pack("B", self.upload_interval)
 
     def out_encode(self) -> bytes:  # Set interval in minutes
         return pack("B", self.upload_interval)
 
+    def rectified(self) -> Tuple[str, StatusReport]:
+        return MODNAME, StatusReport(battery_percentage=self.batt)
+
 
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
 
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
@@ -489,15 +501,15 @@ class _WIFI_POSITIONING(GPS303Pkt):
             ]
         )
 
             ]
         )
 
-    def rectified(self) -> Dict[str, Any]:  # JSON-able dict
-        return {
-            "type": "approximate_location",
-            "devtime": str(self.devtime),
-            "mcc": self.mcc,
-            "mnc": self.mnc,
-            "base_stations": self.gsm_cells,
-            "wifi_aps": self.wifi_aps,
-        }
+    def rectified(self) -> Tuple[str, HintReport]:
+        return MODNAME, HintReport(
+            devtime=str(self.devtime),
+            battery_percentage=None,
+            mcc=self.mcc,
+            mnc=self.mnc,
+            gsm_cells=self.gsm_cells,
+            wifi_aps=[("<UNKNOWN>", mac, sig) for mac, sig in self.wifi_aps],
+        )
 
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
 
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
@@ -819,14 +831,8 @@ def proto_handled(proto: str) -> bool:
     return proto.startswith(PROTO_PREFIX)
 
 
     return proto.startswith(PROTO_PREFIX)
 
 
-def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
-    return PROTO_PREFIX + (
-        obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
-    )
-
-
 def proto_of_message(packet: bytes) -> str:
 def proto_of_message(packet: bytes) -> str:
-    return proto_name(CLASSES.get(packet[1], UNKNOWN))
+    return CLASSES.get(packet[1], UNKNOWN).proto_name()
 
 
 def imei_from_packet(packet: bytes) -> Optional[str]:
 
 
 def imei_from_packet(packet: bytes) -> Optional[str]:
@@ -886,7 +892,13 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
 
 def exposed_protos() -> List[Tuple[str, bool]]:
     return [
 
 def exposed_protos() -> List[Tuple[str, bool]]:
     return [
-        (proto_name(cls), cls.RESPOND is Respond.EXT)
+        (cls.proto_name(), cls.RESPOND is Respond.EXT)
         for cls in CLASSES.values()
         if hasattr(cls, "rectified")
     ]
         for cls in CLASSES.values()
         if hasattr(cls, "rectified")
     ]
+
+
+def make_response(cmd: str, imei: str, **kwargs: Any) -> Optional[GPS303Pkt]:
+    if cmd == "poweroff":
+        return HIBERNATION.Out()
+    return None