]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
test: include lookaside and termconfig in the loop
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from typing import (
22     Any,
23     Callable,
24     Dict,
25     List,
26     Optional,
27     Tuple,
28     Type,
29     TYPE_CHECKING,
30     Union,
31 )
32
33 __all__ = (
34     "class_by_prefix",
35     "inline_response",
36     "parse_message",
37     "proto_by_name",
38     "DecodeError",
39     "Respond",
40     "GPS303Pkt",
41     "UNKNOWN",
42     "LOGIN",
43     "SUPERVISION",
44     "HEARTBEAT",
45     "GPS_POSITIONING",
46     "GPS_OFFLINE_POSITIONING",
47     "STATUS",
48     "HIBERNATION",
49     "RESET",
50     "WHITELIST_TOTAL",
51     "WIFI_OFFLINE_POSITIONING",
52     "TIME",
53     "PROHIBIT_LBS",
54     "GPS_LBS_SWITCH_TIMES",
55     "REMOTE_MONITOR_PHONE",
56     "SOS_PHONE",
57     "DAD_PHONE",
58     "MOM_PHONE",
59     "STOP_UPLOAD",
60     "GPS_OFF_PERIOD",
61     "DND_PERIOD",
62     "RESTART_SHUTDOWN",
63     "DEVICE",
64     "ALARM_CLOCK",
65     "STOP_ALARM",
66     "SETUP",
67     "SYNCHRONOUS_WHITELIST",
68     "RESTORE_PASSWORD",
69     "WIFI_POSITIONING",
70     "MANUAL_POSITIONING",
71     "BATTERY_CHARGE",
72     "CHARGER_CONNECTED",
73     "CHARGER_DISCONNECTED",
74     "VIBRATION_RECEIVED",
75     "POSITION_UPLOAD_INTERVAL",
76     "SOS_ALARM",
77     "UNKNOWN_B3",
78 )
79
80
81 class DecodeError(Exception):
82     def __init__(self, e: Exception, **kwargs: Any) -> None:
83         super().__init__(e)
84         for k, v in kwargs.items():
85             setattr(self, k, v)
86
87
88 def maybe(typ: type) -> Callable[[Any], Any]:
89     return lambda x: None if x is None else typ(x)
90
91
92 def intx(x: Union[str, int]) -> int:
93     if isinstance(x, str):
94         x = int(x, 0)
95     return x
96
97
98 def boolx(x: Union[str, bool]) -> bool:
99     if isinstance(x, str):
100         if x.upper() in ("ON", "TRUE", "1"):
101             return True
102         if x.upper() in ("OFF", "FALSE", "0"):
103             return False
104         raise ValueError(str(x) + " could not be parsed as a Boolean")
105     return x
106
107
108 def hhmm(x: str) -> str:
109     """Check for the string that represents hours and minutes"""
110     if not isinstance(x, str) or len(x) != 4:
111         raise ValueError(str(x) + " is not a four-character string")
112     hh = int(x[:2])
113     mm = int(x[2:])
114     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
115         raise ValueError(str(x) + " does not contain valid hours and minutes")
116     return x
117
118
119 def hhmmhhmm(x: str) -> str:
120     """Check for the string that represents hours and minutes twice"""
121     if not isinstance(x, str) or len(x) != 8:
122         raise ValueError(str(x) + " is not an eight-character string")
123     return hhmm(x[:4]) + hhmm(x[4:])
124
125
126 def l3str(x: Union[str, List[str]]) -> List[str]:
127     if isinstance(x, str):
128         lx = x.split(",")
129     else:
130         lx = x
131     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
132         raise ValueError(str(lx) + " is not a list of three strings")
133     return lx
134
135
136 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
137     def alrmspec(sub: str) -> Tuple[int, str]:
138         if len(sub) != 7:
139             raise ValueError(sub + " does not represent day and time")
140         return (
141             {
142                 "MON": 1,
143                 "TUE": 2,
144                 "WED": 3,
145                 "THU": 4,
146                 "FRI": 5,
147                 "SAT": 6,
148                 "SUN": 7,
149             }[sub[:3].upper()],
150             sub[3:],
151         )
152
153     if isinstance(x, str):
154         lx = [alrmspec(sub) for sub in x.split(",")]
155     else:
156         lx = x
157     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
158     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
159         raise ValueError(str(lx) + " is a wrong alarms specification")
160     return [(d, hhmm(tm)) for d, tm in lx]
161
162
163 def l3int(x: Union[str, List[int]]) -> List[int]:
164     if isinstance(x, str):
165         lx = [int(el) for el in x.split(",")]
166     else:
167         lx = x
168     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
169         raise ValueError(str(lx) + " is not a list of three integers")
170     return lx
171
172
173 class MetaPkt(type):
174     """
175     For each class corresponding to a message, automatically create
176     two nested classes `In` and `Out` that also inherit from their
177     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
178     copied to the `In` nested class under the name `KWARGS`, and
179     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
180     to the nested class `Out`. In addition, method `encode` is
181     defined in both classes equal to `in_encode()` and `out_encode()`
182     respectively.
183     """
184
185     if TYPE_CHECKING:
186
187         def __getattr__(self, name: str) -> Any:
188             pass
189
190         def __setattr__(self, name: str, value: Any) -> None:
191             pass
192
193     def __new__(
194         cls: Type["MetaPkt"],
195         name: str,
196         bases: Tuple[type, ...],
197         attrs: Dict[str, Any],
198     ) -> "MetaPkt":
199         newcls = super().__new__(cls, name, bases, attrs)
200         newcls.In = super().__new__(
201             cls,
202             name + ".In",
203             (newcls,) + bases,
204             {
205                 "KWARGS": newcls.IN_KWARGS,
206                 "decode": newcls.in_decode,
207                 "encode": newcls.in_encode,
208             },
209         )
210         newcls.Out = super().__new__(
211             cls,
212             name + ".Out",
213             (newcls,) + bases,
214             {
215                 "KWARGS": newcls.OUT_KWARGS,
216                 "decode": newcls.out_decode,
217                 "encode": newcls.out_encode,
218             },
219         )
220         return newcls
221
222
223 class Respond(Enum):
224     NON = 0  # Incoming, no response needed
225     INL = 1  # Birirectional, use `inline_response()`
226     EXT = 2  # Birirectional, use external responder
227
228
229 class GPS303Pkt(metaclass=MetaPkt):
230     RESPOND = Respond.NON  # Do not send anything back by default
231     PROTO: int
232     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
233     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
234     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
235     In: Type["GPS303Pkt"]
236     Out: Type["GPS303Pkt"]
237
238     if TYPE_CHECKING:
239
240         def __getattr__(self, name: str) -> Any:
241             pass
242
243         def __setattr__(self, name: str, value: Any) -> None:
244             pass
245
246     def __init__(self, *args: Any, **kwargs: Any):
247         """
248         Construct the object _either_ from (length, payload),
249         _or_ from the values of individual fields
250         """
251         assert not args or (len(args) == 2 and not kwargs)
252         if args:  # guaranteed to be two arguments at this point
253             self.length, self.payload = args
254             try:
255                 self.decode(self.length, self.payload)
256             except error as e:
257                 raise DecodeError(e, obj=self)
258         else:
259             for kw, typ, dfl in self.KWARGS:
260                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
261             if kwargs:
262                 raise ValueError(
263                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
264                 )
265
266     def __repr__(self) -> str:
267         return "{}({})".format(
268             self.__class__.__name__,
269             ", ".join(
270                 "{}={}".format(
271                     k,
272                     'bytes.fromhex("{}")'.format(v.hex())
273                     if isinstance(v, bytes)
274                     else v.__repr__(),
275                 )
276                 for k, v in self.__dict__.items()
277                 if not k.startswith("_")
278             ),
279         )
280
281     decode: Callable[["GPS303Pkt", int, bytes], None]
282
283     def in_decode(self, length: int, packet: bytes) -> None:
284         # Overridden in subclasses, otherwise do not decode payload
285         return
286
287     def out_decode(self, length: int, packet: bytes) -> None:
288         # Overridden in subclasses, otherwise do not decode payload
289         return
290
291     encode: Callable[["GPS303Pkt"], bytes]
292
293     def in_encode(self) -> bytes:
294         # Necessary to emulate terminal, which is not implemented
295         raise NotImplementedError(
296             self.__class__.__name__ + ".encode() not implemented"
297         )
298
299     def out_encode(self) -> bytes:
300         # Overridden in subclasses, otherwise make empty payload
301         return b""
302
303     @property
304     def packed(self) -> bytes:
305         payload = self.encode()
306         length = getattr(self, "length", len(payload) + 1)
307         return pack("BB", length, self.PROTO) + payload
308
309
310 class UNKNOWN(GPS303Pkt):
311     PROTO = 256  # > 255 is impossible in real packets
312
313
314 class LOGIN(GPS303Pkt):
315     PROTO = 0x01
316     RESPOND = Respond.INL
317     # Default response for ACK, can also respond with STOP_UPLOAD
318     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
319
320     def in_decode(self, length: int, payload: bytes) -> None:
321         self.imei = payload[:8].ljust(8, b"\0").hex()
322         self.ver = payload[8]
323
324     def in_encode(self) -> bytes:
325         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
326             "B", self.ver
327         )
328
329
330 class SUPERVISION(GPS303Pkt):
331     PROTO = 0x05
332     OUT_KWARGS = (("status", int, 1),)
333
334     def out_encode(self) -> bytes:
335         # 1: The device automatically answers Pickup effect
336         # 2: Automatically Answering Two-way Calls
337         # 3: Ring manually answer the two-way call
338         return pack("B", self.status)
339
340
341 class HEARTBEAT(GPS303Pkt):
342     PROTO = 0x08
343     RESPOND = Respond.INL
344
345
346 class _GPS_POSITIONING(GPS303Pkt):
347     RESPOND = Respond.INL
348
349     def in_decode(self, length: int, payload: bytes) -> None:
350         self.dtime = payload[:6]
351         if self.dtime == b"\0\0\0\0\0\0":
352             self.devtime = None
353         else:
354             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
355             self.devtime = datetime(
356                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
357             )
358         self.gps_data_length = payload[6] >> 4
359         self.gps_nb_sat = payload[6] & 0x0F
360         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
361         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
362         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
363         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
364         self.heading = flags & 0b0000001111111111  # bits 6 - last
365         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
366         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
367         self.speed = speed
368         self.flags = flags
369
370     def out_encode(self) -> bytes:
371         tup = datetime.utcnow().timetuple()
372         ttup = (tup[0] % 100,) + tup[1:6]
373         return pack("BBBBBB", *ttup)
374
375
376 class GPS_POSITIONING(_GPS_POSITIONING):
377     PROTO = 0x10
378
379
380 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
381     PROTO = 0x11
382
383
384 class STATUS(GPS303Pkt):
385     PROTO = 0x13
386     RESPOND = Respond.EXT
387     IN_KWARGS = (
388         ("batt", int, 100),
389         ("ver", int, 0),
390         ("timezone", int, 0),
391         ("intvl", int, 0),
392         ("signal", maybe(int), None),
393     )
394     OUT_KWARGS = (("upload_interval", int, 25),)
395
396     def in_decode(self, length: int, payload: bytes) -> None:
397         self.batt, self.ver, self.timezone, self.intvl = unpack(
398             "BBBB", payload[:4]
399         )
400         if len(payload) > 4:
401             self.signal: Optional[int] = payload[4]
402         else:
403             self.signal = None
404
405     def in_encode(self) -> bytes:
406         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
407             b"" if self.signal is None else pack("B", self.signal)
408         )
409
410     def out_encode(self) -> bytes:  # Set interval in minutes
411         return pack("B", self.upload_interval)
412
413
414 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
415     PROTO = 0x14
416
417     def in_encode(self) -> bytes:
418         return b""
419
420
421 class RESET(GPS303Pkt):
422     # Device sends when it got reset SMS
423     # Server can send to initiate factory reset
424     PROTO = 0x15
425
426
427 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
428     PROTO = 0x16
429     OUT_KWARGS = (("number", int, 3),)
430
431     def out_encode(self) -> bytes:  # Number of whitelist entries
432         return pack("B", self.number)
433
434
435 class _WIFI_POSITIONING(GPS303Pkt):
436     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
437         # IN_KWARGS = (
438         ("dtime", bytes, b"\0\0\0\0\0\0"),
439         ("wifi_aps", list, []),
440         ("mcc", int, 0),
441         ("mnc", int, 0),
442         ("gsm_cells", list, []),
443     )
444
445     def in_decode(self, length: int, payload: bytes) -> None:
446         self.dtime = payload[:6]
447         if self.dtime == b"\0\0\0\0\0\0":
448             self.devtime = None
449         else:
450             self.devtime = datetime.strptime(
451                 self.dtime.hex(), "%y%m%d%H%M%S"
452             ).astimezone(tz=timezone.utc)
453         self.wifi_aps = []
454         for i in range(self.length):  # length has special meaning here
455             slice = payload[6 + i * 7 : 13 + i * 7]
456             self.wifi_aps.append(
457                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
458             )
459         gsm_slice = payload[6 + self.length * 7 :]
460         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
461         self.gsm_cells = []
462         for i in range(ncells):
463             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
464             locac, cellid, sigstr = unpack(
465                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
466             )
467             self.gsm_cells.append((locac, cellid, -sigstr))
468
469     def in_encode(self) -> bytes:
470         self.length = len(self.wifi_aps)
471         return b"".join(
472             [
473                 self.dtime,
474                 b"".join(
475                     [
476                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
477                         + pack("B", -sigstr)
478                         for mac, sigstr in self.wifi_aps
479                     ]
480                 ),
481                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
482                 b"".join(
483                     [
484                         pack("!HHB", locac, cellid, -sigstr)
485                         for locac, cellid, sigstr in self.gsm_cells
486                     ]
487                 ),
488             ]
489         )
490
491
492 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
493     PROTO = 0x17
494     RESPOND = Respond.INL
495
496     def out_encode(self) -> bytes:
497         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
498
499
500 class TIME(GPS303Pkt):
501     PROTO = 0x30
502     RESPOND = Respond.INL
503
504     def out_encode(self) -> bytes:
505         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
506
507
508 class PROHIBIT_LBS(GPS303Pkt):
509     PROTO = 0x33
510     OUT_KWARGS = (("status", int, 1),)
511
512     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
513         return pack("B", self.status)
514
515
516 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
517     PROTO = 0x34
518
519     OUT_KWARGS = (
520         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
521         ("gps_interval_set", boolx, False),
522         ("gps_interval", hhmmhhmm, "00000000"),
523         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
524         ("boot_time_set", boolx, False),
525         ("boot_time", hhmm, "0000"),
526         ("shut_time_set", boolx, False),
527         ("shut_time", hhmm, "0000"),
528     )
529
530     def out_encode(self) -> bytes:
531         return (
532             pack("B", self.gps_off)
533             + pack("B", self.gps_interval_set)
534             + bytes.fromhex(self.gps_interval)
535             + pack("B", self.lbs_off)
536             + pack("B", self.boot_time_set)
537             + bytes.fromhex(self.boot_time)
538             + pack("B", self.shut_time_set)
539             + bytes.fromhex(self.shut_time)
540         )
541
542
543 class _SET_PHONE(GPS303Pkt):
544     OUT_KWARGS = (("phone", str, ""),)
545
546     def out_encode(self) -> bytes:
547         self.phone: str
548         return self.phone.encode("")
549
550
551 class REMOTE_MONITOR_PHONE(_SET_PHONE):
552     PROTO = 0x40
553
554
555 class SOS_PHONE(_SET_PHONE):
556     PROTO = 0x41
557
558
559 class DAD_PHONE(_SET_PHONE):
560     PROTO = 0x42
561
562
563 class MOM_PHONE(_SET_PHONE):
564     PROTO = 0x43
565
566
567 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
568     PROTO = 0x44
569
570
571 class GPS_OFF_PERIOD(GPS303Pkt):
572     PROTO = 0x46
573     OUT_KWARGS = (
574         ("onoff", int, 0),
575         ("fm", hhmm, "0000"),
576         ("to", hhmm, "2359"),
577     )
578
579     def out_encode(self) -> bytes:
580         return (
581             pack("B", self.onoff)
582             + bytes.fromhex(self.fm)
583             + bytes.fromhex(self.to)
584         )
585
586
587 class DND_PERIOD(GPS303Pkt):
588     PROTO = 0x47
589     OUT_KWARGS = (
590         ("onoff", int, 0),
591         ("week", int, 3),
592         ("fm1", hhmm, "0000"),
593         ("to1", hhmm, "2359"),
594         ("fm2", hhmm, "0000"),
595         ("to2", hhmm, "2359"),
596     )
597
598     def out_encode(self) -> bytes:
599         return (
600             pack("B", self.onoff)
601             + pack("B", self.week)
602             + bytes.fromhex(self.fm1)
603             + bytes.fromhex(self.to1)
604             + bytes.fromhex(self.fm2)
605             + bytes.fromhex(self.to2)
606         )
607
608
609 class RESTART_SHUTDOWN(GPS303Pkt):
610     PROTO = 0x48
611     OUT_KWARGS = (("flag", int, 0),)
612
613     def out_encode(self) -> bytes:
614         # 1 - restart
615         # 2 - shutdown
616         return pack("B", self.flag)
617
618
619 class DEVICE(GPS303Pkt):
620     PROTO = 0x49
621     OUT_KWARGS = (("flag", int, 0),)
622
623     # 0 - Stop looking for equipment
624     # 1 - Start looking for equipment
625     def out_encode(self) -> bytes:
626         return pack("B", self.flag)
627
628
629 class ALARM_CLOCK(GPS303Pkt):
630     PROTO = 0x50
631     OUT_KWARGS: Tuple[
632         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
633     ] = (
634         ("alarms", l3alarms, []),
635     )
636
637     def out_encode(self) -> bytes:
638         return b"".join(
639             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
640         )
641
642
643 class STOP_ALARM(GPS303Pkt):
644     PROTO = 0x56
645
646     def in_decode(self, length: int, payload: bytes) -> None:
647         self.flag = payload[0]
648
649
650 class SETUP(GPS303Pkt):
651     PROTO = 0x57
652     RESPOND = Respond.EXT
653     OUT_KWARGS = (
654         ("uploadintervalseconds", intx, 0x0300),
655         ("binaryswitch", intx, 0b00110001),
656         ("alarms", l3int, [0, 0, 0]),
657         ("dndtimeswitch", int, 0),
658         ("dndtimes", l3int, [0, 0, 0]),
659         ("gpstimeswitch", int, 0),
660         ("gpstimestart", int, 0),
661         ("gpstimestop", int, 0),
662         ("phonenumbers", l3str, ["", "", ""]),
663     )
664
665     def out_encode(self) -> bytes:
666         def pack3b(x: int) -> bytes:
667             return pack("!I", x)[1:]
668
669         return b"".join(
670             [
671                 pack("!H", self.uploadintervalseconds),
672                 pack("B", self.binaryswitch),
673             ]
674             + [pack3b(el) for el in self.alarms]
675             + [
676                 pack("B", self.dndtimeswitch),
677             ]
678             + [pack3b(el) for el in self.dndtimes]
679             + [
680                 pack("B", self.gpstimeswitch),
681                 pack("!H", self.gpstimestart),
682                 pack("!H", self.gpstimestop),
683             ]
684             + [b";".join([el.encode() for el in self.phonenumbers])]
685         )
686
687     def in_encode(self) -> bytes:
688         return b""
689
690
691 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
692     PROTO = 0x58
693
694
695 class RESTORE_PASSWORD(GPS303Pkt):
696     PROTO = 0x67
697
698
699 class WIFI_POSITIONING(_WIFI_POSITIONING):
700     PROTO = 0x69
701     RESPOND = Respond.EXT
702     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
703
704     def out_encode(self) -> bytes:
705         if self.latitude is None or self.longitude is None:
706             return b""
707         return "{:+#010.8g},{:+#010.8g}".format(
708             self.latitude, self.longitude
709         ).encode()
710
711     def out_decode(self, length: int, payload: bytes) -> None:
712         lat, lon = payload.decode().split(",")
713         self.latitude = float(lat)
714         self.longitude = float(lon)
715
716
717 class MANUAL_POSITIONING(GPS303Pkt):
718     PROTO = 0x80
719
720     def in_decode(self, length: int, payload: bytes) -> None:
721         self.flag = payload[0] if len(payload) > 0 else -1
722         self.reason = {
723             1: "Incorrect time",
724             2: "LBS less",
725             3: "WiFi less",
726             4: "LBS search > 3 times",
727             5: "Same LBS and WiFi data",
728             6: "LBS prohibited, WiFi absent",
729             7: "GPS spacing < 50 m",
730         }.get(self.flag, "Unknown")
731
732
733 class BATTERY_CHARGE(GPS303Pkt):
734     PROTO = 0x81
735
736
737 class CHARGER_CONNECTED(GPS303Pkt):
738     PROTO = 0x82
739
740
741 class CHARGER_DISCONNECTED(GPS303Pkt):
742     PROTO = 0x83
743
744
745 class VIBRATION_RECEIVED(GPS303Pkt):
746     PROTO = 0x94
747
748
749 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
750     PROTO = 0x98
751     RESPOND = Respond.EXT
752     OUT_KWARGS = (("interval", int, 10),)
753
754     def in_decode(self, length: int, payload: bytes) -> None:
755         self.interval = unpack("!H", payload[:2])
756
757     def out_encode(self) -> bytes:
758         return pack("!H", self.interval)
759
760
761 class SOS_ALARM(GPS303Pkt):
762     PROTO = 0x99
763
764
765 class UNKNOWN_B3(GPS303Pkt):
766     PROTO = 0xB3
767     IN_KWARGS = (("asciidata", str, ""),)
768
769     def in_decode(self, length: int, payload: bytes) -> None:
770         self.asciidata = payload.decode()
771
772
773 # Build dicts protocol number -> class and class name -> protocol number
774 CLASSES = {}
775 PROTOS = {}
776 if True:  # just to indent the code, sorry!
777     for cls in [
778         cls
779         for name, cls in globals().items()
780         if isclass(cls)
781         and issubclass(cls, GPS303Pkt)
782         and not name.startswith("_")
783     ]:
784         if hasattr(cls, "PROTO"):
785             CLASSES[cls.PROTO] = cls
786             PROTOS[cls.__name__] = cls.PROTO
787
788
789 def class_by_prefix(
790     prefix: str,
791 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
792     lst = [
793         (name, proto)
794         for name, proto in PROTOS.items()
795         if name.upper().startswith(prefix.upper())
796     ]
797     if len(lst) != 1:
798         return lst
799     _, proto = lst[0]
800     return CLASSES[proto]
801
802
803 def proto_by_name(name: str) -> int:
804     return PROTOS.get(name, -1)
805
806
807 def proto_of_message(packet: bytes) -> int:
808     return packet[1]
809
810
811 def inline_response(packet: bytes) -> Optional[bytes]:
812     proto = proto_of_message(packet)
813     if proto in CLASSES:
814         cls = CLASSES[proto]
815         if cls.RESPOND is Respond.INL:
816             return cls.Out().packed
817     return None
818
819
820 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
821     """From a packet (without framing bytes) derive the XXX.In object"""
822     length, proto = unpack("BB", packet[:2])
823     payload = packet[2:]
824     if proto not in CLASSES:
825         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
826             f"Proto {proto} is unknown"
827         )
828     else:
829         try:
830             if is_incoming:
831                 return CLASSES[proto].In(length, payload)
832             else:
833                 return CLASSES[proto].Out(length, payload)
834         except (DecodeError, ValueError, IndexError) as e:
835             cause = e
836     if is_incoming:
837         retobj = UNKNOWN.In(length, payload)
838     else:
839         retobj = UNKNOWN.Out(length, payload)
840     retobj.PROTO = proto  # Override class attr with object attr
841     retobj.cause = cause
842     return retobj