]> www.average.org Git - loctrkd.git/blob - loctrkd/zx303proto.py
rename gps303 -> loctrkd
[loctrkd.git] / loctrkd / zx303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from typing import (
23     Any,
24     Callable,
25     Dict,
26     List,
27     Optional,
28     Tuple,
29     Type,
30     TYPE_CHECKING,
31     Union,
32 )
33
34 __all__ = (
35     "Stream",
36     "class_by_prefix",
37     "inline_response",
38     "parse_message",
39     "probe_buffer",
40     "proto_by_name",
41     "proto_name",
42     "DecodeError",
43     "Respond",
44     "GPS303Pkt",
45     "UNKNOWN",
46     "LOGIN",
47     "SUPERVISION",
48     "HEARTBEAT",
49     "GPS_POSITIONING",
50     "GPS_OFFLINE_POSITIONING",
51     "STATUS",
52     "HIBERNATION",
53     "RESET",
54     "WHITELIST_TOTAL",
55     "WIFI_OFFLINE_POSITIONING",
56     "TIME",
57     "PROHIBIT_LBS",
58     "GPS_LBS_SWITCH_TIMES",
59     "REMOTE_MONITOR_PHONE",
60     "SOS_PHONE",
61     "DAD_PHONE",
62     "MOM_PHONE",
63     "STOP_UPLOAD",
64     "GPS_OFF_PERIOD",
65     "DND_PERIOD",
66     "RESTART_SHUTDOWN",
67     "DEVICE",
68     "ALARM_CLOCK",
69     "STOP_ALARM",
70     "SETUP",
71     "SYNCHRONOUS_WHITELIST",
72     "RESTORE_PASSWORD",
73     "WIFI_POSITIONING",
74     "MANUAL_POSITIONING",
75     "BATTERY_CHARGE",
76     "CHARGER_CONNECTED",
77     "CHARGER_DISCONNECTED",
78     "VIBRATION_RECEIVED",
79     "POSITION_UPLOAD_INTERVAL",
80     "SOS_ALARM",
81     "UNKNOWN_B3",
82 )
83
84 PROTO_PREFIX = "ZX"
85
86 ### Deframer ###
87
88 MAXBUFFER: int = 4096
89
90
91 class Stream:
92     def __init__(self) -> None:
93         self.buffer = b""
94
95     @staticmethod
96     def enframe(buffer: bytes) -> bytes:
97         return b"xx" + buffer + b"\r\n"
98
99     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
100         """
101         Process next segment of the stream. Return successfully deframed
102         packets as `bytes` and error messages as `str`.
103         """
104         when = time()
105         self.buffer += segment
106         if len(self.buffer) > MAXBUFFER:
107             # We are receiving junk. Let's drop it or we run out of memory.
108             self.buffer = b""
109             return [f"More than {MAXBUFFER} unparseable data, dropping"]
110         msgs: List[Union[bytes, str]] = []
111         while True:
112             framestart = self.buffer.find(b"xx")
113             if framestart == -1:  # No frames, return whatever we have
114                 break
115             if framestart > 0:  # Should not happen, report
116                 msgs.append(
117                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
118                 )
119                 self.buffer = self.buffer[framestart:]
120             # At this point, buffer starts with a packet
121             if len(self.buffer) < 6:  # no len and proto - cannot proceed
122                 break
123             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
124             frameend = 0
125             # Length field can legitimeely be much less than the
126             # length of the packet (e.g. WiFi positioning), but
127             # it _should not_ be greater. Still sometimes it is.
128             # Luckily, not by too much: by maybe two or three bytes?
129             # Do this embarrassing hack to avoid accidental match
130             # of some binary data in the packet against '\r\n'.
131             while True:
132                 frameend = self.buffer.find(b"\r\n", frameend + 1)
133                 if frameend == -1 or frameend >= (
134                     exp_end - 3
135                 ):  # Found realistic match or none
136                     break
137             if frameend == -1:  # Incomplete frame, return what we have
138                 break
139             packet = self.buffer[2:frameend]
140             self.buffer = self.buffer[frameend + 2 :]
141             if len(packet) < 2:  # frameend comes too early
142                 msgs.append(f"Packet too short: {packet.hex()}")
143             else:
144                 msgs.append(packet)
145         return msgs
146
147     def close(self) -> bytes:
148         ret = self.buffer
149         self.buffer = b""
150         return ret
151
152
153 ### Parser/Constructor ###
154
155
156 class DecodeError(Exception):
157     def __init__(self, e: Exception, **kwargs: Any) -> None:
158         super().__init__(e)
159         for k, v in kwargs.items():
160             setattr(self, k, v)
161
162
163 def maybe(typ: type) -> Callable[[Any], Any]:
164     return lambda x: None if x is None else typ(x)
165
166
167 def intx(x: Union[str, int]) -> int:
168     if isinstance(x, str):
169         x = int(x, 0)
170     return x
171
172
173 def boolx(x: Union[str, bool]) -> bool:
174     if isinstance(x, str):
175         if x.upper() in ("ON", "TRUE", "1"):
176             return True
177         if x.upper() in ("OFF", "FALSE", "0"):
178             return False
179         raise ValueError(str(x) + " could not be parsed as a Boolean")
180     return x
181
182
183 def hhmm(x: str) -> str:
184     """Check for the string that represents hours and minutes"""
185     if not isinstance(x, str) or len(x) != 4:
186         raise ValueError(str(x) + " is not a four-character string")
187     hh = int(x[:2])
188     mm = int(x[2:])
189     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
190         raise ValueError(str(x) + " does not contain valid hours and minutes")
191     return x
192
193
194 def hhmmhhmm(x: str) -> str:
195     """Check for the string that represents hours and minutes twice"""
196     if not isinstance(x, str) or len(x) != 8:
197         raise ValueError(str(x) + " is not an eight-character string")
198     return hhmm(x[:4]) + hhmm(x[4:])
199
200
201 def l3str(x: Union[str, List[str]]) -> List[str]:
202     if isinstance(x, str):
203         lx = x.split(",")
204     else:
205         lx = x
206     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
207         raise ValueError(str(lx) + " is not a list of three strings")
208     return lx
209
210
211 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
212     def alrmspec(sub: str) -> Tuple[int, str]:
213         if len(sub) != 7:
214             raise ValueError(sub + " does not represent day and time")
215         return (
216             {
217                 "MON": 1,
218                 "TUE": 2,
219                 "WED": 3,
220                 "THU": 4,
221                 "FRI": 5,
222                 "SAT": 6,
223                 "SUN": 7,
224             }[sub[:3].upper()],
225             sub[3:],
226         )
227
228     if isinstance(x, str):
229         lx = [alrmspec(sub) for sub in x.split(",")]
230     else:
231         lx = x
232     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
233     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
234         raise ValueError(str(lx) + " is a wrong alarms specification")
235     return [(d, hhmm(tm)) for d, tm in lx]
236
237
238 def l3int(x: Union[str, List[int]]) -> List[int]:
239     if isinstance(x, str):
240         lx = [int(el) for el in x.split(",")]
241     else:
242         lx = x
243     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
244         raise ValueError(str(lx) + " is not a list of three integers")
245     return lx
246
247
248 class MetaPkt(type):
249     """
250     For each class corresponding to a message, automatically create
251     two nested classes `In` and `Out` that also inherit from their
252     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
253     copied to the `In` nested class under the name `KWARGS`, and
254     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
255     to the nested class `Out`. In addition, method `encode` is
256     defined in both classes equal to `in_encode()` and `out_encode()`
257     respectively.
258     """
259
260     if TYPE_CHECKING:
261
262         def __getattr__(self, name: str) -> Any:
263             pass
264
265         def __setattr__(self, name: str, value: Any) -> None:
266             pass
267
268     def __new__(
269         cls: Type["MetaPkt"],
270         name: str,
271         bases: Tuple[type, ...],
272         attrs: Dict[str, Any],
273     ) -> "MetaPkt":
274         newcls = super().__new__(cls, name, bases, attrs)
275         newcls.In = super().__new__(
276             cls,
277             name + ".In",
278             (newcls,) + bases,
279             {
280                 "KWARGS": newcls.IN_KWARGS,
281                 "decode": newcls.in_decode,
282                 "encode": newcls.in_encode,
283             },
284         )
285         newcls.Out = super().__new__(
286             cls,
287             name + ".Out",
288             (newcls,) + bases,
289             {
290                 "KWARGS": newcls.OUT_KWARGS,
291                 "decode": newcls.out_decode,
292                 "encode": newcls.out_encode,
293             },
294         )
295         return newcls
296
297
298 class Respond(Enum):
299     NON = 0  # Incoming, no response needed
300     INL = 1  # Birirectional, use `inline_response()`
301     EXT = 2  # Birirectional, use external responder
302
303
304 class GPS303Pkt(metaclass=MetaPkt):
305     RESPOND = Respond.NON  # Do not send anything back by default
306     PROTO: int
307     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
308     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
309     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
310     In: Type["GPS303Pkt"]
311     Out: Type["GPS303Pkt"]
312
313     if TYPE_CHECKING:
314
315         def __getattr__(self, name: str) -> Any:
316             pass
317
318         def __setattr__(self, name: str, value: Any) -> None:
319             pass
320
321     def __init__(self, *args: Any, **kwargs: Any):
322         """
323         Construct the object _either_ from (length, payload),
324         _or_ from the values of individual fields
325         """
326         assert not args or (len(args) == 2 and not kwargs)
327         if args:  # guaranteed to be two arguments at this point
328             self.length, self.payload = args
329             try:
330                 self.decode(self.length, self.payload)
331             except error as e:
332                 raise DecodeError(e, obj=self)
333         else:
334             for kw, typ, dfl in self.KWARGS:
335                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
336             if kwargs:
337                 raise ValueError(
338                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
339                 )
340
341     def __repr__(self) -> str:
342         return "{}({})".format(
343             self.__class__.__name__,
344             ", ".join(
345                 "{}={}".format(
346                     k,
347                     'bytes.fromhex("{}")'.format(v.hex())
348                     if isinstance(v, bytes)
349                     else v.__repr__(),
350                 )
351                 for k, v in self.__dict__.items()
352                 if not k.startswith("_")
353             ),
354         )
355
356     decode: Callable[["GPS303Pkt", int, bytes], None]
357
358     def in_decode(self, length: int, packet: bytes) -> None:
359         # Overridden in subclasses, otherwise do not decode payload
360         return
361
362     def out_decode(self, length: int, packet: bytes) -> None:
363         # Overridden in subclasses, otherwise do not decode payload
364         return
365
366     encode: Callable[["GPS303Pkt"], bytes]
367
368     def in_encode(self) -> bytes:
369         # Necessary to emulate terminal, which is not implemented
370         raise NotImplementedError(
371             self.__class__.__name__ + ".encode() not implemented"
372         )
373
374     def out_encode(self) -> bytes:
375         # Overridden in subclasses, otherwise make empty payload
376         return b""
377
378     @property
379     def packed(self) -> bytes:
380         payload = self.encode()
381         length = getattr(self, "length", len(payload) + 1)
382         return pack("BB", length, self.PROTO) + payload
383
384
385 class UNKNOWN(GPS303Pkt):
386     PROTO = 256  # > 255 is impossible in real packets
387
388
389 class LOGIN(GPS303Pkt):
390     PROTO = 0x01
391     RESPOND = Respond.INL
392     # Default response for ACK, can also respond with STOP_UPLOAD
393     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
394
395     def in_decode(self, length: int, payload: bytes) -> None:
396         self.imei = payload[:8].ljust(8, b"\0").hex()
397         self.ver = payload[8]
398
399     def in_encode(self) -> bytes:
400         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
401             "B", self.ver
402         )
403
404
405 class SUPERVISION(GPS303Pkt):
406     PROTO = 0x05
407     OUT_KWARGS = (("status", int, 1),)
408
409     def out_encode(self) -> bytes:
410         # 1: The device automatically answers Pickup effect
411         # 2: Automatically Answering Two-way Calls
412         # 3: Ring manually answer the two-way call
413         return pack("B", self.status)
414
415
416 class HEARTBEAT(GPS303Pkt):
417     PROTO = 0x08
418     RESPOND = Respond.INL
419
420
421 class _GPS_POSITIONING(GPS303Pkt):
422     RESPOND = Respond.INL
423
424     def in_decode(self, length: int, payload: bytes) -> None:
425         self.dtime = payload[:6]
426         if self.dtime == b"\0\0\0\0\0\0":
427             self.devtime = None
428         else:
429             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
430             self.devtime = datetime(
431                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
432             )
433         self.gps_data_length = payload[6] >> 4
434         self.gps_nb_sat = payload[6] & 0x0F
435         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
436         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
437         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
438         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
439         self.heading = flags & 0b0000001111111111  # bits 6 - last
440         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
441         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
442         self.speed = speed
443         self.flags = flags
444
445     def out_encode(self) -> bytes:
446         tup = datetime.utcnow().timetuple()
447         ttup = (tup[0] % 100,) + tup[1:6]
448         return pack("BBBBBB", *ttup)
449
450
451 class GPS_POSITIONING(_GPS_POSITIONING):
452     PROTO = 0x10
453
454
455 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
456     PROTO = 0x11
457
458
459 class STATUS(GPS303Pkt):
460     PROTO = 0x13
461     RESPOND = Respond.EXT
462     IN_KWARGS = (
463         ("batt", int, 100),
464         ("ver", int, 0),
465         ("timezone", int, 0),
466         ("intvl", int, 0),
467         ("signal", maybe(int), None),
468     )
469     OUT_KWARGS = (("upload_interval", int, 25),)
470
471     def in_decode(self, length: int, payload: bytes) -> None:
472         self.batt, self.ver, self.timezone, self.intvl = unpack(
473             "BBBB", payload[:4]
474         )
475         if len(payload) > 4:
476             self.signal: Optional[int] = payload[4]
477         else:
478             self.signal = None
479
480     def in_encode(self) -> bytes:
481         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
482             b"" if self.signal is None else pack("B", self.signal)
483         )
484
485     def out_encode(self) -> bytes:  # Set interval in minutes
486         return pack("B", self.upload_interval)
487
488
489 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
490     PROTO = 0x14
491
492     def in_encode(self) -> bytes:
493         return b""
494
495
496 class RESET(GPS303Pkt):
497     # Device sends when it got reset SMS
498     # Server can send to initiate factory reset
499     PROTO = 0x15
500
501
502 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
503     PROTO = 0x16
504     OUT_KWARGS = (("number", int, 3),)
505
506     def out_encode(self) -> bytes:  # Number of whitelist entries
507         return pack("B", self.number)
508
509
510 class _WIFI_POSITIONING(GPS303Pkt):
511     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
512         # IN_KWARGS = (
513         ("dtime", bytes, b"\0\0\0\0\0\0"),
514         ("wifi_aps", list, []),
515         ("mcc", int, 0),
516         ("mnc", int, 0),
517         ("gsm_cells", list, []),
518     )
519
520     def in_decode(self, length: int, payload: bytes) -> None:
521         self.dtime = payload[:6]
522         if self.dtime == b"\0\0\0\0\0\0":
523             self.devtime = None
524         else:
525             self.devtime = datetime.strptime(
526                 self.dtime.hex(), "%y%m%d%H%M%S"
527             ).astimezone(tz=timezone.utc)
528         self.wifi_aps = []
529         for i in range(self.length):  # length has special meaning here
530             slice = payload[6 + i * 7 : 13 + i * 7]
531             self.wifi_aps.append(
532                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
533             )
534         gsm_slice = payload[6 + self.length * 7 :]
535         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
536         self.gsm_cells = []
537         for i in range(ncells):
538             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
539             locac, cellid, sigstr = unpack(
540                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
541             )
542             self.gsm_cells.append((locac, cellid, -sigstr))
543
544     def in_encode(self) -> bytes:
545         self.length = len(self.wifi_aps)
546         return b"".join(
547             [
548                 self.dtime,
549                 b"".join(
550                     [
551                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
552                         + pack("B", -sigstr)
553                         for mac, sigstr in self.wifi_aps
554                     ]
555                 ),
556                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
557                 b"".join(
558                     [
559                         pack("!HHB", locac, cellid, -sigstr)
560                         for locac, cellid, sigstr in self.gsm_cells
561                     ]
562                 ),
563             ]
564         )
565
566
567 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
568     PROTO = 0x17
569     RESPOND = Respond.INL
570
571     def out_encode(self) -> bytes:
572         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
573
574
575 class TIME(GPS303Pkt):
576     PROTO = 0x30
577     RESPOND = Respond.INL
578
579     def out_encode(self) -> bytes:
580         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
581
582
583 class PROHIBIT_LBS(GPS303Pkt):
584     PROTO = 0x33
585     OUT_KWARGS = (("status", int, 1),)
586
587     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
588         return pack("B", self.status)
589
590
591 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
592     PROTO = 0x34
593
594     OUT_KWARGS = (
595         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
596         ("gps_interval_set", boolx, False),
597         ("gps_interval", hhmmhhmm, "00000000"),
598         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
599         ("boot_time_set", boolx, False),
600         ("boot_time", hhmm, "0000"),
601         ("shut_time_set", boolx, False),
602         ("shut_time", hhmm, "0000"),
603     )
604
605     def out_encode(self) -> bytes:
606         return (
607             pack("B", self.gps_off)
608             + pack("B", self.gps_interval_set)
609             + bytes.fromhex(self.gps_interval)
610             + pack("B", self.lbs_off)
611             + pack("B", self.boot_time_set)
612             + bytes.fromhex(self.boot_time)
613             + pack("B", self.shut_time_set)
614             + bytes.fromhex(self.shut_time)
615         )
616
617
618 class _SET_PHONE(GPS303Pkt):
619     OUT_KWARGS = (("phone", str, ""),)
620
621     def out_encode(self) -> bytes:
622         self.phone: str
623         return self.phone.encode("")
624
625
626 class REMOTE_MONITOR_PHONE(_SET_PHONE):
627     PROTO = 0x40
628
629
630 class SOS_PHONE(_SET_PHONE):
631     PROTO = 0x41
632
633
634 class DAD_PHONE(_SET_PHONE):
635     PROTO = 0x42
636
637
638 class MOM_PHONE(_SET_PHONE):
639     PROTO = 0x43
640
641
642 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
643     PROTO = 0x44
644
645
646 class GPS_OFF_PERIOD(GPS303Pkt):
647     PROTO = 0x46
648     OUT_KWARGS = (
649         ("onoff", int, 0),
650         ("fm", hhmm, "0000"),
651         ("to", hhmm, "2359"),
652     )
653
654     def out_encode(self) -> bytes:
655         return (
656             pack("B", self.onoff)
657             + bytes.fromhex(self.fm)
658             + bytes.fromhex(self.to)
659         )
660
661
662 class DND_PERIOD(GPS303Pkt):
663     PROTO = 0x47
664     OUT_KWARGS = (
665         ("onoff", int, 0),
666         ("week", int, 3),
667         ("fm1", hhmm, "0000"),
668         ("to1", hhmm, "2359"),
669         ("fm2", hhmm, "0000"),
670         ("to2", hhmm, "2359"),
671     )
672
673     def out_encode(self) -> bytes:
674         return (
675             pack("B", self.onoff)
676             + pack("B", self.week)
677             + bytes.fromhex(self.fm1)
678             + bytes.fromhex(self.to1)
679             + bytes.fromhex(self.fm2)
680             + bytes.fromhex(self.to2)
681         )
682
683
684 class RESTART_SHUTDOWN(GPS303Pkt):
685     PROTO = 0x48
686     OUT_KWARGS = (("flag", int, 0),)
687
688     def out_encode(self) -> bytes:
689         # 1 - restart
690         # 2 - shutdown
691         return pack("B", self.flag)
692
693
694 class DEVICE(GPS303Pkt):
695     PROTO = 0x49
696     OUT_KWARGS = (("flag", int, 0),)
697
698     # 0 - Stop looking for equipment
699     # 1 - Start looking for equipment
700     def out_encode(self) -> bytes:
701         return pack("B", self.flag)
702
703
704 class ALARM_CLOCK(GPS303Pkt):
705     PROTO = 0x50
706     OUT_KWARGS: Tuple[
707         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
708     ] = (
709         ("alarms", l3alarms, []),
710     )
711
712     def out_encode(self) -> bytes:
713         return b"".join(
714             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
715         )
716
717
718 class STOP_ALARM(GPS303Pkt):
719     PROTO = 0x56
720
721     def in_decode(self, length: int, payload: bytes) -> None:
722         self.flag = payload[0]
723
724
725 class SETUP(GPS303Pkt):
726     PROTO = 0x57
727     RESPOND = Respond.EXT
728     OUT_KWARGS = (
729         ("uploadintervalseconds", intx, 0x0300),
730         ("binaryswitch", intx, 0b00110001),
731         ("alarms", l3int, [0, 0, 0]),
732         ("dndtimeswitch", int, 0),
733         ("dndtimes", l3int, [0, 0, 0]),
734         ("gpstimeswitch", int, 0),
735         ("gpstimestart", int, 0),
736         ("gpstimestop", int, 0),
737         ("phonenumbers", l3str, ["", "", ""]),
738     )
739
740     def out_encode(self) -> bytes:
741         def pack3b(x: int) -> bytes:
742             return pack("!I", x)[1:]
743
744         return b"".join(
745             [
746                 pack("!H", self.uploadintervalseconds),
747                 pack("B", self.binaryswitch),
748             ]
749             + [pack3b(el) for el in self.alarms]
750             + [
751                 pack("B", self.dndtimeswitch),
752             ]
753             + [pack3b(el) for el in self.dndtimes]
754             + [
755                 pack("B", self.gpstimeswitch),
756                 pack("!H", self.gpstimestart),
757                 pack("!H", self.gpstimestop),
758             ]
759             + [b";".join([el.encode() for el in self.phonenumbers])]
760         )
761
762     def in_encode(self) -> bytes:
763         return b""
764
765
766 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
767     PROTO = 0x58
768
769
770 class RESTORE_PASSWORD(GPS303Pkt):
771     PROTO = 0x67
772
773
774 class WIFI_POSITIONING(_WIFI_POSITIONING):
775     PROTO = 0x69
776     RESPOND = Respond.EXT
777     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
778
779     def out_encode(self) -> bytes:
780         if self.latitude is None or self.longitude is None:
781             return b""
782         return "{:+#010.8g},{:+#010.8g}".format(
783             self.latitude, self.longitude
784         ).encode()
785
786     def out_decode(self, length: int, payload: bytes) -> None:
787         lat, lon = payload.decode().split(",")
788         self.latitude = float(lat)
789         self.longitude = float(lon)
790
791
792 class MANUAL_POSITIONING(GPS303Pkt):
793     PROTO = 0x80
794
795     def in_decode(self, length: int, payload: bytes) -> None:
796         self.flag = payload[0] if len(payload) > 0 else -1
797         self.reason = {
798             1: "Incorrect time",
799             2: "LBS less",
800             3: "WiFi less",
801             4: "LBS search > 3 times",
802             5: "Same LBS and WiFi data",
803             6: "LBS prohibited, WiFi absent",
804             7: "GPS spacing < 50 m",
805         }.get(self.flag, "Unknown")
806
807
808 class BATTERY_CHARGE(GPS303Pkt):
809     PROTO = 0x81
810
811
812 class CHARGER_CONNECTED(GPS303Pkt):
813     PROTO = 0x82
814
815
816 class CHARGER_DISCONNECTED(GPS303Pkt):
817     PROTO = 0x83
818
819
820 class VIBRATION_RECEIVED(GPS303Pkt):
821     PROTO = 0x94
822
823
824 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
825     PROTO = 0x98
826     RESPOND = Respond.EXT
827     OUT_KWARGS = (("interval", int, 10),)
828
829     def in_decode(self, length: int, payload: bytes) -> None:
830         self.interval = unpack("!H", payload[:2])
831
832     def out_encode(self) -> bytes:
833         return pack("!H", self.interval)
834
835
836 class SOS_ALARM(GPS303Pkt):
837     PROTO = 0x99
838
839
840 class UNKNOWN_B3(GPS303Pkt):
841     PROTO = 0xB3
842     IN_KWARGS = (("asciidata", str, ""),)
843
844     def in_decode(self, length: int, payload: bytes) -> None:
845         self.asciidata = payload.decode()
846
847
848 # Build dicts protocol number -> class and class name -> protocol number
849 CLASSES = {}
850 PROTOS = {}
851 if True:  # just to indent the code, sorry!
852     for cls in [
853         cls
854         for name, cls in globals().items()
855         if isclass(cls)
856         and issubclass(cls, GPS303Pkt)
857         and not name.startswith("_")
858     ]:
859         if hasattr(cls, "PROTO"):
860             CLASSES[cls.PROTO] = cls
861             PROTOS[cls.__name__] = cls.PROTO
862
863
864 def class_by_prefix(
865     prefix: str,
866 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
867     lst = [
868         (name, proto)
869         for name, proto in PROTOS.items()
870         if name.upper().startswith(prefix.upper())
871     ]
872     if len(lst) != 1:
873         return lst
874     _, proto = lst[0]
875     return CLASSES[proto]
876
877
878 def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
879     return (
880         PROTO_PREFIX
881         + ":"
882         + (
883             obj.__class__.__name__
884             if isinstance(obj, GPS303Pkt)
885             else obj.__name__
886         )
887     ).ljust(16, "\0")[:16]
888
889
890 def proto_by_name(name: str) -> int:
891     return PROTOS.get(name, -1)
892
893
894 def proto_of_message(packet: bytes) -> str:
895     return proto_name(CLASSES.get(packet[1], UNKNOWN))
896
897
898 def imei_from_packet(packet: bytes) -> Optional[str]:
899     if packet[1] == LOGIN.PROTO:
900         msg = parse_message(packet)
901         if isinstance(msg, LOGIN):
902             return msg.imei
903     return None
904
905
906 def is_goodbye_packet(packet: bytes) -> bool:
907     return packet[1] == HIBERNATION.PROTO
908
909
910 def inline_response(packet: bytes) -> Optional[bytes]:
911     proto = packet[1]
912     if proto in CLASSES:
913         cls = CLASSES[proto]
914         if cls.RESPOND is Respond.INL:
915             return cls.Out().packed
916     return None
917
918
919 def probe_buffer(buffer: bytes) -> bool:
920     framestart = buffer.find(b"xx")
921     if framestart < 0:
922         return False
923     if len(buffer) - framestart < 6:
924         return False
925     return True
926
927
928 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
929     """From a packet (without framing bytes) derive the XXX.In object"""
930     length, proto = unpack("BB", packet[:2])
931     payload = packet[2:]
932     if proto not in CLASSES:
933         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
934             f"Proto {proto} is unknown"
935         )
936     else:
937         try:
938             if is_incoming:
939                 return CLASSES[proto].In(length, payload)
940             else:
941                 return CLASSES[proto].Out(length, payload)
942         except (DecodeError, ValueError, IndexError) as e:
943             cause = e
944     if is_incoming:
945         retobj = UNKNOWN.In(length, payload)
946     else:
947         retobj = UNKNOWN.Out(length, payload)
948     retobj.PROTO = proto  # Override class attr with object attr
949     retobj.cause = cause
950     return retobj