]> www.average.org Git - loctrkd.git/blob - loctrkd/zx303proto.py
protocols: introduce method `rectified()`
[loctrkd.git] / loctrkd / zx303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from typing import (
23     Any,
24     Callable,
25     Dict,
26     List,
27     Optional,
28     Tuple,
29     Type,
30     TYPE_CHECKING,
31     Union,
32 )
33
34 from .protomodule import ProtoClass
35
36 __all__ = (
37     "Stream",
38     "class_by_prefix",
39     "enframe",
40     "exposed_protos",
41     "inline_response",
42     "proto_handled",
43     "parse_message",
44     "probe_buffer",
45     "proto_name",
46     "DecodeError",
47     "Respond",
48 )
49
50 PROTO_PREFIX = "ZX:"
51
52 ### Deframer ###
53
54 MAXBUFFER: int = 4096
55
56
57 class Stream:
58     def __init__(self) -> None:
59         self.buffer = b""
60
61     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
62         """
63         Process next segment of the stream. Return successfully deframed
64         packets as `bytes` and error messages as `str`.
65         """
66         when = time()
67         self.buffer += segment
68         if len(self.buffer) > MAXBUFFER:
69             # We are receiving junk. Let's drop it or we run out of memory.
70             self.buffer = b""
71             return [f"More than {MAXBUFFER} unparseable data, dropping"]
72         msgs: List[Union[bytes, str]] = []
73         while True:
74             framestart = self.buffer.find(b"xx")
75             if framestart == -1:  # No frames, return whatever we have
76                 break
77             if framestart > 0:  # Should not happen, report
78                 msgs.append(
79                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
80                 )
81                 self.buffer = self.buffer[framestart:]
82             # At this point, buffer starts with a packet
83             if len(self.buffer) < 6:  # no len and proto - cannot proceed
84                 break
85             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
86             frameend = 0
87             # Length field can legitimeely be much less than the
88             # length of the packet (e.g. WiFi positioning), but
89             # it _should not_ be greater. Still sometimes it is.
90             # Luckily, not by too much: by maybe two or three bytes?
91             # Do this embarrassing hack to avoid accidental match
92             # of some binary data in the packet against '\r\n'.
93             while True:
94                 frameend = self.buffer.find(b"\r\n", frameend + 1)
95                 if frameend == -1 or frameend >= (
96                     exp_end - 3
97                 ):  # Found realistic match or none
98                     break
99             if frameend == -1:  # Incomplete frame, return what we have
100                 break
101             packet = self.buffer[2:frameend]
102             self.buffer = self.buffer[frameend + 2 :]
103             if len(packet) < 2:  # frameend comes too early
104                 msgs.append(f"Packet too short: {packet.hex()}")
105             else:
106                 msgs.append(packet)
107         return msgs
108
109     def close(self) -> bytes:
110         ret = self.buffer
111         self.buffer = b""
112         return ret
113
114
115 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
116     return b"xx" + buffer + b"\r\n"
117
118
119 ### Parser/Constructor ###
120
121
122 class DecodeError(Exception):
123     def __init__(self, e: Exception, **kwargs: Any) -> None:
124         super().__init__(e)
125         for k, v in kwargs.items():
126             setattr(self, k, v)
127
128
129 def maybe(typ: type) -> Callable[[Any], Any]:
130     return lambda x: None if x is None else typ(x)
131
132
133 def intx(x: Union[str, int]) -> int:
134     if isinstance(x, str):
135         x = int(x, 0)
136     return x
137
138
139 def boolx(x: Union[str, bool]) -> bool:
140     if isinstance(x, str):
141         if x.upper() in ("ON", "TRUE", "1"):
142             return True
143         if x.upper() in ("OFF", "FALSE", "0"):
144             return False
145         raise ValueError(str(x) + " could not be parsed as a Boolean")
146     return x
147
148
149 def hhmm(x: str) -> str:
150     """Check for the string that represents hours and minutes"""
151     if not isinstance(x, str) or len(x) != 4:
152         raise ValueError(str(x) + " is not a four-character string")
153     hh = int(x[:2])
154     mm = int(x[2:])
155     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
156         raise ValueError(str(x) + " does not contain valid hours and minutes")
157     return x
158
159
160 def hhmmhhmm(x: str) -> str:
161     """Check for the string that represents hours and minutes twice"""
162     if not isinstance(x, str) or len(x) != 8:
163         raise ValueError(str(x) + " is not an eight-character string")
164     return hhmm(x[:4]) + hhmm(x[4:])
165
166
167 def l3str(x: Union[str, List[str]]) -> List[str]:
168     if isinstance(x, str):
169         lx = x.split(",")
170     else:
171         lx = x
172     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
173         raise ValueError(str(lx) + " is not a list of three strings")
174     return lx
175
176
177 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
178     def alrmspec(sub: str) -> Tuple[int, str]:
179         if len(sub) != 7:
180             raise ValueError(sub + " does not represent day and time")
181         return (
182             {
183                 "MON": 1,
184                 "TUE": 2,
185                 "WED": 3,
186                 "THU": 4,
187                 "FRI": 5,
188                 "SAT": 6,
189                 "SUN": 7,
190             }[sub[:3].upper()],
191             sub[3:],
192         )
193
194     if isinstance(x, str):
195         lx = [alrmspec(sub) for sub in x.split(",")]
196     else:
197         lx = x
198     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
199     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
200         raise ValueError(str(lx) + " is a wrong alarms specification")
201     return [(d, hhmm(tm)) for d, tm in lx]
202
203
204 def l3int(x: Union[str, List[int]]) -> List[int]:
205     if isinstance(x, str):
206         lx = [int(el) for el in x.split(",")]
207     else:
208         lx = x
209     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
210         raise ValueError(str(lx) + " is not a list of three integers")
211     return lx
212
213
214 class Respond(Enum):
215     NON = 0  # Incoming, no response needed
216     INL = 1  # Birirectional, use `inline_response()`
217     EXT = 2  # Birirectional, use external responder
218
219
220 class GPS303Pkt(ProtoClass):
221     RESPOND = Respond.NON  # Do not send anything back by default
222     PROTO: int
223     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
224     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
225     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
226     In: Type["GPS303Pkt"]
227     Out: Type["GPS303Pkt"]
228
229     if TYPE_CHECKING:
230
231         def __getattr__(self, name: str) -> Any:
232             pass
233
234         def __setattr__(self, name: str, value: Any) -> None:
235             pass
236
237     def __init__(self, *args: Any, **kwargs: Any):
238         """
239         Construct the object _either_ from (length, payload),
240         _or_ from the values of individual fields
241         """
242         assert not args or (len(args) == 2 and not kwargs)
243         if args:  # guaranteed to be two arguments at this point
244             self.length, self.payload = args
245             try:
246                 self.decode(self.length, self.payload)
247             except error as e:
248                 raise DecodeError(e, obj=self)
249         else:
250             for kw, typ, dfl in self.KWARGS:
251                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
252             if kwargs:
253                 raise ValueError(
254                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
255                 )
256
257     def __repr__(self) -> str:
258         return "{}({})".format(
259             self.__class__.__name__,
260             ", ".join(
261                 "{}={}".format(
262                     k,
263                     'bytes.fromhex("{}")'.format(v.hex())
264                     if isinstance(v, bytes)
265                     else v.__repr__(),
266                 )
267                 for k, v in self.__dict__.items()
268                 if not k.startswith("_")
269             ),
270         )
271
272     decode: Callable[["GPS303Pkt", int, bytes], None]
273
274     def in_decode(self, length: int, packet: bytes) -> None:
275         # Overridden in subclasses, otherwise do not decode payload
276         return
277
278     def out_decode(self, length: int, packet: bytes) -> None:
279         # Overridden in subclasses, otherwise do not decode payload
280         return
281
282     encode: Callable[["GPS303Pkt"], bytes]
283
284     def in_encode(self) -> bytes:
285         # Necessary to emulate terminal, which is not implemented
286         raise NotImplementedError(
287             self.__class__.__name__ + ".encode() not implemented"
288         )
289
290     def out_encode(self) -> bytes:
291         # Overridden in subclasses, otherwise make empty payload
292         return b""
293
294     @property
295     def packed(self) -> bytes:
296         payload = self.encode()
297         length = getattr(self, "length", len(payload) + 1)
298         return pack("BB", length, self.PROTO) + payload
299
300
301 class UNKNOWN(GPS303Pkt):
302     PROTO = 256  # > 255 is impossible in real packets
303
304
305 class LOGIN(GPS303Pkt):
306     PROTO = 0x01
307     RESPOND = Respond.INL
308     # Default response for ACK, can also respond with STOP_UPLOAD
309     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
310
311     def in_decode(self, length: int, payload: bytes) -> None:
312         self.imei = payload[:8].ljust(8, b"\0").hex()
313         self.ver = payload[8]
314
315     def in_encode(self) -> bytes:
316         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
317             "B", self.ver
318         )
319
320
321 class SUPERVISION(GPS303Pkt):
322     PROTO = 0x05
323     OUT_KWARGS = (("status", int, 1),)
324
325     def out_encode(self) -> bytes:
326         # 1: The device automatically answers Pickup effect
327         # 2: Automatically Answering Two-way Calls
328         # 3: Ring manually answer the two-way call
329         return pack("B", self.status)
330
331
332 class HEARTBEAT(GPS303Pkt):
333     PROTO = 0x08
334     RESPOND = Respond.INL
335
336
337 class _GPS_POSITIONING(GPS303Pkt):
338     RESPOND = Respond.INL
339
340     def in_decode(self, length: int, payload: bytes) -> None:
341         self.dtime = payload[:6]
342         if self.dtime == b"\0\0\0\0\0\0":
343             self.devtime = None
344         else:
345             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
346             self.devtime = datetime(
347                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
348             )
349         self.gps_data_length = payload[6] >> 4
350         self.gps_nb_sat = payload[6] & 0x0F
351         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
352         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
353         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
354         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
355         self.heading = flags & 0b0000001111111111  # bits 6 - last
356         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
357         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
358         self.speed = speed
359         self.flags = flags
360
361     def out_encode(self) -> bytes:
362         tup = datetime.utcnow().timetuple()
363         ttup = (tup[0] % 100,) + tup[1:6]
364         return pack("BBBBBB", *ttup)
365
366     def rectified(self) -> Dict[str, Any]:  # JSON-able dict
367         return {
368             "type": "location",
369             "devtime": str(self.devtime),
370             "speed": self.speed,
371             "direction": self.heading,
372             "latitude": self.latitude,
373             "longitude": self.longitude,
374         }
375
376
377 class GPS_POSITIONING(_GPS_POSITIONING):
378     PROTO = 0x10
379
380
381 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
382     PROTO = 0x11
383
384
385 class STATUS(GPS303Pkt):
386     PROTO = 0x13
387     RESPOND = Respond.EXT
388     IN_KWARGS = (
389         ("batt", int, 100),
390         ("ver", int, 0),
391         ("timezone", int, 0),
392         ("intvl", int, 0),
393         ("signal", maybe(int), None),
394     )
395     OUT_KWARGS = (("upload_interval", int, 25),)
396
397     def in_decode(self, length: int, payload: bytes) -> None:
398         self.batt, self.ver, self.timezone, self.intvl = unpack(
399             "BBBB", payload[:4]
400         )
401         if len(payload) > 4:
402             self.signal: Optional[int] = payload[4]
403         else:
404             self.signal = None
405
406     def in_encode(self) -> bytes:
407         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
408             b"" if self.signal is None else pack("B", self.signal)
409         )
410
411     def out_encode(self) -> bytes:  # Set interval in minutes
412         return pack("B", self.upload_interval)
413
414
415 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
416     PROTO = 0x14
417
418     def in_encode(self) -> bytes:
419         return b""
420
421
422 class RESET(GPS303Pkt):
423     # Device sends when it got reset SMS
424     # Server can send to initiate factory reset
425     PROTO = 0x15
426
427
428 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
429     PROTO = 0x16
430     OUT_KWARGS = (("number", int, 3),)
431
432     def out_encode(self) -> bytes:  # Number of whitelist entries
433         return pack("B", self.number)
434
435
436 class _WIFI_POSITIONING(GPS303Pkt):
437     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
438         # IN_KWARGS = (
439         ("dtime", bytes, b"\0\0\0\0\0\0"),
440         ("wifi_aps", list, []),
441         ("mcc", int, 0),
442         ("mnc", int, 0),
443         ("gsm_cells", list, []),
444     )
445
446     def in_decode(self, length: int, payload: bytes) -> None:
447         self.dtime = payload[:6]
448         if self.dtime == b"\0\0\0\0\0\0":
449             self.devtime = None
450         else:
451             self.devtime = datetime.strptime(
452                 self.dtime.hex(), "%y%m%d%H%M%S"
453             ).astimezone(tz=timezone.utc)
454         self.wifi_aps = []
455         for i in range(self.length):  # length has special meaning here
456             slice = payload[6 + i * 7 : 13 + i * 7]
457             self.wifi_aps.append(
458                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
459             )
460         gsm_slice = payload[6 + self.length * 7 :]
461         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
462         self.gsm_cells = []
463         for i in range(ncells):
464             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
465             locac, cellid, sigstr = unpack(
466                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
467             )
468             self.gsm_cells.append((locac, cellid, -sigstr))
469
470     def in_encode(self) -> bytes:
471         self.length = len(self.wifi_aps)
472         return b"".join(
473             [
474                 self.dtime,
475                 b"".join(
476                     [
477                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
478                         + pack("B", -sigstr)
479                         for mac, sigstr in self.wifi_aps
480                     ]
481                 ),
482                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
483                 b"".join(
484                     [
485                         pack("!HHB", locac, cellid, -sigstr)
486                         for locac, cellid, sigstr in self.gsm_cells
487                     ]
488                 ),
489             ]
490         )
491
492     def rectified(self) -> Dict[str, Any]:  # JSON-able dict
493         return {
494             "type": "approximate_location",
495             "devtime": str(self.devtime),
496             "mcc": self.mcc,
497             "mnc": self.mnc,
498             "base_stations": self.gsm_cells,
499             "wifi_aps": self.wifi_aps,
500         }
501
502
503 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
504     PROTO = 0x17
505     RESPOND = Respond.INL
506
507     def out_encode(self) -> bytes:
508         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
509
510
511 class TIME(GPS303Pkt):
512     PROTO = 0x30
513     RESPOND = Respond.INL
514
515     def out_encode(self) -> bytes:
516         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
517
518
519 class PROHIBIT_LBS(GPS303Pkt):
520     PROTO = 0x33
521     OUT_KWARGS = (("status", int, 1),)
522
523     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
524         return pack("B", self.status)
525
526
527 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
528     PROTO = 0x34
529
530     OUT_KWARGS = (
531         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
532         ("gps_interval_set", boolx, False),
533         ("gps_interval", hhmmhhmm, "00000000"),
534         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
535         ("boot_time_set", boolx, False),
536         ("boot_time", hhmm, "0000"),
537         ("shut_time_set", boolx, False),
538         ("shut_time", hhmm, "0000"),
539     )
540
541     def out_encode(self) -> bytes:
542         return (
543             pack("B", self.gps_off)
544             + pack("B", self.gps_interval_set)
545             + bytes.fromhex(self.gps_interval)
546             + pack("B", self.lbs_off)
547             + pack("B", self.boot_time_set)
548             + bytes.fromhex(self.boot_time)
549             + pack("B", self.shut_time_set)
550             + bytes.fromhex(self.shut_time)
551         )
552
553
554 class _SET_PHONE(GPS303Pkt):
555     OUT_KWARGS = (("phone", str, ""),)
556
557     def out_encode(self) -> bytes:
558         self.phone: str
559         return self.phone.encode("")
560
561
562 class REMOTE_MONITOR_PHONE(_SET_PHONE):
563     PROTO = 0x40
564
565
566 class SOS_PHONE(_SET_PHONE):
567     PROTO = 0x41
568
569
570 class DAD_PHONE(_SET_PHONE):
571     PROTO = 0x42
572
573
574 class MOM_PHONE(_SET_PHONE):
575     PROTO = 0x43
576
577
578 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
579     PROTO = 0x44
580
581
582 class GPS_OFF_PERIOD(GPS303Pkt):
583     PROTO = 0x46
584     OUT_KWARGS = (
585         ("onoff", int, 0),
586         ("fm", hhmm, "0000"),
587         ("to", hhmm, "2359"),
588     )
589
590     def out_encode(self) -> bytes:
591         return (
592             pack("B", self.onoff)
593             + bytes.fromhex(self.fm)
594             + bytes.fromhex(self.to)
595         )
596
597
598 class DND_PERIOD(GPS303Pkt):
599     PROTO = 0x47
600     OUT_KWARGS = (
601         ("onoff", int, 0),
602         ("week", int, 3),
603         ("fm1", hhmm, "0000"),
604         ("to1", hhmm, "2359"),
605         ("fm2", hhmm, "0000"),
606         ("to2", hhmm, "2359"),
607     )
608
609     def out_encode(self) -> bytes:
610         return (
611             pack("B", self.onoff)
612             + pack("B", self.week)
613             + bytes.fromhex(self.fm1)
614             + bytes.fromhex(self.to1)
615             + bytes.fromhex(self.fm2)
616             + bytes.fromhex(self.to2)
617         )
618
619
620 class RESTART_SHUTDOWN(GPS303Pkt):
621     PROTO = 0x48
622     OUT_KWARGS = (("flag", int, 0),)
623
624     def out_encode(self) -> bytes:
625         # 1 - restart
626         # 2 - shutdown
627         return pack("B", self.flag)
628
629
630 class DEVICE(GPS303Pkt):
631     PROTO = 0x49
632     OUT_KWARGS = (("flag", int, 0),)
633
634     # 0 - Stop looking for equipment
635     # 1 - Start looking for equipment
636     def out_encode(self) -> bytes:
637         return pack("B", self.flag)
638
639
640 class ALARM_CLOCK(GPS303Pkt):
641     PROTO = 0x50
642     OUT_KWARGS: Tuple[
643         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
644     ] = (
645         ("alarms", l3alarms, []),
646     )
647
648     def out_encode(self) -> bytes:
649         return b"".join(
650             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
651         )
652
653
654 class STOP_ALARM(GPS303Pkt):
655     PROTO = 0x56
656
657     def in_decode(self, length: int, payload: bytes) -> None:
658         self.flag = payload[0]
659
660
661 class SETUP(GPS303Pkt):
662     PROTO = 0x57
663     RESPOND = Respond.EXT
664     OUT_KWARGS = (
665         ("uploadintervalseconds", intx, 0x0300),
666         ("binaryswitch", intx, 0b00110001),
667         ("alarms", l3int, [0, 0, 0]),
668         ("dndtimeswitch", int, 0),
669         ("dndtimes", l3int, [0, 0, 0]),
670         ("gpstimeswitch", int, 0),
671         ("gpstimestart", int, 0),
672         ("gpstimestop", int, 0),
673         ("phonenumbers", l3str, ["", "", ""]),
674     )
675
676     def out_encode(self) -> bytes:
677         def pack3b(x: int) -> bytes:
678             return pack("!I", x)[1:]
679
680         return b"".join(
681             [
682                 pack("!H", self.uploadintervalseconds),
683                 pack("B", self.binaryswitch),
684             ]
685             + [pack3b(el) for el in self.alarms]
686             + [
687                 pack("B", self.dndtimeswitch),
688             ]
689             + [pack3b(el) for el in self.dndtimes]
690             + [
691                 pack("B", self.gpstimeswitch),
692                 pack("!H", self.gpstimestart),
693                 pack("!H", self.gpstimestop),
694             ]
695             + [b";".join([el.encode() for el in self.phonenumbers])]
696         )
697
698     def in_encode(self) -> bytes:
699         return b""
700
701
702 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
703     PROTO = 0x58
704
705
706 class RESTORE_PASSWORD(GPS303Pkt):
707     PROTO = 0x67
708
709
710 class WIFI_POSITIONING(_WIFI_POSITIONING):
711     PROTO = 0x69
712     RESPOND = Respond.EXT
713     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
714
715     def out_encode(self) -> bytes:
716         if self.latitude is None or self.longitude is None:
717             return b""
718         return "{:+#010.8g},{:+#010.8g}".format(
719             self.latitude, self.longitude
720         ).encode()
721
722     def out_decode(self, length: int, payload: bytes) -> None:
723         lat, lon = payload.decode().split(",")
724         self.latitude = float(lat)
725         self.longitude = float(lon)
726
727
728 class MANUAL_POSITIONING(GPS303Pkt):
729     PROTO = 0x80
730
731     def in_decode(self, length: int, payload: bytes) -> None:
732         self.flag = payload[0] if len(payload) > 0 else -1
733         self.reason = {
734             1: "Incorrect time",
735             2: "LBS less",
736             3: "WiFi less",
737             4: "LBS search > 3 times",
738             5: "Same LBS and WiFi data",
739             6: "LBS prohibited, WiFi absent",
740             7: "GPS spacing < 50 m",
741         }.get(self.flag, "Unknown")
742
743
744 class BATTERY_CHARGE(GPS303Pkt):
745     PROTO = 0x81
746
747
748 class CHARGER_CONNECTED(GPS303Pkt):
749     PROTO = 0x82
750
751
752 class CHARGER_DISCONNECTED(GPS303Pkt):
753     PROTO = 0x83
754
755
756 class VIBRATION_RECEIVED(GPS303Pkt):
757     PROTO = 0x94
758
759
760 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
761     PROTO = 0x98
762     RESPOND = Respond.EXT
763     OUT_KWARGS = (("interval", int, 10),)
764
765     def in_decode(self, length: int, payload: bytes) -> None:
766         self.interval = unpack("!H", payload[:2])
767
768     def out_encode(self) -> bytes:
769         return pack("!H", self.interval)
770
771
772 class SOS_ALARM(GPS303Pkt):
773     PROTO = 0x99
774
775
776 class UNKNOWN_B3(GPS303Pkt):
777     PROTO = 0xB3
778     IN_KWARGS = (("asciidata", str, ""),)
779
780     def in_decode(self, length: int, payload: bytes) -> None:
781         self.asciidata = payload.decode()
782
783
784 # Build dicts protocol number -> class and class name -> protocol number
785 CLASSES = {}
786 PROTOS = {}
787 if True:  # just to indent the code, sorry!
788     for cls in [
789         cls
790         for name, cls in globals().items()
791         if isclass(cls)
792         and issubclass(cls, GPS303Pkt)
793         and not name.startswith("_")
794     ]:
795         if hasattr(cls, "PROTO"):
796             CLASSES[cls.PROTO] = cls
797             PROTOS[cls.__name__] = cls.PROTO
798
799
800 def class_by_prefix(
801     prefix: str,
802 ) -> Union[Type[GPS303Pkt], List[str]]:
803     if prefix.startswith(PROTO_PREFIX):
804         pname = prefix[len(PROTO_PREFIX) :]
805     else:
806         raise KeyError(pname)
807     lst = [
808         (name, proto)
809         for name, proto in PROTOS.items()
810         if name.upper().startswith(prefix.upper())
811     ]
812     if len(lst) != 1:
813         return [name for name, _ in lst]
814     _, proto = lst[0]
815     return CLASSES[proto]
816
817
818 def proto_handled(proto: str) -> bool:
819     return proto.startswith(PROTO_PREFIX)
820
821
822 def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
823     return PROTO_PREFIX + (
824         obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
825     )
826
827
828 def proto_of_message(packet: bytes) -> str:
829     return proto_name(CLASSES.get(packet[1], UNKNOWN))
830
831
832 def imei_from_packet(packet: bytes) -> Optional[str]:
833     if packet[1] == LOGIN.PROTO:
834         msg = parse_message(packet)
835         if isinstance(msg, LOGIN):
836             return msg.imei
837     return None
838
839
840 def is_goodbye_packet(packet: bytes) -> bool:
841     return packet[1] == HIBERNATION.PROTO
842
843
844 def inline_response(packet: bytes) -> Optional[bytes]:
845     proto = packet[1]
846     if proto in CLASSES:
847         cls = CLASSES[proto]
848         if cls.RESPOND is Respond.INL:
849             return cls.Out().packed
850     return None
851
852
853 def probe_buffer(buffer: bytes) -> bool:
854     framestart = buffer.find(b"xx")
855     if framestart < 0:
856         return False
857     if len(buffer) - framestart < 6:
858         return False
859     return True
860
861
862 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
863     """From a packet (without framing bytes) derive the XXX.In object"""
864     length, proto = unpack("BB", packet[:2])
865     payload = packet[2:]
866     if proto not in CLASSES:
867         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
868             f"Proto {proto} is unknown"
869         )
870     else:
871         try:
872             if is_incoming:
873                 return CLASSES[proto].In(length, payload)
874             else:
875                 return CLASSES[proto].Out(length, payload)
876         except (DecodeError, ValueError, IndexError) as e:
877             cause = e
878     if is_incoming:
879         retobj = UNKNOWN.In(length, payload)
880     else:
881         retobj = UNKNOWN.Out(length, payload)
882     retobj.PROTO = proto  # Override class attr with object attr
883     retobj.cause = cause
884     return retobj
885
886
887 def exposed_protos() -> List[Tuple[str, bool]]:
888     return [
889         (proto_name(cls), cls.RESPOND is Respond.EXT)
890         for cls in CLASSES.values()
891         if hasattr(cls, "rectified")
892     ]