]> www.average.org Git - loctrkd.git/blob - loctrkd/zx303proto.py
protocols: make "interface" module
[loctrkd.git] / loctrkd / zx303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from typing import (
23     Any,
24     Callable,
25     Dict,
26     List,
27     Optional,
28     Tuple,
29     Type,
30     TYPE_CHECKING,
31     Union,
32 )
33
34 from .protomodule import ProtoClass
35
36 __all__ = (
37     "Stream",
38     "class_by_prefix",
39     "enframe",
40     "exposed_protos",
41     "inline_response",
42     "proto_handled",
43     "parse_message",
44     "probe_buffer",
45     "proto_name",
46     "DecodeError",
47     "Respond",
48     "GPS303Pkt",
49     "UNKNOWN",
50     "LOGIN",
51     "SUPERVISION",
52     "HEARTBEAT",
53     "GPS_POSITIONING",
54     "GPS_OFFLINE_POSITIONING",
55     "STATUS",
56     "HIBERNATION",
57     "RESET",
58     "WHITELIST_TOTAL",
59     "WIFI_OFFLINE_POSITIONING",
60     "TIME",
61     "PROHIBIT_LBS",
62     "GPS_LBS_SWITCH_TIMES",
63     "REMOTE_MONITOR_PHONE",
64     "SOS_PHONE",
65     "DAD_PHONE",
66     "MOM_PHONE",
67     "STOP_UPLOAD",
68     "GPS_OFF_PERIOD",
69     "DND_PERIOD",
70     "RESTART_SHUTDOWN",
71     "DEVICE",
72     "ALARM_CLOCK",
73     "STOP_ALARM",
74     "SETUP",
75     "SYNCHRONOUS_WHITELIST",
76     "RESTORE_PASSWORD",
77     "WIFI_POSITIONING",
78     "MANUAL_POSITIONING",
79     "BATTERY_CHARGE",
80     "CHARGER_CONNECTED",
81     "CHARGER_DISCONNECTED",
82     "VIBRATION_RECEIVED",
83     "POSITION_UPLOAD_INTERVAL",
84     "SOS_ALARM",
85     "UNKNOWN_B3",
86 )
87
88 PROTO_PREFIX = "ZX:"
89
90 ### Deframer ###
91
92 MAXBUFFER: int = 4096
93
94
95 class Stream:
96     def __init__(self) -> None:
97         self.buffer = b""
98
99     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
100         """
101         Process next segment of the stream. Return successfully deframed
102         packets as `bytes` and error messages as `str`.
103         """
104         when = time()
105         self.buffer += segment
106         if len(self.buffer) > MAXBUFFER:
107             # We are receiving junk. Let's drop it or we run out of memory.
108             self.buffer = b""
109             return [f"More than {MAXBUFFER} unparseable data, dropping"]
110         msgs: List[Union[bytes, str]] = []
111         while True:
112             framestart = self.buffer.find(b"xx")
113             if framestart == -1:  # No frames, return whatever we have
114                 break
115             if framestart > 0:  # Should not happen, report
116                 msgs.append(
117                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
118                 )
119                 self.buffer = self.buffer[framestart:]
120             # At this point, buffer starts with a packet
121             if len(self.buffer) < 6:  # no len and proto - cannot proceed
122                 break
123             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
124             frameend = 0
125             # Length field can legitimeely be much less than the
126             # length of the packet (e.g. WiFi positioning), but
127             # it _should not_ be greater. Still sometimes it is.
128             # Luckily, not by too much: by maybe two or three bytes?
129             # Do this embarrassing hack to avoid accidental match
130             # of some binary data in the packet against '\r\n'.
131             while True:
132                 frameend = self.buffer.find(b"\r\n", frameend + 1)
133                 if frameend == -1 or frameend >= (
134                     exp_end - 3
135                 ):  # Found realistic match or none
136                     break
137             if frameend == -1:  # Incomplete frame, return what we have
138                 break
139             packet = self.buffer[2:frameend]
140             self.buffer = self.buffer[frameend + 2 :]
141             if len(packet) < 2:  # frameend comes too early
142                 msgs.append(f"Packet too short: {packet.hex()}")
143             else:
144                 msgs.append(packet)
145         return msgs
146
147     def close(self) -> bytes:
148         ret = self.buffer
149         self.buffer = b""
150         return ret
151
152
153 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
154     return b"xx" + buffer + b"\r\n"
155
156
157 ### Parser/Constructor ###
158
159
160 class DecodeError(Exception):
161     def __init__(self, e: Exception, **kwargs: Any) -> None:
162         super().__init__(e)
163         for k, v in kwargs.items():
164             setattr(self, k, v)
165
166
167 def maybe(typ: type) -> Callable[[Any], Any]:
168     return lambda x: None if x is None else typ(x)
169
170
171 def intx(x: Union[str, int]) -> int:
172     if isinstance(x, str):
173         x = int(x, 0)
174     return x
175
176
177 def boolx(x: Union[str, bool]) -> bool:
178     if isinstance(x, str):
179         if x.upper() in ("ON", "TRUE", "1"):
180             return True
181         if x.upper() in ("OFF", "FALSE", "0"):
182             return False
183         raise ValueError(str(x) + " could not be parsed as a Boolean")
184     return x
185
186
187 def hhmm(x: str) -> str:
188     """Check for the string that represents hours and minutes"""
189     if not isinstance(x, str) or len(x) != 4:
190         raise ValueError(str(x) + " is not a four-character string")
191     hh = int(x[:2])
192     mm = int(x[2:])
193     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
194         raise ValueError(str(x) + " does not contain valid hours and minutes")
195     return x
196
197
198 def hhmmhhmm(x: str) -> str:
199     """Check for the string that represents hours and minutes twice"""
200     if not isinstance(x, str) or len(x) != 8:
201         raise ValueError(str(x) + " is not an eight-character string")
202     return hhmm(x[:4]) + hhmm(x[4:])
203
204
205 def l3str(x: Union[str, List[str]]) -> List[str]:
206     if isinstance(x, str):
207         lx = x.split(",")
208     else:
209         lx = x
210     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
211         raise ValueError(str(lx) + " is not a list of three strings")
212     return lx
213
214
215 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
216     def alrmspec(sub: str) -> Tuple[int, str]:
217         if len(sub) != 7:
218             raise ValueError(sub + " does not represent day and time")
219         return (
220             {
221                 "MON": 1,
222                 "TUE": 2,
223                 "WED": 3,
224                 "THU": 4,
225                 "FRI": 5,
226                 "SAT": 6,
227                 "SUN": 7,
228             }[sub[:3].upper()],
229             sub[3:],
230         )
231
232     if isinstance(x, str):
233         lx = [alrmspec(sub) for sub in x.split(",")]
234     else:
235         lx = x
236     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
237     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
238         raise ValueError(str(lx) + " is a wrong alarms specification")
239     return [(d, hhmm(tm)) for d, tm in lx]
240
241
242 def l3int(x: Union[str, List[int]]) -> List[int]:
243     if isinstance(x, str):
244         lx = [int(el) for el in x.split(",")]
245     else:
246         lx = x
247     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
248         raise ValueError(str(lx) + " is not a list of three integers")
249     return lx
250
251
252 class Respond(Enum):
253     NON = 0  # Incoming, no response needed
254     INL = 1  # Birirectional, use `inline_response()`
255     EXT = 2  # Birirectional, use external responder
256
257
258 class GPS303Pkt(ProtoClass):
259     RESPOND = Respond.NON  # Do not send anything back by default
260     PROTO: int
261     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
262     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
263     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
264     In: Type["GPS303Pkt"]
265     Out: Type["GPS303Pkt"]
266
267     if TYPE_CHECKING:
268
269         def __getattr__(self, name: str) -> Any:
270             pass
271
272         def __setattr__(self, name: str, value: Any) -> None:
273             pass
274
275     def __init__(self, *args: Any, **kwargs: Any):
276         """
277         Construct the object _either_ from (length, payload),
278         _or_ from the values of individual fields
279         """
280         assert not args or (len(args) == 2 and not kwargs)
281         if args:  # guaranteed to be two arguments at this point
282             self.length, self.payload = args
283             try:
284                 self.decode(self.length, self.payload)
285             except error as e:
286                 raise DecodeError(e, obj=self)
287         else:
288             for kw, typ, dfl in self.KWARGS:
289                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
290             if kwargs:
291                 raise ValueError(
292                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
293                 )
294
295     def __repr__(self) -> str:
296         return "{}({})".format(
297             self.__class__.__name__,
298             ", ".join(
299                 "{}={}".format(
300                     k,
301                     'bytes.fromhex("{}")'.format(v.hex())
302                     if isinstance(v, bytes)
303                     else v.__repr__(),
304                 )
305                 for k, v in self.__dict__.items()
306                 if not k.startswith("_")
307             ),
308         )
309
310     decode: Callable[["GPS303Pkt", int, bytes], None]
311
312     def in_decode(self, length: int, packet: bytes) -> None:
313         # Overridden in subclasses, otherwise do not decode payload
314         return
315
316     def out_decode(self, length: int, packet: bytes) -> None:
317         # Overridden in subclasses, otherwise do not decode payload
318         return
319
320     encode: Callable[["GPS303Pkt"], bytes]
321
322     def in_encode(self) -> bytes:
323         # Necessary to emulate terminal, which is not implemented
324         raise NotImplementedError(
325             self.__class__.__name__ + ".encode() not implemented"
326         )
327
328     def out_encode(self) -> bytes:
329         # Overridden in subclasses, otherwise make empty payload
330         return b""
331
332     @property
333     def packed(self) -> bytes:
334         payload = self.encode()
335         length = getattr(self, "length", len(payload) + 1)
336         return pack("BB", length, self.PROTO) + payload
337
338
339 class UNKNOWN(GPS303Pkt):
340     PROTO = 256  # > 255 is impossible in real packets
341
342
343 class LOGIN(GPS303Pkt):
344     PROTO = 0x01
345     RESPOND = Respond.INL
346     # Default response for ACK, can also respond with STOP_UPLOAD
347     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
348
349     def in_decode(self, length: int, payload: bytes) -> None:
350         self.imei = payload[:8].ljust(8, b"\0").hex()
351         self.ver = payload[8]
352
353     def in_encode(self) -> bytes:
354         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
355             "B", self.ver
356         )
357
358
359 class SUPERVISION(GPS303Pkt):
360     PROTO = 0x05
361     OUT_KWARGS = (("status", int, 1),)
362
363     def out_encode(self) -> bytes:
364         # 1: The device automatically answers Pickup effect
365         # 2: Automatically Answering Two-way Calls
366         # 3: Ring manually answer the two-way call
367         return pack("B", self.status)
368
369
370 class HEARTBEAT(GPS303Pkt):
371     PROTO = 0x08
372     RESPOND = Respond.INL
373
374
375 class _GPS_POSITIONING(GPS303Pkt):
376     RESPOND = Respond.INL
377
378     def in_decode(self, length: int, payload: bytes) -> None:
379         self.dtime = payload[:6]
380         if self.dtime == b"\0\0\0\0\0\0":
381             self.devtime = None
382         else:
383             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
384             self.devtime = datetime(
385                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
386             )
387         self.gps_data_length = payload[6] >> 4
388         self.gps_nb_sat = payload[6] & 0x0F
389         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
390         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
391         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
392         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
393         self.heading = flags & 0b0000001111111111  # bits 6 - last
394         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
395         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
396         self.speed = speed
397         self.flags = flags
398
399     def out_encode(self) -> bytes:
400         tup = datetime.utcnow().timetuple()
401         ttup = (tup[0] % 100,) + tup[1:6]
402         return pack("BBBBBB", *ttup)
403
404
405 class GPS_POSITIONING(_GPS_POSITIONING):
406     PROTO = 0x10
407
408
409 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
410     PROTO = 0x11
411
412
413 class STATUS(GPS303Pkt):
414     PROTO = 0x13
415     RESPOND = Respond.EXT
416     IN_KWARGS = (
417         ("batt", int, 100),
418         ("ver", int, 0),
419         ("timezone", int, 0),
420         ("intvl", int, 0),
421         ("signal", maybe(int), None),
422     )
423     OUT_KWARGS = (("upload_interval", int, 25),)
424
425     def in_decode(self, length: int, payload: bytes) -> None:
426         self.batt, self.ver, self.timezone, self.intvl = unpack(
427             "BBBB", payload[:4]
428         )
429         if len(payload) > 4:
430             self.signal: Optional[int] = payload[4]
431         else:
432             self.signal = None
433
434     def in_encode(self) -> bytes:
435         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
436             b"" if self.signal is None else pack("B", self.signal)
437         )
438
439     def out_encode(self) -> bytes:  # Set interval in minutes
440         return pack("B", self.upload_interval)
441
442
443 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
444     PROTO = 0x14
445
446     def in_encode(self) -> bytes:
447         return b""
448
449
450 class RESET(GPS303Pkt):
451     # Device sends when it got reset SMS
452     # Server can send to initiate factory reset
453     PROTO = 0x15
454
455
456 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
457     PROTO = 0x16
458     OUT_KWARGS = (("number", int, 3),)
459
460     def out_encode(self) -> bytes:  # Number of whitelist entries
461         return pack("B", self.number)
462
463
464 class _WIFI_POSITIONING(GPS303Pkt):
465     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
466         # IN_KWARGS = (
467         ("dtime", bytes, b"\0\0\0\0\0\0"),
468         ("wifi_aps", list, []),
469         ("mcc", int, 0),
470         ("mnc", int, 0),
471         ("gsm_cells", list, []),
472     )
473
474     def in_decode(self, length: int, payload: bytes) -> None:
475         self.dtime = payload[:6]
476         if self.dtime == b"\0\0\0\0\0\0":
477             self.devtime = None
478         else:
479             self.devtime = datetime.strptime(
480                 self.dtime.hex(), "%y%m%d%H%M%S"
481             ).astimezone(tz=timezone.utc)
482         self.wifi_aps = []
483         for i in range(self.length):  # length has special meaning here
484             slice = payload[6 + i * 7 : 13 + i * 7]
485             self.wifi_aps.append(
486                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
487             )
488         gsm_slice = payload[6 + self.length * 7 :]
489         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
490         self.gsm_cells = []
491         for i in range(ncells):
492             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
493             locac, cellid, sigstr = unpack(
494                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
495             )
496             self.gsm_cells.append((locac, cellid, -sigstr))
497
498     def in_encode(self) -> bytes:
499         self.length = len(self.wifi_aps)
500         return b"".join(
501             [
502                 self.dtime,
503                 b"".join(
504                     [
505                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
506                         + pack("B", -sigstr)
507                         for mac, sigstr in self.wifi_aps
508                     ]
509                 ),
510                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
511                 b"".join(
512                     [
513                         pack("!HHB", locac, cellid, -sigstr)
514                         for locac, cellid, sigstr in self.gsm_cells
515                     ]
516                 ),
517             ]
518         )
519
520
521 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
522     PROTO = 0x17
523     RESPOND = Respond.INL
524
525     def out_encode(self) -> bytes:
526         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
527
528
529 class TIME(GPS303Pkt):
530     PROTO = 0x30
531     RESPOND = Respond.INL
532
533     def out_encode(self) -> bytes:
534         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
535
536
537 class PROHIBIT_LBS(GPS303Pkt):
538     PROTO = 0x33
539     OUT_KWARGS = (("status", int, 1),)
540
541     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
542         return pack("B", self.status)
543
544
545 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
546     PROTO = 0x34
547
548     OUT_KWARGS = (
549         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
550         ("gps_interval_set", boolx, False),
551         ("gps_interval", hhmmhhmm, "00000000"),
552         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
553         ("boot_time_set", boolx, False),
554         ("boot_time", hhmm, "0000"),
555         ("shut_time_set", boolx, False),
556         ("shut_time", hhmm, "0000"),
557     )
558
559     def out_encode(self) -> bytes:
560         return (
561             pack("B", self.gps_off)
562             + pack("B", self.gps_interval_set)
563             + bytes.fromhex(self.gps_interval)
564             + pack("B", self.lbs_off)
565             + pack("B", self.boot_time_set)
566             + bytes.fromhex(self.boot_time)
567             + pack("B", self.shut_time_set)
568             + bytes.fromhex(self.shut_time)
569         )
570
571
572 class _SET_PHONE(GPS303Pkt):
573     OUT_KWARGS = (("phone", str, ""),)
574
575     def out_encode(self) -> bytes:
576         self.phone: str
577         return self.phone.encode("")
578
579
580 class REMOTE_MONITOR_PHONE(_SET_PHONE):
581     PROTO = 0x40
582
583
584 class SOS_PHONE(_SET_PHONE):
585     PROTO = 0x41
586
587
588 class DAD_PHONE(_SET_PHONE):
589     PROTO = 0x42
590
591
592 class MOM_PHONE(_SET_PHONE):
593     PROTO = 0x43
594
595
596 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
597     PROTO = 0x44
598
599
600 class GPS_OFF_PERIOD(GPS303Pkt):
601     PROTO = 0x46
602     OUT_KWARGS = (
603         ("onoff", int, 0),
604         ("fm", hhmm, "0000"),
605         ("to", hhmm, "2359"),
606     )
607
608     def out_encode(self) -> bytes:
609         return (
610             pack("B", self.onoff)
611             + bytes.fromhex(self.fm)
612             + bytes.fromhex(self.to)
613         )
614
615
616 class DND_PERIOD(GPS303Pkt):
617     PROTO = 0x47
618     OUT_KWARGS = (
619         ("onoff", int, 0),
620         ("week", int, 3),
621         ("fm1", hhmm, "0000"),
622         ("to1", hhmm, "2359"),
623         ("fm2", hhmm, "0000"),
624         ("to2", hhmm, "2359"),
625     )
626
627     def out_encode(self) -> bytes:
628         return (
629             pack("B", self.onoff)
630             + pack("B", self.week)
631             + bytes.fromhex(self.fm1)
632             + bytes.fromhex(self.to1)
633             + bytes.fromhex(self.fm2)
634             + bytes.fromhex(self.to2)
635         )
636
637
638 class RESTART_SHUTDOWN(GPS303Pkt):
639     PROTO = 0x48
640     OUT_KWARGS = (("flag", int, 0),)
641
642     def out_encode(self) -> bytes:
643         # 1 - restart
644         # 2 - shutdown
645         return pack("B", self.flag)
646
647
648 class DEVICE(GPS303Pkt):
649     PROTO = 0x49
650     OUT_KWARGS = (("flag", int, 0),)
651
652     # 0 - Stop looking for equipment
653     # 1 - Start looking for equipment
654     def out_encode(self) -> bytes:
655         return pack("B", self.flag)
656
657
658 class ALARM_CLOCK(GPS303Pkt):
659     PROTO = 0x50
660     OUT_KWARGS: Tuple[
661         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
662     ] = (
663         ("alarms", l3alarms, []),
664     )
665
666     def out_encode(self) -> bytes:
667         return b"".join(
668             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
669         )
670
671
672 class STOP_ALARM(GPS303Pkt):
673     PROTO = 0x56
674
675     def in_decode(self, length: int, payload: bytes) -> None:
676         self.flag = payload[0]
677
678
679 class SETUP(GPS303Pkt):
680     PROTO = 0x57
681     RESPOND = Respond.EXT
682     OUT_KWARGS = (
683         ("uploadintervalseconds", intx, 0x0300),
684         ("binaryswitch", intx, 0b00110001),
685         ("alarms", l3int, [0, 0, 0]),
686         ("dndtimeswitch", int, 0),
687         ("dndtimes", l3int, [0, 0, 0]),
688         ("gpstimeswitch", int, 0),
689         ("gpstimestart", int, 0),
690         ("gpstimestop", int, 0),
691         ("phonenumbers", l3str, ["", "", ""]),
692     )
693
694     def out_encode(self) -> bytes:
695         def pack3b(x: int) -> bytes:
696             return pack("!I", x)[1:]
697
698         return b"".join(
699             [
700                 pack("!H", self.uploadintervalseconds),
701                 pack("B", self.binaryswitch),
702             ]
703             + [pack3b(el) for el in self.alarms]
704             + [
705                 pack("B", self.dndtimeswitch),
706             ]
707             + [pack3b(el) for el in self.dndtimes]
708             + [
709                 pack("B", self.gpstimeswitch),
710                 pack("!H", self.gpstimestart),
711                 pack("!H", self.gpstimestop),
712             ]
713             + [b";".join([el.encode() for el in self.phonenumbers])]
714         )
715
716     def in_encode(self) -> bytes:
717         return b""
718
719
720 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
721     PROTO = 0x58
722
723
724 class RESTORE_PASSWORD(GPS303Pkt):
725     PROTO = 0x67
726
727
728 class WIFI_POSITIONING(_WIFI_POSITIONING):
729     PROTO = 0x69
730     RESPOND = Respond.EXT
731     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
732
733     def out_encode(self) -> bytes:
734         if self.latitude is None or self.longitude is None:
735             return b""
736         return "{:+#010.8g},{:+#010.8g}".format(
737             self.latitude, self.longitude
738         ).encode()
739
740     def out_decode(self, length: int, payload: bytes) -> None:
741         lat, lon = payload.decode().split(",")
742         self.latitude = float(lat)
743         self.longitude = float(lon)
744
745
746 class MANUAL_POSITIONING(GPS303Pkt):
747     PROTO = 0x80
748
749     def in_decode(self, length: int, payload: bytes) -> None:
750         self.flag = payload[0] if len(payload) > 0 else -1
751         self.reason = {
752             1: "Incorrect time",
753             2: "LBS less",
754             3: "WiFi less",
755             4: "LBS search > 3 times",
756             5: "Same LBS and WiFi data",
757             6: "LBS prohibited, WiFi absent",
758             7: "GPS spacing < 50 m",
759         }.get(self.flag, "Unknown")
760
761
762 class BATTERY_CHARGE(GPS303Pkt):
763     PROTO = 0x81
764
765
766 class CHARGER_CONNECTED(GPS303Pkt):
767     PROTO = 0x82
768
769
770 class CHARGER_DISCONNECTED(GPS303Pkt):
771     PROTO = 0x83
772
773
774 class VIBRATION_RECEIVED(GPS303Pkt):
775     PROTO = 0x94
776
777
778 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
779     PROTO = 0x98
780     RESPOND = Respond.EXT
781     OUT_KWARGS = (("interval", int, 10),)
782
783     def in_decode(self, length: int, payload: bytes) -> None:
784         self.interval = unpack("!H", payload[:2])
785
786     def out_encode(self) -> bytes:
787         return pack("!H", self.interval)
788
789
790 class SOS_ALARM(GPS303Pkt):
791     PROTO = 0x99
792
793
794 class UNKNOWN_B3(GPS303Pkt):
795     PROTO = 0xB3
796     IN_KWARGS = (("asciidata", str, ""),)
797
798     def in_decode(self, length: int, payload: bytes) -> None:
799         self.asciidata = payload.decode()
800
801
802 # Build dicts protocol number -> class and class name -> protocol number
803 CLASSES = {}
804 PROTOS = {}
805 if True:  # just to indent the code, sorry!
806     for cls in [
807         cls
808         for name, cls in globals().items()
809         if isclass(cls)
810         and issubclass(cls, GPS303Pkt)
811         and not name.startswith("_")
812     ]:
813         if hasattr(cls, "PROTO"):
814             CLASSES[cls.PROTO] = cls
815             PROTOS[cls.__name__] = cls.PROTO
816
817
818 def class_by_prefix(
819     prefix: str,
820 ) -> Union[Type[GPS303Pkt], List[str]]:
821     if prefix.startswith(PROTO_PREFIX):
822         pname = prefix[len(PROTO_PREFIX) :]
823     else:
824         raise KeyError(pname)
825     lst = [
826         (name, proto)
827         for name, proto in PROTOS.items()
828         if name.upper().startswith(prefix.upper())
829     ]
830     if len(lst) != 1:
831         return [name for name, _ in lst]
832     _, proto = lst[0]
833     return CLASSES[proto]
834
835
836 def proto_handled(proto: str) -> bool:
837     return proto.startswith(PROTO_PREFIX)
838
839
840 def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
841     return PROTO_PREFIX + (
842         obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
843     )
844
845
846 def proto_of_message(packet: bytes) -> str:
847     return proto_name(CLASSES.get(packet[1], UNKNOWN))
848
849
850 def imei_from_packet(packet: bytes) -> Optional[str]:
851     if packet[1] == LOGIN.PROTO:
852         msg = parse_message(packet)
853         if isinstance(msg, LOGIN):
854             return msg.imei
855     return None
856
857
858 def is_goodbye_packet(packet: bytes) -> bool:
859     return packet[1] == HIBERNATION.PROTO
860
861
862 def inline_response(packet: bytes) -> Optional[bytes]:
863     proto = packet[1]
864     if proto in CLASSES:
865         cls = CLASSES[proto]
866         if cls.RESPOND is Respond.INL:
867             return cls.Out().packed
868     return None
869
870
871 def probe_buffer(buffer: bytes) -> bool:
872     framestart = buffer.find(b"xx")
873     if framestart < 0:
874         return False
875     if len(buffer) - framestart < 6:
876         return False
877     return True
878
879
880 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
881     """From a packet (without framing bytes) derive the XXX.In object"""
882     length, proto = unpack("BB", packet[:2])
883     payload = packet[2:]
884     if proto not in CLASSES:
885         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
886             f"Proto {proto} is unknown"
887         )
888     else:
889         try:
890             if is_incoming:
891                 return CLASSES[proto].In(length, payload)
892             else:
893                 return CLASSES[proto].Out(length, payload)
894         except (DecodeError, ValueError, IndexError) as e:
895             cause = e
896     if is_incoming:
897         retobj = UNKNOWN.In(length, payload)
898     else:
899         retobj = UNKNOWN.Out(length, payload)
900     retobj.PROTO = proto  # Override class attr with object attr
901     retobj.cause = cause
902     return retobj
903
904
905 def exposed_protos() -> List[Tuple[str, bool]]:
906     return [
907         (proto_name(GPS_POSITIONING), True),
908         (proto_name(WIFI_POSITIONING), False),
909         (proto_name(STATUS), True),
910     ]