]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/zx303proto.py
Implement sending commands from the web interface
[loctrkd.git] / loctrkd / zx303proto.py
index efb02d249f6c118185ce2a66b23675c9acf85efd..a7329c316326187c950580ec11a65843981ace2d 100755 (executable)
@@ -19,6 +19,7 @@ from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from time import time
 from inspect import isclass
 from struct import error, pack, unpack
 from time import time
+from types import SimpleNamespace
 from typing import (
     Any,
     Callable,
 from typing import (
     Any,
     Callable,
@@ -31,57 +32,24 @@ from typing import (
     Union,
 )
 
     Union,
 )
 
+from .common import CoordReport, HintReport, StatusReport
+from .protomodule import ProtoClass
+
 __all__ = (
     "Stream",
     "class_by_prefix",
 __all__ = (
     "Stream",
     "class_by_prefix",
+    "enframe",
+    "exposed_protos",
     "inline_response",
     "inline_response",
+    "proto_handled",
     "parse_message",
     "probe_buffer",
     "parse_message",
     "probe_buffer",
-    "proto_by_name",
-    "proto_name",
     "DecodeError",
     "Respond",
     "DecodeError",
     "Respond",
-    "GPS303Pkt",
-    "UNKNOWN",
-    "LOGIN",
-    "SUPERVISION",
-    "HEARTBEAT",
-    "GPS_POSITIONING",
-    "GPS_OFFLINE_POSITIONING",
-    "STATUS",
-    "HIBERNATION",
-    "RESET",
-    "WHITELIST_TOTAL",
-    "WIFI_OFFLINE_POSITIONING",
-    "TIME",
-    "PROHIBIT_LBS",
-    "GPS_LBS_SWITCH_TIMES",
-    "REMOTE_MONITOR_PHONE",
-    "SOS_PHONE",
-    "DAD_PHONE",
-    "MOM_PHONE",
-    "STOP_UPLOAD",
-    "GPS_OFF_PERIOD",
-    "DND_PERIOD",
-    "RESTART_SHUTDOWN",
-    "DEVICE",
-    "ALARM_CLOCK",
-    "STOP_ALARM",
-    "SETUP",
-    "SYNCHRONOUS_WHITELIST",
-    "RESTORE_PASSWORD",
-    "WIFI_POSITIONING",
-    "MANUAL_POSITIONING",
-    "BATTERY_CHARGE",
-    "CHARGER_CONNECTED",
-    "CHARGER_DISCONNECTED",
-    "VIBRATION_RECEIVED",
-    "POSITION_UPLOAD_INTERVAL",
-    "SOS_ALARM",
-    "UNKNOWN_B3",
 )
 
 )
 
-PROTO_PREFIX = "ZX"
+MODNAME = __name__.split(".")[-1]
+PROTO_PREFIX: str = "ZX:"
 
 ### Deframer ###
 
 
 ### Deframer ###
 
@@ -92,10 +60,6 @@ class Stream:
     def __init__(self) -> None:
         self.buffer = b""
 
     def __init__(self) -> None:
         self.buffer = b""
 
-    @staticmethod
-    def enframe(buffer: bytes) -> bytes:
-        return b"xx" + buffer + b"\r\n"
-
     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
         """
         Process next segment of the stream. Return successfully deframed
     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
         """
         Process next segment of the stream. Return successfully deframed
@@ -150,6 +114,10 @@ class Stream:
         return ret
 
 
         return ret
 
 
+def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
+    return b"xx" + buffer + b"\r\n"
+
+
 ### Parser/Constructor ###
 
 
 ### Parser/Constructor ###
 
 
@@ -245,63 +213,13 @@ def l3int(x: Union[str, List[int]]) -> List[int]:
     return lx
 
 
     return lx
 
 
-class MetaPkt(type):
-    """
-    For each class corresponding to a message, automatically create
-    two nested classes `In` and `Out` that also inherit from their
-    "nest". Class attribute `IN_KWARGS` defined in the "nest" is
-    copied to the `In` nested class under the name `KWARGS`, and
-    likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
-    to the nested class `Out`. In addition, method `encode` is
-    defined in both classes equal to `in_encode()` and `out_encode()`
-    respectively.
-    """
-
-    if TYPE_CHECKING:
-
-        def __getattr__(self, name: str) -> Any:
-            pass
-
-        def __setattr__(self, name: str, value: Any) -> None:
-            pass
-
-    def __new__(
-        cls: Type["MetaPkt"],
-        name: str,
-        bases: Tuple[type, ...],
-        attrs: Dict[str, Any],
-    ) -> "MetaPkt":
-        newcls = super().__new__(cls, name, bases, attrs)
-        newcls.In = super().__new__(
-            cls,
-            name + ".In",
-            (newcls,) + bases,
-            {
-                "KWARGS": newcls.IN_KWARGS,
-                "decode": newcls.in_decode,
-                "encode": newcls.in_encode,
-            },
-        )
-        newcls.Out = super().__new__(
-            cls,
-            name + ".Out",
-            (newcls,) + bases,
-            {
-                "KWARGS": newcls.OUT_KWARGS,
-                "decode": newcls.out_decode,
-                "encode": newcls.out_encode,
-            },
-        )
-        return newcls
-
-
 class Respond(Enum):
     NON = 0  # Incoming, no response needed
     INL = 1  # Birirectional, use `inline_response()`
     EXT = 2  # Birirectional, use external responder
 
 
 class Respond(Enum):
     NON = 0  # Incoming, no response needed
     INL = 1  # Birirectional, use `inline_response()`
     EXT = 2  # Birirectional, use external responder
 
 
-class GPS303Pkt(metaclass=MetaPkt):
+class GPS303Pkt(ProtoClass):
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
@@ -375,6 +293,11 @@ class GPS303Pkt(metaclass=MetaPkt):
         # Overridden in subclasses, otherwise make empty payload
         return b""
 
         # Overridden in subclasses, otherwise make empty payload
         return b""
 
+    @classmethod
+    def proto_name(cls) -> str:
+        """Name of the command as used externally"""
+        return (PROTO_PREFIX + cls.__name__)[:16]
+
     @property
     def packed(self) -> bytes:
         payload = self.encode()
     @property
     def packed(self) -> bytes:
         payload = self.encode()
@@ -447,6 +370,18 @@ class _GPS_POSITIONING(GPS303Pkt):
         ttup = (tup[0] % 100,) + tup[1:6]
         return pack("BBBBBB", *ttup)
 
         ttup = (tup[0] % 100,) + tup[1:6]
         return pack("BBBBBB", *ttup)
 
+    def rectified(self) -> Tuple[str, CoordReport]:  # JSON-able dict
+        return MODNAME, CoordReport(
+            devtime=str(self.devtime),
+            battery_percentage=None,
+            accuracy=None,
+            altitude=None,
+            speed=self.speed,
+            direction=self.heading,
+            latitude=self.latitude,
+            longitude=self.longitude,
+        )
+
 
 class GPS_POSITIONING(_GPS_POSITIONING):
     PROTO = 0x10
 
 class GPS_POSITIONING(_GPS_POSITIONING):
     PROTO = 0x10
@@ -485,6 +420,9 @@ class STATUS(GPS303Pkt):
     def out_encode(self) -> bytes:  # Set interval in minutes
         return pack("B", self.upload_interval)
 
     def out_encode(self) -> bytes:  # Set interval in minutes
         return pack("B", self.upload_interval)
 
+    def rectified(self) -> Tuple[str, StatusReport]:
+        return MODNAME, StatusReport(battery_percentage=self.batt)
+
 
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
 
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
@@ -563,6 +501,16 @@ class _WIFI_POSITIONING(GPS303Pkt):
             ]
         )
 
             ]
         )
 
+    def rectified(self) -> Tuple[str, HintReport]:
+        return MODNAME, HintReport(
+            devtime=str(self.devtime),
+            battery_percentage=None,
+            mcc=self.mcc,
+            mnc=self.mnc,
+            gsm_cells=self.gsm_cells,
+            wifi_aps=[("<UNKNOWN>", mac, sig) for mac, sig in self.wifi_aps],
+        )
+
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
@@ -863,36 +811,28 @@ if True:  # just to indent the code, sorry!
 
 def class_by_prefix(
     prefix: str,
 
 def class_by_prefix(
     prefix: str,
-) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
+) -> Union[Type[GPS303Pkt], List[str]]:
+    if prefix.startswith(PROTO_PREFIX):
+        pname = prefix[len(PROTO_PREFIX) :]
+    else:
+        raise KeyError(pname)
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
         if name.upper().startswith(prefix.upper())
     ]
     if len(lst) != 1:
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
         if name.upper().startswith(prefix.upper())
     ]
     if len(lst) != 1:
-        return lst
+        return [name for name, _ in lst]
     _, proto = lst[0]
     return CLASSES[proto]
 
 
     _, proto = lst[0]
     return CLASSES[proto]
 
 
-def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
-    return (
-        PROTO_PREFIX
-        + ":"
-        + (
-            obj.__class__.__name__
-            if isinstance(obj, GPS303Pkt)
-            else obj.__name__
-        )
-    ).ljust(16, "\0")[:16]
-
-
-def proto_by_name(name: str) -> int:
-    return PROTOS.get(name, -1)
+def proto_handled(proto: str) -> bool:
+    return proto.startswith(PROTO_PREFIX)
 
 
 def proto_of_message(packet: bytes) -> str:
 
 
 def proto_of_message(packet: bytes) -> str:
-    return proto_name(CLASSES.get(packet[1], UNKNOWN))
+    return CLASSES.get(packet[1], UNKNOWN).proto_name()
 
 
 def imei_from_packet(packet: bytes) -> Optional[str]:
 
 
 def imei_from_packet(packet: bytes) -> Optional[str]:
@@ -948,3 +888,17 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     retobj.PROTO = proto  # Override class attr with object attr
     retobj.cause = cause
     return retobj
     retobj.PROTO = proto  # Override class attr with object attr
     retobj.cause = cause
     return retobj
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+    return [
+        (cls.proto_name(), cls.RESPOND is Respond.EXT)
+        for cls in CLASSES.values()
+        if hasattr(cls, "rectified")
+    ]
+
+
+def make_response(cmd: str, imei: str, **kwargs: Any) -> Optional[GPS303Pkt]:
+    if cmd == "poweroff":
+        return HIBERNATION.Out()
+    return None