]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
move stream parser/deframer to the protocol module
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from typing import (
23     Any,
24     Callable,
25     Dict,
26     List,
27     Optional,
28     Tuple,
29     Type,
30     TYPE_CHECKING,
31     Union,
32 )
33
34 __all__ = (
35     "GPS303Conn",
36     "StreamError",
37     "class_by_prefix",
38     "inline_response",
39     "parse_message",
40     "proto_by_name",
41     "DecodeError",
42     "Respond",
43     "GPS303Pkt",
44     "UNKNOWN",
45     "LOGIN",
46     "SUPERVISION",
47     "HEARTBEAT",
48     "GPS_POSITIONING",
49     "GPS_OFFLINE_POSITIONING",
50     "STATUS",
51     "HIBERNATION",
52     "RESET",
53     "WHITELIST_TOTAL",
54     "WIFI_OFFLINE_POSITIONING",
55     "TIME",
56     "PROHIBIT_LBS",
57     "GPS_LBS_SWITCH_TIMES",
58     "REMOTE_MONITOR_PHONE",
59     "SOS_PHONE",
60     "DAD_PHONE",
61     "MOM_PHONE",
62     "STOP_UPLOAD",
63     "GPS_OFF_PERIOD",
64     "DND_PERIOD",
65     "RESTART_SHUTDOWN",
66     "DEVICE",
67     "ALARM_CLOCK",
68     "STOP_ALARM",
69     "SETUP",
70     "SYNCHRONOUS_WHITELIST",
71     "RESTORE_PASSWORD",
72     "WIFI_POSITIONING",
73     "MANUAL_POSITIONING",
74     "BATTERY_CHARGE",
75     "CHARGER_CONNECTED",
76     "CHARGER_DISCONNECTED",
77     "VIBRATION_RECEIVED",
78     "POSITION_UPLOAD_INTERVAL",
79     "SOS_ALARM",
80     "UNKNOWN_B3",
81 )
82
83 ### Deframer ###
84
85 MAXBUFFER: int = 4096
86
87
88 class StreamError(Exception):
89     pass
90
91
92 class GPS303Conn:
93     def __init__(self) -> None:
94         self.buffer = b""
95
96     @staticmethod
97     def enframe(buffer: bytes) -> bytes:
98         return b"xx" + buffer + b"\r\n"
99
100     def recv(self, segment: bytes) -> List[bytes]:
101         when = time()
102         self.buffer += segment
103         if len(self.buffer) > MAXBUFFER:
104             # We are receiving junk. Let's drop it or we run out of memory.
105             self.buffer = b""
106             raise StreamError(
107                 f"More than {MAXBUFFER} unparseable data, dropping"
108             )
109         msgs = []
110         while True:
111             framestart = self.buffer.find(b"xx")
112             if framestart == -1:  # No frames, return whatever we have
113                 break
114             if framestart > 0:  # Should not happen, report
115                 self.buffer = self.buffer[framestart:]
116                 raise StreamError(
117                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
118                 )
119             # At this point, buffer starts with a packet
120             if len(self.buffer) < 6:  # no len and proto - cannot proceed
121                 break
122             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
123             frameend = 0
124             # Length field can legitimeely be much less than the
125             # length of the packet (e.g. WiFi positioning), but
126             # it _should not_ be greater. Still sometimes it is.
127             # Luckily, not by too much: by maybe two or three bytes?
128             # Do this embarrassing hack to avoid accidental match
129             # of some binary data in the packet against '\r\n'.
130             while True:
131                 frameend = self.buffer.find(b"\r\n", frameend + 1)
132                 if frameend == -1 or frameend >= (
133                     exp_end - 3
134                 ):  # Found realistic match or none
135                     break
136             if frameend == -1:  # Incomplete frame, return what we have
137                 break
138             packet = self.buffer[2:frameend]
139             self.buffer = self.buffer[frameend + 2 :]
140             if len(packet) < 2:  # frameend comes too early
141                 raise StreamError(f"Packet too short: {packet.hex()}")
142             msgs.append(packet)
143         return msgs
144
145     def close(self) -> bytes:
146         ret = self.buffer
147         self.buffer = b""
148         return ret
149
150
151 ### Parser/Constructor ###
152
153
154 class DecodeError(Exception):
155     def __init__(self, e: Exception, **kwargs: Any) -> None:
156         super().__init__(e)
157         for k, v in kwargs.items():
158             setattr(self, k, v)
159
160
161 def maybe(typ: type) -> Callable[[Any], Any]:
162     return lambda x: None if x is None else typ(x)
163
164
165 def intx(x: Union[str, int]) -> int:
166     if isinstance(x, str):
167         x = int(x, 0)
168     return x
169
170
171 def boolx(x: Union[str, bool]) -> bool:
172     if isinstance(x, str):
173         if x.upper() in ("ON", "TRUE", "1"):
174             return True
175         if x.upper() in ("OFF", "FALSE", "0"):
176             return False
177         raise ValueError(str(x) + " could not be parsed as a Boolean")
178     return x
179
180
181 def hhmm(x: str) -> str:
182     """Check for the string that represents hours and minutes"""
183     if not isinstance(x, str) or len(x) != 4:
184         raise ValueError(str(x) + " is not a four-character string")
185     hh = int(x[:2])
186     mm = int(x[2:])
187     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
188         raise ValueError(str(x) + " does not contain valid hours and minutes")
189     return x
190
191
192 def hhmmhhmm(x: str) -> str:
193     """Check for the string that represents hours and minutes twice"""
194     if not isinstance(x, str) or len(x) != 8:
195         raise ValueError(str(x) + " is not an eight-character string")
196     return hhmm(x[:4]) + hhmm(x[4:])
197
198
199 def l3str(x: Union[str, List[str]]) -> List[str]:
200     if isinstance(x, str):
201         lx = x.split(",")
202     else:
203         lx = x
204     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
205         raise ValueError(str(lx) + " is not a list of three strings")
206     return lx
207
208
209 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
210     def alrmspec(sub: str) -> Tuple[int, str]:
211         if len(sub) != 7:
212             raise ValueError(sub + " does not represent day and time")
213         return (
214             {
215                 "MON": 1,
216                 "TUE": 2,
217                 "WED": 3,
218                 "THU": 4,
219                 "FRI": 5,
220                 "SAT": 6,
221                 "SUN": 7,
222             }[sub[:3].upper()],
223             sub[3:],
224         )
225
226     if isinstance(x, str):
227         lx = [alrmspec(sub) for sub in x.split(",")]
228     else:
229         lx = x
230     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
231     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
232         raise ValueError(str(lx) + " is a wrong alarms specification")
233     return [(d, hhmm(tm)) for d, tm in lx]
234
235
236 def l3int(x: Union[str, List[int]]) -> List[int]:
237     if isinstance(x, str):
238         lx = [int(el) for el in x.split(",")]
239     else:
240         lx = x
241     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
242         raise ValueError(str(lx) + " is not a list of three integers")
243     return lx
244
245
246 class MetaPkt(type):
247     """
248     For each class corresponding to a message, automatically create
249     two nested classes `In` and `Out` that also inherit from their
250     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
251     copied to the `In` nested class under the name `KWARGS`, and
252     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
253     to the nested class `Out`. In addition, method `encode` is
254     defined in both classes equal to `in_encode()` and `out_encode()`
255     respectively.
256     """
257
258     if TYPE_CHECKING:
259
260         def __getattr__(self, name: str) -> Any:
261             pass
262
263         def __setattr__(self, name: str, value: Any) -> None:
264             pass
265
266     def __new__(
267         cls: Type["MetaPkt"],
268         name: str,
269         bases: Tuple[type, ...],
270         attrs: Dict[str, Any],
271     ) -> "MetaPkt":
272         newcls = super().__new__(cls, name, bases, attrs)
273         newcls.In = super().__new__(
274             cls,
275             name + ".In",
276             (newcls,) + bases,
277             {
278                 "KWARGS": newcls.IN_KWARGS,
279                 "decode": newcls.in_decode,
280                 "encode": newcls.in_encode,
281             },
282         )
283         newcls.Out = super().__new__(
284             cls,
285             name + ".Out",
286             (newcls,) + bases,
287             {
288                 "KWARGS": newcls.OUT_KWARGS,
289                 "decode": newcls.out_decode,
290                 "encode": newcls.out_encode,
291             },
292         )
293         return newcls
294
295
296 class Respond(Enum):
297     NON = 0  # Incoming, no response needed
298     INL = 1  # Birirectional, use `inline_response()`
299     EXT = 2  # Birirectional, use external responder
300
301
302 class GPS303Pkt(metaclass=MetaPkt):
303     RESPOND = Respond.NON  # Do not send anything back by default
304     PROTO: int
305     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
306     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
307     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
308     In: Type["GPS303Pkt"]
309     Out: Type["GPS303Pkt"]
310
311     if TYPE_CHECKING:
312
313         def __getattr__(self, name: str) -> Any:
314             pass
315
316         def __setattr__(self, name: str, value: Any) -> None:
317             pass
318
319     def __init__(self, *args: Any, **kwargs: Any):
320         """
321         Construct the object _either_ from (length, payload),
322         _or_ from the values of individual fields
323         """
324         assert not args or (len(args) == 2 and not kwargs)
325         if args:  # guaranteed to be two arguments at this point
326             self.length, self.payload = args
327             try:
328                 self.decode(self.length, self.payload)
329             except error as e:
330                 raise DecodeError(e, obj=self)
331         else:
332             for kw, typ, dfl in self.KWARGS:
333                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
334             if kwargs:
335                 raise ValueError(
336                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
337                 )
338
339     def __repr__(self) -> str:
340         return "{}({})".format(
341             self.__class__.__name__,
342             ", ".join(
343                 "{}={}".format(
344                     k,
345                     'bytes.fromhex("{}")'.format(v.hex())
346                     if isinstance(v, bytes)
347                     else v.__repr__(),
348                 )
349                 for k, v in self.__dict__.items()
350                 if not k.startswith("_")
351             ),
352         )
353
354     decode: Callable[["GPS303Pkt", int, bytes], None]
355
356     def in_decode(self, length: int, packet: bytes) -> None:
357         # Overridden in subclasses, otherwise do not decode payload
358         return
359
360     def out_decode(self, length: int, packet: bytes) -> None:
361         # Overridden in subclasses, otherwise do not decode payload
362         return
363
364     encode: Callable[["GPS303Pkt"], bytes]
365
366     def in_encode(self) -> bytes:
367         # Necessary to emulate terminal, which is not implemented
368         raise NotImplementedError(
369             self.__class__.__name__ + ".encode() not implemented"
370         )
371
372     def out_encode(self) -> bytes:
373         # Overridden in subclasses, otherwise make empty payload
374         return b""
375
376     @property
377     def packed(self) -> bytes:
378         payload = self.encode()
379         length = getattr(self, "length", len(payload) + 1)
380         return pack("BB", length, self.PROTO) + payload
381
382
383 class UNKNOWN(GPS303Pkt):
384     PROTO = 256  # > 255 is impossible in real packets
385
386
387 class LOGIN(GPS303Pkt):
388     PROTO = 0x01
389     RESPOND = Respond.INL
390     # Default response for ACK, can also respond with STOP_UPLOAD
391     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
392
393     def in_decode(self, length: int, payload: bytes) -> None:
394         self.imei = payload[:8].ljust(8, b"\0").hex()
395         self.ver = payload[8]
396
397     def in_encode(self) -> bytes:
398         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
399             "B", self.ver
400         )
401
402
403 class SUPERVISION(GPS303Pkt):
404     PROTO = 0x05
405     OUT_KWARGS = (("status", int, 1),)
406
407     def out_encode(self) -> bytes:
408         # 1: The device automatically answers Pickup effect
409         # 2: Automatically Answering Two-way Calls
410         # 3: Ring manually answer the two-way call
411         return pack("B", self.status)
412
413
414 class HEARTBEAT(GPS303Pkt):
415     PROTO = 0x08
416     RESPOND = Respond.INL
417
418
419 class _GPS_POSITIONING(GPS303Pkt):
420     RESPOND = Respond.INL
421
422     def in_decode(self, length: int, payload: bytes) -> None:
423         self.dtime = payload[:6]
424         if self.dtime == b"\0\0\0\0\0\0":
425             self.devtime = None
426         else:
427             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
428             self.devtime = datetime(
429                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
430             )
431         self.gps_data_length = payload[6] >> 4
432         self.gps_nb_sat = payload[6] & 0x0F
433         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
434         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
435         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
436         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
437         self.heading = flags & 0b0000001111111111  # bits 6 - last
438         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
439         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
440         self.speed = speed
441         self.flags = flags
442
443     def out_encode(self) -> bytes:
444         tup = datetime.utcnow().timetuple()
445         ttup = (tup[0] % 100,) + tup[1:6]
446         return pack("BBBBBB", *ttup)
447
448
449 class GPS_POSITIONING(_GPS_POSITIONING):
450     PROTO = 0x10
451
452
453 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
454     PROTO = 0x11
455
456
457 class STATUS(GPS303Pkt):
458     PROTO = 0x13
459     RESPOND = Respond.EXT
460     IN_KWARGS = (
461         ("batt", int, 100),
462         ("ver", int, 0),
463         ("timezone", int, 0),
464         ("intvl", int, 0),
465         ("signal", maybe(int), None),
466     )
467     OUT_KWARGS = (("upload_interval", int, 25),)
468
469     def in_decode(self, length: int, payload: bytes) -> None:
470         self.batt, self.ver, self.timezone, self.intvl = unpack(
471             "BBBB", payload[:4]
472         )
473         if len(payload) > 4:
474             self.signal: Optional[int] = payload[4]
475         else:
476             self.signal = None
477
478     def in_encode(self) -> bytes:
479         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
480             b"" if self.signal is None else pack("B", self.signal)
481         )
482
483     def out_encode(self) -> bytes:  # Set interval in minutes
484         return pack("B", self.upload_interval)
485
486
487 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
488     PROTO = 0x14
489
490     def in_encode(self) -> bytes:
491         return b""
492
493
494 class RESET(GPS303Pkt):
495     # Device sends when it got reset SMS
496     # Server can send to initiate factory reset
497     PROTO = 0x15
498
499
500 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
501     PROTO = 0x16
502     OUT_KWARGS = (("number", int, 3),)
503
504     def out_encode(self) -> bytes:  # Number of whitelist entries
505         return pack("B", self.number)
506
507
508 class _WIFI_POSITIONING(GPS303Pkt):
509     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
510         # IN_KWARGS = (
511         ("dtime", bytes, b"\0\0\0\0\0\0"),
512         ("wifi_aps", list, []),
513         ("mcc", int, 0),
514         ("mnc", int, 0),
515         ("gsm_cells", list, []),
516     )
517
518     def in_decode(self, length: int, payload: bytes) -> None:
519         self.dtime = payload[:6]
520         if self.dtime == b"\0\0\0\0\0\0":
521             self.devtime = None
522         else:
523             self.devtime = datetime.strptime(
524                 self.dtime.hex(), "%y%m%d%H%M%S"
525             ).astimezone(tz=timezone.utc)
526         self.wifi_aps = []
527         for i in range(self.length):  # length has special meaning here
528             slice = payload[6 + i * 7 : 13 + i * 7]
529             self.wifi_aps.append(
530                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
531             )
532         gsm_slice = payload[6 + self.length * 7 :]
533         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
534         self.gsm_cells = []
535         for i in range(ncells):
536             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
537             locac, cellid, sigstr = unpack(
538                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
539             )
540             self.gsm_cells.append((locac, cellid, -sigstr))
541
542     def in_encode(self) -> bytes:
543         self.length = len(self.wifi_aps)
544         return b"".join(
545             [
546                 self.dtime,
547                 b"".join(
548                     [
549                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
550                         + pack("B", -sigstr)
551                         for mac, sigstr in self.wifi_aps
552                     ]
553                 ),
554                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
555                 b"".join(
556                     [
557                         pack("!HHB", locac, cellid, -sigstr)
558                         for locac, cellid, sigstr in self.gsm_cells
559                     ]
560                 ),
561             ]
562         )
563
564
565 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
566     PROTO = 0x17
567     RESPOND = Respond.INL
568
569     def out_encode(self) -> bytes:
570         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
571
572
573 class TIME(GPS303Pkt):
574     PROTO = 0x30
575     RESPOND = Respond.INL
576
577     def out_encode(self) -> bytes:
578         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
579
580
581 class PROHIBIT_LBS(GPS303Pkt):
582     PROTO = 0x33
583     OUT_KWARGS = (("status", int, 1),)
584
585     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
586         return pack("B", self.status)
587
588
589 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
590     PROTO = 0x34
591
592     OUT_KWARGS = (
593         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
594         ("gps_interval_set", boolx, False),
595         ("gps_interval", hhmmhhmm, "00000000"),
596         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
597         ("boot_time_set", boolx, False),
598         ("boot_time", hhmm, "0000"),
599         ("shut_time_set", boolx, False),
600         ("shut_time", hhmm, "0000"),
601     )
602
603     def out_encode(self) -> bytes:
604         return (
605             pack("B", self.gps_off)
606             + pack("B", self.gps_interval_set)
607             + bytes.fromhex(self.gps_interval)
608             + pack("B", self.lbs_off)
609             + pack("B", self.boot_time_set)
610             + bytes.fromhex(self.boot_time)
611             + pack("B", self.shut_time_set)
612             + bytes.fromhex(self.shut_time)
613         )
614
615
616 class _SET_PHONE(GPS303Pkt):
617     OUT_KWARGS = (("phone", str, ""),)
618
619     def out_encode(self) -> bytes:
620         self.phone: str
621         return self.phone.encode("")
622
623
624 class REMOTE_MONITOR_PHONE(_SET_PHONE):
625     PROTO = 0x40
626
627
628 class SOS_PHONE(_SET_PHONE):
629     PROTO = 0x41
630
631
632 class DAD_PHONE(_SET_PHONE):
633     PROTO = 0x42
634
635
636 class MOM_PHONE(_SET_PHONE):
637     PROTO = 0x43
638
639
640 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
641     PROTO = 0x44
642
643
644 class GPS_OFF_PERIOD(GPS303Pkt):
645     PROTO = 0x46
646     OUT_KWARGS = (
647         ("onoff", int, 0),
648         ("fm", hhmm, "0000"),
649         ("to", hhmm, "2359"),
650     )
651
652     def out_encode(self) -> bytes:
653         return (
654             pack("B", self.onoff)
655             + bytes.fromhex(self.fm)
656             + bytes.fromhex(self.to)
657         )
658
659
660 class DND_PERIOD(GPS303Pkt):
661     PROTO = 0x47
662     OUT_KWARGS = (
663         ("onoff", int, 0),
664         ("week", int, 3),
665         ("fm1", hhmm, "0000"),
666         ("to1", hhmm, "2359"),
667         ("fm2", hhmm, "0000"),
668         ("to2", hhmm, "2359"),
669     )
670
671     def out_encode(self) -> bytes:
672         return (
673             pack("B", self.onoff)
674             + pack("B", self.week)
675             + bytes.fromhex(self.fm1)
676             + bytes.fromhex(self.to1)
677             + bytes.fromhex(self.fm2)
678             + bytes.fromhex(self.to2)
679         )
680
681
682 class RESTART_SHUTDOWN(GPS303Pkt):
683     PROTO = 0x48
684     OUT_KWARGS = (("flag", int, 0),)
685
686     def out_encode(self) -> bytes:
687         # 1 - restart
688         # 2 - shutdown
689         return pack("B", self.flag)
690
691
692 class DEVICE(GPS303Pkt):
693     PROTO = 0x49
694     OUT_KWARGS = (("flag", int, 0),)
695
696     # 0 - Stop looking for equipment
697     # 1 - Start looking for equipment
698     def out_encode(self) -> bytes:
699         return pack("B", self.flag)
700
701
702 class ALARM_CLOCK(GPS303Pkt):
703     PROTO = 0x50
704     OUT_KWARGS: Tuple[
705         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
706     ] = (
707         ("alarms", l3alarms, []),
708     )
709
710     def out_encode(self) -> bytes:
711         return b"".join(
712             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
713         )
714
715
716 class STOP_ALARM(GPS303Pkt):
717     PROTO = 0x56
718
719     def in_decode(self, length: int, payload: bytes) -> None:
720         self.flag = payload[0]
721
722
723 class SETUP(GPS303Pkt):
724     PROTO = 0x57
725     RESPOND = Respond.EXT
726     OUT_KWARGS = (
727         ("uploadintervalseconds", intx, 0x0300),
728         ("binaryswitch", intx, 0b00110001),
729         ("alarms", l3int, [0, 0, 0]),
730         ("dndtimeswitch", int, 0),
731         ("dndtimes", l3int, [0, 0, 0]),
732         ("gpstimeswitch", int, 0),
733         ("gpstimestart", int, 0),
734         ("gpstimestop", int, 0),
735         ("phonenumbers", l3str, ["", "", ""]),
736     )
737
738     def out_encode(self) -> bytes:
739         def pack3b(x: int) -> bytes:
740             return pack("!I", x)[1:]
741
742         return b"".join(
743             [
744                 pack("!H", self.uploadintervalseconds),
745                 pack("B", self.binaryswitch),
746             ]
747             + [pack3b(el) for el in self.alarms]
748             + [
749                 pack("B", self.dndtimeswitch),
750             ]
751             + [pack3b(el) for el in self.dndtimes]
752             + [
753                 pack("B", self.gpstimeswitch),
754                 pack("!H", self.gpstimestart),
755                 pack("!H", self.gpstimestop),
756             ]
757             + [b";".join([el.encode() for el in self.phonenumbers])]
758         )
759
760     def in_encode(self) -> bytes:
761         return b""
762
763
764 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
765     PROTO = 0x58
766
767
768 class RESTORE_PASSWORD(GPS303Pkt):
769     PROTO = 0x67
770
771
772 class WIFI_POSITIONING(_WIFI_POSITIONING):
773     PROTO = 0x69
774     RESPOND = Respond.EXT
775     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
776
777     def out_encode(self) -> bytes:
778         if self.latitude is None or self.longitude is None:
779             return b""
780         return "{:+#010.8g},{:+#010.8g}".format(
781             self.latitude, self.longitude
782         ).encode()
783
784     def out_decode(self, length: int, payload: bytes) -> None:
785         lat, lon = payload.decode().split(",")
786         self.latitude = float(lat)
787         self.longitude = float(lon)
788
789
790 class MANUAL_POSITIONING(GPS303Pkt):
791     PROTO = 0x80
792
793     def in_decode(self, length: int, payload: bytes) -> None:
794         self.flag = payload[0] if len(payload) > 0 else -1
795         self.reason = {
796             1: "Incorrect time",
797             2: "LBS less",
798             3: "WiFi less",
799             4: "LBS search > 3 times",
800             5: "Same LBS and WiFi data",
801             6: "LBS prohibited, WiFi absent",
802             7: "GPS spacing < 50 m",
803         }.get(self.flag, "Unknown")
804
805
806 class BATTERY_CHARGE(GPS303Pkt):
807     PROTO = 0x81
808
809
810 class CHARGER_CONNECTED(GPS303Pkt):
811     PROTO = 0x82
812
813
814 class CHARGER_DISCONNECTED(GPS303Pkt):
815     PROTO = 0x83
816
817
818 class VIBRATION_RECEIVED(GPS303Pkt):
819     PROTO = 0x94
820
821
822 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
823     PROTO = 0x98
824     RESPOND = Respond.EXT
825     OUT_KWARGS = (("interval", int, 10),)
826
827     def in_decode(self, length: int, payload: bytes) -> None:
828         self.interval = unpack("!H", payload[:2])
829
830     def out_encode(self) -> bytes:
831         return pack("!H", self.interval)
832
833
834 class SOS_ALARM(GPS303Pkt):
835     PROTO = 0x99
836
837
838 class UNKNOWN_B3(GPS303Pkt):
839     PROTO = 0xB3
840     IN_KWARGS = (("asciidata", str, ""),)
841
842     def in_decode(self, length: int, payload: bytes) -> None:
843         self.asciidata = payload.decode()
844
845
846 # Build dicts protocol number -> class and class name -> protocol number
847 CLASSES = {}
848 PROTOS = {}
849 if True:  # just to indent the code, sorry!
850     for cls in [
851         cls
852         for name, cls in globals().items()
853         if isclass(cls)
854         and issubclass(cls, GPS303Pkt)
855         and not name.startswith("_")
856     ]:
857         if hasattr(cls, "PROTO"):
858             CLASSES[cls.PROTO] = cls
859             PROTOS[cls.__name__] = cls.PROTO
860
861
862 def class_by_prefix(
863     prefix: str,
864 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
865     lst = [
866         (name, proto)
867         for name, proto in PROTOS.items()
868         if name.upper().startswith(prefix.upper())
869     ]
870     if len(lst) != 1:
871         return lst
872     _, proto = lst[0]
873     return CLASSES[proto]
874
875
876 def proto_by_name(name: str) -> int:
877     return PROTOS.get(name, -1)
878
879
880 def proto_of_message(packet: bytes) -> int:
881     return packet[1]
882
883
884 def inline_response(packet: bytes) -> Optional[bytes]:
885     proto = proto_of_message(packet)
886     if proto in CLASSES:
887         cls = CLASSES[proto]
888         if cls.RESPOND is Respond.INL:
889             return cls.Out().packed
890     return None
891
892
893 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
894     """From a packet (without framing bytes) derive the XXX.In object"""
895     length, proto = unpack("BB", packet[:2])
896     payload = packet[2:]
897     if proto not in CLASSES:
898         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
899             f"Proto {proto} is unknown"
900         )
901     else:
902         try:
903             if is_incoming:
904                 return CLASSES[proto].In(length, payload)
905             else:
906                 return CLASSES[proto].Out(length, payload)
907         except (DecodeError, ValueError, IndexError) as e:
908             cause = e
909     if is_incoming:
910         retobj = UNKNOWN.In(length, payload)
911     else:
912         retobj = UNKNOWN.Out(length, payload)
913     retobj.PROTO = proto  # Override class attr with object attr
914     retobj.cause = cause
915     return retobj