]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/zx303proto.py
Cleanup some of the types
[loctrkd.git] / loctrkd / zx303proto.py
index a2132f6c3c658aee30995d5326cf4d8701f93ba3..63dc3f5f294149c48d6ea8e910e283c3f174cdb3 100755 (executable)
@@ -19,6 +19,7 @@ from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from time import time
 from inspect import isclass
 from struct import error, pack, unpack
 from time import time
+from types import SimpleNamespace
 from typing import (
     Any,
     Callable,
 from typing import (
     Any,
     Callable,
@@ -31,57 +32,23 @@ from typing import (
     Union,
 )
 
     Union,
 )
 
+from .common import CoordReport, HintReport, StatusReport
+from .protomodule import ProtoClass
+
 __all__ = (
     "Stream",
     "class_by_prefix",
 __all__ = (
     "Stream",
     "class_by_prefix",
+    "enframe",
+    "exposed_protos",
     "inline_response",
     "proto_handled",
     "parse_message",
     "probe_buffer",
     "inline_response",
     "proto_handled",
     "parse_message",
     "probe_buffer",
-    "proto_name",
     "DecodeError",
     "Respond",
     "DecodeError",
     "Respond",
-    "GPS303Pkt",
-    "UNKNOWN",
-    "LOGIN",
-    "SUPERVISION",
-    "HEARTBEAT",
-    "GPS_POSITIONING",
-    "GPS_OFFLINE_POSITIONING",
-    "STATUS",
-    "HIBERNATION",
-    "RESET",
-    "WHITELIST_TOTAL",
-    "WIFI_OFFLINE_POSITIONING",
-    "TIME",
-    "PROHIBIT_LBS",
-    "GPS_LBS_SWITCH_TIMES",
-    "REMOTE_MONITOR_PHONE",
-    "SOS_PHONE",
-    "DAD_PHONE",
-    "MOM_PHONE",
-    "STOP_UPLOAD",
-    "GPS_OFF_PERIOD",
-    "DND_PERIOD",
-    "RESTART_SHUTDOWN",
-    "DEVICE",
-    "ALARM_CLOCK",
-    "STOP_ALARM",
-    "SETUP",
-    "SYNCHRONOUS_WHITELIST",
-    "RESTORE_PASSWORD",
-    "WIFI_POSITIONING",
-    "MANUAL_POSITIONING",
-    "BATTERY_CHARGE",
-    "CHARGER_CONNECTED",
-    "CHARGER_DISCONNECTED",
-    "VIBRATION_RECEIVED",
-    "POSITION_UPLOAD_INTERVAL",
-    "SOS_ALARM",
-    "UNKNOWN_B3",
 )
 
 )
 
-PROTO_PREFIX = "ZX:"
+PROTO_PREFIX: str = "ZX:"
 
 ### Deframer ###
 
 
 ### Deframer ###
 
@@ -245,63 +212,13 @@ def l3int(x: Union[str, List[int]]) -> List[int]:
     return lx
 
 
     return lx
 
 
-class MetaPkt(type):
-    """
-    For each class corresponding to a message, automatically create
-    two nested classes `In` and `Out` that also inherit from their
-    "nest". Class attribute `IN_KWARGS` defined in the "nest" is
-    copied to the `In` nested class under the name `KWARGS`, and
-    likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
-    to the nested class `Out`. In addition, method `encode` is
-    defined in both classes equal to `in_encode()` and `out_encode()`
-    respectively.
-    """
-
-    if TYPE_CHECKING:
-
-        def __getattr__(self, name: str) -> Any:
-            pass
-
-        def __setattr__(self, name: str, value: Any) -> None:
-            pass
-
-    def __new__(
-        cls: Type["MetaPkt"],
-        name: str,
-        bases: Tuple[type, ...],
-        attrs: Dict[str, Any],
-    ) -> "MetaPkt":
-        newcls = super().__new__(cls, name, bases, attrs)
-        newcls.In = super().__new__(
-            cls,
-            name + ".In",
-            (newcls,) + bases,
-            {
-                "KWARGS": newcls.IN_KWARGS,
-                "decode": newcls.in_decode,
-                "encode": newcls.in_encode,
-            },
-        )
-        newcls.Out = super().__new__(
-            cls,
-            name + ".Out",
-            (newcls,) + bases,
-            {
-                "KWARGS": newcls.OUT_KWARGS,
-                "decode": newcls.out_decode,
-                "encode": newcls.out_encode,
-            },
-        )
-        return newcls
-
-
 class Respond(Enum):
     NON = 0  # Incoming, no response needed
     INL = 1  # Birirectional, use `inline_response()`
     EXT = 2  # Birirectional, use external responder
 
 
 class Respond(Enum):
     NON = 0  # Incoming, no response needed
     INL = 1  # Birirectional, use `inline_response()`
     EXT = 2  # Birirectional, use external responder
 
 
-class GPS303Pkt(metaclass=MetaPkt):
+class GPS303Pkt(ProtoClass):
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
@@ -375,6 +292,11 @@ class GPS303Pkt(metaclass=MetaPkt):
         # Overridden in subclasses, otherwise make empty payload
         return b""
 
         # Overridden in subclasses, otherwise make empty payload
         return b""
 
+    @classmethod
+    def proto_name(cls) -> str:
+        """Name of the command as used externally"""
+        return (PROTO_PREFIX + cls.__name__)[:16]
+
     @property
     def packed(self) -> bytes:
         payload = self.encode()
     @property
     def packed(self) -> bytes:
         payload = self.encode()
@@ -447,6 +369,18 @@ class _GPS_POSITIONING(GPS303Pkt):
         ttup = (tup[0] % 100,) + tup[1:6]
         return pack("BBBBBB", *ttup)
 
         ttup = (tup[0] % 100,) + tup[1:6]
         return pack("BBBBBB", *ttup)
 
+    def rectified(self) -> CoordReport:  # JSON-able dict
+        return CoordReport(
+            devtime=str(self.devtime),
+            battery_percentage=-1,
+            accuracy=-1.0,
+            altitude=-1.0,
+            speed=self.speed,
+            direction=self.heading,
+            latitude=self.latitude,
+            longitude=self.longitude,
+        )
+
 
 class GPS_POSITIONING(_GPS_POSITIONING):
     PROTO = 0x10
 
 class GPS_POSITIONING(_GPS_POSITIONING):
     PROTO = 0x10
@@ -485,6 +419,9 @@ class STATUS(GPS303Pkt):
     def out_encode(self) -> bytes:  # Set interval in minutes
         return pack("B", self.upload_interval)
 
     def out_encode(self) -> bytes:  # Set interval in minutes
         return pack("B", self.upload_interval)
 
+    def rectified(self) -> StatusReport:
+        return StatusReport(battery_percentage=self.batt)
+
 
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
 
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
@@ -563,6 +500,16 @@ class _WIFI_POSITIONING(GPS303Pkt):
             ]
         )
 
             ]
         )
 
+    def rectified(self) -> HintReport:
+        return HintReport(
+            devtime=str(self.devtime),
+            battery_percentage=-1,
+            mcc=self.mcc,
+            mnc=self.mnc,
+            gsm_cells=self.gsm_cells,
+            wifi_aps=[("<UNKNOWN>", mac, sig) for mac, sig in self.wifi_aps],
+        )
+
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
 
 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x17
@@ -863,14 +810,18 @@ if True:  # just to indent the code, sorry!
 
 def class_by_prefix(
     prefix: str,
 
 def class_by_prefix(
     prefix: str,
-) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
+) -> Union[Type[GPS303Pkt], List[str]]:
+    if prefix.startswith(PROTO_PREFIX):
+        pname = prefix[len(PROTO_PREFIX) :]
+    else:
+        raise KeyError(pname)
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
         if name.upper().startswith(prefix.upper())
     ]
     if len(lst) != 1:
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
         if name.upper().startswith(prefix.upper())
     ]
     if len(lst) != 1:
-        return lst
+        return [name for name, _ in lst]
     _, proto = lst[0]
     return CLASSES[proto]
 
     _, proto = lst[0]
     return CLASSES[proto]
 
@@ -879,14 +830,8 @@ def proto_handled(proto: str) -> bool:
     return proto.startswith(PROTO_PREFIX)
 
 
     return proto.startswith(PROTO_PREFIX)
 
 
-def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
-    return PROTO_PREFIX + (
-        obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
-    )
-
-
 def proto_of_message(packet: bytes) -> str:
 def proto_of_message(packet: bytes) -> str:
-    return proto_name(CLASSES.get(packet[1], UNKNOWN))
+    return CLASSES.get(packet[1], UNKNOWN).proto_name()
 
 
 def imei_from_packet(packet: bytes) -> Optional[str]:
 
 
 def imei_from_packet(packet: bytes) -> Optional[str]:
@@ -942,3 +887,11 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     retobj.PROTO = proto  # Override class attr with object attr
     retobj.cause = cause
     return retobj
     retobj.PROTO = proto  # Override class attr with object attr
     retobj.cause = cause
     return retobj
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+    return [
+        (cls.proto_name(), cls.RESPOND is Respond.EXT)
+        for cls in CLASSES.values()
+        if hasattr(cls, "rectified")
+    ]