]> www.average.org Git - loctrkd.git/blob - loctrkd/zx303proto.py
Convert recitifier to multiprotocol support
[loctrkd.git] / loctrkd / zx303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from types import SimpleNamespace
23 from typing import (
24     Any,
25     Callable,
26     Dict,
27     List,
28     Optional,
29     Tuple,
30     Type,
31     TYPE_CHECKING,
32     Union,
33 )
34
35 from .common import CoordReport, HintReport, StatusReport
36 from .protomodule import ProtoClass
37
38 __all__ = (
39     "Stream",
40     "class_by_prefix",
41     "enframe",
42     "exposed_protos",
43     "inline_response",
44     "proto_handled",
45     "parse_message",
46     "probe_buffer",
47     "proto_name",
48     "DecodeError",
49     "Respond",
50 )
51
52 PROTO_PREFIX = "ZX:"
53
54 ### Deframer ###
55
56 MAXBUFFER: int = 4096
57
58
59 class Stream:
60     def __init__(self) -> None:
61         self.buffer = b""
62
63     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
64         """
65         Process next segment of the stream. Return successfully deframed
66         packets as `bytes` and error messages as `str`.
67         """
68         when = time()
69         self.buffer += segment
70         if len(self.buffer) > MAXBUFFER:
71             # We are receiving junk. Let's drop it or we run out of memory.
72             self.buffer = b""
73             return [f"More than {MAXBUFFER} unparseable data, dropping"]
74         msgs: List[Union[bytes, str]] = []
75         while True:
76             framestart = self.buffer.find(b"xx")
77             if framestart == -1:  # No frames, return whatever we have
78                 break
79             if framestart > 0:  # Should not happen, report
80                 msgs.append(
81                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
82                 )
83                 self.buffer = self.buffer[framestart:]
84             # At this point, buffer starts with a packet
85             if len(self.buffer) < 6:  # no len and proto - cannot proceed
86                 break
87             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
88             frameend = 0
89             # Length field can legitimeely be much less than the
90             # length of the packet (e.g. WiFi positioning), but
91             # it _should not_ be greater. Still sometimes it is.
92             # Luckily, not by too much: by maybe two or three bytes?
93             # Do this embarrassing hack to avoid accidental match
94             # of some binary data in the packet against '\r\n'.
95             while True:
96                 frameend = self.buffer.find(b"\r\n", frameend + 1)
97                 if frameend == -1 or frameend >= (
98                     exp_end - 3
99                 ):  # Found realistic match or none
100                     break
101             if frameend == -1:  # Incomplete frame, return what we have
102                 break
103             packet = self.buffer[2:frameend]
104             self.buffer = self.buffer[frameend + 2 :]
105             if len(packet) < 2:  # frameend comes too early
106                 msgs.append(f"Packet too short: {packet.hex()}")
107             else:
108                 msgs.append(packet)
109         return msgs
110
111     def close(self) -> bytes:
112         ret = self.buffer
113         self.buffer = b""
114         return ret
115
116
117 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
118     return b"xx" + buffer + b"\r\n"
119
120
121 ### Parser/Constructor ###
122
123
124 class DecodeError(Exception):
125     def __init__(self, e: Exception, **kwargs: Any) -> None:
126         super().__init__(e)
127         for k, v in kwargs.items():
128             setattr(self, k, v)
129
130
131 def maybe(typ: type) -> Callable[[Any], Any]:
132     return lambda x: None if x is None else typ(x)
133
134
135 def intx(x: Union[str, int]) -> int:
136     if isinstance(x, str):
137         x = int(x, 0)
138     return x
139
140
141 def boolx(x: Union[str, bool]) -> bool:
142     if isinstance(x, str):
143         if x.upper() in ("ON", "TRUE", "1"):
144             return True
145         if x.upper() in ("OFF", "FALSE", "0"):
146             return False
147         raise ValueError(str(x) + " could not be parsed as a Boolean")
148     return x
149
150
151 def hhmm(x: str) -> str:
152     """Check for the string that represents hours and minutes"""
153     if not isinstance(x, str) or len(x) != 4:
154         raise ValueError(str(x) + " is not a four-character string")
155     hh = int(x[:2])
156     mm = int(x[2:])
157     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
158         raise ValueError(str(x) + " does not contain valid hours and minutes")
159     return x
160
161
162 def hhmmhhmm(x: str) -> str:
163     """Check for the string that represents hours and minutes twice"""
164     if not isinstance(x, str) or len(x) != 8:
165         raise ValueError(str(x) + " is not an eight-character string")
166     return hhmm(x[:4]) + hhmm(x[4:])
167
168
169 def l3str(x: Union[str, List[str]]) -> List[str]:
170     if isinstance(x, str):
171         lx = x.split(",")
172     else:
173         lx = x
174     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
175         raise ValueError(str(lx) + " is not a list of three strings")
176     return lx
177
178
179 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
180     def alrmspec(sub: str) -> Tuple[int, str]:
181         if len(sub) != 7:
182             raise ValueError(sub + " does not represent day and time")
183         return (
184             {
185                 "MON": 1,
186                 "TUE": 2,
187                 "WED": 3,
188                 "THU": 4,
189                 "FRI": 5,
190                 "SAT": 6,
191                 "SUN": 7,
192             }[sub[:3].upper()],
193             sub[3:],
194         )
195
196     if isinstance(x, str):
197         lx = [alrmspec(sub) for sub in x.split(",")]
198     else:
199         lx = x
200     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
201     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
202         raise ValueError(str(lx) + " is a wrong alarms specification")
203     return [(d, hhmm(tm)) for d, tm in lx]
204
205
206 def l3int(x: Union[str, List[int]]) -> List[int]:
207     if isinstance(x, str):
208         lx = [int(el) for el in x.split(",")]
209     else:
210         lx = x
211     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
212         raise ValueError(str(lx) + " is not a list of three integers")
213     return lx
214
215
216 class Respond(Enum):
217     NON = 0  # Incoming, no response needed
218     INL = 1  # Birirectional, use `inline_response()`
219     EXT = 2  # Birirectional, use external responder
220
221
222 class GPS303Pkt(ProtoClass):
223     RESPOND = Respond.NON  # Do not send anything back by default
224     PROTO: int
225     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
226     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
227     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
228     In: Type["GPS303Pkt"]
229     Out: Type["GPS303Pkt"]
230
231     if TYPE_CHECKING:
232
233         def __getattr__(self, name: str) -> Any:
234             pass
235
236         def __setattr__(self, name: str, value: Any) -> None:
237             pass
238
239     def __init__(self, *args: Any, **kwargs: Any):
240         """
241         Construct the object _either_ from (length, payload),
242         _or_ from the values of individual fields
243         """
244         assert not args or (len(args) == 2 and not kwargs)
245         if args:  # guaranteed to be two arguments at this point
246             self.length, self.payload = args
247             try:
248                 self.decode(self.length, self.payload)
249             except error as e:
250                 raise DecodeError(e, obj=self)
251         else:
252             for kw, typ, dfl in self.KWARGS:
253                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
254             if kwargs:
255                 raise ValueError(
256                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
257                 )
258
259     def __repr__(self) -> str:
260         return "{}({})".format(
261             self.__class__.__name__,
262             ", ".join(
263                 "{}={}".format(
264                     k,
265                     'bytes.fromhex("{}")'.format(v.hex())
266                     if isinstance(v, bytes)
267                     else v.__repr__(),
268                 )
269                 for k, v in self.__dict__.items()
270                 if not k.startswith("_")
271             ),
272         )
273
274     decode: Callable[["GPS303Pkt", int, bytes], None]
275
276     def in_decode(self, length: int, packet: bytes) -> None:
277         # Overridden in subclasses, otherwise do not decode payload
278         return
279
280     def out_decode(self, length: int, packet: bytes) -> None:
281         # Overridden in subclasses, otherwise do not decode payload
282         return
283
284     encode: Callable[["GPS303Pkt"], bytes]
285
286     def in_encode(self) -> bytes:
287         # Necessary to emulate terminal, which is not implemented
288         raise NotImplementedError(
289             self.__class__.__name__ + ".encode() not implemented"
290         )
291
292     def out_encode(self) -> bytes:
293         # Overridden in subclasses, otherwise make empty payload
294         return b""
295
296     @property
297     def packed(self) -> bytes:
298         payload = self.encode()
299         length = getattr(self, "length", len(payload) + 1)
300         return pack("BB", length, self.PROTO) + payload
301
302
303 class UNKNOWN(GPS303Pkt):
304     PROTO = 256  # > 255 is impossible in real packets
305
306
307 class LOGIN(GPS303Pkt):
308     PROTO = 0x01
309     RESPOND = Respond.INL
310     # Default response for ACK, can also respond with STOP_UPLOAD
311     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
312
313     def in_decode(self, length: int, payload: bytes) -> None:
314         self.imei = payload[:8].ljust(8, b"\0").hex()
315         self.ver = payload[8]
316
317     def in_encode(self) -> bytes:
318         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
319             "B", self.ver
320         )
321
322
323 class SUPERVISION(GPS303Pkt):
324     PROTO = 0x05
325     OUT_KWARGS = (("status", int, 1),)
326
327     def out_encode(self) -> bytes:
328         # 1: The device automatically answers Pickup effect
329         # 2: Automatically Answering Two-way Calls
330         # 3: Ring manually answer the two-way call
331         return pack("B", self.status)
332
333
334 class HEARTBEAT(GPS303Pkt):
335     PROTO = 0x08
336     RESPOND = Respond.INL
337
338
339 class _GPS_POSITIONING(GPS303Pkt):
340     RESPOND = Respond.INL
341
342     def in_decode(self, length: int, payload: bytes) -> None:
343         self.dtime = payload[:6]
344         if self.dtime == b"\0\0\0\0\0\0":
345             self.devtime = None
346         else:
347             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
348             self.devtime = datetime(
349                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
350             )
351         self.gps_data_length = payload[6] >> 4
352         self.gps_nb_sat = payload[6] & 0x0F
353         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
354         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
355         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
356         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
357         self.heading = flags & 0b0000001111111111  # bits 6 - last
358         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
359         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
360         self.speed = speed
361         self.flags = flags
362
363     def out_encode(self) -> bytes:
364         tup = datetime.utcnow().timetuple()
365         ttup = (tup[0] % 100,) + tup[1:6]
366         return pack("BBBBBB", *ttup)
367
368     def rectified(self) -> CoordReport:  # JSON-able dict
369         return CoordReport(
370             devtime=str(self.devtime),
371             battery_percentage=-1,
372             accuracy=-1.0,
373             altitude=-1.0,
374             speed=self.speed,
375             direction=self.heading,
376             latitude=self.latitude,
377             longitude=self.longitude,
378         )
379
380
381 class GPS_POSITIONING(_GPS_POSITIONING):
382     PROTO = 0x10
383
384
385 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
386     PROTO = 0x11
387
388
389 class STATUS(GPS303Pkt):
390     PROTO = 0x13
391     RESPOND = Respond.EXT
392     IN_KWARGS = (
393         ("batt", int, 100),
394         ("ver", int, 0),
395         ("timezone", int, 0),
396         ("intvl", int, 0),
397         ("signal", maybe(int), None),
398     )
399     OUT_KWARGS = (("upload_interval", int, 25),)
400
401     def in_decode(self, length: int, payload: bytes) -> None:
402         self.batt, self.ver, self.timezone, self.intvl = unpack(
403             "BBBB", payload[:4]
404         )
405         if len(payload) > 4:
406             self.signal: Optional[int] = payload[4]
407         else:
408             self.signal = None
409
410     def in_encode(self) -> bytes:
411         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
412             b"" if self.signal is None else pack("B", self.signal)
413         )
414
415     def out_encode(self) -> bytes:  # Set interval in minutes
416         return pack("B", self.upload_interval)
417
418     def rectified(self) -> StatusReport:
419         return StatusReport(battery_percentage=self.batt)
420
421
422 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
423     PROTO = 0x14
424
425     def in_encode(self) -> bytes:
426         return b""
427
428
429 class RESET(GPS303Pkt):
430     # Device sends when it got reset SMS
431     # Server can send to initiate factory reset
432     PROTO = 0x15
433
434
435 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
436     PROTO = 0x16
437     OUT_KWARGS = (("number", int, 3),)
438
439     def out_encode(self) -> bytes:  # Number of whitelist entries
440         return pack("B", self.number)
441
442
443 class _WIFI_POSITIONING(GPS303Pkt):
444     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
445         # IN_KWARGS = (
446         ("dtime", bytes, b"\0\0\0\0\0\0"),
447         ("wifi_aps", list, []),
448         ("mcc", int, 0),
449         ("mnc", int, 0),
450         ("gsm_cells", list, []),
451     )
452
453     def in_decode(self, length: int, payload: bytes) -> None:
454         self.dtime = payload[:6]
455         if self.dtime == b"\0\0\0\0\0\0":
456             self.devtime = None
457         else:
458             self.devtime = datetime.strptime(
459                 self.dtime.hex(), "%y%m%d%H%M%S"
460             ).astimezone(tz=timezone.utc)
461         self.wifi_aps = []
462         for i in range(self.length):  # length has special meaning here
463             slice = payload[6 + i * 7 : 13 + i * 7]
464             self.wifi_aps.append(
465                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
466             )
467         gsm_slice = payload[6 + self.length * 7 :]
468         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
469         self.gsm_cells = []
470         for i in range(ncells):
471             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
472             locac, cellid, sigstr = unpack(
473                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
474             )
475             self.gsm_cells.append((locac, cellid, -sigstr))
476
477     def in_encode(self) -> bytes:
478         self.length = len(self.wifi_aps)
479         return b"".join(
480             [
481                 self.dtime,
482                 b"".join(
483                     [
484                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
485                         + pack("B", -sigstr)
486                         for mac, sigstr in self.wifi_aps
487                     ]
488                 ),
489                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
490                 b"".join(
491                     [
492                         pack("!HHB", locac, cellid, -sigstr)
493                         for locac, cellid, sigstr in self.gsm_cells
494                     ]
495                 ),
496             ]
497         )
498
499     def rectified(self) -> HintReport:
500         return HintReport(
501             devtime=str(self.devtime),
502             battery_percentage=-1,
503             mcc=self.mcc,
504             mnc=self.mnc,
505             gsm_cells=self.gsm_cells,
506             wifi_aps=[("<UNKNOWN>", mac, sig) for mac, sig in self.wifi_aps],
507         )
508
509
510 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
511     PROTO = 0x17
512     RESPOND = Respond.INL
513
514     def out_encode(self) -> bytes:
515         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
516
517
518 class TIME(GPS303Pkt):
519     PROTO = 0x30
520     RESPOND = Respond.INL
521
522     def out_encode(self) -> bytes:
523         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
524
525
526 class PROHIBIT_LBS(GPS303Pkt):
527     PROTO = 0x33
528     OUT_KWARGS = (("status", int, 1),)
529
530     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
531         return pack("B", self.status)
532
533
534 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
535     PROTO = 0x34
536
537     OUT_KWARGS = (
538         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
539         ("gps_interval_set", boolx, False),
540         ("gps_interval", hhmmhhmm, "00000000"),
541         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
542         ("boot_time_set", boolx, False),
543         ("boot_time", hhmm, "0000"),
544         ("shut_time_set", boolx, False),
545         ("shut_time", hhmm, "0000"),
546     )
547
548     def out_encode(self) -> bytes:
549         return (
550             pack("B", self.gps_off)
551             + pack("B", self.gps_interval_set)
552             + bytes.fromhex(self.gps_interval)
553             + pack("B", self.lbs_off)
554             + pack("B", self.boot_time_set)
555             + bytes.fromhex(self.boot_time)
556             + pack("B", self.shut_time_set)
557             + bytes.fromhex(self.shut_time)
558         )
559
560
561 class _SET_PHONE(GPS303Pkt):
562     OUT_KWARGS = (("phone", str, ""),)
563
564     def out_encode(self) -> bytes:
565         self.phone: str
566         return self.phone.encode("")
567
568
569 class REMOTE_MONITOR_PHONE(_SET_PHONE):
570     PROTO = 0x40
571
572
573 class SOS_PHONE(_SET_PHONE):
574     PROTO = 0x41
575
576
577 class DAD_PHONE(_SET_PHONE):
578     PROTO = 0x42
579
580
581 class MOM_PHONE(_SET_PHONE):
582     PROTO = 0x43
583
584
585 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
586     PROTO = 0x44
587
588
589 class GPS_OFF_PERIOD(GPS303Pkt):
590     PROTO = 0x46
591     OUT_KWARGS = (
592         ("onoff", int, 0),
593         ("fm", hhmm, "0000"),
594         ("to", hhmm, "2359"),
595     )
596
597     def out_encode(self) -> bytes:
598         return (
599             pack("B", self.onoff)
600             + bytes.fromhex(self.fm)
601             + bytes.fromhex(self.to)
602         )
603
604
605 class DND_PERIOD(GPS303Pkt):
606     PROTO = 0x47
607     OUT_KWARGS = (
608         ("onoff", int, 0),
609         ("week", int, 3),
610         ("fm1", hhmm, "0000"),
611         ("to1", hhmm, "2359"),
612         ("fm2", hhmm, "0000"),
613         ("to2", hhmm, "2359"),
614     )
615
616     def out_encode(self) -> bytes:
617         return (
618             pack("B", self.onoff)
619             + pack("B", self.week)
620             + bytes.fromhex(self.fm1)
621             + bytes.fromhex(self.to1)
622             + bytes.fromhex(self.fm2)
623             + bytes.fromhex(self.to2)
624         )
625
626
627 class RESTART_SHUTDOWN(GPS303Pkt):
628     PROTO = 0x48
629     OUT_KWARGS = (("flag", int, 0),)
630
631     def out_encode(self) -> bytes:
632         # 1 - restart
633         # 2 - shutdown
634         return pack("B", self.flag)
635
636
637 class DEVICE(GPS303Pkt):
638     PROTO = 0x49
639     OUT_KWARGS = (("flag", int, 0),)
640
641     # 0 - Stop looking for equipment
642     # 1 - Start looking for equipment
643     def out_encode(self) -> bytes:
644         return pack("B", self.flag)
645
646
647 class ALARM_CLOCK(GPS303Pkt):
648     PROTO = 0x50
649     OUT_KWARGS: Tuple[
650         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
651     ] = (
652         ("alarms", l3alarms, []),
653     )
654
655     def out_encode(self) -> bytes:
656         return b"".join(
657             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
658         )
659
660
661 class STOP_ALARM(GPS303Pkt):
662     PROTO = 0x56
663
664     def in_decode(self, length: int, payload: bytes) -> None:
665         self.flag = payload[0]
666
667
668 class SETUP(GPS303Pkt):
669     PROTO = 0x57
670     RESPOND = Respond.EXT
671     OUT_KWARGS = (
672         ("uploadintervalseconds", intx, 0x0300),
673         ("binaryswitch", intx, 0b00110001),
674         ("alarms", l3int, [0, 0, 0]),
675         ("dndtimeswitch", int, 0),
676         ("dndtimes", l3int, [0, 0, 0]),
677         ("gpstimeswitch", int, 0),
678         ("gpstimestart", int, 0),
679         ("gpstimestop", int, 0),
680         ("phonenumbers", l3str, ["", "", ""]),
681     )
682
683     def out_encode(self) -> bytes:
684         def pack3b(x: int) -> bytes:
685             return pack("!I", x)[1:]
686
687         return b"".join(
688             [
689                 pack("!H", self.uploadintervalseconds),
690                 pack("B", self.binaryswitch),
691             ]
692             + [pack3b(el) for el in self.alarms]
693             + [
694                 pack("B", self.dndtimeswitch),
695             ]
696             + [pack3b(el) for el in self.dndtimes]
697             + [
698                 pack("B", self.gpstimeswitch),
699                 pack("!H", self.gpstimestart),
700                 pack("!H", self.gpstimestop),
701             ]
702             + [b";".join([el.encode() for el in self.phonenumbers])]
703         )
704
705     def in_encode(self) -> bytes:
706         return b""
707
708
709 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
710     PROTO = 0x58
711
712
713 class RESTORE_PASSWORD(GPS303Pkt):
714     PROTO = 0x67
715
716
717 class WIFI_POSITIONING(_WIFI_POSITIONING):
718     PROTO = 0x69
719     RESPOND = Respond.EXT
720     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
721
722     def out_encode(self) -> bytes:
723         if self.latitude is None or self.longitude is None:
724             return b""
725         return "{:+#010.8g},{:+#010.8g}".format(
726             self.latitude, self.longitude
727         ).encode()
728
729     def out_decode(self, length: int, payload: bytes) -> None:
730         lat, lon = payload.decode().split(",")
731         self.latitude = float(lat)
732         self.longitude = float(lon)
733
734
735 class MANUAL_POSITIONING(GPS303Pkt):
736     PROTO = 0x80
737
738     def in_decode(self, length: int, payload: bytes) -> None:
739         self.flag = payload[0] if len(payload) > 0 else -1
740         self.reason = {
741             1: "Incorrect time",
742             2: "LBS less",
743             3: "WiFi less",
744             4: "LBS search > 3 times",
745             5: "Same LBS and WiFi data",
746             6: "LBS prohibited, WiFi absent",
747             7: "GPS spacing < 50 m",
748         }.get(self.flag, "Unknown")
749
750
751 class BATTERY_CHARGE(GPS303Pkt):
752     PROTO = 0x81
753
754
755 class CHARGER_CONNECTED(GPS303Pkt):
756     PROTO = 0x82
757
758
759 class CHARGER_DISCONNECTED(GPS303Pkt):
760     PROTO = 0x83
761
762
763 class VIBRATION_RECEIVED(GPS303Pkt):
764     PROTO = 0x94
765
766
767 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
768     PROTO = 0x98
769     RESPOND = Respond.EXT
770     OUT_KWARGS = (("interval", int, 10),)
771
772     def in_decode(self, length: int, payload: bytes) -> None:
773         self.interval = unpack("!H", payload[:2])
774
775     def out_encode(self) -> bytes:
776         return pack("!H", self.interval)
777
778
779 class SOS_ALARM(GPS303Pkt):
780     PROTO = 0x99
781
782
783 class UNKNOWN_B3(GPS303Pkt):
784     PROTO = 0xB3
785     IN_KWARGS = (("asciidata", str, ""),)
786
787     def in_decode(self, length: int, payload: bytes) -> None:
788         self.asciidata = payload.decode()
789
790
791 # Build dicts protocol number -> class and class name -> protocol number
792 CLASSES = {}
793 PROTOS = {}
794 if True:  # just to indent the code, sorry!
795     for cls in [
796         cls
797         for name, cls in globals().items()
798         if isclass(cls)
799         and issubclass(cls, GPS303Pkt)
800         and not name.startswith("_")
801     ]:
802         if hasattr(cls, "PROTO"):
803             CLASSES[cls.PROTO] = cls
804             PROTOS[cls.__name__] = cls.PROTO
805
806
807 def class_by_prefix(
808     prefix: str,
809 ) -> Union[Type[GPS303Pkt], List[str]]:
810     if prefix.startswith(PROTO_PREFIX):
811         pname = prefix[len(PROTO_PREFIX) :]
812     else:
813         raise KeyError(pname)
814     lst = [
815         (name, proto)
816         for name, proto in PROTOS.items()
817         if name.upper().startswith(prefix.upper())
818     ]
819     if len(lst) != 1:
820         return [name for name, _ in lst]
821     _, proto = lst[0]
822     return CLASSES[proto]
823
824
825 def proto_handled(proto: str) -> bool:
826     return proto.startswith(PROTO_PREFIX)
827
828
829 def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
830     return PROTO_PREFIX + (
831         obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
832     )
833
834
835 def proto_of_message(packet: bytes) -> str:
836     return proto_name(CLASSES.get(packet[1], UNKNOWN))
837
838
839 def imei_from_packet(packet: bytes) -> Optional[str]:
840     if packet[1] == LOGIN.PROTO:
841         msg = parse_message(packet)
842         if isinstance(msg, LOGIN):
843             return msg.imei
844     return None
845
846
847 def is_goodbye_packet(packet: bytes) -> bool:
848     return packet[1] == HIBERNATION.PROTO
849
850
851 def inline_response(packet: bytes) -> Optional[bytes]:
852     proto = packet[1]
853     if proto in CLASSES:
854         cls = CLASSES[proto]
855         if cls.RESPOND is Respond.INL:
856             return cls.Out().packed
857     return None
858
859
860 def probe_buffer(buffer: bytes) -> bool:
861     framestart = buffer.find(b"xx")
862     if framestart < 0:
863         return False
864     if len(buffer) - framestart < 6:
865         return False
866     return True
867
868
869 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
870     """From a packet (without framing bytes) derive the XXX.In object"""
871     length, proto = unpack("BB", packet[:2])
872     payload = packet[2:]
873     if proto not in CLASSES:
874         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
875             f"Proto {proto} is unknown"
876         )
877     else:
878         try:
879             if is_incoming:
880                 return CLASSES[proto].In(length, payload)
881             else:
882                 return CLASSES[proto].Out(length, payload)
883         except (DecodeError, ValueError, IndexError) as e:
884             cause = e
885     if is_incoming:
886         retobj = UNKNOWN.In(length, payload)
887     else:
888         retobj = UNKNOWN.Out(length, payload)
889     retobj.PROTO = proto  # Override class attr with object attr
890     retobj.cause = cause
891     return retobj
892
893
894 def exposed_protos() -> List[Tuple[str, bool]]:
895     return [
896         (proto_name(cls)[:16], cls.RESPOND is Respond.EXT)
897         for cls in CLASSES.values()
898         if hasattr(cls, "rectified")
899     ]