]> www.average.org Git - loctrkd.git/blob - loctrkd/zx303proto.py
function `proto_handled()` in proto modules
[loctrkd.git] / loctrkd / zx303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from typing import (
23     Any,
24     Callable,
25     Dict,
26     List,
27     Optional,
28     Tuple,
29     Type,
30     TYPE_CHECKING,
31     Union,
32 )
33
34 __all__ = (
35     "Stream",
36     "class_by_prefix",
37     "inline_response",
38     "proto_handled",
39     "parse_message",
40     "probe_buffer",
41     "proto_by_name",
42     "proto_name",
43     "DecodeError",
44     "Respond",
45     "GPS303Pkt",
46     "UNKNOWN",
47     "LOGIN",
48     "SUPERVISION",
49     "HEARTBEAT",
50     "GPS_POSITIONING",
51     "GPS_OFFLINE_POSITIONING",
52     "STATUS",
53     "HIBERNATION",
54     "RESET",
55     "WHITELIST_TOTAL",
56     "WIFI_OFFLINE_POSITIONING",
57     "TIME",
58     "PROHIBIT_LBS",
59     "GPS_LBS_SWITCH_TIMES",
60     "REMOTE_MONITOR_PHONE",
61     "SOS_PHONE",
62     "DAD_PHONE",
63     "MOM_PHONE",
64     "STOP_UPLOAD",
65     "GPS_OFF_PERIOD",
66     "DND_PERIOD",
67     "RESTART_SHUTDOWN",
68     "DEVICE",
69     "ALARM_CLOCK",
70     "STOP_ALARM",
71     "SETUP",
72     "SYNCHRONOUS_WHITELIST",
73     "RESTORE_PASSWORD",
74     "WIFI_POSITIONING",
75     "MANUAL_POSITIONING",
76     "BATTERY_CHARGE",
77     "CHARGER_CONNECTED",
78     "CHARGER_DISCONNECTED",
79     "VIBRATION_RECEIVED",
80     "POSITION_UPLOAD_INTERVAL",
81     "SOS_ALARM",
82     "UNKNOWN_B3",
83 )
84
85 PROTO_PREFIX = "ZX:"
86
87 ### Deframer ###
88
89 MAXBUFFER: int = 4096
90
91
92 class Stream:
93     def __init__(self) -> None:
94         self.buffer = b""
95
96     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
97         """
98         Process next segment of the stream. Return successfully deframed
99         packets as `bytes` and error messages as `str`.
100         """
101         when = time()
102         self.buffer += segment
103         if len(self.buffer) > MAXBUFFER:
104             # We are receiving junk. Let's drop it or we run out of memory.
105             self.buffer = b""
106             return [f"More than {MAXBUFFER} unparseable data, dropping"]
107         msgs: List[Union[bytes, str]] = []
108         while True:
109             framestart = self.buffer.find(b"xx")
110             if framestart == -1:  # No frames, return whatever we have
111                 break
112             if framestart > 0:  # Should not happen, report
113                 msgs.append(
114                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
115                 )
116                 self.buffer = self.buffer[framestart:]
117             # At this point, buffer starts with a packet
118             if len(self.buffer) < 6:  # no len and proto - cannot proceed
119                 break
120             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
121             frameend = 0
122             # Length field can legitimeely be much less than the
123             # length of the packet (e.g. WiFi positioning), but
124             # it _should not_ be greater. Still sometimes it is.
125             # Luckily, not by too much: by maybe two or three bytes?
126             # Do this embarrassing hack to avoid accidental match
127             # of some binary data in the packet against '\r\n'.
128             while True:
129                 frameend = self.buffer.find(b"\r\n", frameend + 1)
130                 if frameend == -1 or frameend >= (
131                     exp_end - 3
132                 ):  # Found realistic match or none
133                     break
134             if frameend == -1:  # Incomplete frame, return what we have
135                 break
136             packet = self.buffer[2:frameend]
137             self.buffer = self.buffer[frameend + 2 :]
138             if len(packet) < 2:  # frameend comes too early
139                 msgs.append(f"Packet too short: {packet.hex()}")
140             else:
141                 msgs.append(packet)
142         return msgs
143
144     def close(self) -> bytes:
145         ret = self.buffer
146         self.buffer = b""
147         return ret
148
149
150 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
151     return b"xx" + buffer + b"\r\n"
152
153
154 ### Parser/Constructor ###
155
156
157 class DecodeError(Exception):
158     def __init__(self, e: Exception, **kwargs: Any) -> None:
159         super().__init__(e)
160         for k, v in kwargs.items():
161             setattr(self, k, v)
162
163
164 def maybe(typ: type) -> Callable[[Any], Any]:
165     return lambda x: None if x is None else typ(x)
166
167
168 def intx(x: Union[str, int]) -> int:
169     if isinstance(x, str):
170         x = int(x, 0)
171     return x
172
173
174 def boolx(x: Union[str, bool]) -> bool:
175     if isinstance(x, str):
176         if x.upper() in ("ON", "TRUE", "1"):
177             return True
178         if x.upper() in ("OFF", "FALSE", "0"):
179             return False
180         raise ValueError(str(x) + " could not be parsed as a Boolean")
181     return x
182
183
184 def hhmm(x: str) -> str:
185     """Check for the string that represents hours and minutes"""
186     if not isinstance(x, str) or len(x) != 4:
187         raise ValueError(str(x) + " is not a four-character string")
188     hh = int(x[:2])
189     mm = int(x[2:])
190     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
191         raise ValueError(str(x) + " does not contain valid hours and minutes")
192     return x
193
194
195 def hhmmhhmm(x: str) -> str:
196     """Check for the string that represents hours and minutes twice"""
197     if not isinstance(x, str) or len(x) != 8:
198         raise ValueError(str(x) + " is not an eight-character string")
199     return hhmm(x[:4]) + hhmm(x[4:])
200
201
202 def l3str(x: Union[str, List[str]]) -> List[str]:
203     if isinstance(x, str):
204         lx = x.split(",")
205     else:
206         lx = x
207     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
208         raise ValueError(str(lx) + " is not a list of three strings")
209     return lx
210
211
212 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
213     def alrmspec(sub: str) -> Tuple[int, str]:
214         if len(sub) != 7:
215             raise ValueError(sub + " does not represent day and time")
216         return (
217             {
218                 "MON": 1,
219                 "TUE": 2,
220                 "WED": 3,
221                 "THU": 4,
222                 "FRI": 5,
223                 "SAT": 6,
224                 "SUN": 7,
225             }[sub[:3].upper()],
226             sub[3:],
227         )
228
229     if isinstance(x, str):
230         lx = [alrmspec(sub) for sub in x.split(",")]
231     else:
232         lx = x
233     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
234     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
235         raise ValueError(str(lx) + " is a wrong alarms specification")
236     return [(d, hhmm(tm)) for d, tm in lx]
237
238
239 def l3int(x: Union[str, List[int]]) -> List[int]:
240     if isinstance(x, str):
241         lx = [int(el) for el in x.split(",")]
242     else:
243         lx = x
244     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
245         raise ValueError(str(lx) + " is not a list of three integers")
246     return lx
247
248
249 class MetaPkt(type):
250     """
251     For each class corresponding to a message, automatically create
252     two nested classes `In` and `Out` that also inherit from their
253     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
254     copied to the `In` nested class under the name `KWARGS`, and
255     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
256     to the nested class `Out`. In addition, method `encode` is
257     defined in both classes equal to `in_encode()` and `out_encode()`
258     respectively.
259     """
260
261     if TYPE_CHECKING:
262
263         def __getattr__(self, name: str) -> Any:
264             pass
265
266         def __setattr__(self, name: str, value: Any) -> None:
267             pass
268
269     def __new__(
270         cls: Type["MetaPkt"],
271         name: str,
272         bases: Tuple[type, ...],
273         attrs: Dict[str, Any],
274     ) -> "MetaPkt":
275         newcls = super().__new__(cls, name, bases, attrs)
276         newcls.In = super().__new__(
277             cls,
278             name + ".In",
279             (newcls,) + bases,
280             {
281                 "KWARGS": newcls.IN_KWARGS,
282                 "decode": newcls.in_decode,
283                 "encode": newcls.in_encode,
284             },
285         )
286         newcls.Out = super().__new__(
287             cls,
288             name + ".Out",
289             (newcls,) + bases,
290             {
291                 "KWARGS": newcls.OUT_KWARGS,
292                 "decode": newcls.out_decode,
293                 "encode": newcls.out_encode,
294             },
295         )
296         return newcls
297
298
299 class Respond(Enum):
300     NON = 0  # Incoming, no response needed
301     INL = 1  # Birirectional, use `inline_response()`
302     EXT = 2  # Birirectional, use external responder
303
304
305 class GPS303Pkt(metaclass=MetaPkt):
306     RESPOND = Respond.NON  # Do not send anything back by default
307     PROTO: int
308     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
309     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
310     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
311     In: Type["GPS303Pkt"]
312     Out: Type["GPS303Pkt"]
313
314     if TYPE_CHECKING:
315
316         def __getattr__(self, name: str) -> Any:
317             pass
318
319         def __setattr__(self, name: str, value: Any) -> None:
320             pass
321
322     def __init__(self, *args: Any, **kwargs: Any):
323         """
324         Construct the object _either_ from (length, payload),
325         _or_ from the values of individual fields
326         """
327         assert not args or (len(args) == 2 and not kwargs)
328         if args:  # guaranteed to be two arguments at this point
329             self.length, self.payload = args
330             try:
331                 self.decode(self.length, self.payload)
332             except error as e:
333                 raise DecodeError(e, obj=self)
334         else:
335             for kw, typ, dfl in self.KWARGS:
336                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
337             if kwargs:
338                 raise ValueError(
339                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
340                 )
341
342     def __repr__(self) -> str:
343         return "{}({})".format(
344             self.__class__.__name__,
345             ", ".join(
346                 "{}={}".format(
347                     k,
348                     'bytes.fromhex("{}")'.format(v.hex())
349                     if isinstance(v, bytes)
350                     else v.__repr__(),
351                 )
352                 for k, v in self.__dict__.items()
353                 if not k.startswith("_")
354             ),
355         )
356
357     decode: Callable[["GPS303Pkt", int, bytes], None]
358
359     def in_decode(self, length: int, packet: bytes) -> None:
360         # Overridden in subclasses, otherwise do not decode payload
361         return
362
363     def out_decode(self, length: int, packet: bytes) -> None:
364         # Overridden in subclasses, otherwise do not decode payload
365         return
366
367     encode: Callable[["GPS303Pkt"], bytes]
368
369     def in_encode(self) -> bytes:
370         # Necessary to emulate terminal, which is not implemented
371         raise NotImplementedError(
372             self.__class__.__name__ + ".encode() not implemented"
373         )
374
375     def out_encode(self) -> bytes:
376         # Overridden in subclasses, otherwise make empty payload
377         return b""
378
379     @property
380     def packed(self) -> bytes:
381         payload = self.encode()
382         length = getattr(self, "length", len(payload) + 1)
383         return pack("BB", length, self.PROTO) + payload
384
385
386 class UNKNOWN(GPS303Pkt):
387     PROTO = 256  # > 255 is impossible in real packets
388
389
390 class LOGIN(GPS303Pkt):
391     PROTO = 0x01
392     RESPOND = Respond.INL
393     # Default response for ACK, can also respond with STOP_UPLOAD
394     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
395
396     def in_decode(self, length: int, payload: bytes) -> None:
397         self.imei = payload[:8].ljust(8, b"\0").hex()
398         self.ver = payload[8]
399
400     def in_encode(self) -> bytes:
401         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
402             "B", self.ver
403         )
404
405
406 class SUPERVISION(GPS303Pkt):
407     PROTO = 0x05
408     OUT_KWARGS = (("status", int, 1),)
409
410     def out_encode(self) -> bytes:
411         # 1: The device automatically answers Pickup effect
412         # 2: Automatically Answering Two-way Calls
413         # 3: Ring manually answer the two-way call
414         return pack("B", self.status)
415
416
417 class HEARTBEAT(GPS303Pkt):
418     PROTO = 0x08
419     RESPOND = Respond.INL
420
421
422 class _GPS_POSITIONING(GPS303Pkt):
423     RESPOND = Respond.INL
424
425     def in_decode(self, length: int, payload: bytes) -> None:
426         self.dtime = payload[:6]
427         if self.dtime == b"\0\0\0\0\0\0":
428             self.devtime = None
429         else:
430             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
431             self.devtime = datetime(
432                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
433             )
434         self.gps_data_length = payload[6] >> 4
435         self.gps_nb_sat = payload[6] & 0x0F
436         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
437         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
438         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
439         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
440         self.heading = flags & 0b0000001111111111  # bits 6 - last
441         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
442         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
443         self.speed = speed
444         self.flags = flags
445
446     def out_encode(self) -> bytes:
447         tup = datetime.utcnow().timetuple()
448         ttup = (tup[0] % 100,) + tup[1:6]
449         return pack("BBBBBB", *ttup)
450
451
452 class GPS_POSITIONING(_GPS_POSITIONING):
453     PROTO = 0x10
454
455
456 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
457     PROTO = 0x11
458
459
460 class STATUS(GPS303Pkt):
461     PROTO = 0x13
462     RESPOND = Respond.EXT
463     IN_KWARGS = (
464         ("batt", int, 100),
465         ("ver", int, 0),
466         ("timezone", int, 0),
467         ("intvl", int, 0),
468         ("signal", maybe(int), None),
469     )
470     OUT_KWARGS = (("upload_interval", int, 25),)
471
472     def in_decode(self, length: int, payload: bytes) -> None:
473         self.batt, self.ver, self.timezone, self.intvl = unpack(
474             "BBBB", payload[:4]
475         )
476         if len(payload) > 4:
477             self.signal: Optional[int] = payload[4]
478         else:
479             self.signal = None
480
481     def in_encode(self) -> bytes:
482         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
483             b"" if self.signal is None else pack("B", self.signal)
484         )
485
486     def out_encode(self) -> bytes:  # Set interval in minutes
487         return pack("B", self.upload_interval)
488
489
490 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
491     PROTO = 0x14
492
493     def in_encode(self) -> bytes:
494         return b""
495
496
497 class RESET(GPS303Pkt):
498     # Device sends when it got reset SMS
499     # Server can send to initiate factory reset
500     PROTO = 0x15
501
502
503 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
504     PROTO = 0x16
505     OUT_KWARGS = (("number", int, 3),)
506
507     def out_encode(self) -> bytes:  # Number of whitelist entries
508         return pack("B", self.number)
509
510
511 class _WIFI_POSITIONING(GPS303Pkt):
512     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
513         # IN_KWARGS = (
514         ("dtime", bytes, b"\0\0\0\0\0\0"),
515         ("wifi_aps", list, []),
516         ("mcc", int, 0),
517         ("mnc", int, 0),
518         ("gsm_cells", list, []),
519     )
520
521     def in_decode(self, length: int, payload: bytes) -> None:
522         self.dtime = payload[:6]
523         if self.dtime == b"\0\0\0\0\0\0":
524             self.devtime = None
525         else:
526             self.devtime = datetime.strptime(
527                 self.dtime.hex(), "%y%m%d%H%M%S"
528             ).astimezone(tz=timezone.utc)
529         self.wifi_aps = []
530         for i in range(self.length):  # length has special meaning here
531             slice = payload[6 + i * 7 : 13 + i * 7]
532             self.wifi_aps.append(
533                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
534             )
535         gsm_slice = payload[6 + self.length * 7 :]
536         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
537         self.gsm_cells = []
538         for i in range(ncells):
539             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
540             locac, cellid, sigstr = unpack(
541                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
542             )
543             self.gsm_cells.append((locac, cellid, -sigstr))
544
545     def in_encode(self) -> bytes:
546         self.length = len(self.wifi_aps)
547         return b"".join(
548             [
549                 self.dtime,
550                 b"".join(
551                     [
552                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
553                         + pack("B", -sigstr)
554                         for mac, sigstr in self.wifi_aps
555                     ]
556                 ),
557                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
558                 b"".join(
559                     [
560                         pack("!HHB", locac, cellid, -sigstr)
561                         for locac, cellid, sigstr in self.gsm_cells
562                     ]
563                 ),
564             ]
565         )
566
567
568 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
569     PROTO = 0x17
570     RESPOND = Respond.INL
571
572     def out_encode(self) -> bytes:
573         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
574
575
576 class TIME(GPS303Pkt):
577     PROTO = 0x30
578     RESPOND = Respond.INL
579
580     def out_encode(self) -> bytes:
581         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
582
583
584 class PROHIBIT_LBS(GPS303Pkt):
585     PROTO = 0x33
586     OUT_KWARGS = (("status", int, 1),)
587
588     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
589         return pack("B", self.status)
590
591
592 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
593     PROTO = 0x34
594
595     OUT_KWARGS = (
596         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
597         ("gps_interval_set", boolx, False),
598         ("gps_interval", hhmmhhmm, "00000000"),
599         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
600         ("boot_time_set", boolx, False),
601         ("boot_time", hhmm, "0000"),
602         ("shut_time_set", boolx, False),
603         ("shut_time", hhmm, "0000"),
604     )
605
606     def out_encode(self) -> bytes:
607         return (
608             pack("B", self.gps_off)
609             + pack("B", self.gps_interval_set)
610             + bytes.fromhex(self.gps_interval)
611             + pack("B", self.lbs_off)
612             + pack("B", self.boot_time_set)
613             + bytes.fromhex(self.boot_time)
614             + pack("B", self.shut_time_set)
615             + bytes.fromhex(self.shut_time)
616         )
617
618
619 class _SET_PHONE(GPS303Pkt):
620     OUT_KWARGS = (("phone", str, ""),)
621
622     def out_encode(self) -> bytes:
623         self.phone: str
624         return self.phone.encode("")
625
626
627 class REMOTE_MONITOR_PHONE(_SET_PHONE):
628     PROTO = 0x40
629
630
631 class SOS_PHONE(_SET_PHONE):
632     PROTO = 0x41
633
634
635 class DAD_PHONE(_SET_PHONE):
636     PROTO = 0x42
637
638
639 class MOM_PHONE(_SET_PHONE):
640     PROTO = 0x43
641
642
643 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
644     PROTO = 0x44
645
646
647 class GPS_OFF_PERIOD(GPS303Pkt):
648     PROTO = 0x46
649     OUT_KWARGS = (
650         ("onoff", int, 0),
651         ("fm", hhmm, "0000"),
652         ("to", hhmm, "2359"),
653     )
654
655     def out_encode(self) -> bytes:
656         return (
657             pack("B", self.onoff)
658             + bytes.fromhex(self.fm)
659             + bytes.fromhex(self.to)
660         )
661
662
663 class DND_PERIOD(GPS303Pkt):
664     PROTO = 0x47
665     OUT_KWARGS = (
666         ("onoff", int, 0),
667         ("week", int, 3),
668         ("fm1", hhmm, "0000"),
669         ("to1", hhmm, "2359"),
670         ("fm2", hhmm, "0000"),
671         ("to2", hhmm, "2359"),
672     )
673
674     def out_encode(self) -> bytes:
675         return (
676             pack("B", self.onoff)
677             + pack("B", self.week)
678             + bytes.fromhex(self.fm1)
679             + bytes.fromhex(self.to1)
680             + bytes.fromhex(self.fm2)
681             + bytes.fromhex(self.to2)
682         )
683
684
685 class RESTART_SHUTDOWN(GPS303Pkt):
686     PROTO = 0x48
687     OUT_KWARGS = (("flag", int, 0),)
688
689     def out_encode(self) -> bytes:
690         # 1 - restart
691         # 2 - shutdown
692         return pack("B", self.flag)
693
694
695 class DEVICE(GPS303Pkt):
696     PROTO = 0x49
697     OUT_KWARGS = (("flag", int, 0),)
698
699     # 0 - Stop looking for equipment
700     # 1 - Start looking for equipment
701     def out_encode(self) -> bytes:
702         return pack("B", self.flag)
703
704
705 class ALARM_CLOCK(GPS303Pkt):
706     PROTO = 0x50
707     OUT_KWARGS: Tuple[
708         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
709     ] = (
710         ("alarms", l3alarms, []),
711     )
712
713     def out_encode(self) -> bytes:
714         return b"".join(
715             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
716         )
717
718
719 class STOP_ALARM(GPS303Pkt):
720     PROTO = 0x56
721
722     def in_decode(self, length: int, payload: bytes) -> None:
723         self.flag = payload[0]
724
725
726 class SETUP(GPS303Pkt):
727     PROTO = 0x57
728     RESPOND = Respond.EXT
729     OUT_KWARGS = (
730         ("uploadintervalseconds", intx, 0x0300),
731         ("binaryswitch", intx, 0b00110001),
732         ("alarms", l3int, [0, 0, 0]),
733         ("dndtimeswitch", int, 0),
734         ("dndtimes", l3int, [0, 0, 0]),
735         ("gpstimeswitch", int, 0),
736         ("gpstimestart", int, 0),
737         ("gpstimestop", int, 0),
738         ("phonenumbers", l3str, ["", "", ""]),
739     )
740
741     def out_encode(self) -> bytes:
742         def pack3b(x: int) -> bytes:
743             return pack("!I", x)[1:]
744
745         return b"".join(
746             [
747                 pack("!H", self.uploadintervalseconds),
748                 pack("B", self.binaryswitch),
749             ]
750             + [pack3b(el) for el in self.alarms]
751             + [
752                 pack("B", self.dndtimeswitch),
753             ]
754             + [pack3b(el) for el in self.dndtimes]
755             + [
756                 pack("B", self.gpstimeswitch),
757                 pack("!H", self.gpstimestart),
758                 pack("!H", self.gpstimestop),
759             ]
760             + [b";".join([el.encode() for el in self.phonenumbers])]
761         )
762
763     def in_encode(self) -> bytes:
764         return b""
765
766
767 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
768     PROTO = 0x58
769
770
771 class RESTORE_PASSWORD(GPS303Pkt):
772     PROTO = 0x67
773
774
775 class WIFI_POSITIONING(_WIFI_POSITIONING):
776     PROTO = 0x69
777     RESPOND = Respond.EXT
778     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
779
780     def out_encode(self) -> bytes:
781         if self.latitude is None or self.longitude is None:
782             return b""
783         return "{:+#010.8g},{:+#010.8g}".format(
784             self.latitude, self.longitude
785         ).encode()
786
787     def out_decode(self, length: int, payload: bytes) -> None:
788         lat, lon = payload.decode().split(",")
789         self.latitude = float(lat)
790         self.longitude = float(lon)
791
792
793 class MANUAL_POSITIONING(GPS303Pkt):
794     PROTO = 0x80
795
796     def in_decode(self, length: int, payload: bytes) -> None:
797         self.flag = payload[0] if len(payload) > 0 else -1
798         self.reason = {
799             1: "Incorrect time",
800             2: "LBS less",
801             3: "WiFi less",
802             4: "LBS search > 3 times",
803             5: "Same LBS and WiFi data",
804             6: "LBS prohibited, WiFi absent",
805             7: "GPS spacing < 50 m",
806         }.get(self.flag, "Unknown")
807
808
809 class BATTERY_CHARGE(GPS303Pkt):
810     PROTO = 0x81
811
812
813 class CHARGER_CONNECTED(GPS303Pkt):
814     PROTO = 0x82
815
816
817 class CHARGER_DISCONNECTED(GPS303Pkt):
818     PROTO = 0x83
819
820
821 class VIBRATION_RECEIVED(GPS303Pkt):
822     PROTO = 0x94
823
824
825 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
826     PROTO = 0x98
827     RESPOND = Respond.EXT
828     OUT_KWARGS = (("interval", int, 10),)
829
830     def in_decode(self, length: int, payload: bytes) -> None:
831         self.interval = unpack("!H", payload[:2])
832
833     def out_encode(self) -> bytes:
834         return pack("!H", self.interval)
835
836
837 class SOS_ALARM(GPS303Pkt):
838     PROTO = 0x99
839
840
841 class UNKNOWN_B3(GPS303Pkt):
842     PROTO = 0xB3
843     IN_KWARGS = (("asciidata", str, ""),)
844
845     def in_decode(self, length: int, payload: bytes) -> None:
846         self.asciidata = payload.decode()
847
848
849 # Build dicts protocol number -> class and class name -> protocol number
850 CLASSES = {}
851 PROTOS = {}
852 if True:  # just to indent the code, sorry!
853     for cls in [
854         cls
855         for name, cls in globals().items()
856         if isclass(cls)
857         and issubclass(cls, GPS303Pkt)
858         and not name.startswith("_")
859     ]:
860         if hasattr(cls, "PROTO"):
861             CLASSES[cls.PROTO] = cls
862             PROTOS[cls.__name__] = cls.PROTO
863
864
865 def class_by_prefix(
866     prefix: str,
867 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
868     lst = [
869         (name, proto)
870         for name, proto in PROTOS.items()
871         if name.upper().startswith(prefix.upper())
872     ]
873     if len(lst) != 1:
874         return lst
875     _, proto = lst[0]
876     return CLASSES[proto]
877
878
879 def proto_handled(proto: str) -> bool:
880     return proto.startswith(PROTO_PREFIX)
881
882
883 def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
884     return PROTO_PREFIX + (
885         obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
886     )
887
888
889 def proto_by_name(name: str) -> int:
890     return PROTOS.get(name, -1)
891
892
893 def proto_of_message(packet: bytes) -> str:
894     return proto_name(CLASSES.get(packet[1], UNKNOWN))
895
896
897 def imei_from_packet(packet: bytes) -> Optional[str]:
898     if packet[1] == LOGIN.PROTO:
899         msg = parse_message(packet)
900         if isinstance(msg, LOGIN):
901             return msg.imei
902     return None
903
904
905 def is_goodbye_packet(packet: bytes) -> bool:
906     return packet[1] == HIBERNATION.PROTO
907
908
909 def inline_response(packet: bytes) -> Optional[bytes]:
910     proto = packet[1]
911     if proto in CLASSES:
912         cls = CLASSES[proto]
913         if cls.RESPOND is Respond.INL:
914             return cls.Out().packed
915     return None
916
917
918 def probe_buffer(buffer: bytes) -> bool:
919     framestart = buffer.find(b"xx")
920     if framestart < 0:
921         return False
922     if len(buffer) - framestart < 6:
923         return False
924     return True
925
926
927 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
928     """From a packet (without framing bytes) derive the XXX.In object"""
929     length, proto = unpack("BB", packet[:2])
930     payload = packet[2:]
931     if proto not in CLASSES:
932         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
933             f"Proto {proto} is unknown"
934         )
935     else:
936         try:
937             if is_incoming:
938                 return CLASSES[proto].In(length, payload)
939             else:
940                 return CLASSES[proto].Out(length, payload)
941         except (DecodeError, ValueError, IndexError) as e:
942             cause = e
943     if is_incoming:
944         retobj = UNKNOWN.In(length, payload)
945     else:
946         retobj = UNKNOWN.Out(length, payload)
947     retobj.PROTO = proto  # Override class attr with object attr
948     retobj.cause = cause
949     return retobj