]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
Implement remaining "Out" commands
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from typing import (
22     Any,
23     Callable,
24     Dict,
25     List,
26     Optional,
27     Tuple,
28     Type,
29     TYPE_CHECKING,
30     Union,
31 )
32
33 __all__ = (
34     "class_by_prefix",
35     "inline_response",
36     "parse_message",
37     "proto_by_name",
38     "DecodeError",
39     "Respond",
40     "GPS303Pkt",
41     "UNKNOWN",
42     "LOGIN",
43     "SUPERVISION",
44     "HEARTBEAT",
45     "GPS_POSITIONING",
46     "GPS_OFFLINE_POSITIONING",
47     "STATUS",
48     "HIBERNATION",
49     "RESET",
50     "WHITELIST_TOTAL",
51     "WIFI_OFFLINE_POSITIONING",
52     "TIME",
53     "PROHIBIT_LBS",
54     "GPS_LBS_SWITCH_TIMES",
55     "REMOTE_MONITOR_PHONE",
56     "SOS_PHONE",
57     "DAD_PHONE",
58     "MOM_PHONE",
59     "STOP_UPLOAD",
60     "GPS_OFF_PERIOD",
61     "DND_PERIOD",
62     "RESTART_SHUTDOWN",
63     "DEVICE",
64     "ALARM_CLOCK",
65     "STOP_ALARM",
66     "SETUP",
67     "SYNCHRONOUS_WHITELIST",
68     "RESTORE_PASSWORD",
69     "WIFI_POSITIONING",
70     "MANUAL_POSITIONING",
71     "BATTERY_CHARGE",
72     "CHARGER_CONNECTED",
73     "CHARGER_DISCONNECTED",
74     "VIBRATION_RECEIVED",
75     "POSITION_UPLOAD_INTERVAL",
76     "SOS_ALARM",
77     "UNKNOWN_B3",
78 )
79
80
81 class DecodeError(Exception):
82     def __init__(self, e: Exception, **kwargs: Any) -> None:
83         super().__init__(e)
84         for k, v in kwargs.items():
85             setattr(self, k, v)
86
87
88 def intx(x: Union[str, int]) -> int:
89     if isinstance(x, str):
90         x = int(x, 0)
91     return x
92
93
94 def boolx(x: Union[str, bool]) -> bool:
95     if isinstance(x, str):
96         if x.upper() in ("ON", "TRUE", "1"):
97             return True
98         if x.upper() in ("OFF", "FALSE", "0"):
99             return False
100         raise ValueError(str(x) + " could not be parsed as a Boolean")
101     return x
102
103
104 def hhmm(x: str) -> str:
105     """Check for the string that represents hours and minutes"""
106     if not isinstance(x, str) or len(x) != 4:
107         raise ValueError(str(x) + " is not a four-character string")
108     hh = int(x[:2])
109     mm = int(x[2:])
110     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
111         raise ValueError(str(x) + " does not contain valid hours and minutes")
112     return x
113
114
115 def hhmmhhmm(x: str) -> str:
116     """Check for the string that represents hours and minutes twice"""
117     if not isinstance(x, str) or len(x) != 8:
118         raise ValueError(str(x) + " is not an eight-character string")
119     return hhmm(x[:4]) + hhmm(x[4:])
120
121
122 def l3str(x: Union[str, List[str]]) -> List[str]:
123     if isinstance(x, str):
124         lx = x.split(",")
125     else:
126         lx = x
127     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
128         raise ValueError(str(lx) + " is not a list of three strings")
129     return lx
130
131
132 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
133     def alrmspec(sub: str) -> Tuple[int, str]:
134         if len(sub) != 7:
135             raise ValueError(sub + " does not represent day and time")
136         return (
137             {
138                 "MON": 1,
139                 "TUE": 2,
140                 "WED": 3,
141                 "THU": 4,
142                 "FRI": 5,
143                 "SAT": 6,
144                 "SUN": 7,
145             }[sub[:3].upper()],
146             sub[3:],
147         )
148
149     if isinstance(x, str):
150         lx = [alrmspec(sub) for sub in x.split(",")]
151     else:
152         lx = x
153     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
154     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
155         raise ValueError(str(lx) + " is a wrong alarms specification")
156     return [(d, hhmm(tm)) for d, tm in lx]
157
158
159 def l3int(x: Union[str, List[int]]) -> List[int]:
160     if isinstance(x, str):
161         lx = [int(el) for el in x.split(",")]
162     else:
163         lx = x
164     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
165         raise ValueError(str(lx) + " is not a list of three integers")
166     return lx
167
168
169 class MetaPkt(type):
170     """
171     For each class corresponding to a message, automatically create
172     two nested classes `In` and `Out` that also inherit from their
173     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
174     copied to the `In` nested class under the name `KWARGS`, and
175     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
176     to the nested class `Out`. In addition, method `encode` is
177     defined in both classes equal to `in_encode()` and `out_encode()`
178     respectively.
179     """
180
181     if TYPE_CHECKING:
182
183         def __getattr__(self, name: str) -> Any:
184             pass
185
186         def __setattr__(self, name: str, value: Any) -> None:
187             pass
188
189     def __new__(
190         cls: Type["MetaPkt"],
191         name: str,
192         bases: Tuple[type, ...],
193         attrs: Dict[str, Any],
194     ) -> "MetaPkt":
195         newcls = super().__new__(cls, name, bases, attrs)
196         newcls.In = super().__new__(
197             cls,
198             name + ".In",
199             (newcls,) + bases,
200             {
201                 "KWARGS": newcls.IN_KWARGS,
202                 "decode": newcls.in_decode,
203                 "encode": newcls.in_encode,
204             },
205         )
206         newcls.Out = super().__new__(
207             cls,
208             name + ".Out",
209             (newcls,) + bases,
210             {
211                 "KWARGS": newcls.OUT_KWARGS,
212                 "decode": newcls.out_decode,
213                 "encode": newcls.out_encode,
214             },
215         )
216         return newcls
217
218
219 class Respond(Enum):
220     NON = 0  # Incoming, no response needed
221     INL = 1  # Birirectional, use `inline_response()`
222     EXT = 2  # Birirectional, use external responder
223
224
225 class GPS303Pkt(metaclass=MetaPkt):
226     RESPOND = Respond.NON  # Do not send anything back by default
227     PROTO: int
228     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
229     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
230     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
231     In: Type["GPS303Pkt"]
232     Out: Type["GPS303Pkt"]
233
234     if TYPE_CHECKING:
235
236         def __getattr__(self, name: str) -> Any:
237             pass
238
239         def __setattr__(self, name: str, value: Any) -> None:
240             pass
241
242     def __init__(self, *args: Any, **kwargs: Any):
243         """
244         Construct the object _either_ from (length, payload),
245         _or_ from the values of individual fields
246         """
247         assert not args or (len(args) == 2 and not kwargs)
248         if args:  # guaranteed to be two arguments at this point
249             self.length, self.payload = args
250             try:
251                 self.decode(self.length, self.payload)
252             except error as e:
253                 raise DecodeError(e, obj=self)
254         else:
255             for kw, typ, dfl in self.KWARGS:
256                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
257             if kwargs:
258                 raise ValueError(
259                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
260                 )
261
262     def __repr__(self) -> str:
263         return "{}({})".format(
264             self.__class__.__name__,
265             ", ".join(
266                 "{}={}".format(
267                     k,
268                     'bytes.fromhex("{}")'.format(v.hex())
269                     if isinstance(v, bytes)
270                     else v.__repr__(),
271                 )
272                 for k, v in self.__dict__.items()
273                 if not k.startswith("_")
274             ),
275         )
276
277     decode: Callable[["GPS303Pkt", int, bytes], None]
278
279     def in_decode(self, length: int, packet: bytes) -> None:
280         # Overridden in subclasses, otherwise do not decode payload
281         return
282
283     def out_decode(self, length: int, packet: bytes) -> None:
284         # Overridden in subclasses, otherwise do not decode payload
285         return
286
287     encode: Callable[["GPS303Pkt"], bytes]
288
289     def in_encode(self) -> bytes:
290         # Necessary to emulate terminal, which is not implemented
291         raise NotImplementedError(
292             self.__class__.__name__ + ".encode() not implemented"
293         )
294
295     def out_encode(self) -> bytes:
296         # Overridden in subclasses, otherwise make empty payload
297         return b""
298
299     @property
300     def packed(self) -> bytes:
301         payload = self.encode()
302         length = len(payload) + 1
303         return pack("BB", length, self.PROTO) + payload
304
305
306 class UNKNOWN(GPS303Pkt):
307     PROTO = 256  # > 255 is impossible in real packets
308
309
310 class LOGIN(GPS303Pkt):
311     PROTO = 0x01
312     RESPOND = Respond.INL
313     # Default response for ACK, can also respond with STOP_UPLOAD
314
315     def in_decode(self, length: int, payload: bytes) -> None:
316         self.imei = payload[:-1].hex()
317         self.ver = unpack("B", payload[-1:])[0]
318
319
320 class SUPERVISION(GPS303Pkt):
321     PROTO = 0x05
322     OUT_KWARGS = (("status", int, 1),)
323
324     def out_encode(self) -> bytes:
325         # 1: The device automatically answers Pickup effect
326         # 2: Automatically Answering Two-way Calls
327         # 3: Ring manually answer the two-way call
328         return pack("B", self.status)
329
330
331 class HEARTBEAT(GPS303Pkt):
332     PROTO = 0x08
333     RESPOND = Respond.INL
334
335
336 class _GPS_POSITIONING(GPS303Pkt):
337     RESPOND = Respond.INL
338
339     def in_decode(self, length: int, payload: bytes) -> None:
340         self.dtime = payload[:6]
341         if self.dtime == b"\0\0\0\0\0\0":
342             self.devtime = None
343         else:
344             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
345             self.devtime = datetime(
346                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
347             )
348         self.gps_data_length = payload[6] >> 4
349         self.gps_nb_sat = payload[6] & 0x0F
350         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
351         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
352         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
353         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
354         self.heading = flags & 0b0000001111111111  # bits 6 - last
355         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
356         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
357         self.speed = speed
358         self.flags = flags
359
360     def out_encode(self) -> bytes:
361         tup = datetime.utcnow().timetuple()
362         ttup = (tup[0] % 100,) + tup[1:6]
363         return pack("BBBBBB", *ttup)
364
365
366 class GPS_POSITIONING(_GPS_POSITIONING):
367     PROTO = 0x10
368
369
370 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
371     PROTO = 0x11
372
373
374 class STATUS(GPS303Pkt):
375     PROTO = 0x13
376     RESPOND = Respond.EXT
377     OUT_KWARGS = (("upload_interval", int, 25),)
378
379     def in_decode(self, length: int, payload: bytes) -> None:
380         self.batt, self.ver, self.timezone, self.intvl = unpack(
381             "BBBB", payload[:4]
382         )
383         if len(payload) > 4:
384             self.signal: Optional[int] = payload[4]
385         else:
386             self.signal = None
387
388     def out_encode(self) -> bytes:  # Set interval in minutes
389         return pack("B", self.upload_interval)
390
391
392 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
393     PROTO = 0x14
394
395
396 class RESET(GPS303Pkt):
397     # Device sends when it got reset SMS
398     # Server can send to initiate factory reset
399     PROTO = 0x15
400
401
402 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
403     PROTO = 0x16
404     OUT_KWARGS = (("number", int, 3),)
405
406     def out_encode(self) -> bytes:  # Number of whitelist entries
407         return pack("B", self.number)
408
409
410 class _WIFI_POSITIONING(GPS303Pkt):
411     def in_decode(self, length: int, payload: bytes) -> None:
412         self.dtime = payload[:6]
413         if self.dtime == b"\0\0\0\0\0\0":
414             self.devtime = None
415         else:
416             self.devtime = datetime.strptime(
417                 self.dtime.hex(), "%y%m%d%H%M%S"
418             ).astimezone(tz=timezone.utc)
419         self.wifi_aps = []
420         for i in range(self.length):  # length has special meaning here
421             slice = payload[6 + i * 7 : 13 + i * 7]
422             self.wifi_aps.append(
423                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
424             )
425         gsm_slice = payload[6 + self.length * 7 :]
426         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
427         self.gsm_cells = []
428         for i in range(ncells):
429             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
430             locac, cellid, sigstr = unpack(
431                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
432             )
433             self.gsm_cells.append((locac, cellid, -sigstr))
434
435
436 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
437     PROTO = 0x17
438     RESPOND = Respond.INL
439
440     def out_encode(self) -> bytes:
441         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
442
443
444 class TIME(GPS303Pkt):
445     PROTO = 0x30
446     RESPOND = Respond.INL
447
448     def out_encode(self) -> bytes:
449         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
450
451
452 class PROHIBIT_LBS(GPS303Pkt):
453     PROTO = 0x33
454     OUT_KWARGS = (("status", int, 1),)
455
456     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
457         return pack("B", self.status)
458
459
460 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
461     PROTO = 0x34
462
463     OUT_KWARGS = (
464         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
465         ("gps_interval_set", boolx, False),
466         ("gps_interval", hhmmhhmm, "00000000"),
467         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
468         ("boot_time_set", boolx, False),
469         ("boot_time", hhmm, "0000"),
470         ("shut_time_set", boolx, False),
471         ("shut_time", hhmm, "0000"),
472     )
473
474     def out_encode(self) -> bytes:
475         return (
476             pack("B", self.gps_off)
477             + pack("B", self.gps_interval_set)
478             + bytes.fromhex(self.gps_interval)
479             + pack("B", self.lbs_off)
480             + pack("B", self.boot_time_set)
481             + bytes.fromhex(self.boot_time)
482             + pack("B", self.shut_time_set)
483             + bytes.fromhex(self.shut_time)
484         )
485
486
487 class _SET_PHONE(GPS303Pkt):
488     OUT_KWARGS = (("phone", str, ""),)
489
490     def out_encode(self) -> bytes:
491         self.phone: str
492         return self.phone.encode("")
493
494
495 class REMOTE_MONITOR_PHONE(_SET_PHONE):
496     PROTO = 0x40
497
498
499 class SOS_PHONE(_SET_PHONE):
500     PROTO = 0x41
501
502
503 class DAD_PHONE(_SET_PHONE):
504     PROTO = 0x42
505
506
507 class MOM_PHONE(_SET_PHONE):
508     PROTO = 0x43
509
510
511 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
512     PROTO = 0x44
513
514
515 class GPS_OFF_PERIOD(GPS303Pkt):
516     PROTO = 0x46
517     OUT_KWARGS = (
518         ("onoff", int, 0),
519         ("fm", hhmm, "0000"),
520         ("to", hhmm, "2359"),
521     )
522
523     def out_encode(self) -> bytes:
524         return (
525             pack("B", self.onoff)
526             + bytes.fromhex(self.fm)
527             + bytes.fromhex(self.to)
528         )
529
530
531 class DND_PERIOD(GPS303Pkt):
532     PROTO = 0x47
533     OUT_KWARGS = (
534         ("onoff", int, 0),
535         ("week", int, 3),
536         ("fm1", hhmm, "0000"),
537         ("to1", hhmm, "2359"),
538         ("fm2", hhmm, "0000"),
539         ("to2", hhmm, "2359"),
540     )
541
542     def out_encode(self) -> bytes:
543         return (
544             pack("B", self.onoff)
545             + pack("B", self.week)
546             + bytes.fromhex(self.fm1)
547             + bytes.fromhex(self.to1)
548             + bytes.fromhex(self.fm2)
549             + bytes.fromhex(self.to2)
550         )
551
552
553 class RESTART_SHUTDOWN(GPS303Pkt):
554     PROTO = 0x48
555     OUT_KWARGS = (("flag", int, 0),)
556
557     def out_encode(self) -> bytes:
558         # 1 - restart
559         # 2 - shutdown
560         return pack("B", self.flag)
561
562
563 class DEVICE(GPS303Pkt):
564     PROTO = 0x49
565     OUT_KWARGS = (("flag", int, 0),)
566
567     # 0 - Stop looking for equipment
568     # 1 - Start looking for equipment
569     def out_encode(self) -> bytes:
570         return pack("B", self.flag)
571
572
573 class ALARM_CLOCK(GPS303Pkt):
574     PROTO = 0x50
575     OUT_KWARGS: Tuple[
576         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
577     ] = (
578         ("alarms", l3alarms, []),
579     )
580
581     def out_encode(self) -> bytes:
582         return b"".join(
583             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
584         )
585
586
587 class STOP_ALARM(GPS303Pkt):
588     PROTO = 0x56
589
590     def in_decode(self, length: int, payload: bytes) -> None:
591         self.flag = payload[0]
592
593
594 class SETUP(GPS303Pkt):
595     PROTO = 0x57
596     RESPOND = Respond.EXT
597     OUT_KWARGS = (
598         ("uploadintervalseconds", intx, 0x0300),
599         ("binaryswitch", intx, 0b00110001),
600         ("alarms", l3int, [0, 0, 0]),
601         ("dndtimeswitch", int, 0),
602         ("dndtimes", l3int, [0, 0, 0]),
603         ("gpstimeswitch", int, 0),
604         ("gpstimestart", int, 0),
605         ("gpstimestop", int, 0),
606         ("phonenumbers", l3str, ["", "", ""]),
607     )
608
609     def out_encode(self) -> bytes:
610         def pack3b(x: int) -> bytes:
611             return pack("!I", x)[1:]
612
613         return b"".join(
614             [
615                 pack("!H", self.uploadintervalseconds),
616                 pack("B", self.binaryswitch),
617             ]
618             + [pack3b(el) for el in self.alarms]
619             + [
620                 pack("B", self.dndtimeswitch),
621             ]
622             + [pack3b(el) for el in self.dndtimes]
623             + [
624                 pack("B", self.gpstimeswitch),
625                 pack("!H", self.gpstimestart),
626                 pack("!H", self.gpstimestop),
627             ]
628             + [b";".join([el.encode() for el in self.phonenumbers])]
629         )
630
631
632 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
633     PROTO = 0x58
634
635
636 class RESTORE_PASSWORD(GPS303Pkt):
637     PROTO = 0x67
638
639
640 class WIFI_POSITIONING(_WIFI_POSITIONING):
641     PROTO = 0x69
642     RESPOND = Respond.EXT
643     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
644
645     def out_encode(self) -> bytes:
646         if self.latitude is None or self.longitude is None:
647             return b""
648         return "{:+#010.8g},{:+#010.8g}".format(
649             self.latitude, self.longitude
650         ).encode()
651
652     def out_decode(self, length: int, payload: bytes) -> None:
653         lat, lon = payload.decode().split(",")
654         self.latitude = float(lat)
655         self.longitude = float(lon)
656
657
658 class MANUAL_POSITIONING(GPS303Pkt):
659     PROTO = 0x80
660
661     def in_decode(self, length: int, payload: bytes) -> None:
662         self.flag = payload[0] if len(payload) > 0 else -1
663         self.reason = {
664             1: "Incorrect time",
665             2: "LBS less",
666             3: "WiFi less",
667             4: "LBS search > 3 times",
668             5: "Same LBS and WiFi data",
669             6: "LBS prohibited, WiFi absent",
670             7: "GPS spacing < 50 m",
671         }.get(self.flag, "Unknown")
672
673
674 class BATTERY_CHARGE(GPS303Pkt):
675     PROTO = 0x81
676
677
678 class CHARGER_CONNECTED(GPS303Pkt):
679     PROTO = 0x82
680
681
682 class CHARGER_DISCONNECTED(GPS303Pkt):
683     PROTO = 0x83
684
685
686 class VIBRATION_RECEIVED(GPS303Pkt):
687     PROTO = 0x94
688
689
690 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
691     PROTO = 0x98
692     RESPOND = Respond.EXT
693     OUT_KWARGS = (("interval", int, 10),)
694
695     def in_decode(self, length: int, payload: bytes) -> None:
696         self.interval = unpack("!H", payload[:2])
697
698     def out_encode(self) -> bytes:
699         return pack("!H", self.interval)
700
701
702 class SOS_ALARM(GPS303Pkt):
703     PROTO = 0x99
704
705
706 class UNKNOWN_B3(GPS303Pkt):
707     PROTO = 0xB3
708     IN_KWARGS = (("asciidata", str, ""),)
709
710     def in_decode(self, length: int, payload: bytes) -> None:
711         self.asciidata = payload.decode()
712
713
714 # Build dicts protocol number -> class and class name -> protocol number
715 CLASSES = {}
716 PROTOS = {}
717 if True:  # just to indent the code, sorry!
718     for cls in [
719         cls
720         for name, cls in globals().items()
721         if isclass(cls)
722         and issubclass(cls, GPS303Pkt)
723         and not name.startswith("_")
724     ]:
725         if hasattr(cls, "PROTO"):
726             CLASSES[cls.PROTO] = cls
727             PROTOS[cls.__name__] = cls.PROTO
728
729
730 def class_by_prefix(
731     prefix: str,
732 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
733     lst = [
734         (name, proto)
735         for name, proto in PROTOS.items()
736         if name.upper().startswith(prefix.upper())
737     ]
738     if len(lst) != 1:
739         return lst
740     _, proto = lst[0]
741     return CLASSES[proto]
742
743
744 def proto_by_name(name: str) -> int:
745     return PROTOS.get(name, -1)
746
747
748 def proto_of_message(packet: bytes) -> int:
749     return packet[1]
750
751
752 def inline_response(packet: bytes) -> Optional[bytes]:
753     proto = proto_of_message(packet)
754     if proto in CLASSES:
755         cls = CLASSES[proto]
756         if cls.RESPOND is Respond.INL:
757             return cls.Out().packed
758     return None
759
760
761 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
762     """From a packet (without framing bytes) derive the XXX.In object"""
763     length, proto = unpack("BB", packet[:2])
764     payload = packet[2:]
765     if proto not in CLASSES:
766         cause: Union[DecodeError, ValueError] = ValueError(
767             f"Proto {proto} is unknown"
768         )
769     else:
770         try:
771             if is_incoming:
772                 return CLASSES[proto].In(length, payload)
773             else:
774                 return CLASSES[proto].Out(length, payload)
775         except DecodeError as e:
776             cause = e
777     if is_incoming:
778         retobj = UNKNOWN.In(length, payload)
779     else:
780         retobj = UNKNOWN.Out(length, payload)
781     retobj.PROTO = proto  # Override class attr with object attr
782     retobj.cause = cause
783     return retobj