]> www.average.org Git - loctrkd.git/blob - loctrkd/zx303proto.py
rectifier: lookaside based on rectified objects
[loctrkd.git] / loctrkd / zx303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from types import SimpleNamespace
23 from typing import (
24     Any,
25     Callable,
26     Dict,
27     List,
28     Optional,
29     Tuple,
30     Type,
31     TYPE_CHECKING,
32     Union,
33 )
34
35 from .protomodule import ProtoClass
36
37 __all__ = (
38     "Stream",
39     "class_by_prefix",
40     "enframe",
41     "exposed_protos",
42     "inline_response",
43     "proto_handled",
44     "parse_message",
45     "probe_buffer",
46     "proto_name",
47     "DecodeError",
48     "Respond",
49 )
50
51 PROTO_PREFIX = "ZX:"
52
53 ### Deframer ###
54
55 MAXBUFFER: int = 4096
56
57
58 class Stream:
59     def __init__(self) -> None:
60         self.buffer = b""
61
62     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
63         """
64         Process next segment of the stream. Return successfully deframed
65         packets as `bytes` and error messages as `str`.
66         """
67         when = time()
68         self.buffer += segment
69         if len(self.buffer) > MAXBUFFER:
70             # We are receiving junk. Let's drop it or we run out of memory.
71             self.buffer = b""
72             return [f"More than {MAXBUFFER} unparseable data, dropping"]
73         msgs: List[Union[bytes, str]] = []
74         while True:
75             framestart = self.buffer.find(b"xx")
76             if framestart == -1:  # No frames, return whatever we have
77                 break
78             if framestart > 0:  # Should not happen, report
79                 msgs.append(
80                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
81                 )
82                 self.buffer = self.buffer[framestart:]
83             # At this point, buffer starts with a packet
84             if len(self.buffer) < 6:  # no len and proto - cannot proceed
85                 break
86             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
87             frameend = 0
88             # Length field can legitimeely be much less than the
89             # length of the packet (e.g. WiFi positioning), but
90             # it _should not_ be greater. Still sometimes it is.
91             # Luckily, not by too much: by maybe two or three bytes?
92             # Do this embarrassing hack to avoid accidental match
93             # of some binary data in the packet against '\r\n'.
94             while True:
95                 frameend = self.buffer.find(b"\r\n", frameend + 1)
96                 if frameend == -1 or frameend >= (
97                     exp_end - 3
98                 ):  # Found realistic match or none
99                     break
100             if frameend == -1:  # Incomplete frame, return what we have
101                 break
102             packet = self.buffer[2:frameend]
103             self.buffer = self.buffer[frameend + 2 :]
104             if len(packet) < 2:  # frameend comes too early
105                 msgs.append(f"Packet too short: {packet.hex()}")
106             else:
107                 msgs.append(packet)
108         return msgs
109
110     def close(self) -> bytes:
111         ret = self.buffer
112         self.buffer = b""
113         return ret
114
115
116 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
117     return b"xx" + buffer + b"\r\n"
118
119
120 ### Parser/Constructor ###
121
122
123 class DecodeError(Exception):
124     def __init__(self, e: Exception, **kwargs: Any) -> None:
125         super().__init__(e)
126         for k, v in kwargs.items():
127             setattr(self, k, v)
128
129
130 def maybe(typ: type) -> Callable[[Any], Any]:
131     return lambda x: None if x is None else typ(x)
132
133
134 def intx(x: Union[str, int]) -> int:
135     if isinstance(x, str):
136         x = int(x, 0)
137     return x
138
139
140 def boolx(x: Union[str, bool]) -> bool:
141     if isinstance(x, str):
142         if x.upper() in ("ON", "TRUE", "1"):
143             return True
144         if x.upper() in ("OFF", "FALSE", "0"):
145             return False
146         raise ValueError(str(x) + " could not be parsed as a Boolean")
147     return x
148
149
150 def hhmm(x: str) -> str:
151     """Check for the string that represents hours and minutes"""
152     if not isinstance(x, str) or len(x) != 4:
153         raise ValueError(str(x) + " is not a four-character string")
154     hh = int(x[:2])
155     mm = int(x[2:])
156     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
157         raise ValueError(str(x) + " does not contain valid hours and minutes")
158     return x
159
160
161 def hhmmhhmm(x: str) -> str:
162     """Check for the string that represents hours and minutes twice"""
163     if not isinstance(x, str) or len(x) != 8:
164         raise ValueError(str(x) + " is not an eight-character string")
165     return hhmm(x[:4]) + hhmm(x[4:])
166
167
168 def l3str(x: Union[str, List[str]]) -> List[str]:
169     if isinstance(x, str):
170         lx = x.split(",")
171     else:
172         lx = x
173     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
174         raise ValueError(str(lx) + " is not a list of three strings")
175     return lx
176
177
178 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
179     def alrmspec(sub: str) -> Tuple[int, str]:
180         if len(sub) != 7:
181             raise ValueError(sub + " does not represent day and time")
182         return (
183             {
184                 "MON": 1,
185                 "TUE": 2,
186                 "WED": 3,
187                 "THU": 4,
188                 "FRI": 5,
189                 "SAT": 6,
190                 "SUN": 7,
191             }[sub[:3].upper()],
192             sub[3:],
193         )
194
195     if isinstance(x, str):
196         lx = [alrmspec(sub) for sub in x.split(",")]
197     else:
198         lx = x
199     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
200     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
201         raise ValueError(str(lx) + " is a wrong alarms specification")
202     return [(d, hhmm(tm)) for d, tm in lx]
203
204
205 def l3int(x: Union[str, List[int]]) -> List[int]:
206     if isinstance(x, str):
207         lx = [int(el) for el in x.split(",")]
208     else:
209         lx = x
210     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
211         raise ValueError(str(lx) + " is not a list of three integers")
212     return lx
213
214
215 class Respond(Enum):
216     NON = 0  # Incoming, no response needed
217     INL = 1  # Birirectional, use `inline_response()`
218     EXT = 2  # Birirectional, use external responder
219
220
221 class GPS303Pkt(ProtoClass):
222     RESPOND = Respond.NON  # Do not send anything back by default
223     PROTO: int
224     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
225     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
226     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
227     In: Type["GPS303Pkt"]
228     Out: Type["GPS303Pkt"]
229
230     if TYPE_CHECKING:
231
232         def __getattr__(self, name: str) -> Any:
233             pass
234
235         def __setattr__(self, name: str, value: Any) -> None:
236             pass
237
238     def __init__(self, *args: Any, **kwargs: Any):
239         """
240         Construct the object _either_ from (length, payload),
241         _or_ from the values of individual fields
242         """
243         assert not args or (len(args) == 2 and not kwargs)
244         if args:  # guaranteed to be two arguments at this point
245             self.length, self.payload = args
246             try:
247                 self.decode(self.length, self.payload)
248             except error as e:
249                 raise DecodeError(e, obj=self)
250         else:
251             for kw, typ, dfl in self.KWARGS:
252                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
253             if kwargs:
254                 raise ValueError(
255                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
256                 )
257
258     def __repr__(self) -> str:
259         return "{}({})".format(
260             self.__class__.__name__,
261             ", ".join(
262                 "{}={}".format(
263                     k,
264                     'bytes.fromhex("{}")'.format(v.hex())
265                     if isinstance(v, bytes)
266                     else v.__repr__(),
267                 )
268                 for k, v in self.__dict__.items()
269                 if not k.startswith("_")
270             ),
271         )
272
273     decode: Callable[["GPS303Pkt", int, bytes], None]
274
275     def in_decode(self, length: int, packet: bytes) -> None:
276         # Overridden in subclasses, otherwise do not decode payload
277         return
278
279     def out_decode(self, length: int, packet: bytes) -> None:
280         # Overridden in subclasses, otherwise do not decode payload
281         return
282
283     encode: Callable[["GPS303Pkt"], bytes]
284
285     def in_encode(self) -> bytes:
286         # Necessary to emulate terminal, which is not implemented
287         raise NotImplementedError(
288             self.__class__.__name__ + ".encode() not implemented"
289         )
290
291     def out_encode(self) -> bytes:
292         # Overridden in subclasses, otherwise make empty payload
293         return b""
294
295     @property
296     def packed(self) -> bytes:
297         payload = self.encode()
298         length = getattr(self, "length", len(payload) + 1)
299         return pack("BB", length, self.PROTO) + payload
300
301
302 class UNKNOWN(GPS303Pkt):
303     PROTO = 256  # > 255 is impossible in real packets
304
305
306 class LOGIN(GPS303Pkt):
307     PROTO = 0x01
308     RESPOND = Respond.INL
309     # Default response for ACK, can also respond with STOP_UPLOAD
310     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
311
312     def in_decode(self, length: int, payload: bytes) -> None:
313         self.imei = payload[:8].ljust(8, b"\0").hex()
314         self.ver = payload[8]
315
316     def in_encode(self) -> bytes:
317         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
318             "B", self.ver
319         )
320
321
322 class SUPERVISION(GPS303Pkt):
323     PROTO = 0x05
324     OUT_KWARGS = (("status", int, 1),)
325
326     def out_encode(self) -> bytes:
327         # 1: The device automatically answers Pickup effect
328         # 2: Automatically Answering Two-way Calls
329         # 3: Ring manually answer the two-way call
330         return pack("B", self.status)
331
332
333 class HEARTBEAT(GPS303Pkt):
334     PROTO = 0x08
335     RESPOND = Respond.INL
336
337
338 class _GPS_POSITIONING(GPS303Pkt):
339     RESPOND = Respond.INL
340
341     def in_decode(self, length: int, payload: bytes) -> None:
342         self.dtime = payload[:6]
343         if self.dtime == b"\0\0\0\0\0\0":
344             self.devtime = None
345         else:
346             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
347             self.devtime = datetime(
348                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
349             )
350         self.gps_data_length = payload[6] >> 4
351         self.gps_nb_sat = payload[6] & 0x0F
352         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
353         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
354         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
355         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
356         self.heading = flags & 0b0000001111111111  # bits 6 - last
357         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
358         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
359         self.speed = speed
360         self.flags = flags
361
362     def out_encode(self) -> bytes:
363         tup = datetime.utcnow().timetuple()
364         ttup = (tup[0] % 100,) + tup[1:6]
365         return pack("BBBBBB", *ttup)
366
367     def rectified(self) -> SimpleNamespace:  # JSON-able dict
368         return SimpleNamespace(
369             type="location",
370             devtime=str(self.devtime),
371             speed=self.speed,
372             direction=self.heading,
373             latitude=self.latitude,
374             longitude=self.longitude,
375         )
376
377
378 class GPS_POSITIONING(_GPS_POSITIONING):
379     PROTO = 0x10
380
381
382 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
383     PROTO = 0x11
384
385
386 class STATUS(GPS303Pkt):
387     PROTO = 0x13
388     RESPOND = Respond.EXT
389     IN_KWARGS = (
390         ("batt", int, 100),
391         ("ver", int, 0),
392         ("timezone", int, 0),
393         ("intvl", int, 0),
394         ("signal", maybe(int), None),
395     )
396     OUT_KWARGS = (("upload_interval", int, 25),)
397
398     def in_decode(self, length: int, payload: bytes) -> None:
399         self.batt, self.ver, self.timezone, self.intvl = unpack(
400             "BBBB", payload[:4]
401         )
402         if len(payload) > 4:
403             self.signal: Optional[int] = payload[4]
404         else:
405             self.signal = None
406
407     def in_encode(self) -> bytes:
408         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
409             b"" if self.signal is None else pack("B", self.signal)
410         )
411
412     def out_encode(self) -> bytes:  # Set interval in minutes
413         return pack("B", self.upload_interval)
414
415
416 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
417     PROTO = 0x14
418
419     def in_encode(self) -> bytes:
420         return b""
421
422
423 class RESET(GPS303Pkt):
424     # Device sends when it got reset SMS
425     # Server can send to initiate factory reset
426     PROTO = 0x15
427
428
429 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
430     PROTO = 0x16
431     OUT_KWARGS = (("number", int, 3),)
432
433     def out_encode(self) -> bytes:  # Number of whitelist entries
434         return pack("B", self.number)
435
436
437 class _WIFI_POSITIONING(GPS303Pkt):
438     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
439         # IN_KWARGS = (
440         ("dtime", bytes, b"\0\0\0\0\0\0"),
441         ("wifi_aps", list, []),
442         ("mcc", int, 0),
443         ("mnc", int, 0),
444         ("gsm_cells", list, []),
445     )
446
447     def in_decode(self, length: int, payload: bytes) -> None:
448         self.dtime = payload[:6]
449         if self.dtime == b"\0\0\0\0\0\0":
450             self.devtime = None
451         else:
452             self.devtime = datetime.strptime(
453                 self.dtime.hex(), "%y%m%d%H%M%S"
454             ).astimezone(tz=timezone.utc)
455         self.wifi_aps = []
456         for i in range(self.length):  # length has special meaning here
457             slice = payload[6 + i * 7 : 13 + i * 7]
458             self.wifi_aps.append(
459                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
460             )
461         gsm_slice = payload[6 + self.length * 7 :]
462         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
463         self.gsm_cells = []
464         for i in range(ncells):
465             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
466             locac, cellid, sigstr = unpack(
467                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
468             )
469             self.gsm_cells.append((locac, cellid, -sigstr))
470
471     def in_encode(self) -> bytes:
472         self.length = len(self.wifi_aps)
473         return b"".join(
474             [
475                 self.dtime,
476                 b"".join(
477                     [
478                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
479                         + pack("B", -sigstr)
480                         for mac, sigstr in self.wifi_aps
481                     ]
482                 ),
483                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
484                 b"".join(
485                     [
486                         pack("!HHB", locac, cellid, -sigstr)
487                         for locac, cellid, sigstr in self.gsm_cells
488                     ]
489                 ),
490             ]
491         )
492
493     def rectified(self) -> SimpleNamespace:  # JSON-able dict
494         return SimpleNamespace(
495             type="approximate_location",
496             devtime=str(self.devtime),
497             mcc=self.mcc,
498             mnc=self.mnc,
499             base_stations=self.gsm_cells,
500             wifi_aps=self.wifi_aps,
501         )
502
503
504 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
505     PROTO = 0x17
506     RESPOND = Respond.INL
507
508     def out_encode(self) -> bytes:
509         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
510
511
512 class TIME(GPS303Pkt):
513     PROTO = 0x30
514     RESPOND = Respond.INL
515
516     def out_encode(self) -> bytes:
517         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
518
519
520 class PROHIBIT_LBS(GPS303Pkt):
521     PROTO = 0x33
522     OUT_KWARGS = (("status", int, 1),)
523
524     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
525         return pack("B", self.status)
526
527
528 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
529     PROTO = 0x34
530
531     OUT_KWARGS = (
532         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
533         ("gps_interval_set", boolx, False),
534         ("gps_interval", hhmmhhmm, "00000000"),
535         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
536         ("boot_time_set", boolx, False),
537         ("boot_time", hhmm, "0000"),
538         ("shut_time_set", boolx, False),
539         ("shut_time", hhmm, "0000"),
540     )
541
542     def out_encode(self) -> bytes:
543         return (
544             pack("B", self.gps_off)
545             + pack("B", self.gps_interval_set)
546             + bytes.fromhex(self.gps_interval)
547             + pack("B", self.lbs_off)
548             + pack("B", self.boot_time_set)
549             + bytes.fromhex(self.boot_time)
550             + pack("B", self.shut_time_set)
551             + bytes.fromhex(self.shut_time)
552         )
553
554
555 class _SET_PHONE(GPS303Pkt):
556     OUT_KWARGS = (("phone", str, ""),)
557
558     def out_encode(self) -> bytes:
559         self.phone: str
560         return self.phone.encode("")
561
562
563 class REMOTE_MONITOR_PHONE(_SET_PHONE):
564     PROTO = 0x40
565
566
567 class SOS_PHONE(_SET_PHONE):
568     PROTO = 0x41
569
570
571 class DAD_PHONE(_SET_PHONE):
572     PROTO = 0x42
573
574
575 class MOM_PHONE(_SET_PHONE):
576     PROTO = 0x43
577
578
579 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
580     PROTO = 0x44
581
582
583 class GPS_OFF_PERIOD(GPS303Pkt):
584     PROTO = 0x46
585     OUT_KWARGS = (
586         ("onoff", int, 0),
587         ("fm", hhmm, "0000"),
588         ("to", hhmm, "2359"),
589     )
590
591     def out_encode(self) -> bytes:
592         return (
593             pack("B", self.onoff)
594             + bytes.fromhex(self.fm)
595             + bytes.fromhex(self.to)
596         )
597
598
599 class DND_PERIOD(GPS303Pkt):
600     PROTO = 0x47
601     OUT_KWARGS = (
602         ("onoff", int, 0),
603         ("week", int, 3),
604         ("fm1", hhmm, "0000"),
605         ("to1", hhmm, "2359"),
606         ("fm2", hhmm, "0000"),
607         ("to2", hhmm, "2359"),
608     )
609
610     def out_encode(self) -> bytes:
611         return (
612             pack("B", self.onoff)
613             + pack("B", self.week)
614             + bytes.fromhex(self.fm1)
615             + bytes.fromhex(self.to1)
616             + bytes.fromhex(self.fm2)
617             + bytes.fromhex(self.to2)
618         )
619
620
621 class RESTART_SHUTDOWN(GPS303Pkt):
622     PROTO = 0x48
623     OUT_KWARGS = (("flag", int, 0),)
624
625     def out_encode(self) -> bytes:
626         # 1 - restart
627         # 2 - shutdown
628         return pack("B", self.flag)
629
630
631 class DEVICE(GPS303Pkt):
632     PROTO = 0x49
633     OUT_KWARGS = (("flag", int, 0),)
634
635     # 0 - Stop looking for equipment
636     # 1 - Start looking for equipment
637     def out_encode(self) -> bytes:
638         return pack("B", self.flag)
639
640
641 class ALARM_CLOCK(GPS303Pkt):
642     PROTO = 0x50
643     OUT_KWARGS: Tuple[
644         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
645     ] = (
646         ("alarms", l3alarms, []),
647     )
648
649     def out_encode(self) -> bytes:
650         return b"".join(
651             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
652         )
653
654
655 class STOP_ALARM(GPS303Pkt):
656     PROTO = 0x56
657
658     def in_decode(self, length: int, payload: bytes) -> None:
659         self.flag = payload[0]
660
661
662 class SETUP(GPS303Pkt):
663     PROTO = 0x57
664     RESPOND = Respond.EXT
665     OUT_KWARGS = (
666         ("uploadintervalseconds", intx, 0x0300),
667         ("binaryswitch", intx, 0b00110001),
668         ("alarms", l3int, [0, 0, 0]),
669         ("dndtimeswitch", int, 0),
670         ("dndtimes", l3int, [0, 0, 0]),
671         ("gpstimeswitch", int, 0),
672         ("gpstimestart", int, 0),
673         ("gpstimestop", int, 0),
674         ("phonenumbers", l3str, ["", "", ""]),
675     )
676
677     def out_encode(self) -> bytes:
678         def pack3b(x: int) -> bytes:
679             return pack("!I", x)[1:]
680
681         return b"".join(
682             [
683                 pack("!H", self.uploadintervalseconds),
684                 pack("B", self.binaryswitch),
685             ]
686             + [pack3b(el) for el in self.alarms]
687             + [
688                 pack("B", self.dndtimeswitch),
689             ]
690             + [pack3b(el) for el in self.dndtimes]
691             + [
692                 pack("B", self.gpstimeswitch),
693                 pack("!H", self.gpstimestart),
694                 pack("!H", self.gpstimestop),
695             ]
696             + [b";".join([el.encode() for el in self.phonenumbers])]
697         )
698
699     def in_encode(self) -> bytes:
700         return b""
701
702
703 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
704     PROTO = 0x58
705
706
707 class RESTORE_PASSWORD(GPS303Pkt):
708     PROTO = 0x67
709
710
711 class WIFI_POSITIONING(_WIFI_POSITIONING):
712     PROTO = 0x69
713     RESPOND = Respond.EXT
714     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
715
716     def out_encode(self) -> bytes:
717         if self.latitude is None or self.longitude is None:
718             return b""
719         return "{:+#010.8g},{:+#010.8g}".format(
720             self.latitude, self.longitude
721         ).encode()
722
723     def out_decode(self, length: int, payload: bytes) -> None:
724         lat, lon = payload.decode().split(",")
725         self.latitude = float(lat)
726         self.longitude = float(lon)
727
728
729 class MANUAL_POSITIONING(GPS303Pkt):
730     PROTO = 0x80
731
732     def in_decode(self, length: int, payload: bytes) -> None:
733         self.flag = payload[0] if len(payload) > 0 else -1
734         self.reason = {
735             1: "Incorrect time",
736             2: "LBS less",
737             3: "WiFi less",
738             4: "LBS search > 3 times",
739             5: "Same LBS and WiFi data",
740             6: "LBS prohibited, WiFi absent",
741             7: "GPS spacing < 50 m",
742         }.get(self.flag, "Unknown")
743
744
745 class BATTERY_CHARGE(GPS303Pkt):
746     PROTO = 0x81
747
748
749 class CHARGER_CONNECTED(GPS303Pkt):
750     PROTO = 0x82
751
752
753 class CHARGER_DISCONNECTED(GPS303Pkt):
754     PROTO = 0x83
755
756
757 class VIBRATION_RECEIVED(GPS303Pkt):
758     PROTO = 0x94
759
760
761 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
762     PROTO = 0x98
763     RESPOND = Respond.EXT
764     OUT_KWARGS = (("interval", int, 10),)
765
766     def in_decode(self, length: int, payload: bytes) -> None:
767         self.interval = unpack("!H", payload[:2])
768
769     def out_encode(self) -> bytes:
770         return pack("!H", self.interval)
771
772
773 class SOS_ALARM(GPS303Pkt):
774     PROTO = 0x99
775
776
777 class UNKNOWN_B3(GPS303Pkt):
778     PROTO = 0xB3
779     IN_KWARGS = (("asciidata", str, ""),)
780
781     def in_decode(self, length: int, payload: bytes) -> None:
782         self.asciidata = payload.decode()
783
784
785 # Build dicts protocol number -> class and class name -> protocol number
786 CLASSES = {}
787 PROTOS = {}
788 if True:  # just to indent the code, sorry!
789     for cls in [
790         cls
791         for name, cls in globals().items()
792         if isclass(cls)
793         and issubclass(cls, GPS303Pkt)
794         and not name.startswith("_")
795     ]:
796         if hasattr(cls, "PROTO"):
797             CLASSES[cls.PROTO] = cls
798             PROTOS[cls.__name__] = cls.PROTO
799
800
801 def class_by_prefix(
802     prefix: str,
803 ) -> Union[Type[GPS303Pkt], List[str]]:
804     if prefix.startswith(PROTO_PREFIX):
805         pname = prefix[len(PROTO_PREFIX) :]
806     else:
807         raise KeyError(pname)
808     lst = [
809         (name, proto)
810         for name, proto in PROTOS.items()
811         if name.upper().startswith(prefix.upper())
812     ]
813     if len(lst) != 1:
814         return [name for name, _ in lst]
815     _, proto = lst[0]
816     return CLASSES[proto]
817
818
819 def proto_handled(proto: str) -> bool:
820     return proto.startswith(PROTO_PREFIX)
821
822
823 def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
824     return PROTO_PREFIX + (
825         obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
826     )
827
828
829 def proto_of_message(packet: bytes) -> str:
830     return proto_name(CLASSES.get(packet[1], UNKNOWN))
831
832
833 def imei_from_packet(packet: bytes) -> Optional[str]:
834     if packet[1] == LOGIN.PROTO:
835         msg = parse_message(packet)
836         if isinstance(msg, LOGIN):
837             return msg.imei
838     return None
839
840
841 def is_goodbye_packet(packet: bytes) -> bool:
842     return packet[1] == HIBERNATION.PROTO
843
844
845 def inline_response(packet: bytes) -> Optional[bytes]:
846     proto = packet[1]
847     if proto in CLASSES:
848         cls = CLASSES[proto]
849         if cls.RESPOND is Respond.INL:
850             return cls.Out().packed
851     return None
852
853
854 def probe_buffer(buffer: bytes) -> bool:
855     framestart = buffer.find(b"xx")
856     if framestart < 0:
857         return False
858     if len(buffer) - framestart < 6:
859         return False
860     return True
861
862
863 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
864     """From a packet (without framing bytes) derive the XXX.In object"""
865     length, proto = unpack("BB", packet[:2])
866     payload = packet[2:]
867     if proto not in CLASSES:
868         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
869             f"Proto {proto} is unknown"
870         )
871     else:
872         try:
873             if is_incoming:
874                 return CLASSES[proto].In(length, payload)
875             else:
876                 return CLASSES[proto].Out(length, payload)
877         except (DecodeError, ValueError, IndexError) as e:
878             cause = e
879     if is_incoming:
880         retobj = UNKNOWN.In(length, payload)
881     else:
882         retobj = UNKNOWN.Out(length, payload)
883     retobj.PROTO = proto  # Override class attr with object attr
884     retobj.cause = cause
885     return retobj
886
887
888 def exposed_protos() -> List[Tuple[str, bool]]:
889     return [
890         (proto_name(cls), cls.RESPOND is Respond.EXT)
891         for cls in CLASSES.values()
892         if hasattr(cls, "rectified")
893     ]