]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
collector: get rid of more protocol specifics
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from typing import (
23     Any,
24     Callable,
25     Dict,
26     List,
27     Optional,
28     Tuple,
29     Type,
30     TYPE_CHECKING,
31     Union,
32 )
33
34 __all__ = (
35     "GPS303Conn",
36     "class_by_prefix",
37     "inline_response",
38     "parse_message",
39     "proto_by_name",
40     "DecodeError",
41     "Respond",
42     "GPS303Pkt",
43     "UNKNOWN",
44     "LOGIN",
45     "SUPERVISION",
46     "HEARTBEAT",
47     "GPS_POSITIONING",
48     "GPS_OFFLINE_POSITIONING",
49     "STATUS",
50     "HIBERNATION",
51     "RESET",
52     "WHITELIST_TOTAL",
53     "WIFI_OFFLINE_POSITIONING",
54     "TIME",
55     "PROHIBIT_LBS",
56     "GPS_LBS_SWITCH_TIMES",
57     "REMOTE_MONITOR_PHONE",
58     "SOS_PHONE",
59     "DAD_PHONE",
60     "MOM_PHONE",
61     "STOP_UPLOAD",
62     "GPS_OFF_PERIOD",
63     "DND_PERIOD",
64     "RESTART_SHUTDOWN",
65     "DEVICE",
66     "ALARM_CLOCK",
67     "STOP_ALARM",
68     "SETUP",
69     "SYNCHRONOUS_WHITELIST",
70     "RESTORE_PASSWORD",
71     "WIFI_POSITIONING",
72     "MANUAL_POSITIONING",
73     "BATTERY_CHARGE",
74     "CHARGER_CONNECTED",
75     "CHARGER_DISCONNECTED",
76     "VIBRATION_RECEIVED",
77     "POSITION_UPLOAD_INTERVAL",
78     "SOS_ALARM",
79     "UNKNOWN_B3",
80 )
81
82 ### Deframer ###
83
84 MAXBUFFER: int = 4096
85
86
87 class GPS303Conn:
88     def __init__(self) -> None:
89         self.buffer = b""
90
91     @staticmethod
92     def enframe(buffer: bytes) -> bytes:
93         return b"xx" + buffer + b"\r\n"
94
95     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
96         """
97         Process next segment of the stream. Return successfully deframed
98         packets as `bytes` and error messages as `str`.
99         """
100         when = time()
101         self.buffer += segment
102         if len(self.buffer) > MAXBUFFER:
103             # We are receiving junk. Let's drop it or we run out of memory.
104             self.buffer = b""
105             return [f"More than {MAXBUFFER} unparseable data, dropping"]
106         msgs: List[Union[bytes, str]] = []
107         while True:
108             framestart = self.buffer.find(b"xx")
109             if framestart == -1:  # No frames, return whatever we have
110                 break
111             if framestart > 0:  # Should not happen, report
112                 msgs.append(
113                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
114                 )
115                 self.buffer = self.buffer[framestart:]
116             # At this point, buffer starts with a packet
117             if len(self.buffer) < 6:  # no len and proto - cannot proceed
118                 break
119             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
120             frameend = 0
121             # Length field can legitimeely be much less than the
122             # length of the packet (e.g. WiFi positioning), but
123             # it _should not_ be greater. Still sometimes it is.
124             # Luckily, not by too much: by maybe two or three bytes?
125             # Do this embarrassing hack to avoid accidental match
126             # of some binary data in the packet against '\r\n'.
127             while True:
128                 frameend = self.buffer.find(b"\r\n", frameend + 1)
129                 if frameend == -1 or frameend >= (
130                     exp_end - 3
131                 ):  # Found realistic match or none
132                     break
133             if frameend == -1:  # Incomplete frame, return what we have
134                 break
135             packet = self.buffer[2:frameend]
136             self.buffer = self.buffer[frameend + 2 :]
137             if len(packet) < 2:  # frameend comes too early
138                 msgs.append(f"Packet too short: {packet.hex()}")
139             else:
140                 msgs.append(packet)
141         return msgs
142
143     def close(self) -> bytes:
144         ret = self.buffer
145         self.buffer = b""
146         return ret
147
148
149 ### Parser/Constructor ###
150
151
152 class DecodeError(Exception):
153     def __init__(self, e: Exception, **kwargs: Any) -> None:
154         super().__init__(e)
155         for k, v in kwargs.items():
156             setattr(self, k, v)
157
158
159 def maybe(typ: type) -> Callable[[Any], Any]:
160     return lambda x: None if x is None else typ(x)
161
162
163 def intx(x: Union[str, int]) -> int:
164     if isinstance(x, str):
165         x = int(x, 0)
166     return x
167
168
169 def boolx(x: Union[str, bool]) -> bool:
170     if isinstance(x, str):
171         if x.upper() in ("ON", "TRUE", "1"):
172             return True
173         if x.upper() in ("OFF", "FALSE", "0"):
174             return False
175         raise ValueError(str(x) + " could not be parsed as a Boolean")
176     return x
177
178
179 def hhmm(x: str) -> str:
180     """Check for the string that represents hours and minutes"""
181     if not isinstance(x, str) or len(x) != 4:
182         raise ValueError(str(x) + " is not a four-character string")
183     hh = int(x[:2])
184     mm = int(x[2:])
185     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
186         raise ValueError(str(x) + " does not contain valid hours and minutes")
187     return x
188
189
190 def hhmmhhmm(x: str) -> str:
191     """Check for the string that represents hours and minutes twice"""
192     if not isinstance(x, str) or len(x) != 8:
193         raise ValueError(str(x) + " is not an eight-character string")
194     return hhmm(x[:4]) + hhmm(x[4:])
195
196
197 def l3str(x: Union[str, List[str]]) -> List[str]:
198     if isinstance(x, str):
199         lx = x.split(",")
200     else:
201         lx = x
202     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
203         raise ValueError(str(lx) + " is not a list of three strings")
204     return lx
205
206
207 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
208     def alrmspec(sub: str) -> Tuple[int, str]:
209         if len(sub) != 7:
210             raise ValueError(sub + " does not represent day and time")
211         return (
212             {
213                 "MON": 1,
214                 "TUE": 2,
215                 "WED": 3,
216                 "THU": 4,
217                 "FRI": 5,
218                 "SAT": 6,
219                 "SUN": 7,
220             }[sub[:3].upper()],
221             sub[3:],
222         )
223
224     if isinstance(x, str):
225         lx = [alrmspec(sub) for sub in x.split(",")]
226     else:
227         lx = x
228     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
229     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
230         raise ValueError(str(lx) + " is a wrong alarms specification")
231     return [(d, hhmm(tm)) for d, tm in lx]
232
233
234 def l3int(x: Union[str, List[int]]) -> List[int]:
235     if isinstance(x, str):
236         lx = [int(el) for el in x.split(",")]
237     else:
238         lx = x
239     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
240         raise ValueError(str(lx) + " is not a list of three integers")
241     return lx
242
243
244 class MetaPkt(type):
245     """
246     For each class corresponding to a message, automatically create
247     two nested classes `In` and `Out` that also inherit from their
248     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
249     copied to the `In` nested class under the name `KWARGS`, and
250     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
251     to the nested class `Out`. In addition, method `encode` is
252     defined in both classes equal to `in_encode()` and `out_encode()`
253     respectively.
254     """
255
256     if TYPE_CHECKING:
257
258         def __getattr__(self, name: str) -> Any:
259             pass
260
261         def __setattr__(self, name: str, value: Any) -> None:
262             pass
263
264     def __new__(
265         cls: Type["MetaPkt"],
266         name: str,
267         bases: Tuple[type, ...],
268         attrs: Dict[str, Any],
269     ) -> "MetaPkt":
270         newcls = super().__new__(cls, name, bases, attrs)
271         newcls.In = super().__new__(
272             cls,
273             name + ".In",
274             (newcls,) + bases,
275             {
276                 "KWARGS": newcls.IN_KWARGS,
277                 "decode": newcls.in_decode,
278                 "encode": newcls.in_encode,
279             },
280         )
281         newcls.Out = super().__new__(
282             cls,
283             name + ".Out",
284             (newcls,) + bases,
285             {
286                 "KWARGS": newcls.OUT_KWARGS,
287                 "decode": newcls.out_decode,
288                 "encode": newcls.out_encode,
289             },
290         )
291         return newcls
292
293
294 class Respond(Enum):
295     NON = 0  # Incoming, no response needed
296     INL = 1  # Birirectional, use `inline_response()`
297     EXT = 2  # Birirectional, use external responder
298
299
300 class GPS303Pkt(metaclass=MetaPkt):
301     RESPOND = Respond.NON  # Do not send anything back by default
302     PROTO: int
303     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
304     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
305     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
306     In: Type["GPS303Pkt"]
307     Out: Type["GPS303Pkt"]
308
309     if TYPE_CHECKING:
310
311         def __getattr__(self, name: str) -> Any:
312             pass
313
314         def __setattr__(self, name: str, value: Any) -> None:
315             pass
316
317     def __init__(self, *args: Any, **kwargs: Any):
318         """
319         Construct the object _either_ from (length, payload),
320         _or_ from the values of individual fields
321         """
322         assert not args or (len(args) == 2 and not kwargs)
323         if args:  # guaranteed to be two arguments at this point
324             self.length, self.payload = args
325             try:
326                 self.decode(self.length, self.payload)
327             except error as e:
328                 raise DecodeError(e, obj=self)
329         else:
330             for kw, typ, dfl in self.KWARGS:
331                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
332             if kwargs:
333                 raise ValueError(
334                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
335                 )
336
337     def __repr__(self) -> str:
338         return "{}({})".format(
339             self.__class__.__name__,
340             ", ".join(
341                 "{}={}".format(
342                     k,
343                     'bytes.fromhex("{}")'.format(v.hex())
344                     if isinstance(v, bytes)
345                     else v.__repr__(),
346                 )
347                 for k, v in self.__dict__.items()
348                 if not k.startswith("_")
349             ),
350         )
351
352     decode: Callable[["GPS303Pkt", int, bytes], None]
353
354     def in_decode(self, length: int, packet: bytes) -> None:
355         # Overridden in subclasses, otherwise do not decode payload
356         return
357
358     def out_decode(self, length: int, packet: bytes) -> None:
359         # Overridden in subclasses, otherwise do not decode payload
360         return
361
362     encode: Callable[["GPS303Pkt"], bytes]
363
364     def in_encode(self) -> bytes:
365         # Necessary to emulate terminal, which is not implemented
366         raise NotImplementedError(
367             self.__class__.__name__ + ".encode() not implemented"
368         )
369
370     def out_encode(self) -> bytes:
371         # Overridden in subclasses, otherwise make empty payload
372         return b""
373
374     @property
375     def packed(self) -> bytes:
376         payload = self.encode()
377         length = getattr(self, "length", len(payload) + 1)
378         return pack("BB", length, self.PROTO) + payload
379
380
381 class UNKNOWN(GPS303Pkt):
382     PROTO = 256  # > 255 is impossible in real packets
383
384
385 class LOGIN(GPS303Pkt):
386     PROTO = 0x01
387     RESPOND = Respond.INL
388     # Default response for ACK, can also respond with STOP_UPLOAD
389     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
390
391     def in_decode(self, length: int, payload: bytes) -> None:
392         self.imei = payload[:8].ljust(8, b"\0").hex()
393         self.ver = payload[8]
394
395     def in_encode(self) -> bytes:
396         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
397             "B", self.ver
398         )
399
400
401 class SUPERVISION(GPS303Pkt):
402     PROTO = 0x05
403     OUT_KWARGS = (("status", int, 1),)
404
405     def out_encode(self) -> bytes:
406         # 1: The device automatically answers Pickup effect
407         # 2: Automatically Answering Two-way Calls
408         # 3: Ring manually answer the two-way call
409         return pack("B", self.status)
410
411
412 class HEARTBEAT(GPS303Pkt):
413     PROTO = 0x08
414     RESPOND = Respond.INL
415
416
417 class _GPS_POSITIONING(GPS303Pkt):
418     RESPOND = Respond.INL
419
420     def in_decode(self, length: int, payload: bytes) -> None:
421         self.dtime = payload[:6]
422         if self.dtime == b"\0\0\0\0\0\0":
423             self.devtime = None
424         else:
425             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
426             self.devtime = datetime(
427                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
428             )
429         self.gps_data_length = payload[6] >> 4
430         self.gps_nb_sat = payload[6] & 0x0F
431         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
432         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
433         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
434         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
435         self.heading = flags & 0b0000001111111111  # bits 6 - last
436         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
437         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
438         self.speed = speed
439         self.flags = flags
440
441     def out_encode(self) -> bytes:
442         tup = datetime.utcnow().timetuple()
443         ttup = (tup[0] % 100,) + tup[1:6]
444         return pack("BBBBBB", *ttup)
445
446
447 class GPS_POSITIONING(_GPS_POSITIONING):
448     PROTO = 0x10
449
450
451 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
452     PROTO = 0x11
453
454
455 class STATUS(GPS303Pkt):
456     PROTO = 0x13
457     RESPOND = Respond.EXT
458     IN_KWARGS = (
459         ("batt", int, 100),
460         ("ver", int, 0),
461         ("timezone", int, 0),
462         ("intvl", int, 0),
463         ("signal", maybe(int), None),
464     )
465     OUT_KWARGS = (("upload_interval", int, 25),)
466
467     def in_decode(self, length: int, payload: bytes) -> None:
468         self.batt, self.ver, self.timezone, self.intvl = unpack(
469             "BBBB", payload[:4]
470         )
471         if len(payload) > 4:
472             self.signal: Optional[int] = payload[4]
473         else:
474             self.signal = None
475
476     def in_encode(self) -> bytes:
477         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
478             b"" if self.signal is None else pack("B", self.signal)
479         )
480
481     def out_encode(self) -> bytes:  # Set interval in minutes
482         return pack("B", self.upload_interval)
483
484
485 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
486     PROTO = 0x14
487
488     def in_encode(self) -> bytes:
489         return b""
490
491
492 class RESET(GPS303Pkt):
493     # Device sends when it got reset SMS
494     # Server can send to initiate factory reset
495     PROTO = 0x15
496
497
498 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
499     PROTO = 0x16
500     OUT_KWARGS = (("number", int, 3),)
501
502     def out_encode(self) -> bytes:  # Number of whitelist entries
503         return pack("B", self.number)
504
505
506 class _WIFI_POSITIONING(GPS303Pkt):
507     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
508         # IN_KWARGS = (
509         ("dtime", bytes, b"\0\0\0\0\0\0"),
510         ("wifi_aps", list, []),
511         ("mcc", int, 0),
512         ("mnc", int, 0),
513         ("gsm_cells", list, []),
514     )
515
516     def in_decode(self, length: int, payload: bytes) -> None:
517         self.dtime = payload[:6]
518         if self.dtime == b"\0\0\0\0\0\0":
519             self.devtime = None
520         else:
521             self.devtime = datetime.strptime(
522                 self.dtime.hex(), "%y%m%d%H%M%S"
523             ).astimezone(tz=timezone.utc)
524         self.wifi_aps = []
525         for i in range(self.length):  # length has special meaning here
526             slice = payload[6 + i * 7 : 13 + i * 7]
527             self.wifi_aps.append(
528                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
529             )
530         gsm_slice = payload[6 + self.length * 7 :]
531         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
532         self.gsm_cells = []
533         for i in range(ncells):
534             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
535             locac, cellid, sigstr = unpack(
536                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
537             )
538             self.gsm_cells.append((locac, cellid, -sigstr))
539
540     def in_encode(self) -> bytes:
541         self.length = len(self.wifi_aps)
542         return b"".join(
543             [
544                 self.dtime,
545                 b"".join(
546                     [
547                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
548                         + pack("B", -sigstr)
549                         for mac, sigstr in self.wifi_aps
550                     ]
551                 ),
552                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
553                 b"".join(
554                     [
555                         pack("!HHB", locac, cellid, -sigstr)
556                         for locac, cellid, sigstr in self.gsm_cells
557                     ]
558                 ),
559             ]
560         )
561
562
563 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
564     PROTO = 0x17
565     RESPOND = Respond.INL
566
567     def out_encode(self) -> bytes:
568         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
569
570
571 class TIME(GPS303Pkt):
572     PROTO = 0x30
573     RESPOND = Respond.INL
574
575     def out_encode(self) -> bytes:
576         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
577
578
579 class PROHIBIT_LBS(GPS303Pkt):
580     PROTO = 0x33
581     OUT_KWARGS = (("status", int, 1),)
582
583     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
584         return pack("B", self.status)
585
586
587 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
588     PROTO = 0x34
589
590     OUT_KWARGS = (
591         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
592         ("gps_interval_set", boolx, False),
593         ("gps_interval", hhmmhhmm, "00000000"),
594         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
595         ("boot_time_set", boolx, False),
596         ("boot_time", hhmm, "0000"),
597         ("shut_time_set", boolx, False),
598         ("shut_time", hhmm, "0000"),
599     )
600
601     def out_encode(self) -> bytes:
602         return (
603             pack("B", self.gps_off)
604             + pack("B", self.gps_interval_set)
605             + bytes.fromhex(self.gps_interval)
606             + pack("B", self.lbs_off)
607             + pack("B", self.boot_time_set)
608             + bytes.fromhex(self.boot_time)
609             + pack("B", self.shut_time_set)
610             + bytes.fromhex(self.shut_time)
611         )
612
613
614 class _SET_PHONE(GPS303Pkt):
615     OUT_KWARGS = (("phone", str, ""),)
616
617     def out_encode(self) -> bytes:
618         self.phone: str
619         return self.phone.encode("")
620
621
622 class REMOTE_MONITOR_PHONE(_SET_PHONE):
623     PROTO = 0x40
624
625
626 class SOS_PHONE(_SET_PHONE):
627     PROTO = 0x41
628
629
630 class DAD_PHONE(_SET_PHONE):
631     PROTO = 0x42
632
633
634 class MOM_PHONE(_SET_PHONE):
635     PROTO = 0x43
636
637
638 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
639     PROTO = 0x44
640
641
642 class GPS_OFF_PERIOD(GPS303Pkt):
643     PROTO = 0x46
644     OUT_KWARGS = (
645         ("onoff", int, 0),
646         ("fm", hhmm, "0000"),
647         ("to", hhmm, "2359"),
648     )
649
650     def out_encode(self) -> bytes:
651         return (
652             pack("B", self.onoff)
653             + bytes.fromhex(self.fm)
654             + bytes.fromhex(self.to)
655         )
656
657
658 class DND_PERIOD(GPS303Pkt):
659     PROTO = 0x47
660     OUT_KWARGS = (
661         ("onoff", int, 0),
662         ("week", int, 3),
663         ("fm1", hhmm, "0000"),
664         ("to1", hhmm, "2359"),
665         ("fm2", hhmm, "0000"),
666         ("to2", hhmm, "2359"),
667     )
668
669     def out_encode(self) -> bytes:
670         return (
671             pack("B", self.onoff)
672             + pack("B", self.week)
673             + bytes.fromhex(self.fm1)
674             + bytes.fromhex(self.to1)
675             + bytes.fromhex(self.fm2)
676             + bytes.fromhex(self.to2)
677         )
678
679
680 class RESTART_SHUTDOWN(GPS303Pkt):
681     PROTO = 0x48
682     OUT_KWARGS = (("flag", int, 0),)
683
684     def out_encode(self) -> bytes:
685         # 1 - restart
686         # 2 - shutdown
687         return pack("B", self.flag)
688
689
690 class DEVICE(GPS303Pkt):
691     PROTO = 0x49
692     OUT_KWARGS = (("flag", int, 0),)
693
694     # 0 - Stop looking for equipment
695     # 1 - Start looking for equipment
696     def out_encode(self) -> bytes:
697         return pack("B", self.flag)
698
699
700 class ALARM_CLOCK(GPS303Pkt):
701     PROTO = 0x50
702     OUT_KWARGS: Tuple[
703         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
704     ] = (
705         ("alarms", l3alarms, []),
706     )
707
708     def out_encode(self) -> bytes:
709         return b"".join(
710             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
711         )
712
713
714 class STOP_ALARM(GPS303Pkt):
715     PROTO = 0x56
716
717     def in_decode(self, length: int, payload: bytes) -> None:
718         self.flag = payload[0]
719
720
721 class SETUP(GPS303Pkt):
722     PROTO = 0x57
723     RESPOND = Respond.EXT
724     OUT_KWARGS = (
725         ("uploadintervalseconds", intx, 0x0300),
726         ("binaryswitch", intx, 0b00110001),
727         ("alarms", l3int, [0, 0, 0]),
728         ("dndtimeswitch", int, 0),
729         ("dndtimes", l3int, [0, 0, 0]),
730         ("gpstimeswitch", int, 0),
731         ("gpstimestart", int, 0),
732         ("gpstimestop", int, 0),
733         ("phonenumbers", l3str, ["", "", ""]),
734     )
735
736     def out_encode(self) -> bytes:
737         def pack3b(x: int) -> bytes:
738             return pack("!I", x)[1:]
739
740         return b"".join(
741             [
742                 pack("!H", self.uploadintervalseconds),
743                 pack("B", self.binaryswitch),
744             ]
745             + [pack3b(el) for el in self.alarms]
746             + [
747                 pack("B", self.dndtimeswitch),
748             ]
749             + [pack3b(el) for el in self.dndtimes]
750             + [
751                 pack("B", self.gpstimeswitch),
752                 pack("!H", self.gpstimestart),
753                 pack("!H", self.gpstimestop),
754             ]
755             + [b";".join([el.encode() for el in self.phonenumbers])]
756         )
757
758     def in_encode(self) -> bytes:
759         return b""
760
761
762 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
763     PROTO = 0x58
764
765
766 class RESTORE_PASSWORD(GPS303Pkt):
767     PROTO = 0x67
768
769
770 class WIFI_POSITIONING(_WIFI_POSITIONING):
771     PROTO = 0x69
772     RESPOND = Respond.EXT
773     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
774
775     def out_encode(self) -> bytes:
776         if self.latitude is None or self.longitude is None:
777             return b""
778         return "{:+#010.8g},{:+#010.8g}".format(
779             self.latitude, self.longitude
780         ).encode()
781
782     def out_decode(self, length: int, payload: bytes) -> None:
783         lat, lon = payload.decode().split(",")
784         self.latitude = float(lat)
785         self.longitude = float(lon)
786
787
788 class MANUAL_POSITIONING(GPS303Pkt):
789     PROTO = 0x80
790
791     def in_decode(self, length: int, payload: bytes) -> None:
792         self.flag = payload[0] if len(payload) > 0 else -1
793         self.reason = {
794             1: "Incorrect time",
795             2: "LBS less",
796             3: "WiFi less",
797             4: "LBS search > 3 times",
798             5: "Same LBS and WiFi data",
799             6: "LBS prohibited, WiFi absent",
800             7: "GPS spacing < 50 m",
801         }.get(self.flag, "Unknown")
802
803
804 class BATTERY_CHARGE(GPS303Pkt):
805     PROTO = 0x81
806
807
808 class CHARGER_CONNECTED(GPS303Pkt):
809     PROTO = 0x82
810
811
812 class CHARGER_DISCONNECTED(GPS303Pkt):
813     PROTO = 0x83
814
815
816 class VIBRATION_RECEIVED(GPS303Pkt):
817     PROTO = 0x94
818
819
820 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
821     PROTO = 0x98
822     RESPOND = Respond.EXT
823     OUT_KWARGS = (("interval", int, 10),)
824
825     def in_decode(self, length: int, payload: bytes) -> None:
826         self.interval = unpack("!H", payload[:2])
827
828     def out_encode(self) -> bytes:
829         return pack("!H", self.interval)
830
831
832 class SOS_ALARM(GPS303Pkt):
833     PROTO = 0x99
834
835
836 class UNKNOWN_B3(GPS303Pkt):
837     PROTO = 0xB3
838     IN_KWARGS = (("asciidata", str, ""),)
839
840     def in_decode(self, length: int, payload: bytes) -> None:
841         self.asciidata = payload.decode()
842
843
844 # Build dicts protocol number -> class and class name -> protocol number
845 CLASSES = {}
846 PROTOS = {}
847 if True:  # just to indent the code, sorry!
848     for cls in [
849         cls
850         for name, cls in globals().items()
851         if isclass(cls)
852         and issubclass(cls, GPS303Pkt)
853         and not name.startswith("_")
854     ]:
855         if hasattr(cls, "PROTO"):
856             CLASSES[cls.PROTO] = cls
857             PROTOS[cls.__name__] = cls.PROTO
858
859
860 def class_by_prefix(
861     prefix: str,
862 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
863     lst = [
864         (name, proto)
865         for name, proto in PROTOS.items()
866         if name.upper().startswith(prefix.upper())
867     ]
868     if len(lst) != 1:
869         return lst
870     _, proto = lst[0]
871     return CLASSES[proto]
872
873
874 def proto_by_name(name: str) -> int:
875     return PROTOS.get(name, -1)
876
877
878 def proto_of_message(packet: bytes) -> int:
879     return packet[1]
880
881
882 def imei_from_packet(packet: bytes) -> Optional[str]:
883     if proto_of_message(packet) == LOGIN.PROTO:
884         msg = parse_message(packet)
885         if isinstance(msg, LOGIN):
886             return msg.imei
887     return None
888
889
890 def is_goodbye_packet(packet: bytes) -> bool:
891     return proto_of_message(packet) == HIBERNATION.PROTO
892
893
894 def inline_response(packet: bytes) -> Optional[bytes]:
895     proto = proto_of_message(packet)
896     if proto in CLASSES:
897         cls = CLASSES[proto]
898         if cls.RESPOND is Respond.INL:
899             return cls.Out().packed
900     return None
901
902
903 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
904     """From a packet (without framing bytes) derive the XXX.In object"""
905     length, proto = unpack("BB", packet[:2])
906     payload = packet[2:]
907     if proto not in CLASSES:
908         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
909             f"Proto {proto} is unknown"
910         )
911     else:
912         try:
913             if is_incoming:
914                 return CLASSES[proto].In(length, payload)
915             else:
916                 return CLASSES[proto].Out(length, payload)
917         except (DecodeError, ValueError, IndexError) as e:
918             cause = e
919     if is_incoming:
920         retobj = UNKNOWN.In(length, payload)
921     else:
922         retobj = UNKNOWN.Out(length, payload)
923     retobj.PROTO = proto  # Override class attr with object attr
924     retobj.cause = cause
925     return retobj