]> www.average.org Git - loctrkd.git/blob - loctrkd/zx303proto.py
aff3405691baa596ca36ca90e00f7067880cef3f
[loctrkd.git] / loctrkd / zx303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from types import SimpleNamespace
23 from typing import (
24     Any,
25     Callable,
26     Dict,
27     List,
28     Optional,
29     Tuple,
30     Type,
31     TYPE_CHECKING,
32     Union,
33 )
34
35 from .common import CoordReport, HintReport, StatusReport
36 from .protomodule import ProtoClass
37
38 __all__ = (
39     "Stream",
40     "class_by_prefix",
41     "enframe",
42     "exposed_protos",
43     "inline_response",
44     "proto_handled",
45     "parse_message",
46     "probe_buffer",
47     "DecodeError",
48     "Respond",
49 )
50
51 PROTO_PREFIX: str = "ZX:"
52
53 ### Deframer ###
54
55 MAXBUFFER: int = 4096
56
57
58 class Stream:
59     def __init__(self) -> None:
60         self.buffer = b""
61
62     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
63         """
64         Process next segment of the stream. Return successfully deframed
65         packets as `bytes` and error messages as `str`.
66         """
67         when = time()
68         self.buffer += segment
69         if len(self.buffer) > MAXBUFFER:
70             # We are receiving junk. Let's drop it or we run out of memory.
71             self.buffer = b""
72             return [f"More than {MAXBUFFER} unparseable data, dropping"]
73         msgs: List[Union[bytes, str]] = []
74         while True:
75             framestart = self.buffer.find(b"xx")
76             if framestart == -1:  # No frames, return whatever we have
77                 break
78             if framestart > 0:  # Should not happen, report
79                 msgs.append(
80                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
81                 )
82                 self.buffer = self.buffer[framestart:]
83             # At this point, buffer starts with a packet
84             if len(self.buffer) < 6:  # no len and proto - cannot proceed
85                 break
86             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
87             frameend = 0
88             # Length field can legitimeely be much less than the
89             # length of the packet (e.g. WiFi positioning), but
90             # it _should not_ be greater. Still sometimes it is.
91             # Luckily, not by too much: by maybe two or three bytes?
92             # Do this embarrassing hack to avoid accidental match
93             # of some binary data in the packet against '\r\n'.
94             while True:
95                 frameend = self.buffer.find(b"\r\n", frameend + 1)
96                 if frameend == -1 or frameend >= (
97                     exp_end - 3
98                 ):  # Found realistic match or none
99                     break
100             if frameend == -1:  # Incomplete frame, return what we have
101                 break
102             packet = self.buffer[2:frameend]
103             self.buffer = self.buffer[frameend + 2 :]
104             if len(packet) < 2:  # frameend comes too early
105                 msgs.append(f"Packet too short: {packet.hex()}")
106             else:
107                 msgs.append(packet)
108         return msgs
109
110     def close(self) -> bytes:
111         ret = self.buffer
112         self.buffer = b""
113         return ret
114
115
116 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
117     return b"xx" + buffer + b"\r\n"
118
119
120 ### Parser/Constructor ###
121
122
123 class DecodeError(Exception):
124     def __init__(self, e: Exception, **kwargs: Any) -> None:
125         super().__init__(e)
126         for k, v in kwargs.items():
127             setattr(self, k, v)
128
129
130 def maybe(typ: type) -> Callable[[Any], Any]:
131     return lambda x: None if x is None else typ(x)
132
133
134 def intx(x: Union[str, int]) -> int:
135     if isinstance(x, str):
136         x = int(x, 0)
137     return x
138
139
140 def boolx(x: Union[str, bool]) -> bool:
141     if isinstance(x, str):
142         if x.upper() in ("ON", "TRUE", "1"):
143             return True
144         if x.upper() in ("OFF", "FALSE", "0"):
145             return False
146         raise ValueError(str(x) + " could not be parsed as a Boolean")
147     return x
148
149
150 def hhmm(x: str) -> str:
151     """Check for the string that represents hours and minutes"""
152     if not isinstance(x, str) or len(x) != 4:
153         raise ValueError(str(x) + " is not a four-character string")
154     hh = int(x[:2])
155     mm = int(x[2:])
156     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
157         raise ValueError(str(x) + " does not contain valid hours and minutes")
158     return x
159
160
161 def hhmmhhmm(x: str) -> str:
162     """Check for the string that represents hours and minutes twice"""
163     if not isinstance(x, str) or len(x) != 8:
164         raise ValueError(str(x) + " is not an eight-character string")
165     return hhmm(x[:4]) + hhmm(x[4:])
166
167
168 def l3str(x: Union[str, List[str]]) -> List[str]:
169     if isinstance(x, str):
170         lx = x.split(",")
171     else:
172         lx = x
173     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
174         raise ValueError(str(lx) + " is not a list of three strings")
175     return lx
176
177
178 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
179     def alrmspec(sub: str) -> Tuple[int, str]:
180         if len(sub) != 7:
181             raise ValueError(sub + " does not represent day and time")
182         return (
183             {
184                 "MON": 1,
185                 "TUE": 2,
186                 "WED": 3,
187                 "THU": 4,
188                 "FRI": 5,
189                 "SAT": 6,
190                 "SUN": 7,
191             }[sub[:3].upper()],
192             sub[3:],
193         )
194
195     if isinstance(x, str):
196         lx = [alrmspec(sub) for sub in x.split(",")]
197     else:
198         lx = x
199     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
200     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
201         raise ValueError(str(lx) + " is a wrong alarms specification")
202     return [(d, hhmm(tm)) for d, tm in lx]
203
204
205 def l3int(x: Union[str, List[int]]) -> List[int]:
206     if isinstance(x, str):
207         lx = [int(el) for el in x.split(",")]
208     else:
209         lx = x
210     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
211         raise ValueError(str(lx) + " is not a list of three integers")
212     return lx
213
214
215 class Respond(Enum):
216     NON = 0  # Incoming, no response needed
217     INL = 1  # Birirectional, use `inline_response()`
218     EXT = 2  # Birirectional, use external responder
219
220
221 class GPS303Pkt(ProtoClass):
222     RESPOND = Respond.NON  # Do not send anything back by default
223     PROTO: int
224     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
225     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
226     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
227     In: Type["GPS303Pkt"]
228     Out: Type["GPS303Pkt"]
229
230     if TYPE_CHECKING:
231
232         def __getattr__(self, name: str) -> Any:
233             pass
234
235         def __setattr__(self, name: str, value: Any) -> None:
236             pass
237
238     def __init__(self, *args: Any, **kwargs: Any):
239         """
240         Construct the object _either_ from (length, payload),
241         _or_ from the values of individual fields
242         """
243         assert not args or (len(args) == 2 and not kwargs)
244         if args:  # guaranteed to be two arguments at this point
245             self.length, self.payload = args
246             try:
247                 self.decode(self.length, self.payload)
248             except error as e:
249                 raise DecodeError(e, obj=self)
250         else:
251             for kw, typ, dfl in self.KWARGS:
252                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
253             if kwargs:
254                 raise ValueError(
255                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
256                 )
257
258     def __repr__(self) -> str:
259         return "{}({})".format(
260             self.__class__.__name__,
261             ", ".join(
262                 "{}={}".format(
263                     k,
264                     'bytes.fromhex("{}")'.format(v.hex())
265                     if isinstance(v, bytes)
266                     else v.__repr__(),
267                 )
268                 for k, v in self.__dict__.items()
269                 if not k.startswith("_")
270             ),
271         )
272
273     decode: Callable[["GPS303Pkt", int, bytes], None]
274
275     def in_decode(self, length: int, packet: bytes) -> None:
276         # Overridden in subclasses, otherwise do not decode payload
277         return
278
279     def out_decode(self, length: int, packet: bytes) -> None:
280         # Overridden in subclasses, otherwise do not decode payload
281         return
282
283     encode: Callable[["GPS303Pkt"], bytes]
284
285     def in_encode(self) -> bytes:
286         # Necessary to emulate terminal, which is not implemented
287         raise NotImplementedError(
288             self.__class__.__name__ + ".encode() not implemented"
289         )
290
291     def out_encode(self) -> bytes:
292         # Overridden in subclasses, otherwise make empty payload
293         return b""
294
295     @classmethod
296     def proto_name(cls) -> str:
297         """Name of the command as used externally"""
298         return (PROTO_PREFIX + cls.__name__)[:16]
299
300     @property
301     def packed(self) -> bytes:
302         payload = self.encode()
303         length = getattr(self, "length", len(payload) + 1)
304         return pack("BB", length, self.PROTO) + payload
305
306
307 class UNKNOWN(GPS303Pkt):
308     PROTO = 256  # > 255 is impossible in real packets
309
310
311 class LOGIN(GPS303Pkt):
312     PROTO = 0x01
313     RESPOND = Respond.INL
314     # Default response for ACK, can also respond with STOP_UPLOAD
315     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
316
317     def in_decode(self, length: int, payload: bytes) -> None:
318         self.imei = payload[:8].ljust(8, b"\0").hex()
319         self.ver = payload[8]
320
321     def in_encode(self) -> bytes:
322         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
323             "B", self.ver
324         )
325
326
327 class SUPERVISION(GPS303Pkt):
328     PROTO = 0x05
329     OUT_KWARGS = (("status", int, 1),)
330
331     def out_encode(self) -> bytes:
332         # 1: The device automatically answers Pickup effect
333         # 2: Automatically Answering Two-way Calls
334         # 3: Ring manually answer the two-way call
335         return pack("B", self.status)
336
337
338 class HEARTBEAT(GPS303Pkt):
339     PROTO = 0x08
340     RESPOND = Respond.INL
341
342
343 class _GPS_POSITIONING(GPS303Pkt):
344     RESPOND = Respond.INL
345
346     def in_decode(self, length: int, payload: bytes) -> None:
347         self.dtime = payload[:6]
348         if self.dtime == b"\0\0\0\0\0\0":
349             self.devtime = None
350         else:
351             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
352             self.devtime = datetime(
353                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
354             )
355         self.gps_data_length = payload[6] >> 4
356         self.gps_nb_sat = payload[6] & 0x0F
357         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
358         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
359         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
360         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
361         self.heading = flags & 0b0000001111111111  # bits 6 - last
362         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
363         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
364         self.speed = speed
365         self.flags = flags
366
367     def out_encode(self) -> bytes:
368         tup = datetime.utcnow().timetuple()
369         ttup = (tup[0] % 100,) + tup[1:6]
370         return pack("BBBBBB", *ttup)
371
372     def rectified(self) -> CoordReport:  # JSON-able dict
373         return CoordReport(
374             devtime=str(self.devtime),
375             battery_percentage=None,
376             accuracy=None,
377             altitude=None,
378             speed=self.speed,
379             direction=self.heading,
380             latitude=self.latitude,
381             longitude=self.longitude,
382         )
383
384
385 class GPS_POSITIONING(_GPS_POSITIONING):
386     PROTO = 0x10
387
388
389 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
390     PROTO = 0x11
391
392
393 class STATUS(GPS303Pkt):
394     PROTO = 0x13
395     RESPOND = Respond.EXT
396     IN_KWARGS = (
397         ("batt", int, 100),
398         ("ver", int, 0),
399         ("timezone", int, 0),
400         ("intvl", int, 0),
401         ("signal", maybe(int), None),
402     )
403     OUT_KWARGS = (("upload_interval", int, 25),)
404
405     def in_decode(self, length: int, payload: bytes) -> None:
406         self.batt, self.ver, self.timezone, self.intvl = unpack(
407             "BBBB", payload[:4]
408         )
409         if len(payload) > 4:
410             self.signal: Optional[int] = payload[4]
411         else:
412             self.signal = None
413
414     def in_encode(self) -> bytes:
415         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
416             b"" if self.signal is None else pack("B", self.signal)
417         )
418
419     def out_encode(self) -> bytes:  # Set interval in minutes
420         return pack("B", self.upload_interval)
421
422     def rectified(self) -> StatusReport:
423         return StatusReport(battery_percentage=self.batt)
424
425
426 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
427     PROTO = 0x14
428
429     def in_encode(self) -> bytes:
430         return b""
431
432
433 class RESET(GPS303Pkt):
434     # Device sends when it got reset SMS
435     # Server can send to initiate factory reset
436     PROTO = 0x15
437
438
439 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
440     PROTO = 0x16
441     OUT_KWARGS = (("number", int, 3),)
442
443     def out_encode(self) -> bytes:  # Number of whitelist entries
444         return pack("B", self.number)
445
446
447 class _WIFI_POSITIONING(GPS303Pkt):
448     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
449         # IN_KWARGS = (
450         ("dtime", bytes, b"\0\0\0\0\0\0"),
451         ("wifi_aps", list, []),
452         ("mcc", int, 0),
453         ("mnc", int, 0),
454         ("gsm_cells", list, []),
455     )
456
457     def in_decode(self, length: int, payload: bytes) -> None:
458         self.dtime = payload[:6]
459         if self.dtime == b"\0\0\0\0\0\0":
460             self.devtime = None
461         else:
462             self.devtime = datetime.strptime(
463                 self.dtime.hex(), "%y%m%d%H%M%S"
464             ).astimezone(tz=timezone.utc)
465         self.wifi_aps = []
466         for i in range(self.length):  # length has special meaning here
467             slice = payload[6 + i * 7 : 13 + i * 7]
468             self.wifi_aps.append(
469                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
470             )
471         gsm_slice = payload[6 + self.length * 7 :]
472         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
473         self.gsm_cells = []
474         for i in range(ncells):
475             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
476             locac, cellid, sigstr = unpack(
477                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
478             )
479             self.gsm_cells.append((locac, cellid, -sigstr))
480
481     def in_encode(self) -> bytes:
482         self.length = len(self.wifi_aps)
483         return b"".join(
484             [
485                 self.dtime,
486                 b"".join(
487                     [
488                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
489                         + pack("B", -sigstr)
490                         for mac, sigstr in self.wifi_aps
491                     ]
492                 ),
493                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
494                 b"".join(
495                     [
496                         pack("!HHB", locac, cellid, -sigstr)
497                         for locac, cellid, sigstr in self.gsm_cells
498                     ]
499                 ),
500             ]
501         )
502
503     def rectified(self) -> HintReport:
504         return HintReport(
505             devtime=str(self.devtime),
506             battery_percentage=None,
507             mcc=self.mcc,
508             mnc=self.mnc,
509             gsm_cells=self.gsm_cells,
510             wifi_aps=[("<UNKNOWN>", mac, sig) for mac, sig in self.wifi_aps],
511         )
512
513
514 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
515     PROTO = 0x17
516     RESPOND = Respond.INL
517
518     def out_encode(self) -> bytes:
519         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
520
521
522 class TIME(GPS303Pkt):
523     PROTO = 0x30
524     RESPOND = Respond.INL
525
526     def out_encode(self) -> bytes:
527         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
528
529
530 class PROHIBIT_LBS(GPS303Pkt):
531     PROTO = 0x33
532     OUT_KWARGS = (("status", int, 1),)
533
534     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
535         return pack("B", self.status)
536
537
538 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
539     PROTO = 0x34
540
541     OUT_KWARGS = (
542         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
543         ("gps_interval_set", boolx, False),
544         ("gps_interval", hhmmhhmm, "00000000"),
545         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
546         ("boot_time_set", boolx, False),
547         ("boot_time", hhmm, "0000"),
548         ("shut_time_set", boolx, False),
549         ("shut_time", hhmm, "0000"),
550     )
551
552     def out_encode(self) -> bytes:
553         return (
554             pack("B", self.gps_off)
555             + pack("B", self.gps_interval_set)
556             + bytes.fromhex(self.gps_interval)
557             + pack("B", self.lbs_off)
558             + pack("B", self.boot_time_set)
559             + bytes.fromhex(self.boot_time)
560             + pack("B", self.shut_time_set)
561             + bytes.fromhex(self.shut_time)
562         )
563
564
565 class _SET_PHONE(GPS303Pkt):
566     OUT_KWARGS = (("phone", str, ""),)
567
568     def out_encode(self) -> bytes:
569         self.phone: str
570         return self.phone.encode("")
571
572
573 class REMOTE_MONITOR_PHONE(_SET_PHONE):
574     PROTO = 0x40
575
576
577 class SOS_PHONE(_SET_PHONE):
578     PROTO = 0x41
579
580
581 class DAD_PHONE(_SET_PHONE):
582     PROTO = 0x42
583
584
585 class MOM_PHONE(_SET_PHONE):
586     PROTO = 0x43
587
588
589 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
590     PROTO = 0x44
591
592
593 class GPS_OFF_PERIOD(GPS303Pkt):
594     PROTO = 0x46
595     OUT_KWARGS = (
596         ("onoff", int, 0),
597         ("fm", hhmm, "0000"),
598         ("to", hhmm, "2359"),
599     )
600
601     def out_encode(self) -> bytes:
602         return (
603             pack("B", self.onoff)
604             + bytes.fromhex(self.fm)
605             + bytes.fromhex(self.to)
606         )
607
608
609 class DND_PERIOD(GPS303Pkt):
610     PROTO = 0x47
611     OUT_KWARGS = (
612         ("onoff", int, 0),
613         ("week", int, 3),
614         ("fm1", hhmm, "0000"),
615         ("to1", hhmm, "2359"),
616         ("fm2", hhmm, "0000"),
617         ("to2", hhmm, "2359"),
618     )
619
620     def out_encode(self) -> bytes:
621         return (
622             pack("B", self.onoff)
623             + pack("B", self.week)
624             + bytes.fromhex(self.fm1)
625             + bytes.fromhex(self.to1)
626             + bytes.fromhex(self.fm2)
627             + bytes.fromhex(self.to2)
628         )
629
630
631 class RESTART_SHUTDOWN(GPS303Pkt):
632     PROTO = 0x48
633     OUT_KWARGS = (("flag", int, 0),)
634
635     def out_encode(self) -> bytes:
636         # 1 - restart
637         # 2 - shutdown
638         return pack("B", self.flag)
639
640
641 class DEVICE(GPS303Pkt):
642     PROTO = 0x49
643     OUT_KWARGS = (("flag", int, 0),)
644
645     # 0 - Stop looking for equipment
646     # 1 - Start looking for equipment
647     def out_encode(self) -> bytes:
648         return pack("B", self.flag)
649
650
651 class ALARM_CLOCK(GPS303Pkt):
652     PROTO = 0x50
653     OUT_KWARGS: Tuple[
654         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
655     ] = (
656         ("alarms", l3alarms, []),
657     )
658
659     def out_encode(self) -> bytes:
660         return b"".join(
661             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
662         )
663
664
665 class STOP_ALARM(GPS303Pkt):
666     PROTO = 0x56
667
668     def in_decode(self, length: int, payload: bytes) -> None:
669         self.flag = payload[0]
670
671
672 class SETUP(GPS303Pkt):
673     PROTO = 0x57
674     RESPOND = Respond.EXT
675     OUT_KWARGS = (
676         ("uploadintervalseconds", intx, 0x0300),
677         ("binaryswitch", intx, 0b00110001),
678         ("alarms", l3int, [0, 0, 0]),
679         ("dndtimeswitch", int, 0),
680         ("dndtimes", l3int, [0, 0, 0]),
681         ("gpstimeswitch", int, 0),
682         ("gpstimestart", int, 0),
683         ("gpstimestop", int, 0),
684         ("phonenumbers", l3str, ["", "", ""]),
685     )
686
687     def out_encode(self) -> bytes:
688         def pack3b(x: int) -> bytes:
689             return pack("!I", x)[1:]
690
691         return b"".join(
692             [
693                 pack("!H", self.uploadintervalseconds),
694                 pack("B", self.binaryswitch),
695             ]
696             + [pack3b(el) for el in self.alarms]
697             + [
698                 pack("B", self.dndtimeswitch),
699             ]
700             + [pack3b(el) for el in self.dndtimes]
701             + [
702                 pack("B", self.gpstimeswitch),
703                 pack("!H", self.gpstimestart),
704                 pack("!H", self.gpstimestop),
705             ]
706             + [b";".join([el.encode() for el in self.phonenumbers])]
707         )
708
709     def in_encode(self) -> bytes:
710         return b""
711
712
713 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
714     PROTO = 0x58
715
716
717 class RESTORE_PASSWORD(GPS303Pkt):
718     PROTO = 0x67
719
720
721 class WIFI_POSITIONING(_WIFI_POSITIONING):
722     PROTO = 0x69
723     RESPOND = Respond.EXT
724     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
725
726     def out_encode(self) -> bytes:
727         if self.latitude is None or self.longitude is None:
728             return b""
729         return "{:+#010.8g},{:+#010.8g}".format(
730             self.latitude, self.longitude
731         ).encode()
732
733     def out_decode(self, length: int, payload: bytes) -> None:
734         lat, lon = payload.decode().split(",")
735         self.latitude = float(lat)
736         self.longitude = float(lon)
737
738
739 class MANUAL_POSITIONING(GPS303Pkt):
740     PROTO = 0x80
741
742     def in_decode(self, length: int, payload: bytes) -> None:
743         self.flag = payload[0] if len(payload) > 0 else -1
744         self.reason = {
745             1: "Incorrect time",
746             2: "LBS less",
747             3: "WiFi less",
748             4: "LBS search > 3 times",
749             5: "Same LBS and WiFi data",
750             6: "LBS prohibited, WiFi absent",
751             7: "GPS spacing < 50 m",
752         }.get(self.flag, "Unknown")
753
754
755 class BATTERY_CHARGE(GPS303Pkt):
756     PROTO = 0x81
757
758
759 class CHARGER_CONNECTED(GPS303Pkt):
760     PROTO = 0x82
761
762
763 class CHARGER_DISCONNECTED(GPS303Pkt):
764     PROTO = 0x83
765
766
767 class VIBRATION_RECEIVED(GPS303Pkt):
768     PROTO = 0x94
769
770
771 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
772     PROTO = 0x98
773     RESPOND = Respond.EXT
774     OUT_KWARGS = (("interval", int, 10),)
775
776     def in_decode(self, length: int, payload: bytes) -> None:
777         self.interval = unpack("!H", payload[:2])
778
779     def out_encode(self) -> bytes:
780         return pack("!H", self.interval)
781
782
783 class SOS_ALARM(GPS303Pkt):
784     PROTO = 0x99
785
786
787 class UNKNOWN_B3(GPS303Pkt):
788     PROTO = 0xB3
789     IN_KWARGS = (("asciidata", str, ""),)
790
791     def in_decode(self, length: int, payload: bytes) -> None:
792         self.asciidata = payload.decode()
793
794
795 # Build dicts protocol number -> class and class name -> protocol number
796 CLASSES = {}
797 PROTOS = {}
798 if True:  # just to indent the code, sorry!
799     for cls in [
800         cls
801         for name, cls in globals().items()
802         if isclass(cls)
803         and issubclass(cls, GPS303Pkt)
804         and not name.startswith("_")
805     ]:
806         if hasattr(cls, "PROTO"):
807             CLASSES[cls.PROTO] = cls
808             PROTOS[cls.__name__] = cls.PROTO
809
810
811 def class_by_prefix(
812     prefix: str,
813 ) -> Union[Type[GPS303Pkt], List[str]]:
814     if prefix.startswith(PROTO_PREFIX):
815         pname = prefix[len(PROTO_PREFIX) :]
816     else:
817         raise KeyError(pname)
818     lst = [
819         (name, proto)
820         for name, proto in PROTOS.items()
821         if name.upper().startswith(prefix.upper())
822     ]
823     if len(lst) != 1:
824         return [name for name, _ in lst]
825     _, proto = lst[0]
826     return CLASSES[proto]
827
828
829 def proto_handled(proto: str) -> bool:
830     return proto.startswith(PROTO_PREFIX)
831
832
833 def proto_of_message(packet: bytes) -> str:
834     return CLASSES.get(packet[1], UNKNOWN).proto_name()
835
836
837 def imei_from_packet(packet: bytes) -> Optional[str]:
838     if packet[1] == LOGIN.PROTO:
839         msg = parse_message(packet)
840         if isinstance(msg, LOGIN):
841             return msg.imei
842     return None
843
844
845 def is_goodbye_packet(packet: bytes) -> bool:
846     return packet[1] == HIBERNATION.PROTO
847
848
849 def inline_response(packet: bytes) -> Optional[bytes]:
850     proto = packet[1]
851     if proto in CLASSES:
852         cls = CLASSES[proto]
853         if cls.RESPOND is Respond.INL:
854             return cls.Out().packed
855     return None
856
857
858 def probe_buffer(buffer: bytes) -> bool:
859     framestart = buffer.find(b"xx")
860     if framestart < 0:
861         return False
862     if len(buffer) - framestart < 6:
863         return False
864     return True
865
866
867 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
868     """From a packet (without framing bytes) derive the XXX.In object"""
869     length, proto = unpack("BB", packet[:2])
870     payload = packet[2:]
871     if proto not in CLASSES:
872         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
873             f"Proto {proto} is unknown"
874         )
875     else:
876         try:
877             if is_incoming:
878                 return CLASSES[proto].In(length, payload)
879             else:
880                 return CLASSES[proto].Out(length, payload)
881         except (DecodeError, ValueError, IndexError) as e:
882             cause = e
883     if is_incoming:
884         retobj = UNKNOWN.In(length, payload)
885     else:
886         retobj = UNKNOWN.Out(length, payload)
887     retobj.PROTO = proto  # Override class attr with object attr
888     retobj.cause = cause
889     return retobj
890
891
892 def exposed_protos() -> List[Tuple[str, bool]]:
893     return [
894         (cls.proto_name(), cls.RESPOND is Respond.EXT)
895         for cls in CLASSES.values()
896         if hasattr(cls, "rectified")
897     ]