X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=loctrkd%2Fzx303proto.py;h=a7329c316326187c950580ec11a65843981ace2d;hb=2cf0fd9d215dda17eae4261ab7967367f6aa0028;hp=4d5730304fbead5492db2c1b08f5bf86381a0618;hpb=b0bfb1a7b499ca18bf707858b0650e04acec9881;p=loctrkd.git diff --git a/loctrkd/zx303proto.py b/loctrkd/zx303proto.py index 4d57303..a7329c3 100755 --- a/loctrkd/zx303proto.py +++ b/loctrkd/zx303proto.py @@ -19,6 +19,7 @@ from enum import Enum from inspect import isclass from struct import error, pack, unpack from time import time +from types import SimpleNamespace from typing import ( Any, Callable, @@ -31,57 +32,24 @@ from typing import ( Union, ) +from .common import CoordReport, HintReport, StatusReport +from .protomodule import ProtoClass + __all__ = ( "Stream", "class_by_prefix", + "enframe", + "exposed_protos", "inline_response", "proto_handled", "parse_message", "probe_buffer", - "proto_name", "DecodeError", "Respond", - "GPS303Pkt", - "UNKNOWN", - "LOGIN", - "SUPERVISION", - "HEARTBEAT", - "GPS_POSITIONING", - "GPS_OFFLINE_POSITIONING", - "STATUS", - "HIBERNATION", - "RESET", - "WHITELIST_TOTAL", - "WIFI_OFFLINE_POSITIONING", - "TIME", - "PROHIBIT_LBS", - "GPS_LBS_SWITCH_TIMES", - "REMOTE_MONITOR_PHONE", - "SOS_PHONE", - "DAD_PHONE", - "MOM_PHONE", - "STOP_UPLOAD", - "GPS_OFF_PERIOD", - "DND_PERIOD", - "RESTART_SHUTDOWN", - "DEVICE", - "ALARM_CLOCK", - "STOP_ALARM", - "SETUP", - "SYNCHRONOUS_WHITELIST", - "RESTORE_PASSWORD", - "WIFI_POSITIONING", - "MANUAL_POSITIONING", - "BATTERY_CHARGE", - "CHARGER_CONNECTED", - "CHARGER_DISCONNECTED", - "VIBRATION_RECEIVED", - "POSITION_UPLOAD_INTERVAL", - "SOS_ALARM", - "UNKNOWN_B3", ) -PROTO_PREFIX = "ZX:" +MODNAME = __name__.split(".")[-1] +PROTO_PREFIX: str = "ZX:" ### Deframer ### @@ -245,63 +213,13 @@ def l3int(x: Union[str, List[int]]) -> List[int]: return lx -class MetaPkt(type): - """ - For each class corresponding to a message, automatically create - two nested classes `In` and `Out` that also inherit from their - "nest". Class attribute `IN_KWARGS` defined in the "nest" is - copied to the `In` nested class under the name `KWARGS`, and - likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS` - to the nested class `Out`. In addition, method `encode` is - defined in both classes equal to `in_encode()` and `out_encode()` - respectively. - """ - - if TYPE_CHECKING: - - def __getattr__(self, name: str) -> Any: - pass - - def __setattr__(self, name: str, value: Any) -> None: - pass - - def __new__( - cls: Type["MetaPkt"], - name: str, - bases: Tuple[type, ...], - attrs: Dict[str, Any], - ) -> "MetaPkt": - newcls = super().__new__(cls, name, bases, attrs) - newcls.In = super().__new__( - cls, - name + ".In", - (newcls,) + bases, - { - "KWARGS": newcls.IN_KWARGS, - "decode": newcls.in_decode, - "encode": newcls.in_encode, - }, - ) - newcls.Out = super().__new__( - cls, - name + ".Out", - (newcls,) + bases, - { - "KWARGS": newcls.OUT_KWARGS, - "decode": newcls.out_decode, - "encode": newcls.out_encode, - }, - ) - return newcls - - class Respond(Enum): NON = 0 # Incoming, no response needed INL = 1 # Birirectional, use `inline_response()` EXT = 2 # Birirectional, use external responder -class GPS303Pkt(metaclass=MetaPkt): +class GPS303Pkt(ProtoClass): RESPOND = Respond.NON # Do not send anything back by default PROTO: int IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = () @@ -375,6 +293,11 @@ class GPS303Pkt(metaclass=MetaPkt): # Overridden in subclasses, otherwise make empty payload return b"" + @classmethod + def proto_name(cls) -> str: + """Name of the command as used externally""" + return (PROTO_PREFIX + cls.__name__)[:16] + @property def packed(self) -> bytes: payload = self.encode() @@ -447,6 +370,18 @@ class _GPS_POSITIONING(GPS303Pkt): ttup = (tup[0] % 100,) + tup[1:6] return pack("BBBBBB", *ttup) + def rectified(self) -> Tuple[str, CoordReport]: # JSON-able dict + return MODNAME, CoordReport( + devtime=str(self.devtime), + battery_percentage=None, + accuracy=None, + altitude=None, + speed=self.speed, + direction=self.heading, + latitude=self.latitude, + longitude=self.longitude, + ) + class GPS_POSITIONING(_GPS_POSITIONING): PROTO = 0x10 @@ -485,6 +420,9 @@ class STATUS(GPS303Pkt): def out_encode(self) -> bytes: # Set interval in minutes return pack("B", self.upload_interval) + def rectified(self) -> Tuple[str, StatusReport]: + return MODNAME, StatusReport(battery_percentage=self.batt) + class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep PROTO = 0x14 @@ -563,6 +501,16 @@ class _WIFI_POSITIONING(GPS303Pkt): ] ) + def rectified(self) -> Tuple[str, HintReport]: + return MODNAME, HintReport( + devtime=str(self.devtime), + battery_percentage=None, + mcc=self.mcc, + mnc=self.mnc, + gsm_cells=self.gsm_cells, + wifi_aps=[("", mac, sig) for mac, sig in self.wifi_aps], + ) + class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING): PROTO = 0x17 @@ -883,14 +831,8 @@ def proto_handled(proto: str) -> bool: return proto.startswith(PROTO_PREFIX) -def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str: - return PROTO_PREFIX + ( - obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__ - ) - - def proto_of_message(packet: bytes) -> str: - return proto_name(CLASSES.get(packet[1], UNKNOWN)) + return CLASSES.get(packet[1], UNKNOWN).proto_name() def imei_from_packet(packet: bytes) -> Optional[str]: @@ -946,3 +888,17 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt: retobj.PROTO = proto # Override class attr with object attr retobj.cause = cause return retobj + + +def exposed_protos() -> List[Tuple[str, bool]]: + return [ + (cls.proto_name(), cls.RESPOND is Respond.EXT) + for cls in CLASSES.values() + if hasattr(cls, "rectified") + ] + + +def make_response(cmd: str, imei: str, **kwargs: Any) -> Optional[GPS303Pkt]: + if cmd == "poweroff": + return HIBERNATION.Out() + return None