X-Git-Url: http://www.average.org/gitweb/?p=loctrkd.git;a=blobdiff_plain;f=loctrkd%2Fzx303proto.py;h=a7329c316326187c950580ec11a65843981ace2d;hp=2a0ede6991c89b9c46ef56fe3ad344d5c7b506a3;hb=2cf0fd9d215dda17eae4261ab7967367f6aa0028;hpb=b965fecb08f4149f6f91770e9d6bb6d79f53f11e diff --git a/loctrkd/zx303proto.py b/loctrkd/zx303proto.py index 2a0ede6..a7329c3 100755 --- a/loctrkd/zx303proto.py +++ b/loctrkd/zx303proto.py @@ -44,12 +44,12 @@ __all__ = ( "proto_handled", "parse_message", "probe_buffer", - "proto_name", "DecodeError", "Respond", ) -PROTO_PREFIX = "ZX:" +MODNAME = __name__.split(".")[-1] +PROTO_PREFIX: str = "ZX:" ### Deframer ### @@ -293,6 +293,11 @@ class GPS303Pkt(ProtoClass): # Overridden in subclasses, otherwise make empty payload return b"" + @classmethod + def proto_name(cls) -> str: + """Name of the command as used externally""" + return (PROTO_PREFIX + cls.__name__)[:16] + @property def packed(self) -> bytes: payload = self.encode() @@ -365,12 +370,12 @@ class _GPS_POSITIONING(GPS303Pkt): ttup = (tup[0] % 100,) + tup[1:6] return pack("BBBBBB", *ttup) - def rectified(self) -> CoordReport: # JSON-able dict - return CoordReport( + def rectified(self) -> Tuple[str, CoordReport]: # JSON-able dict + return MODNAME, CoordReport( devtime=str(self.devtime), - battery_percentage=-1, - accuracy=-1.0, - altitude=-1.0, + battery_percentage=None, + accuracy=None, + altitude=None, speed=self.speed, direction=self.heading, latitude=self.latitude, @@ -415,8 +420,8 @@ class STATUS(GPS303Pkt): def out_encode(self) -> bytes: # Set interval in minutes return pack("B", self.upload_interval) - def rectified(self) -> StatusReport: - return StatusReport(battery_percentage=self.batt) + def rectified(self) -> Tuple[str, StatusReport]: + return MODNAME, StatusReport(battery_percentage=self.batt) class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep @@ -496,10 +501,10 @@ class _WIFI_POSITIONING(GPS303Pkt): ] ) - def rectified(self) -> HintReport: - return HintReport( + def rectified(self) -> Tuple[str, HintReport]: + return MODNAME, HintReport( devtime=str(self.devtime), - battery_percentage=-1, + battery_percentage=None, mcc=self.mcc, mnc=self.mnc, gsm_cells=self.gsm_cells, @@ -826,14 +831,8 @@ def proto_handled(proto: str) -> bool: return proto.startswith(PROTO_PREFIX) -def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str: - return PROTO_PREFIX + ( - obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__ - ) - - def proto_of_message(packet: bytes) -> str: - return proto_name(CLASSES.get(packet[1], UNKNOWN)) + return CLASSES.get(packet[1], UNKNOWN).proto_name() def imei_from_packet(packet: bytes) -> Optional[str]: @@ -893,7 +892,13 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt: def exposed_protos() -> List[Tuple[str, bool]]: return [ - (proto_name(cls)[:16], cls.RESPOND is Respond.EXT) + (cls.proto_name(), cls.RESPOND is Respond.EXT) for cls in CLASSES.values() if hasattr(cls, "rectified") ] + + +def make_response(cmd: str, imei: str, **kwargs: Any) -> Optional[GPS303Pkt]: + if cmd == "poweroff": + return HIBERNATION.Out() + return None