]> www.average.org Git - loctrkd.git/blob - loctrkd/zx303proto.py
WIP converting wsgateway to multiprotocols
[loctrkd.git] / loctrkd / zx303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from typing import (
23     Any,
24     Callable,
25     Dict,
26     List,
27     Optional,
28     Tuple,
29     Type,
30     TYPE_CHECKING,
31     Union,
32 )
33
34 __all__ = (
35     "Stream",
36     "class_by_prefix",
37     "enframe",
38     "exposed_protos",
39     "inline_response",
40     "proto_handled",
41     "parse_message",
42     "probe_buffer",
43     "proto_name",
44     "DecodeError",
45     "Respond",
46     "GPS303Pkt",
47     "UNKNOWN",
48     "LOGIN",
49     "SUPERVISION",
50     "HEARTBEAT",
51     "GPS_POSITIONING",
52     "GPS_OFFLINE_POSITIONING",
53     "STATUS",
54     "HIBERNATION",
55     "RESET",
56     "WHITELIST_TOTAL",
57     "WIFI_OFFLINE_POSITIONING",
58     "TIME",
59     "PROHIBIT_LBS",
60     "GPS_LBS_SWITCH_TIMES",
61     "REMOTE_MONITOR_PHONE",
62     "SOS_PHONE",
63     "DAD_PHONE",
64     "MOM_PHONE",
65     "STOP_UPLOAD",
66     "GPS_OFF_PERIOD",
67     "DND_PERIOD",
68     "RESTART_SHUTDOWN",
69     "DEVICE",
70     "ALARM_CLOCK",
71     "STOP_ALARM",
72     "SETUP",
73     "SYNCHRONOUS_WHITELIST",
74     "RESTORE_PASSWORD",
75     "WIFI_POSITIONING",
76     "MANUAL_POSITIONING",
77     "BATTERY_CHARGE",
78     "CHARGER_CONNECTED",
79     "CHARGER_DISCONNECTED",
80     "VIBRATION_RECEIVED",
81     "POSITION_UPLOAD_INTERVAL",
82     "SOS_ALARM",
83     "UNKNOWN_B3",
84 )
85
86 PROTO_PREFIX = "ZX:"
87
88 ### Deframer ###
89
90 MAXBUFFER: int = 4096
91
92
93 class Stream:
94     def __init__(self) -> None:
95         self.buffer = b""
96
97     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
98         """
99         Process next segment of the stream. Return successfully deframed
100         packets as `bytes` and error messages as `str`.
101         """
102         when = time()
103         self.buffer += segment
104         if len(self.buffer) > MAXBUFFER:
105             # We are receiving junk. Let's drop it or we run out of memory.
106             self.buffer = b""
107             return [f"More than {MAXBUFFER} unparseable data, dropping"]
108         msgs: List[Union[bytes, str]] = []
109         while True:
110             framestart = self.buffer.find(b"xx")
111             if framestart == -1:  # No frames, return whatever we have
112                 break
113             if framestart > 0:  # Should not happen, report
114                 msgs.append(
115                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
116                 )
117                 self.buffer = self.buffer[framestart:]
118             # At this point, buffer starts with a packet
119             if len(self.buffer) < 6:  # no len and proto - cannot proceed
120                 break
121             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
122             frameend = 0
123             # Length field can legitimeely be much less than the
124             # length of the packet (e.g. WiFi positioning), but
125             # it _should not_ be greater. Still sometimes it is.
126             # Luckily, not by too much: by maybe two or three bytes?
127             # Do this embarrassing hack to avoid accidental match
128             # of some binary data in the packet against '\r\n'.
129             while True:
130                 frameend = self.buffer.find(b"\r\n", frameend + 1)
131                 if frameend == -1 or frameend >= (
132                     exp_end - 3
133                 ):  # Found realistic match or none
134                     break
135             if frameend == -1:  # Incomplete frame, return what we have
136                 break
137             packet = self.buffer[2:frameend]
138             self.buffer = self.buffer[frameend + 2 :]
139             if len(packet) < 2:  # frameend comes too early
140                 msgs.append(f"Packet too short: {packet.hex()}")
141             else:
142                 msgs.append(packet)
143         return msgs
144
145     def close(self) -> bytes:
146         ret = self.buffer
147         self.buffer = b""
148         return ret
149
150
151 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
152     return b"xx" + buffer + b"\r\n"
153
154
155 ### Parser/Constructor ###
156
157
158 class DecodeError(Exception):
159     def __init__(self, e: Exception, **kwargs: Any) -> None:
160         super().__init__(e)
161         for k, v in kwargs.items():
162             setattr(self, k, v)
163
164
165 def maybe(typ: type) -> Callable[[Any], Any]:
166     return lambda x: None if x is None else typ(x)
167
168
169 def intx(x: Union[str, int]) -> int:
170     if isinstance(x, str):
171         x = int(x, 0)
172     return x
173
174
175 def boolx(x: Union[str, bool]) -> bool:
176     if isinstance(x, str):
177         if x.upper() in ("ON", "TRUE", "1"):
178             return True
179         if x.upper() in ("OFF", "FALSE", "0"):
180             return False
181         raise ValueError(str(x) + " could not be parsed as a Boolean")
182     return x
183
184
185 def hhmm(x: str) -> str:
186     """Check for the string that represents hours and minutes"""
187     if not isinstance(x, str) or len(x) != 4:
188         raise ValueError(str(x) + " is not a four-character string")
189     hh = int(x[:2])
190     mm = int(x[2:])
191     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
192         raise ValueError(str(x) + " does not contain valid hours and minutes")
193     return x
194
195
196 def hhmmhhmm(x: str) -> str:
197     """Check for the string that represents hours and minutes twice"""
198     if not isinstance(x, str) or len(x) != 8:
199         raise ValueError(str(x) + " is not an eight-character string")
200     return hhmm(x[:4]) + hhmm(x[4:])
201
202
203 def l3str(x: Union[str, List[str]]) -> List[str]:
204     if isinstance(x, str):
205         lx = x.split(",")
206     else:
207         lx = x
208     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
209         raise ValueError(str(lx) + " is not a list of three strings")
210     return lx
211
212
213 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
214     def alrmspec(sub: str) -> Tuple[int, str]:
215         if len(sub) != 7:
216             raise ValueError(sub + " does not represent day and time")
217         return (
218             {
219                 "MON": 1,
220                 "TUE": 2,
221                 "WED": 3,
222                 "THU": 4,
223                 "FRI": 5,
224                 "SAT": 6,
225                 "SUN": 7,
226             }[sub[:3].upper()],
227             sub[3:],
228         )
229
230     if isinstance(x, str):
231         lx = [alrmspec(sub) for sub in x.split(",")]
232     else:
233         lx = x
234     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
235     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
236         raise ValueError(str(lx) + " is a wrong alarms specification")
237     return [(d, hhmm(tm)) for d, tm in lx]
238
239
240 def l3int(x: Union[str, List[int]]) -> List[int]:
241     if isinstance(x, str):
242         lx = [int(el) for el in x.split(",")]
243     else:
244         lx = x
245     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
246         raise ValueError(str(lx) + " is not a list of three integers")
247     return lx
248
249
250 class MetaPkt(type):
251     """
252     For each class corresponding to a message, automatically create
253     two nested classes `In` and `Out` that also inherit from their
254     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
255     copied to the `In` nested class under the name `KWARGS`, and
256     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
257     to the nested class `Out`. In addition, method `encode` is
258     defined in both classes equal to `in_encode()` and `out_encode()`
259     respectively.
260     """
261
262     if TYPE_CHECKING:
263
264         def __getattr__(self, name: str) -> Any:
265             pass
266
267         def __setattr__(self, name: str, value: Any) -> None:
268             pass
269
270     def __new__(
271         cls: Type["MetaPkt"],
272         name: str,
273         bases: Tuple[type, ...],
274         attrs: Dict[str, Any],
275     ) -> "MetaPkt":
276         newcls = super().__new__(cls, name, bases, attrs)
277         newcls.In = super().__new__(
278             cls,
279             name + ".In",
280             (newcls,) + bases,
281             {
282                 "KWARGS": newcls.IN_KWARGS,
283                 "decode": newcls.in_decode,
284                 "encode": newcls.in_encode,
285             },
286         )
287         newcls.Out = super().__new__(
288             cls,
289             name + ".Out",
290             (newcls,) + bases,
291             {
292                 "KWARGS": newcls.OUT_KWARGS,
293                 "decode": newcls.out_decode,
294                 "encode": newcls.out_encode,
295             },
296         )
297         return newcls
298
299
300 class Respond(Enum):
301     NON = 0  # Incoming, no response needed
302     INL = 1  # Birirectional, use `inline_response()`
303     EXT = 2  # Birirectional, use external responder
304
305
306 class GPS303Pkt(metaclass=MetaPkt):
307     RESPOND = Respond.NON  # Do not send anything back by default
308     PROTO: int
309     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
310     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
311     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
312     In: Type["GPS303Pkt"]
313     Out: Type["GPS303Pkt"]
314
315     if TYPE_CHECKING:
316
317         def __getattr__(self, name: str) -> Any:
318             pass
319
320         def __setattr__(self, name: str, value: Any) -> None:
321             pass
322
323     def __init__(self, *args: Any, **kwargs: Any):
324         """
325         Construct the object _either_ from (length, payload),
326         _or_ from the values of individual fields
327         """
328         assert not args or (len(args) == 2 and not kwargs)
329         if args:  # guaranteed to be two arguments at this point
330             self.length, self.payload = args
331             try:
332                 self.decode(self.length, self.payload)
333             except error as e:
334                 raise DecodeError(e, obj=self)
335         else:
336             for kw, typ, dfl in self.KWARGS:
337                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
338             if kwargs:
339                 raise ValueError(
340                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
341                 )
342
343     def __repr__(self) -> str:
344         return "{}({})".format(
345             self.__class__.__name__,
346             ", ".join(
347                 "{}={}".format(
348                     k,
349                     'bytes.fromhex("{}")'.format(v.hex())
350                     if isinstance(v, bytes)
351                     else v.__repr__(),
352                 )
353                 for k, v in self.__dict__.items()
354                 if not k.startswith("_")
355             ),
356         )
357
358     decode: Callable[["GPS303Pkt", int, bytes], None]
359
360     def in_decode(self, length: int, packet: bytes) -> None:
361         # Overridden in subclasses, otherwise do not decode payload
362         return
363
364     def out_decode(self, length: int, packet: bytes) -> None:
365         # Overridden in subclasses, otherwise do not decode payload
366         return
367
368     encode: Callable[["GPS303Pkt"], bytes]
369
370     def in_encode(self) -> bytes:
371         # Necessary to emulate terminal, which is not implemented
372         raise NotImplementedError(
373             self.__class__.__name__ + ".encode() not implemented"
374         )
375
376     def out_encode(self) -> bytes:
377         # Overridden in subclasses, otherwise make empty payload
378         return b""
379
380     @property
381     def packed(self) -> bytes:
382         payload = self.encode()
383         length = getattr(self, "length", len(payload) + 1)
384         return pack("BB", length, self.PROTO) + payload
385
386
387 class UNKNOWN(GPS303Pkt):
388     PROTO = 256  # > 255 is impossible in real packets
389
390
391 class LOGIN(GPS303Pkt):
392     PROTO = 0x01
393     RESPOND = Respond.INL
394     # Default response for ACK, can also respond with STOP_UPLOAD
395     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
396
397     def in_decode(self, length: int, payload: bytes) -> None:
398         self.imei = payload[:8].ljust(8, b"\0").hex()
399         self.ver = payload[8]
400
401     def in_encode(self) -> bytes:
402         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
403             "B", self.ver
404         )
405
406
407 class SUPERVISION(GPS303Pkt):
408     PROTO = 0x05
409     OUT_KWARGS = (("status", int, 1),)
410
411     def out_encode(self) -> bytes:
412         # 1: The device automatically answers Pickup effect
413         # 2: Automatically Answering Two-way Calls
414         # 3: Ring manually answer the two-way call
415         return pack("B", self.status)
416
417
418 class HEARTBEAT(GPS303Pkt):
419     PROTO = 0x08
420     RESPOND = Respond.INL
421
422
423 class _GPS_POSITIONING(GPS303Pkt):
424     RESPOND = Respond.INL
425
426     def in_decode(self, length: int, payload: bytes) -> None:
427         self.dtime = payload[:6]
428         if self.dtime == b"\0\0\0\0\0\0":
429             self.devtime = None
430         else:
431             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
432             self.devtime = datetime(
433                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
434             )
435         self.gps_data_length = payload[6] >> 4
436         self.gps_nb_sat = payload[6] & 0x0F
437         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
438         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
439         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
440         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
441         self.heading = flags & 0b0000001111111111  # bits 6 - last
442         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
443         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
444         self.speed = speed
445         self.flags = flags
446
447     def out_encode(self) -> bytes:
448         tup = datetime.utcnow().timetuple()
449         ttup = (tup[0] % 100,) + tup[1:6]
450         return pack("BBBBBB", *ttup)
451
452
453 class GPS_POSITIONING(_GPS_POSITIONING):
454     PROTO = 0x10
455
456
457 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
458     PROTO = 0x11
459
460
461 class STATUS(GPS303Pkt):
462     PROTO = 0x13
463     RESPOND = Respond.EXT
464     IN_KWARGS = (
465         ("batt", int, 100),
466         ("ver", int, 0),
467         ("timezone", int, 0),
468         ("intvl", int, 0),
469         ("signal", maybe(int), None),
470     )
471     OUT_KWARGS = (("upload_interval", int, 25),)
472
473     def in_decode(self, length: int, payload: bytes) -> None:
474         self.batt, self.ver, self.timezone, self.intvl = unpack(
475             "BBBB", payload[:4]
476         )
477         if len(payload) > 4:
478             self.signal: Optional[int] = payload[4]
479         else:
480             self.signal = None
481
482     def in_encode(self) -> bytes:
483         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
484             b"" if self.signal is None else pack("B", self.signal)
485         )
486
487     def out_encode(self) -> bytes:  # Set interval in minutes
488         return pack("B", self.upload_interval)
489
490
491 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
492     PROTO = 0x14
493
494     def in_encode(self) -> bytes:
495         return b""
496
497
498 class RESET(GPS303Pkt):
499     # Device sends when it got reset SMS
500     # Server can send to initiate factory reset
501     PROTO = 0x15
502
503
504 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
505     PROTO = 0x16
506     OUT_KWARGS = (("number", int, 3),)
507
508     def out_encode(self) -> bytes:  # Number of whitelist entries
509         return pack("B", self.number)
510
511
512 class _WIFI_POSITIONING(GPS303Pkt):
513     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
514         # IN_KWARGS = (
515         ("dtime", bytes, b"\0\0\0\0\0\0"),
516         ("wifi_aps", list, []),
517         ("mcc", int, 0),
518         ("mnc", int, 0),
519         ("gsm_cells", list, []),
520     )
521
522     def in_decode(self, length: int, payload: bytes) -> None:
523         self.dtime = payload[:6]
524         if self.dtime == b"\0\0\0\0\0\0":
525             self.devtime = None
526         else:
527             self.devtime = datetime.strptime(
528                 self.dtime.hex(), "%y%m%d%H%M%S"
529             ).astimezone(tz=timezone.utc)
530         self.wifi_aps = []
531         for i in range(self.length):  # length has special meaning here
532             slice = payload[6 + i * 7 : 13 + i * 7]
533             self.wifi_aps.append(
534                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
535             )
536         gsm_slice = payload[6 + self.length * 7 :]
537         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
538         self.gsm_cells = []
539         for i in range(ncells):
540             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
541             locac, cellid, sigstr = unpack(
542                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
543             )
544             self.gsm_cells.append((locac, cellid, -sigstr))
545
546     def in_encode(self) -> bytes:
547         self.length = len(self.wifi_aps)
548         return b"".join(
549             [
550                 self.dtime,
551                 b"".join(
552                     [
553                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
554                         + pack("B", -sigstr)
555                         for mac, sigstr in self.wifi_aps
556                     ]
557                 ),
558                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
559                 b"".join(
560                     [
561                         pack("!HHB", locac, cellid, -sigstr)
562                         for locac, cellid, sigstr in self.gsm_cells
563                     ]
564                 ),
565             ]
566         )
567
568
569 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
570     PROTO = 0x17
571     RESPOND = Respond.INL
572
573     def out_encode(self) -> bytes:
574         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
575
576
577 class TIME(GPS303Pkt):
578     PROTO = 0x30
579     RESPOND = Respond.INL
580
581     def out_encode(self) -> bytes:
582         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
583
584
585 class PROHIBIT_LBS(GPS303Pkt):
586     PROTO = 0x33
587     OUT_KWARGS = (("status", int, 1),)
588
589     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
590         return pack("B", self.status)
591
592
593 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
594     PROTO = 0x34
595
596     OUT_KWARGS = (
597         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
598         ("gps_interval_set", boolx, False),
599         ("gps_interval", hhmmhhmm, "00000000"),
600         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
601         ("boot_time_set", boolx, False),
602         ("boot_time", hhmm, "0000"),
603         ("shut_time_set", boolx, False),
604         ("shut_time", hhmm, "0000"),
605     )
606
607     def out_encode(self) -> bytes:
608         return (
609             pack("B", self.gps_off)
610             + pack("B", self.gps_interval_set)
611             + bytes.fromhex(self.gps_interval)
612             + pack("B", self.lbs_off)
613             + pack("B", self.boot_time_set)
614             + bytes.fromhex(self.boot_time)
615             + pack("B", self.shut_time_set)
616             + bytes.fromhex(self.shut_time)
617         )
618
619
620 class _SET_PHONE(GPS303Pkt):
621     OUT_KWARGS = (("phone", str, ""),)
622
623     def out_encode(self) -> bytes:
624         self.phone: str
625         return self.phone.encode("")
626
627
628 class REMOTE_MONITOR_PHONE(_SET_PHONE):
629     PROTO = 0x40
630
631
632 class SOS_PHONE(_SET_PHONE):
633     PROTO = 0x41
634
635
636 class DAD_PHONE(_SET_PHONE):
637     PROTO = 0x42
638
639
640 class MOM_PHONE(_SET_PHONE):
641     PROTO = 0x43
642
643
644 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
645     PROTO = 0x44
646
647
648 class GPS_OFF_PERIOD(GPS303Pkt):
649     PROTO = 0x46
650     OUT_KWARGS = (
651         ("onoff", int, 0),
652         ("fm", hhmm, "0000"),
653         ("to", hhmm, "2359"),
654     )
655
656     def out_encode(self) -> bytes:
657         return (
658             pack("B", self.onoff)
659             + bytes.fromhex(self.fm)
660             + bytes.fromhex(self.to)
661         )
662
663
664 class DND_PERIOD(GPS303Pkt):
665     PROTO = 0x47
666     OUT_KWARGS = (
667         ("onoff", int, 0),
668         ("week", int, 3),
669         ("fm1", hhmm, "0000"),
670         ("to1", hhmm, "2359"),
671         ("fm2", hhmm, "0000"),
672         ("to2", hhmm, "2359"),
673     )
674
675     def out_encode(self) -> bytes:
676         return (
677             pack("B", self.onoff)
678             + pack("B", self.week)
679             + bytes.fromhex(self.fm1)
680             + bytes.fromhex(self.to1)
681             + bytes.fromhex(self.fm2)
682             + bytes.fromhex(self.to2)
683         )
684
685
686 class RESTART_SHUTDOWN(GPS303Pkt):
687     PROTO = 0x48
688     OUT_KWARGS = (("flag", int, 0),)
689
690     def out_encode(self) -> bytes:
691         # 1 - restart
692         # 2 - shutdown
693         return pack("B", self.flag)
694
695
696 class DEVICE(GPS303Pkt):
697     PROTO = 0x49
698     OUT_KWARGS = (("flag", int, 0),)
699
700     # 0 - Stop looking for equipment
701     # 1 - Start looking for equipment
702     def out_encode(self) -> bytes:
703         return pack("B", self.flag)
704
705
706 class ALARM_CLOCK(GPS303Pkt):
707     PROTO = 0x50
708     OUT_KWARGS: Tuple[
709         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
710     ] = (
711         ("alarms", l3alarms, []),
712     )
713
714     def out_encode(self) -> bytes:
715         return b"".join(
716             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
717         )
718
719
720 class STOP_ALARM(GPS303Pkt):
721     PROTO = 0x56
722
723     def in_decode(self, length: int, payload: bytes) -> None:
724         self.flag = payload[0]
725
726
727 class SETUP(GPS303Pkt):
728     PROTO = 0x57
729     RESPOND = Respond.EXT
730     OUT_KWARGS = (
731         ("uploadintervalseconds", intx, 0x0300),
732         ("binaryswitch", intx, 0b00110001),
733         ("alarms", l3int, [0, 0, 0]),
734         ("dndtimeswitch", int, 0),
735         ("dndtimes", l3int, [0, 0, 0]),
736         ("gpstimeswitch", int, 0),
737         ("gpstimestart", int, 0),
738         ("gpstimestop", int, 0),
739         ("phonenumbers", l3str, ["", "", ""]),
740     )
741
742     def out_encode(self) -> bytes:
743         def pack3b(x: int) -> bytes:
744             return pack("!I", x)[1:]
745
746         return b"".join(
747             [
748                 pack("!H", self.uploadintervalseconds),
749                 pack("B", self.binaryswitch),
750             ]
751             + [pack3b(el) for el in self.alarms]
752             + [
753                 pack("B", self.dndtimeswitch),
754             ]
755             + [pack3b(el) for el in self.dndtimes]
756             + [
757                 pack("B", self.gpstimeswitch),
758                 pack("!H", self.gpstimestart),
759                 pack("!H", self.gpstimestop),
760             ]
761             + [b";".join([el.encode() for el in self.phonenumbers])]
762         )
763
764     def in_encode(self) -> bytes:
765         return b""
766
767
768 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
769     PROTO = 0x58
770
771
772 class RESTORE_PASSWORD(GPS303Pkt):
773     PROTO = 0x67
774
775
776 class WIFI_POSITIONING(_WIFI_POSITIONING):
777     PROTO = 0x69
778     RESPOND = Respond.EXT
779     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
780
781     def out_encode(self) -> bytes:
782         if self.latitude is None or self.longitude is None:
783             return b""
784         return "{:+#010.8g},{:+#010.8g}".format(
785             self.latitude, self.longitude
786         ).encode()
787
788     def out_decode(self, length: int, payload: bytes) -> None:
789         lat, lon = payload.decode().split(",")
790         self.latitude = float(lat)
791         self.longitude = float(lon)
792
793
794 class MANUAL_POSITIONING(GPS303Pkt):
795     PROTO = 0x80
796
797     def in_decode(self, length: int, payload: bytes) -> None:
798         self.flag = payload[0] if len(payload) > 0 else -1
799         self.reason = {
800             1: "Incorrect time",
801             2: "LBS less",
802             3: "WiFi less",
803             4: "LBS search > 3 times",
804             5: "Same LBS and WiFi data",
805             6: "LBS prohibited, WiFi absent",
806             7: "GPS spacing < 50 m",
807         }.get(self.flag, "Unknown")
808
809
810 class BATTERY_CHARGE(GPS303Pkt):
811     PROTO = 0x81
812
813
814 class CHARGER_CONNECTED(GPS303Pkt):
815     PROTO = 0x82
816
817
818 class CHARGER_DISCONNECTED(GPS303Pkt):
819     PROTO = 0x83
820
821
822 class VIBRATION_RECEIVED(GPS303Pkt):
823     PROTO = 0x94
824
825
826 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
827     PROTO = 0x98
828     RESPOND = Respond.EXT
829     OUT_KWARGS = (("interval", int, 10),)
830
831     def in_decode(self, length: int, payload: bytes) -> None:
832         self.interval = unpack("!H", payload[:2])
833
834     def out_encode(self) -> bytes:
835         return pack("!H", self.interval)
836
837
838 class SOS_ALARM(GPS303Pkt):
839     PROTO = 0x99
840
841
842 class UNKNOWN_B3(GPS303Pkt):
843     PROTO = 0xB3
844     IN_KWARGS = (("asciidata", str, ""),)
845
846     def in_decode(self, length: int, payload: bytes) -> None:
847         self.asciidata = payload.decode()
848
849
850 # Build dicts protocol number -> class and class name -> protocol number
851 CLASSES = {}
852 PROTOS = {}
853 if True:  # just to indent the code, sorry!
854     for cls in [
855         cls
856         for name, cls in globals().items()
857         if isclass(cls)
858         and issubclass(cls, GPS303Pkt)
859         and not name.startswith("_")
860     ]:
861         if hasattr(cls, "PROTO"):
862             CLASSES[cls.PROTO] = cls
863             PROTOS[cls.__name__] = cls.PROTO
864
865
866 def class_by_prefix(
867     prefix: str,
868 ) -> Union[Type[GPS303Pkt], List[str]]:
869     if prefix.startswith(PROTO_PREFIX):
870         pname = prefix[len(PROTO_PREFIX) :]
871     else:
872         raise KeyError(pname)
873     lst = [
874         (name, proto)
875         for name, proto in PROTOS.items()
876         if name.upper().startswith(prefix.upper())
877     ]
878     if len(lst) != 1:
879         return [name for name, _ in lst]
880     _, proto = lst[0]
881     return CLASSES[proto]
882
883
884 def proto_handled(proto: str) -> bool:
885     return proto.startswith(PROTO_PREFIX)
886
887
888 def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
889     return PROTO_PREFIX + (
890         obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
891     )
892
893
894 def proto_of_message(packet: bytes) -> str:
895     return proto_name(CLASSES.get(packet[1], UNKNOWN))
896
897
898 def imei_from_packet(packet: bytes) -> Optional[str]:
899     if packet[1] == LOGIN.PROTO:
900         msg = parse_message(packet)
901         if isinstance(msg, LOGIN):
902             return msg.imei
903     return None
904
905
906 def is_goodbye_packet(packet: bytes) -> bool:
907     return packet[1] == HIBERNATION.PROTO
908
909
910 def inline_response(packet: bytes) -> Optional[bytes]:
911     proto = packet[1]
912     if proto in CLASSES:
913         cls = CLASSES[proto]
914         if cls.RESPOND is Respond.INL:
915             return cls.Out().packed
916     return None
917
918
919 def probe_buffer(buffer: bytes) -> bool:
920     framestart = buffer.find(b"xx")
921     if framestart < 0:
922         return False
923     if len(buffer) - framestart < 6:
924         return False
925     return True
926
927
928 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
929     """From a packet (without framing bytes) derive the XXX.In object"""
930     length, proto = unpack("BB", packet[:2])
931     payload = packet[2:]
932     if proto not in CLASSES:
933         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
934             f"Proto {proto} is unknown"
935         )
936     else:
937         try:
938             if is_incoming:
939                 return CLASSES[proto].In(length, payload)
940             else:
941                 return CLASSES[proto].Out(length, payload)
942         except (DecodeError, ValueError, IndexError) as e:
943             cause = e
944     if is_incoming:
945         retobj = UNKNOWN.In(length, payload)
946     else:
947         retobj = UNKNOWN.Out(length, payload)
948     retobj.PROTO = proto  # Override class attr with object attr
949     retobj.cause = cause
950     return retobj
951
952
953 def exposed_protos() -> List[Tuple[str, bool]]:
954     return [
955         (proto_name(GPS_POSITIONING), True),
956         (proto_name(WIFI_POSITIONING), False),
957         (proto_name(STATUS), True),
958     ]