]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
test: minimally functional test_storage
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from typing import (
22     Any,
23     Callable,
24     Dict,
25     List,
26     Optional,
27     Tuple,
28     Type,
29     TYPE_CHECKING,
30     Union,
31 )
32
33 __all__ = (
34     "class_by_prefix",
35     "inline_response",
36     "parse_message",
37     "proto_by_name",
38     "DecodeError",
39     "Respond",
40     "GPS303Pkt",
41     "UNKNOWN",
42     "LOGIN",
43     "SUPERVISION",
44     "HEARTBEAT",
45     "GPS_POSITIONING",
46     "GPS_OFFLINE_POSITIONING",
47     "STATUS",
48     "HIBERNATION",
49     "RESET",
50     "WHITELIST_TOTAL",
51     "WIFI_OFFLINE_POSITIONING",
52     "TIME",
53     "PROHIBIT_LBS",
54     "GPS_LBS_SWITCH_TIMES",
55     "REMOTE_MONITOR_PHONE",
56     "SOS_PHONE",
57     "DAD_PHONE",
58     "MOM_PHONE",
59     "STOP_UPLOAD",
60     "GPS_OFF_PERIOD",
61     "DND_PERIOD",
62     "RESTART_SHUTDOWN",
63     "DEVICE",
64     "ALARM_CLOCK",
65     "STOP_ALARM",
66     "SETUP",
67     "SYNCHRONOUS_WHITELIST",
68     "RESTORE_PASSWORD",
69     "WIFI_POSITIONING",
70     "MANUAL_POSITIONING",
71     "BATTERY_CHARGE",
72     "CHARGER_CONNECTED",
73     "CHARGER_DISCONNECTED",
74     "VIBRATION_RECEIVED",
75     "POSITION_UPLOAD_INTERVAL",
76     "SOS_ALARM",
77     "UNKNOWN_B3",
78 )
79
80
81 class DecodeError(Exception):
82     def __init__(self, e: Exception, **kwargs: Any) -> None:
83         super().__init__(e)
84         for k, v in kwargs.items():
85             setattr(self, k, v)
86
87
88 def maybe_int(x: Optional[int]) -> Optional[int]:
89     return None if x is None else int(x)
90
91
92 def intx(x: Union[str, int]) -> int:
93     if isinstance(x, str):
94         x = int(x, 0)
95     return x
96
97
98 def boolx(x: Union[str, bool]) -> bool:
99     if isinstance(x, str):
100         if x.upper() in ("ON", "TRUE", "1"):
101             return True
102         if x.upper() in ("OFF", "FALSE", "0"):
103             return False
104         raise ValueError(str(x) + " could not be parsed as a Boolean")
105     return x
106
107
108 def hhmm(x: str) -> str:
109     """Check for the string that represents hours and minutes"""
110     if not isinstance(x, str) or len(x) != 4:
111         raise ValueError(str(x) + " is not a four-character string")
112     hh = int(x[:2])
113     mm = int(x[2:])
114     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
115         raise ValueError(str(x) + " does not contain valid hours and minutes")
116     return x
117
118
119 def hhmmhhmm(x: str) -> str:
120     """Check for the string that represents hours and minutes twice"""
121     if not isinstance(x, str) or len(x) != 8:
122         raise ValueError(str(x) + " is not an eight-character string")
123     return hhmm(x[:4]) + hhmm(x[4:])
124
125
126 def l3str(x: Union[str, List[str]]) -> List[str]:
127     if isinstance(x, str):
128         lx = x.split(",")
129     else:
130         lx = x
131     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
132         raise ValueError(str(lx) + " is not a list of three strings")
133     return lx
134
135
136 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
137     def alrmspec(sub: str) -> Tuple[int, str]:
138         if len(sub) != 7:
139             raise ValueError(sub + " does not represent day and time")
140         return (
141             {
142                 "MON": 1,
143                 "TUE": 2,
144                 "WED": 3,
145                 "THU": 4,
146                 "FRI": 5,
147                 "SAT": 6,
148                 "SUN": 7,
149             }[sub[:3].upper()],
150             sub[3:],
151         )
152
153     if isinstance(x, str):
154         lx = [alrmspec(sub) for sub in x.split(",")]
155     else:
156         lx = x
157     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
158     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
159         raise ValueError(str(lx) + " is a wrong alarms specification")
160     return [(d, hhmm(tm)) for d, tm in lx]
161
162
163 def l3int(x: Union[str, List[int]]) -> List[int]:
164     if isinstance(x, str):
165         lx = [int(el) for el in x.split(",")]
166     else:
167         lx = x
168     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
169         raise ValueError(str(lx) + " is not a list of three integers")
170     return lx
171
172
173 class MetaPkt(type):
174     """
175     For each class corresponding to a message, automatically create
176     two nested classes `In` and `Out` that also inherit from their
177     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
178     copied to the `In` nested class under the name `KWARGS`, and
179     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
180     to the nested class `Out`. In addition, method `encode` is
181     defined in both classes equal to `in_encode()` and `out_encode()`
182     respectively.
183     """
184
185     if TYPE_CHECKING:
186
187         def __getattr__(self, name: str) -> Any:
188             pass
189
190         def __setattr__(self, name: str, value: Any) -> None:
191             pass
192
193     def __new__(
194         cls: Type["MetaPkt"],
195         name: str,
196         bases: Tuple[type, ...],
197         attrs: Dict[str, Any],
198     ) -> "MetaPkt":
199         newcls = super().__new__(cls, name, bases, attrs)
200         newcls.In = super().__new__(
201             cls,
202             name + ".In",
203             (newcls,) + bases,
204             {
205                 "KWARGS": newcls.IN_KWARGS,
206                 "decode": newcls.in_decode,
207                 "encode": newcls.in_encode,
208             },
209         )
210         newcls.Out = super().__new__(
211             cls,
212             name + ".Out",
213             (newcls,) + bases,
214             {
215                 "KWARGS": newcls.OUT_KWARGS,
216                 "decode": newcls.out_decode,
217                 "encode": newcls.out_encode,
218             },
219         )
220         return newcls
221
222
223 class Respond(Enum):
224     NON = 0  # Incoming, no response needed
225     INL = 1  # Birirectional, use `inline_response()`
226     EXT = 2  # Birirectional, use external responder
227
228
229 class GPS303Pkt(metaclass=MetaPkt):
230     RESPOND = Respond.NON  # Do not send anything back by default
231     PROTO: int
232     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
233     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
234     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
235     In: Type["GPS303Pkt"]
236     Out: Type["GPS303Pkt"]
237
238     if TYPE_CHECKING:
239
240         def __getattr__(self, name: str) -> Any:
241             pass
242
243         def __setattr__(self, name: str, value: Any) -> None:
244             pass
245
246     def __init__(self, *args: Any, **kwargs: Any):
247         """
248         Construct the object _either_ from (length, payload),
249         _or_ from the values of individual fields
250         """
251         assert not args or (len(args) == 2 and not kwargs)
252         if args:  # guaranteed to be two arguments at this point
253             self.length, self.payload = args
254             try:
255                 self.decode(self.length, self.payload)
256             except error as e:
257                 raise DecodeError(e, obj=self)
258         else:
259             for kw, typ, dfl in self.KWARGS:
260                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
261             if kwargs:
262                 raise ValueError(
263                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
264                 )
265
266     def __repr__(self) -> str:
267         return "{}({})".format(
268             self.__class__.__name__,
269             ", ".join(
270                 "{}={}".format(
271                     k,
272                     'bytes.fromhex("{}")'.format(v.hex())
273                     if isinstance(v, bytes)
274                     else v.__repr__(),
275                 )
276                 for k, v in self.__dict__.items()
277                 if not k.startswith("_")
278             ),
279         )
280
281     decode: Callable[["GPS303Pkt", int, bytes], None]
282
283     def in_decode(self, length: int, packet: bytes) -> None:
284         # Overridden in subclasses, otherwise do not decode payload
285         return
286
287     def out_decode(self, length: int, packet: bytes) -> None:
288         # Overridden in subclasses, otherwise do not decode payload
289         return
290
291     encode: Callable[["GPS303Pkt"], bytes]
292
293     def in_encode(self) -> bytes:
294         # Necessary to emulate terminal, which is not implemented
295         raise NotImplementedError(
296             self.__class__.__name__ + ".encode() not implemented"
297         )
298
299     def out_encode(self) -> bytes:
300         # Overridden in subclasses, otherwise make empty payload
301         return b""
302
303     @property
304     def packed(self) -> bytes:
305         payload = self.encode()
306         length = len(payload) + 1
307         return pack("BB", length, self.PROTO) + payload
308
309
310 class UNKNOWN(GPS303Pkt):
311     PROTO = 256  # > 255 is impossible in real packets
312
313
314 class LOGIN(GPS303Pkt):
315     PROTO = 0x01
316     RESPOND = Respond.INL
317     # Default response for ACK, can also respond with STOP_UPLOAD
318     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
319
320     def in_decode(self, length: int, payload: bytes) -> None:
321         self.imei = payload[:8].ljust(8, b"\0").hex()
322         self.ver = payload[8]
323
324     def in_encode(self) -> bytes:
325         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
326             "B", self.ver
327         )
328
329
330 class SUPERVISION(GPS303Pkt):
331     PROTO = 0x05
332     OUT_KWARGS = (("status", int, 1),)
333
334     def out_encode(self) -> bytes:
335         # 1: The device automatically answers Pickup effect
336         # 2: Automatically Answering Two-way Calls
337         # 3: Ring manually answer the two-way call
338         return pack("B", self.status)
339
340
341 class HEARTBEAT(GPS303Pkt):
342     PROTO = 0x08
343     RESPOND = Respond.INL
344
345
346 class _GPS_POSITIONING(GPS303Pkt):
347     RESPOND = Respond.INL
348
349     def in_decode(self, length: int, payload: bytes) -> None:
350         self.dtime = payload[:6]
351         if self.dtime == b"\0\0\0\0\0\0":
352             self.devtime = None
353         else:
354             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
355             self.devtime = datetime(
356                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
357             )
358         self.gps_data_length = payload[6] >> 4
359         self.gps_nb_sat = payload[6] & 0x0F
360         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
361         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
362         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
363         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
364         self.heading = flags & 0b0000001111111111  # bits 6 - last
365         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
366         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
367         self.speed = speed
368         self.flags = flags
369
370     def out_encode(self) -> bytes:
371         tup = datetime.utcnow().timetuple()
372         ttup = (tup[0] % 100,) + tup[1:6]
373         return pack("BBBBBB", *ttup)
374
375
376 class GPS_POSITIONING(_GPS_POSITIONING):
377     PROTO = 0x10
378
379
380 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
381     PROTO = 0x11
382
383
384 class STATUS(GPS303Pkt):
385     PROTO = 0x13
386     RESPOND = Respond.EXT
387     IN_KWARGS = (
388         ("batt", int, 100),
389         ("ver", int, 0),
390         ("timezone", int, 0),
391         ("intvl", int, 0),
392         ("signal", maybe_int, None),
393     )
394     OUT_KWARGS = (("upload_interval", int, 25),)
395
396     def in_decode(self, length: int, payload: bytes) -> None:
397         self.batt, self.ver, self.timezone, self.intvl = unpack(
398             "BBBB", payload[:4]
399         )
400         if len(payload) > 4:
401             self.signal: Optional[int] = payload[4]
402         else:
403             self.signal = None
404
405     def in_encode(self) -> bytes:
406         return (
407             pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + b""
408             if self.signal is None
409             else pack("B", self.signal)
410         )
411
412     def out_encode(self) -> bytes:  # Set interval in minutes
413         return pack("B", self.upload_interval)
414
415
416 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
417     PROTO = 0x14
418
419     def in_encode(self) -> bytes:
420         return b""
421
422
423 class RESET(GPS303Pkt):
424     # Device sends when it got reset SMS
425     # Server can send to initiate factory reset
426     PROTO = 0x15
427
428
429 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
430     PROTO = 0x16
431     OUT_KWARGS = (("number", int, 3),)
432
433     def out_encode(self) -> bytes:  # Number of whitelist entries
434         return pack("B", self.number)
435
436
437 class _WIFI_POSITIONING(GPS303Pkt):
438     def in_decode(self, length: int, payload: bytes) -> None:
439         self.dtime = payload[:6]
440         if self.dtime == b"\0\0\0\0\0\0":
441             self.devtime = None
442         else:
443             self.devtime = datetime.strptime(
444                 self.dtime.hex(), "%y%m%d%H%M%S"
445             ).astimezone(tz=timezone.utc)
446         self.wifi_aps = []
447         for i in range(self.length):  # length has special meaning here
448             slice = payload[6 + i * 7 : 13 + i * 7]
449             self.wifi_aps.append(
450                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
451             )
452         gsm_slice = payload[6 + self.length * 7 :]
453         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
454         self.gsm_cells = []
455         for i in range(ncells):
456             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
457             locac, cellid, sigstr = unpack(
458                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
459             )
460             self.gsm_cells.append((locac, cellid, -sigstr))
461
462
463 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
464     PROTO = 0x17
465     RESPOND = Respond.INL
466
467     def out_encode(self) -> bytes:
468         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
469
470
471 class TIME(GPS303Pkt):
472     PROTO = 0x30
473     RESPOND = Respond.INL
474
475     def out_encode(self) -> bytes:
476         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
477
478
479 class PROHIBIT_LBS(GPS303Pkt):
480     PROTO = 0x33
481     OUT_KWARGS = (("status", int, 1),)
482
483     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
484         return pack("B", self.status)
485
486
487 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
488     PROTO = 0x34
489
490     OUT_KWARGS = (
491         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
492         ("gps_interval_set", boolx, False),
493         ("gps_interval", hhmmhhmm, "00000000"),
494         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
495         ("boot_time_set", boolx, False),
496         ("boot_time", hhmm, "0000"),
497         ("shut_time_set", boolx, False),
498         ("shut_time", hhmm, "0000"),
499     )
500
501     def out_encode(self) -> bytes:
502         return (
503             pack("B", self.gps_off)
504             + pack("B", self.gps_interval_set)
505             + bytes.fromhex(self.gps_interval)
506             + pack("B", self.lbs_off)
507             + pack("B", self.boot_time_set)
508             + bytes.fromhex(self.boot_time)
509             + pack("B", self.shut_time_set)
510             + bytes.fromhex(self.shut_time)
511         )
512
513
514 class _SET_PHONE(GPS303Pkt):
515     OUT_KWARGS = (("phone", str, ""),)
516
517     def out_encode(self) -> bytes:
518         self.phone: str
519         return self.phone.encode("")
520
521
522 class REMOTE_MONITOR_PHONE(_SET_PHONE):
523     PROTO = 0x40
524
525
526 class SOS_PHONE(_SET_PHONE):
527     PROTO = 0x41
528
529
530 class DAD_PHONE(_SET_PHONE):
531     PROTO = 0x42
532
533
534 class MOM_PHONE(_SET_PHONE):
535     PROTO = 0x43
536
537
538 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
539     PROTO = 0x44
540
541
542 class GPS_OFF_PERIOD(GPS303Pkt):
543     PROTO = 0x46
544     OUT_KWARGS = (
545         ("onoff", int, 0),
546         ("fm", hhmm, "0000"),
547         ("to", hhmm, "2359"),
548     )
549
550     def out_encode(self) -> bytes:
551         return (
552             pack("B", self.onoff)
553             + bytes.fromhex(self.fm)
554             + bytes.fromhex(self.to)
555         )
556
557
558 class DND_PERIOD(GPS303Pkt):
559     PROTO = 0x47
560     OUT_KWARGS = (
561         ("onoff", int, 0),
562         ("week", int, 3),
563         ("fm1", hhmm, "0000"),
564         ("to1", hhmm, "2359"),
565         ("fm2", hhmm, "0000"),
566         ("to2", hhmm, "2359"),
567     )
568
569     def out_encode(self) -> bytes:
570         return (
571             pack("B", self.onoff)
572             + pack("B", self.week)
573             + bytes.fromhex(self.fm1)
574             + bytes.fromhex(self.to1)
575             + bytes.fromhex(self.fm2)
576             + bytes.fromhex(self.to2)
577         )
578
579
580 class RESTART_SHUTDOWN(GPS303Pkt):
581     PROTO = 0x48
582     OUT_KWARGS = (("flag", int, 0),)
583
584     def out_encode(self) -> bytes:
585         # 1 - restart
586         # 2 - shutdown
587         return pack("B", self.flag)
588
589
590 class DEVICE(GPS303Pkt):
591     PROTO = 0x49
592     OUT_KWARGS = (("flag", int, 0),)
593
594     # 0 - Stop looking for equipment
595     # 1 - Start looking for equipment
596     def out_encode(self) -> bytes:
597         return pack("B", self.flag)
598
599
600 class ALARM_CLOCK(GPS303Pkt):
601     PROTO = 0x50
602     OUT_KWARGS: Tuple[
603         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
604     ] = (
605         ("alarms", l3alarms, []),
606     )
607
608     def out_encode(self) -> bytes:
609         return b"".join(
610             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
611         )
612
613
614 class STOP_ALARM(GPS303Pkt):
615     PROTO = 0x56
616
617     def in_decode(self, length: int, payload: bytes) -> None:
618         self.flag = payload[0]
619
620
621 class SETUP(GPS303Pkt):
622     PROTO = 0x57
623     RESPOND = Respond.EXT
624     OUT_KWARGS = (
625         ("uploadintervalseconds", intx, 0x0300),
626         ("binaryswitch", intx, 0b00110001),
627         ("alarms", l3int, [0, 0, 0]),
628         ("dndtimeswitch", int, 0),
629         ("dndtimes", l3int, [0, 0, 0]),
630         ("gpstimeswitch", int, 0),
631         ("gpstimestart", int, 0),
632         ("gpstimestop", int, 0),
633         ("phonenumbers", l3str, ["", "", ""]),
634     )
635
636     def out_encode(self) -> bytes:
637         def pack3b(x: int) -> bytes:
638             return pack("!I", x)[1:]
639
640         return b"".join(
641             [
642                 pack("!H", self.uploadintervalseconds),
643                 pack("B", self.binaryswitch),
644             ]
645             + [pack3b(el) for el in self.alarms]
646             + [
647                 pack("B", self.dndtimeswitch),
648             ]
649             + [pack3b(el) for el in self.dndtimes]
650             + [
651                 pack("B", self.gpstimeswitch),
652                 pack("!H", self.gpstimestart),
653                 pack("!H", self.gpstimestop),
654             ]
655             + [b";".join([el.encode() for el in self.phonenumbers])]
656         )
657
658
659 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
660     PROTO = 0x58
661
662
663 class RESTORE_PASSWORD(GPS303Pkt):
664     PROTO = 0x67
665
666
667 class WIFI_POSITIONING(_WIFI_POSITIONING):
668     PROTO = 0x69
669     RESPOND = Respond.EXT
670     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
671
672     def out_encode(self) -> bytes:
673         if self.latitude is None or self.longitude is None:
674             return b""
675         return "{:+#010.8g},{:+#010.8g}".format(
676             self.latitude, self.longitude
677         ).encode()
678
679     def out_decode(self, length: int, payload: bytes) -> None:
680         lat, lon = payload.decode().split(",")
681         self.latitude = float(lat)
682         self.longitude = float(lon)
683
684
685 class MANUAL_POSITIONING(GPS303Pkt):
686     PROTO = 0x80
687
688     def in_decode(self, length: int, payload: bytes) -> None:
689         self.flag = payload[0] if len(payload) > 0 else -1
690         self.reason = {
691             1: "Incorrect time",
692             2: "LBS less",
693             3: "WiFi less",
694             4: "LBS search > 3 times",
695             5: "Same LBS and WiFi data",
696             6: "LBS prohibited, WiFi absent",
697             7: "GPS spacing < 50 m",
698         }.get(self.flag, "Unknown")
699
700
701 class BATTERY_CHARGE(GPS303Pkt):
702     PROTO = 0x81
703
704
705 class CHARGER_CONNECTED(GPS303Pkt):
706     PROTO = 0x82
707
708
709 class CHARGER_DISCONNECTED(GPS303Pkt):
710     PROTO = 0x83
711
712
713 class VIBRATION_RECEIVED(GPS303Pkt):
714     PROTO = 0x94
715
716
717 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
718     PROTO = 0x98
719     RESPOND = Respond.EXT
720     OUT_KWARGS = (("interval", int, 10),)
721
722     def in_decode(self, length: int, payload: bytes) -> None:
723         self.interval = unpack("!H", payload[:2])
724
725     def out_encode(self) -> bytes:
726         return pack("!H", self.interval)
727
728
729 class SOS_ALARM(GPS303Pkt):
730     PROTO = 0x99
731
732
733 class UNKNOWN_B3(GPS303Pkt):
734     PROTO = 0xB3
735     IN_KWARGS = (("asciidata", str, ""),)
736
737     def in_decode(self, length: int, payload: bytes) -> None:
738         self.asciidata = payload.decode()
739
740
741 # Build dicts protocol number -> class and class name -> protocol number
742 CLASSES = {}
743 PROTOS = {}
744 if True:  # just to indent the code, sorry!
745     for cls in [
746         cls
747         for name, cls in globals().items()
748         if isclass(cls)
749         and issubclass(cls, GPS303Pkt)
750         and not name.startswith("_")
751     ]:
752         if hasattr(cls, "PROTO"):
753             CLASSES[cls.PROTO] = cls
754             PROTOS[cls.__name__] = cls.PROTO
755
756
757 def class_by_prefix(
758     prefix: str,
759 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
760     lst = [
761         (name, proto)
762         for name, proto in PROTOS.items()
763         if name.upper().startswith(prefix.upper())
764     ]
765     if len(lst) != 1:
766         return lst
767     _, proto = lst[0]
768     return CLASSES[proto]
769
770
771 def proto_by_name(name: str) -> int:
772     return PROTOS.get(name, -1)
773
774
775 def proto_of_message(packet: bytes) -> int:
776     return packet[1]
777
778
779 def inline_response(packet: bytes) -> Optional[bytes]:
780     proto = proto_of_message(packet)
781     if proto in CLASSES:
782         cls = CLASSES[proto]
783         if cls.RESPOND is Respond.INL:
784             return cls.Out().packed
785     return None
786
787
788 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
789     """From a packet (without framing bytes) derive the XXX.In object"""
790     length, proto = unpack("BB", packet[:2])
791     payload = packet[2:]
792     if proto not in CLASSES:
793         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
794             f"Proto {proto} is unknown"
795         )
796     else:
797         try:
798             if is_incoming:
799                 return CLASSES[proto].In(length, payload)
800             else:
801                 return CLASSES[proto].Out(length, payload)
802         except (DecodeError, ValueError, IndexError) as e:
803             cause = e
804     if is_incoming:
805         retobj = UNKNOWN.In(length, payload)
806     else:
807         retobj = UNKNOWN.Out(length, payload)
808     retobj.PROTO = proto  # Override class attr with object attr
809     retobj.cause = cause
810     return retobj