]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
Initial multiprotocol support
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from typing import (
23     Any,
24     Callable,
25     Dict,
26     List,
27     Optional,
28     Tuple,
29     Type,
30     TYPE_CHECKING,
31     Union,
32 )
33
34 __all__ = (
35     "Stream",
36     "class_by_prefix",
37     "inline_response",
38     "parse_message",
39     "probe_buffer",
40     "proto_by_name",
41     "DecodeError",
42     "Respond",
43     "GPS303Pkt",
44     "UNKNOWN",
45     "LOGIN",
46     "SUPERVISION",
47     "HEARTBEAT",
48     "GPS_POSITIONING",
49     "GPS_OFFLINE_POSITIONING",
50     "STATUS",
51     "HIBERNATION",
52     "RESET",
53     "WHITELIST_TOTAL",
54     "WIFI_OFFLINE_POSITIONING",
55     "TIME",
56     "PROHIBIT_LBS",
57     "GPS_LBS_SWITCH_TIMES",
58     "REMOTE_MONITOR_PHONE",
59     "SOS_PHONE",
60     "DAD_PHONE",
61     "MOM_PHONE",
62     "STOP_UPLOAD",
63     "GPS_OFF_PERIOD",
64     "DND_PERIOD",
65     "RESTART_SHUTDOWN",
66     "DEVICE",
67     "ALARM_CLOCK",
68     "STOP_ALARM",
69     "SETUP",
70     "SYNCHRONOUS_WHITELIST",
71     "RESTORE_PASSWORD",
72     "WIFI_POSITIONING",
73     "MANUAL_POSITIONING",
74     "BATTERY_CHARGE",
75     "CHARGER_CONNECTED",
76     "CHARGER_DISCONNECTED",
77     "VIBRATION_RECEIVED",
78     "POSITION_UPLOAD_INTERVAL",
79     "SOS_ALARM",
80     "UNKNOWN_B3",
81 )
82
83 ### Deframer ###
84
85 MAXBUFFER: int = 4096
86
87
88 class Stream:
89     def __init__(self) -> None:
90         self.buffer = b""
91
92     @staticmethod
93     def enframe(buffer: bytes) -> bytes:
94         return b"xx" + buffer + b"\r\n"
95
96     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
97         """
98         Process next segment of the stream. Return successfully deframed
99         packets as `bytes` and error messages as `str`.
100         """
101         when = time()
102         self.buffer += segment
103         if len(self.buffer) > MAXBUFFER:
104             # We are receiving junk. Let's drop it or we run out of memory.
105             self.buffer = b""
106             return [f"More than {MAXBUFFER} unparseable data, dropping"]
107         msgs: List[Union[bytes, str]] = []
108         while True:
109             framestart = self.buffer.find(b"xx")
110             if framestart == -1:  # No frames, return whatever we have
111                 break
112             if framestart > 0:  # Should not happen, report
113                 msgs.append(
114                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
115                 )
116                 self.buffer = self.buffer[framestart:]
117             # At this point, buffer starts with a packet
118             if len(self.buffer) < 6:  # no len and proto - cannot proceed
119                 break
120             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
121             frameend = 0
122             # Length field can legitimeely be much less than the
123             # length of the packet (e.g. WiFi positioning), but
124             # it _should not_ be greater. Still sometimes it is.
125             # Luckily, not by too much: by maybe two or three bytes?
126             # Do this embarrassing hack to avoid accidental match
127             # of some binary data in the packet against '\r\n'.
128             while True:
129                 frameend = self.buffer.find(b"\r\n", frameend + 1)
130                 if frameend == -1 or frameend >= (
131                     exp_end - 3
132                 ):  # Found realistic match or none
133                     break
134             if frameend == -1:  # Incomplete frame, return what we have
135                 break
136             packet = self.buffer[2:frameend]
137             self.buffer = self.buffer[frameend + 2 :]
138             if len(packet) < 2:  # frameend comes too early
139                 msgs.append(f"Packet too short: {packet.hex()}")
140             else:
141                 msgs.append(packet)
142         return msgs
143
144     def close(self) -> bytes:
145         ret = self.buffer
146         self.buffer = b""
147         return ret
148
149
150 ### Parser/Constructor ###
151
152
153 class DecodeError(Exception):
154     def __init__(self, e: Exception, **kwargs: Any) -> None:
155         super().__init__(e)
156         for k, v in kwargs.items():
157             setattr(self, k, v)
158
159
160 def maybe(typ: type) -> Callable[[Any], Any]:
161     return lambda x: None if x is None else typ(x)
162
163
164 def intx(x: Union[str, int]) -> int:
165     if isinstance(x, str):
166         x = int(x, 0)
167     return x
168
169
170 def boolx(x: Union[str, bool]) -> bool:
171     if isinstance(x, str):
172         if x.upper() in ("ON", "TRUE", "1"):
173             return True
174         if x.upper() in ("OFF", "FALSE", "0"):
175             return False
176         raise ValueError(str(x) + " could not be parsed as a Boolean")
177     return x
178
179
180 def hhmm(x: str) -> str:
181     """Check for the string that represents hours and minutes"""
182     if not isinstance(x, str) or len(x) != 4:
183         raise ValueError(str(x) + " is not a four-character string")
184     hh = int(x[:2])
185     mm = int(x[2:])
186     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
187         raise ValueError(str(x) + " does not contain valid hours and minutes")
188     return x
189
190
191 def hhmmhhmm(x: str) -> str:
192     """Check for the string that represents hours and minutes twice"""
193     if not isinstance(x, str) or len(x) != 8:
194         raise ValueError(str(x) + " is not an eight-character string")
195     return hhmm(x[:4]) + hhmm(x[4:])
196
197
198 def l3str(x: Union[str, List[str]]) -> List[str]:
199     if isinstance(x, str):
200         lx = x.split(",")
201     else:
202         lx = x
203     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
204         raise ValueError(str(lx) + " is not a list of three strings")
205     return lx
206
207
208 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
209     def alrmspec(sub: str) -> Tuple[int, str]:
210         if len(sub) != 7:
211             raise ValueError(sub + " does not represent day and time")
212         return (
213             {
214                 "MON": 1,
215                 "TUE": 2,
216                 "WED": 3,
217                 "THU": 4,
218                 "FRI": 5,
219                 "SAT": 6,
220                 "SUN": 7,
221             }[sub[:3].upper()],
222             sub[3:],
223         )
224
225     if isinstance(x, str):
226         lx = [alrmspec(sub) for sub in x.split(",")]
227     else:
228         lx = x
229     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
230     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
231         raise ValueError(str(lx) + " is a wrong alarms specification")
232     return [(d, hhmm(tm)) for d, tm in lx]
233
234
235 def l3int(x: Union[str, List[int]]) -> List[int]:
236     if isinstance(x, str):
237         lx = [int(el) for el in x.split(",")]
238     else:
239         lx = x
240     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
241         raise ValueError(str(lx) + " is not a list of three integers")
242     return lx
243
244
245 class MetaPkt(type):
246     """
247     For each class corresponding to a message, automatically create
248     two nested classes `In` and `Out` that also inherit from their
249     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
250     copied to the `In` nested class under the name `KWARGS`, and
251     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
252     to the nested class `Out`. In addition, method `encode` is
253     defined in both classes equal to `in_encode()` and `out_encode()`
254     respectively.
255     """
256
257     if TYPE_CHECKING:
258
259         def __getattr__(self, name: str) -> Any:
260             pass
261
262         def __setattr__(self, name: str, value: Any) -> None:
263             pass
264
265     def __new__(
266         cls: Type["MetaPkt"],
267         name: str,
268         bases: Tuple[type, ...],
269         attrs: Dict[str, Any],
270     ) -> "MetaPkt":
271         newcls = super().__new__(cls, name, bases, attrs)
272         newcls.In = super().__new__(
273             cls,
274             name + ".In",
275             (newcls,) + bases,
276             {
277                 "KWARGS": newcls.IN_KWARGS,
278                 "decode": newcls.in_decode,
279                 "encode": newcls.in_encode,
280             },
281         )
282         newcls.Out = super().__new__(
283             cls,
284             name + ".Out",
285             (newcls,) + bases,
286             {
287                 "KWARGS": newcls.OUT_KWARGS,
288                 "decode": newcls.out_decode,
289                 "encode": newcls.out_encode,
290             },
291         )
292         return newcls
293
294
295 class Respond(Enum):
296     NON = 0  # Incoming, no response needed
297     INL = 1  # Birirectional, use `inline_response()`
298     EXT = 2  # Birirectional, use external responder
299
300
301 class GPS303Pkt(metaclass=MetaPkt):
302     RESPOND = Respond.NON  # Do not send anything back by default
303     PROTO: int
304     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
305     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
306     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
307     In: Type["GPS303Pkt"]
308     Out: Type["GPS303Pkt"]
309
310     if TYPE_CHECKING:
311
312         def __getattr__(self, name: str) -> Any:
313             pass
314
315         def __setattr__(self, name: str, value: Any) -> None:
316             pass
317
318     def __init__(self, *args: Any, **kwargs: Any):
319         """
320         Construct the object _either_ from (length, payload),
321         _or_ from the values of individual fields
322         """
323         assert not args or (len(args) == 2 and not kwargs)
324         if args:  # guaranteed to be two arguments at this point
325             self.length, self.payload = args
326             try:
327                 self.decode(self.length, self.payload)
328             except error as e:
329                 raise DecodeError(e, obj=self)
330         else:
331             for kw, typ, dfl in self.KWARGS:
332                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
333             if kwargs:
334                 raise ValueError(
335                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
336                 )
337
338     def __repr__(self) -> str:
339         return "{}({})".format(
340             self.__class__.__name__,
341             ", ".join(
342                 "{}={}".format(
343                     k,
344                     'bytes.fromhex("{}")'.format(v.hex())
345                     if isinstance(v, bytes)
346                     else v.__repr__(),
347                 )
348                 for k, v in self.__dict__.items()
349                 if not k.startswith("_")
350             ),
351         )
352
353     decode: Callable[["GPS303Pkt", int, bytes], None]
354
355     def in_decode(self, length: int, packet: bytes) -> None:
356         # Overridden in subclasses, otherwise do not decode payload
357         return
358
359     def out_decode(self, length: int, packet: bytes) -> None:
360         # Overridden in subclasses, otherwise do not decode payload
361         return
362
363     encode: Callable[["GPS303Pkt"], bytes]
364
365     def in_encode(self) -> bytes:
366         # Necessary to emulate terminal, which is not implemented
367         raise NotImplementedError(
368             self.__class__.__name__ + ".encode() not implemented"
369         )
370
371     def out_encode(self) -> bytes:
372         # Overridden in subclasses, otherwise make empty payload
373         return b""
374
375     @property
376     def packed(self) -> bytes:
377         payload = self.encode()
378         length = getattr(self, "length", len(payload) + 1)
379         return pack("BB", length, self.PROTO) + payload
380
381
382 class UNKNOWN(GPS303Pkt):
383     PROTO = 256  # > 255 is impossible in real packets
384
385
386 class LOGIN(GPS303Pkt):
387     PROTO = 0x01
388     RESPOND = Respond.INL
389     # Default response for ACK, can also respond with STOP_UPLOAD
390     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
391
392     def in_decode(self, length: int, payload: bytes) -> None:
393         self.imei = payload[:8].ljust(8, b"\0").hex()
394         self.ver = payload[8]
395
396     def in_encode(self) -> bytes:
397         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
398             "B", self.ver
399         )
400
401
402 class SUPERVISION(GPS303Pkt):
403     PROTO = 0x05
404     OUT_KWARGS = (("status", int, 1),)
405
406     def out_encode(self) -> bytes:
407         # 1: The device automatically answers Pickup effect
408         # 2: Automatically Answering Two-way Calls
409         # 3: Ring manually answer the two-way call
410         return pack("B", self.status)
411
412
413 class HEARTBEAT(GPS303Pkt):
414     PROTO = 0x08
415     RESPOND = Respond.INL
416
417
418 class _GPS_POSITIONING(GPS303Pkt):
419     RESPOND = Respond.INL
420
421     def in_decode(self, length: int, payload: bytes) -> None:
422         self.dtime = payload[:6]
423         if self.dtime == b"\0\0\0\0\0\0":
424             self.devtime = None
425         else:
426             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
427             self.devtime = datetime(
428                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
429             )
430         self.gps_data_length = payload[6] >> 4
431         self.gps_nb_sat = payload[6] & 0x0F
432         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
433         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
434         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
435         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
436         self.heading = flags & 0b0000001111111111  # bits 6 - last
437         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
438         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
439         self.speed = speed
440         self.flags = flags
441
442     def out_encode(self) -> bytes:
443         tup = datetime.utcnow().timetuple()
444         ttup = (tup[0] % 100,) + tup[1:6]
445         return pack("BBBBBB", *ttup)
446
447
448 class GPS_POSITIONING(_GPS_POSITIONING):
449     PROTO = 0x10
450
451
452 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
453     PROTO = 0x11
454
455
456 class STATUS(GPS303Pkt):
457     PROTO = 0x13
458     RESPOND = Respond.EXT
459     IN_KWARGS = (
460         ("batt", int, 100),
461         ("ver", int, 0),
462         ("timezone", int, 0),
463         ("intvl", int, 0),
464         ("signal", maybe(int), None),
465     )
466     OUT_KWARGS = (("upload_interval", int, 25),)
467
468     def in_decode(self, length: int, payload: bytes) -> None:
469         self.batt, self.ver, self.timezone, self.intvl = unpack(
470             "BBBB", payload[:4]
471         )
472         if len(payload) > 4:
473             self.signal: Optional[int] = payload[4]
474         else:
475             self.signal = None
476
477     def in_encode(self) -> bytes:
478         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
479             b"" if self.signal is None else pack("B", self.signal)
480         )
481
482     def out_encode(self) -> bytes:  # Set interval in minutes
483         return pack("B", self.upload_interval)
484
485
486 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
487     PROTO = 0x14
488
489     def in_encode(self) -> bytes:
490         return b""
491
492
493 class RESET(GPS303Pkt):
494     # Device sends when it got reset SMS
495     # Server can send to initiate factory reset
496     PROTO = 0x15
497
498
499 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
500     PROTO = 0x16
501     OUT_KWARGS = (("number", int, 3),)
502
503     def out_encode(self) -> bytes:  # Number of whitelist entries
504         return pack("B", self.number)
505
506
507 class _WIFI_POSITIONING(GPS303Pkt):
508     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
509         # IN_KWARGS = (
510         ("dtime", bytes, b"\0\0\0\0\0\0"),
511         ("wifi_aps", list, []),
512         ("mcc", int, 0),
513         ("mnc", int, 0),
514         ("gsm_cells", list, []),
515     )
516
517     def in_decode(self, length: int, payload: bytes) -> None:
518         self.dtime = payload[:6]
519         if self.dtime == b"\0\0\0\0\0\0":
520             self.devtime = None
521         else:
522             self.devtime = datetime.strptime(
523                 self.dtime.hex(), "%y%m%d%H%M%S"
524             ).astimezone(tz=timezone.utc)
525         self.wifi_aps = []
526         for i in range(self.length):  # length has special meaning here
527             slice = payload[6 + i * 7 : 13 + i * 7]
528             self.wifi_aps.append(
529                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
530             )
531         gsm_slice = payload[6 + self.length * 7 :]
532         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
533         self.gsm_cells = []
534         for i in range(ncells):
535             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
536             locac, cellid, sigstr = unpack(
537                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
538             )
539             self.gsm_cells.append((locac, cellid, -sigstr))
540
541     def in_encode(self) -> bytes:
542         self.length = len(self.wifi_aps)
543         return b"".join(
544             [
545                 self.dtime,
546                 b"".join(
547                     [
548                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
549                         + pack("B", -sigstr)
550                         for mac, sigstr in self.wifi_aps
551                     ]
552                 ),
553                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
554                 b"".join(
555                     [
556                         pack("!HHB", locac, cellid, -sigstr)
557                         for locac, cellid, sigstr in self.gsm_cells
558                     ]
559                 ),
560             ]
561         )
562
563
564 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
565     PROTO = 0x17
566     RESPOND = Respond.INL
567
568     def out_encode(self) -> bytes:
569         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
570
571
572 class TIME(GPS303Pkt):
573     PROTO = 0x30
574     RESPOND = Respond.INL
575
576     def out_encode(self) -> bytes:
577         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
578
579
580 class PROHIBIT_LBS(GPS303Pkt):
581     PROTO = 0x33
582     OUT_KWARGS = (("status", int, 1),)
583
584     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
585         return pack("B", self.status)
586
587
588 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
589     PROTO = 0x34
590
591     OUT_KWARGS = (
592         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
593         ("gps_interval_set", boolx, False),
594         ("gps_interval", hhmmhhmm, "00000000"),
595         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
596         ("boot_time_set", boolx, False),
597         ("boot_time", hhmm, "0000"),
598         ("shut_time_set", boolx, False),
599         ("shut_time", hhmm, "0000"),
600     )
601
602     def out_encode(self) -> bytes:
603         return (
604             pack("B", self.gps_off)
605             + pack("B", self.gps_interval_set)
606             + bytes.fromhex(self.gps_interval)
607             + pack("B", self.lbs_off)
608             + pack("B", self.boot_time_set)
609             + bytes.fromhex(self.boot_time)
610             + pack("B", self.shut_time_set)
611             + bytes.fromhex(self.shut_time)
612         )
613
614
615 class _SET_PHONE(GPS303Pkt):
616     OUT_KWARGS = (("phone", str, ""),)
617
618     def out_encode(self) -> bytes:
619         self.phone: str
620         return self.phone.encode("")
621
622
623 class REMOTE_MONITOR_PHONE(_SET_PHONE):
624     PROTO = 0x40
625
626
627 class SOS_PHONE(_SET_PHONE):
628     PROTO = 0x41
629
630
631 class DAD_PHONE(_SET_PHONE):
632     PROTO = 0x42
633
634
635 class MOM_PHONE(_SET_PHONE):
636     PROTO = 0x43
637
638
639 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
640     PROTO = 0x44
641
642
643 class GPS_OFF_PERIOD(GPS303Pkt):
644     PROTO = 0x46
645     OUT_KWARGS = (
646         ("onoff", int, 0),
647         ("fm", hhmm, "0000"),
648         ("to", hhmm, "2359"),
649     )
650
651     def out_encode(self) -> bytes:
652         return (
653             pack("B", self.onoff)
654             + bytes.fromhex(self.fm)
655             + bytes.fromhex(self.to)
656         )
657
658
659 class DND_PERIOD(GPS303Pkt):
660     PROTO = 0x47
661     OUT_KWARGS = (
662         ("onoff", int, 0),
663         ("week", int, 3),
664         ("fm1", hhmm, "0000"),
665         ("to1", hhmm, "2359"),
666         ("fm2", hhmm, "0000"),
667         ("to2", hhmm, "2359"),
668     )
669
670     def out_encode(self) -> bytes:
671         return (
672             pack("B", self.onoff)
673             + pack("B", self.week)
674             + bytes.fromhex(self.fm1)
675             + bytes.fromhex(self.to1)
676             + bytes.fromhex(self.fm2)
677             + bytes.fromhex(self.to2)
678         )
679
680
681 class RESTART_SHUTDOWN(GPS303Pkt):
682     PROTO = 0x48
683     OUT_KWARGS = (("flag", int, 0),)
684
685     def out_encode(self) -> bytes:
686         # 1 - restart
687         # 2 - shutdown
688         return pack("B", self.flag)
689
690
691 class DEVICE(GPS303Pkt):
692     PROTO = 0x49
693     OUT_KWARGS = (("flag", int, 0),)
694
695     # 0 - Stop looking for equipment
696     # 1 - Start looking for equipment
697     def out_encode(self) -> bytes:
698         return pack("B", self.flag)
699
700
701 class ALARM_CLOCK(GPS303Pkt):
702     PROTO = 0x50
703     OUT_KWARGS: Tuple[
704         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
705     ] = (
706         ("alarms", l3alarms, []),
707     )
708
709     def out_encode(self) -> bytes:
710         return b"".join(
711             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
712         )
713
714
715 class STOP_ALARM(GPS303Pkt):
716     PROTO = 0x56
717
718     def in_decode(self, length: int, payload: bytes) -> None:
719         self.flag = payload[0]
720
721
722 class SETUP(GPS303Pkt):
723     PROTO = 0x57
724     RESPOND = Respond.EXT
725     OUT_KWARGS = (
726         ("uploadintervalseconds", intx, 0x0300),
727         ("binaryswitch", intx, 0b00110001),
728         ("alarms", l3int, [0, 0, 0]),
729         ("dndtimeswitch", int, 0),
730         ("dndtimes", l3int, [0, 0, 0]),
731         ("gpstimeswitch", int, 0),
732         ("gpstimestart", int, 0),
733         ("gpstimestop", int, 0),
734         ("phonenumbers", l3str, ["", "", ""]),
735     )
736
737     def out_encode(self) -> bytes:
738         def pack3b(x: int) -> bytes:
739             return pack("!I", x)[1:]
740
741         return b"".join(
742             [
743                 pack("!H", self.uploadintervalseconds),
744                 pack("B", self.binaryswitch),
745             ]
746             + [pack3b(el) for el in self.alarms]
747             + [
748                 pack("B", self.dndtimeswitch),
749             ]
750             + [pack3b(el) for el in self.dndtimes]
751             + [
752                 pack("B", self.gpstimeswitch),
753                 pack("!H", self.gpstimestart),
754                 pack("!H", self.gpstimestop),
755             ]
756             + [b";".join([el.encode() for el in self.phonenumbers])]
757         )
758
759     def in_encode(self) -> bytes:
760         return b""
761
762
763 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
764     PROTO = 0x58
765
766
767 class RESTORE_PASSWORD(GPS303Pkt):
768     PROTO = 0x67
769
770
771 class WIFI_POSITIONING(_WIFI_POSITIONING):
772     PROTO = 0x69
773     RESPOND = Respond.EXT
774     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
775
776     def out_encode(self) -> bytes:
777         if self.latitude is None or self.longitude is None:
778             return b""
779         return "{:+#010.8g},{:+#010.8g}".format(
780             self.latitude, self.longitude
781         ).encode()
782
783     def out_decode(self, length: int, payload: bytes) -> None:
784         lat, lon = payload.decode().split(",")
785         self.latitude = float(lat)
786         self.longitude = float(lon)
787
788
789 class MANUAL_POSITIONING(GPS303Pkt):
790     PROTO = 0x80
791
792     def in_decode(self, length: int, payload: bytes) -> None:
793         self.flag = payload[0] if len(payload) > 0 else -1
794         self.reason = {
795             1: "Incorrect time",
796             2: "LBS less",
797             3: "WiFi less",
798             4: "LBS search > 3 times",
799             5: "Same LBS and WiFi data",
800             6: "LBS prohibited, WiFi absent",
801             7: "GPS spacing < 50 m",
802         }.get(self.flag, "Unknown")
803
804
805 class BATTERY_CHARGE(GPS303Pkt):
806     PROTO = 0x81
807
808
809 class CHARGER_CONNECTED(GPS303Pkt):
810     PROTO = 0x82
811
812
813 class CHARGER_DISCONNECTED(GPS303Pkt):
814     PROTO = 0x83
815
816
817 class VIBRATION_RECEIVED(GPS303Pkt):
818     PROTO = 0x94
819
820
821 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
822     PROTO = 0x98
823     RESPOND = Respond.EXT
824     OUT_KWARGS = (("interval", int, 10),)
825
826     def in_decode(self, length: int, payload: bytes) -> None:
827         self.interval = unpack("!H", payload[:2])
828
829     def out_encode(self) -> bytes:
830         return pack("!H", self.interval)
831
832
833 class SOS_ALARM(GPS303Pkt):
834     PROTO = 0x99
835
836
837 class UNKNOWN_B3(GPS303Pkt):
838     PROTO = 0xB3
839     IN_KWARGS = (("asciidata", str, ""),)
840
841     def in_decode(self, length: int, payload: bytes) -> None:
842         self.asciidata = payload.decode()
843
844
845 # Build dicts protocol number -> class and class name -> protocol number
846 CLASSES = {}
847 PROTOS = {}
848 if True:  # just to indent the code, sorry!
849     for cls in [
850         cls
851         for name, cls in globals().items()
852         if isclass(cls)
853         and issubclass(cls, GPS303Pkt)
854         and not name.startswith("_")
855     ]:
856         if hasattr(cls, "PROTO"):
857             CLASSES[cls.PROTO] = cls
858             PROTOS[cls.__name__] = cls.PROTO
859
860
861 def class_by_prefix(
862     prefix: str,
863 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
864     lst = [
865         (name, proto)
866         for name, proto in PROTOS.items()
867         if name.upper().startswith(prefix.upper())
868     ]
869     if len(lst) != 1:
870         return lst
871     _, proto = lst[0]
872     return CLASSES[proto]
873
874
875 def proto_by_name(name: str) -> int:
876     return PROTOS.get(name, -1)
877
878
879 def proto_of_message(packet: bytes) -> int:
880     return packet[1]
881
882
883 def imei_from_packet(packet: bytes) -> Optional[str]:
884     if proto_of_message(packet) == LOGIN.PROTO:
885         msg = parse_message(packet)
886         if isinstance(msg, LOGIN):
887             return msg.imei
888     return None
889
890
891 def is_goodbye_packet(packet: bytes) -> bool:
892     return proto_of_message(packet) == HIBERNATION.PROTO
893
894
895 def inline_response(packet: bytes) -> Optional[bytes]:
896     proto = proto_of_message(packet)
897     if proto in CLASSES:
898         cls = CLASSES[proto]
899         if cls.RESPOND is Respond.INL:
900             return cls.Out().packed
901     return None
902
903
904 def probe_buffer(buffer: bytes) -> bool:
905     framestart = buffer.find(b"xx")
906     if framestart < 0:
907         return False
908     if len(buffer) - framestart < 6:
909         return False
910     return True
911
912
913 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
914     """From a packet (without framing bytes) derive the XXX.In object"""
915     length, proto = unpack("BB", packet[:2])
916     payload = packet[2:]
917     if proto not in CLASSES:
918         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
919             f"Proto {proto} is unknown"
920         )
921     else:
922         try:
923             if is_incoming:
924                 return CLASSES[proto].In(length, payload)
925             else:
926                 return CLASSES[proto].Out(length, payload)
927         except (DecodeError, ValueError, IndexError) as e:
928             cause = e
929     if is_incoming:
930         retobj = UNKNOWN.In(length, payload)
931     else:
932         retobj = UNKNOWN.Out(length, payload)
933     retobj.PROTO = proto  # Override class attr with object attr
934     retobj.cause = cause
935     return retobj