]> www.average.org Git - loctrkd.git/blob - loctrkd/zx303proto.py
Implement sending commands from the web interface
[loctrkd.git] / loctrkd / zx303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from time import time
22 from types import SimpleNamespace
23 from typing import (
24     Any,
25     Callable,
26     Dict,
27     List,
28     Optional,
29     Tuple,
30     Type,
31     TYPE_CHECKING,
32     Union,
33 )
34
35 from .common import CoordReport, HintReport, StatusReport
36 from .protomodule import ProtoClass
37
38 __all__ = (
39     "Stream",
40     "class_by_prefix",
41     "enframe",
42     "exposed_protos",
43     "inline_response",
44     "proto_handled",
45     "parse_message",
46     "probe_buffer",
47     "DecodeError",
48     "Respond",
49 )
50
51 MODNAME = __name__.split(".")[-1]
52 PROTO_PREFIX: str = "ZX:"
53
54 ### Deframer ###
55
56 MAXBUFFER: int = 4096
57
58
59 class Stream:
60     def __init__(self) -> None:
61         self.buffer = b""
62
63     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
64         """
65         Process next segment of the stream. Return successfully deframed
66         packets as `bytes` and error messages as `str`.
67         """
68         when = time()
69         self.buffer += segment
70         if len(self.buffer) > MAXBUFFER:
71             # We are receiving junk. Let's drop it or we run out of memory.
72             self.buffer = b""
73             return [f"More than {MAXBUFFER} unparseable data, dropping"]
74         msgs: List[Union[bytes, str]] = []
75         while True:
76             framestart = self.buffer.find(b"xx")
77             if framestart == -1:  # No frames, return whatever we have
78                 break
79             if framestart > 0:  # Should not happen, report
80                 msgs.append(
81                     f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
82                 )
83                 self.buffer = self.buffer[framestart:]
84             # At this point, buffer starts with a packet
85             if len(self.buffer) < 6:  # no len and proto - cannot proceed
86                 break
87             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
88             frameend = 0
89             # Length field can legitimeely be much less than the
90             # length of the packet (e.g. WiFi positioning), but
91             # it _should not_ be greater. Still sometimes it is.
92             # Luckily, not by too much: by maybe two or three bytes?
93             # Do this embarrassing hack to avoid accidental match
94             # of some binary data in the packet against '\r\n'.
95             while True:
96                 frameend = self.buffer.find(b"\r\n", frameend + 1)
97                 if frameend == -1 or frameend >= (
98                     exp_end - 3
99                 ):  # Found realistic match or none
100                     break
101             if frameend == -1:  # Incomplete frame, return what we have
102                 break
103             packet = self.buffer[2:frameend]
104             self.buffer = self.buffer[frameend + 2 :]
105             if len(packet) < 2:  # frameend comes too early
106                 msgs.append(f"Packet too short: {packet.hex()}")
107             else:
108                 msgs.append(packet)
109         return msgs
110
111     def close(self) -> bytes:
112         ret = self.buffer
113         self.buffer = b""
114         return ret
115
116
117 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
118     return b"xx" + buffer + b"\r\n"
119
120
121 ### Parser/Constructor ###
122
123
124 class DecodeError(Exception):
125     def __init__(self, e: Exception, **kwargs: Any) -> None:
126         super().__init__(e)
127         for k, v in kwargs.items():
128             setattr(self, k, v)
129
130
131 def maybe(typ: type) -> Callable[[Any], Any]:
132     return lambda x: None if x is None else typ(x)
133
134
135 def intx(x: Union[str, int]) -> int:
136     if isinstance(x, str):
137         x = int(x, 0)
138     return x
139
140
141 def boolx(x: Union[str, bool]) -> bool:
142     if isinstance(x, str):
143         if x.upper() in ("ON", "TRUE", "1"):
144             return True
145         if x.upper() in ("OFF", "FALSE", "0"):
146             return False
147         raise ValueError(str(x) + " could not be parsed as a Boolean")
148     return x
149
150
151 def hhmm(x: str) -> str:
152     """Check for the string that represents hours and minutes"""
153     if not isinstance(x, str) or len(x) != 4:
154         raise ValueError(str(x) + " is not a four-character string")
155     hh = int(x[:2])
156     mm = int(x[2:])
157     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
158         raise ValueError(str(x) + " does not contain valid hours and minutes")
159     return x
160
161
162 def hhmmhhmm(x: str) -> str:
163     """Check for the string that represents hours and minutes twice"""
164     if not isinstance(x, str) or len(x) != 8:
165         raise ValueError(str(x) + " is not an eight-character string")
166     return hhmm(x[:4]) + hhmm(x[4:])
167
168
169 def l3str(x: Union[str, List[str]]) -> List[str]:
170     if isinstance(x, str):
171         lx = x.split(",")
172     else:
173         lx = x
174     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
175         raise ValueError(str(lx) + " is not a list of three strings")
176     return lx
177
178
179 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
180     def alrmspec(sub: str) -> Tuple[int, str]:
181         if len(sub) != 7:
182             raise ValueError(sub + " does not represent day and time")
183         return (
184             {
185                 "MON": 1,
186                 "TUE": 2,
187                 "WED": 3,
188                 "THU": 4,
189                 "FRI": 5,
190                 "SAT": 6,
191                 "SUN": 7,
192             }[sub[:3].upper()],
193             sub[3:],
194         )
195
196     if isinstance(x, str):
197         lx = [alrmspec(sub) for sub in x.split(",")]
198     else:
199         lx = x
200     lx.extend([(0, "0000") for _ in range(3 - len(lx))])
201     if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
202         raise ValueError(str(lx) + " is a wrong alarms specification")
203     return [(d, hhmm(tm)) for d, tm in lx]
204
205
206 def l3int(x: Union[str, List[int]]) -> List[int]:
207     if isinstance(x, str):
208         lx = [int(el) for el in x.split(",")]
209     else:
210         lx = x
211     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
212         raise ValueError(str(lx) + " is not a list of three integers")
213     return lx
214
215
216 class Respond(Enum):
217     NON = 0  # Incoming, no response needed
218     INL = 1  # Birirectional, use `inline_response()`
219     EXT = 2  # Birirectional, use external responder
220
221
222 class GPS303Pkt(ProtoClass):
223     RESPOND = Respond.NON  # Do not send anything back by default
224     PROTO: int
225     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
226     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
227     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
228     In: Type["GPS303Pkt"]
229     Out: Type["GPS303Pkt"]
230
231     if TYPE_CHECKING:
232
233         def __getattr__(self, name: str) -> Any:
234             pass
235
236         def __setattr__(self, name: str, value: Any) -> None:
237             pass
238
239     def __init__(self, *args: Any, **kwargs: Any):
240         """
241         Construct the object _either_ from (length, payload),
242         _or_ from the values of individual fields
243         """
244         assert not args or (len(args) == 2 and not kwargs)
245         if args:  # guaranteed to be two arguments at this point
246             self.length, self.payload = args
247             try:
248                 self.decode(self.length, self.payload)
249             except error as e:
250                 raise DecodeError(e, obj=self)
251         else:
252             for kw, typ, dfl in self.KWARGS:
253                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
254             if kwargs:
255                 raise ValueError(
256                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
257                 )
258
259     def __repr__(self) -> str:
260         return "{}({})".format(
261             self.__class__.__name__,
262             ", ".join(
263                 "{}={}".format(
264                     k,
265                     'bytes.fromhex("{}")'.format(v.hex())
266                     if isinstance(v, bytes)
267                     else v.__repr__(),
268                 )
269                 for k, v in self.__dict__.items()
270                 if not k.startswith("_")
271             ),
272         )
273
274     decode: Callable[["GPS303Pkt", int, bytes], None]
275
276     def in_decode(self, length: int, packet: bytes) -> None:
277         # Overridden in subclasses, otherwise do not decode payload
278         return
279
280     def out_decode(self, length: int, packet: bytes) -> None:
281         # Overridden in subclasses, otherwise do not decode payload
282         return
283
284     encode: Callable[["GPS303Pkt"], bytes]
285
286     def in_encode(self) -> bytes:
287         # Necessary to emulate terminal, which is not implemented
288         raise NotImplementedError(
289             self.__class__.__name__ + ".encode() not implemented"
290         )
291
292     def out_encode(self) -> bytes:
293         # Overridden in subclasses, otherwise make empty payload
294         return b""
295
296     @classmethod
297     def proto_name(cls) -> str:
298         """Name of the command as used externally"""
299         return (PROTO_PREFIX + cls.__name__)[:16]
300
301     @property
302     def packed(self) -> bytes:
303         payload = self.encode()
304         length = getattr(self, "length", len(payload) + 1)
305         return pack("BB", length, self.PROTO) + payload
306
307
308 class UNKNOWN(GPS303Pkt):
309     PROTO = 256  # > 255 is impossible in real packets
310
311
312 class LOGIN(GPS303Pkt):
313     PROTO = 0x01
314     RESPOND = Respond.INL
315     # Default response for ACK, can also respond with STOP_UPLOAD
316     IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
317
318     def in_decode(self, length: int, payload: bytes) -> None:
319         self.imei = payload[:8].ljust(8, b"\0").hex()
320         self.ver = payload[8]
321
322     def in_encode(self) -> bytes:
323         return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
324             "B", self.ver
325         )
326
327
328 class SUPERVISION(GPS303Pkt):
329     PROTO = 0x05
330     OUT_KWARGS = (("status", int, 1),)
331
332     def out_encode(self) -> bytes:
333         # 1: The device automatically answers Pickup effect
334         # 2: Automatically Answering Two-way Calls
335         # 3: Ring manually answer the two-way call
336         return pack("B", self.status)
337
338
339 class HEARTBEAT(GPS303Pkt):
340     PROTO = 0x08
341     RESPOND = Respond.INL
342
343
344 class _GPS_POSITIONING(GPS303Pkt):
345     RESPOND = Respond.INL
346
347     def in_decode(self, length: int, payload: bytes) -> None:
348         self.dtime = payload[:6]
349         if self.dtime == b"\0\0\0\0\0\0":
350             self.devtime = None
351         else:
352             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
353             self.devtime = datetime(
354                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
355             )
356         self.gps_data_length = payload[6] >> 4
357         self.gps_nb_sat = payload[6] & 0x0F
358         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
359         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
360         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
361         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
362         self.heading = flags & 0b0000001111111111  # bits 6 - last
363         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
364         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
365         self.speed = speed
366         self.flags = flags
367
368     def out_encode(self) -> bytes:
369         tup = datetime.utcnow().timetuple()
370         ttup = (tup[0] % 100,) + tup[1:6]
371         return pack("BBBBBB", *ttup)
372
373     def rectified(self) -> Tuple[str, CoordReport]:  # JSON-able dict
374         return MODNAME, CoordReport(
375             devtime=str(self.devtime),
376             battery_percentage=None,
377             accuracy=None,
378             altitude=None,
379             speed=self.speed,
380             direction=self.heading,
381             latitude=self.latitude,
382             longitude=self.longitude,
383         )
384
385
386 class GPS_POSITIONING(_GPS_POSITIONING):
387     PROTO = 0x10
388
389
390 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
391     PROTO = 0x11
392
393
394 class STATUS(GPS303Pkt):
395     PROTO = 0x13
396     RESPOND = Respond.EXT
397     IN_KWARGS = (
398         ("batt", int, 100),
399         ("ver", int, 0),
400         ("timezone", int, 0),
401         ("intvl", int, 0),
402         ("signal", maybe(int), None),
403     )
404     OUT_KWARGS = (("upload_interval", int, 25),)
405
406     def in_decode(self, length: int, payload: bytes) -> None:
407         self.batt, self.ver, self.timezone, self.intvl = unpack(
408             "BBBB", payload[:4]
409         )
410         if len(payload) > 4:
411             self.signal: Optional[int] = payload[4]
412         else:
413             self.signal = None
414
415     def in_encode(self) -> bytes:
416         return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
417             b"" if self.signal is None else pack("B", self.signal)
418         )
419
420     def out_encode(self) -> bytes:  # Set interval in minutes
421         return pack("B", self.upload_interval)
422
423     def rectified(self) -> Tuple[str, StatusReport]:
424         return MODNAME, StatusReport(battery_percentage=self.batt)
425
426
427 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
428     PROTO = 0x14
429
430     def in_encode(self) -> bytes:
431         return b""
432
433
434 class RESET(GPS303Pkt):
435     # Device sends when it got reset SMS
436     # Server can send to initiate factory reset
437     PROTO = 0x15
438
439
440 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
441     PROTO = 0x16
442     OUT_KWARGS = (("number", int, 3),)
443
444     def out_encode(self) -> bytes:  # Number of whitelist entries
445         return pack("B", self.number)
446
447
448 class _WIFI_POSITIONING(GPS303Pkt):
449     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
450         # IN_KWARGS = (
451         ("dtime", bytes, b"\0\0\0\0\0\0"),
452         ("wifi_aps", list, []),
453         ("mcc", int, 0),
454         ("mnc", int, 0),
455         ("gsm_cells", list, []),
456     )
457
458     def in_decode(self, length: int, payload: bytes) -> None:
459         self.dtime = payload[:6]
460         if self.dtime == b"\0\0\0\0\0\0":
461             self.devtime = None
462         else:
463             self.devtime = datetime.strptime(
464                 self.dtime.hex(), "%y%m%d%H%M%S"
465             ).astimezone(tz=timezone.utc)
466         self.wifi_aps = []
467         for i in range(self.length):  # length has special meaning here
468             slice = payload[6 + i * 7 : 13 + i * 7]
469             self.wifi_aps.append(
470                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
471             )
472         gsm_slice = payload[6 + self.length * 7 :]
473         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
474         self.gsm_cells = []
475         for i in range(ncells):
476             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
477             locac, cellid, sigstr = unpack(
478                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
479             )
480             self.gsm_cells.append((locac, cellid, -sigstr))
481
482     def in_encode(self) -> bytes:
483         self.length = len(self.wifi_aps)
484         return b"".join(
485             [
486                 self.dtime,
487                 b"".join(
488                     [
489                         bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
490                         + pack("B", -sigstr)
491                         for mac, sigstr in self.wifi_aps
492                     ]
493                 ),
494                 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
495                 b"".join(
496                     [
497                         pack("!HHB", locac, cellid, -sigstr)
498                         for locac, cellid, sigstr in self.gsm_cells
499                     ]
500                 ),
501             ]
502         )
503
504     def rectified(self) -> Tuple[str, HintReport]:
505         return MODNAME, HintReport(
506             devtime=str(self.devtime),
507             battery_percentage=None,
508             mcc=self.mcc,
509             mnc=self.mnc,
510             gsm_cells=self.gsm_cells,
511             wifi_aps=[("<UNKNOWN>", mac, sig) for mac, sig in self.wifi_aps],
512         )
513
514
515 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
516     PROTO = 0x17
517     RESPOND = Respond.INL
518
519     def out_encode(self) -> bytes:
520         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
521
522
523 class TIME(GPS303Pkt):
524     PROTO = 0x30
525     RESPOND = Respond.INL
526
527     def out_encode(self) -> bytes:
528         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
529
530
531 class PROHIBIT_LBS(GPS303Pkt):
532     PROTO = 0x33
533     OUT_KWARGS = (("status", int, 1),)
534
535     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
536         return pack("B", self.status)
537
538
539 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
540     PROTO = 0x34
541
542     OUT_KWARGS = (
543         ("gps_off", boolx, False),  # Clarify the meaning of 0/1
544         ("gps_interval_set", boolx, False),
545         ("gps_interval", hhmmhhmm, "00000000"),
546         ("lbs_off", boolx, False),  # Clarify the meaning of 0/1
547         ("boot_time_set", boolx, False),
548         ("boot_time", hhmm, "0000"),
549         ("shut_time_set", boolx, False),
550         ("shut_time", hhmm, "0000"),
551     )
552
553     def out_encode(self) -> bytes:
554         return (
555             pack("B", self.gps_off)
556             + pack("B", self.gps_interval_set)
557             + bytes.fromhex(self.gps_interval)
558             + pack("B", self.lbs_off)
559             + pack("B", self.boot_time_set)
560             + bytes.fromhex(self.boot_time)
561             + pack("B", self.shut_time_set)
562             + bytes.fromhex(self.shut_time)
563         )
564
565
566 class _SET_PHONE(GPS303Pkt):
567     OUT_KWARGS = (("phone", str, ""),)
568
569     def out_encode(self) -> bytes:
570         self.phone: str
571         return self.phone.encode("")
572
573
574 class REMOTE_MONITOR_PHONE(_SET_PHONE):
575     PROTO = 0x40
576
577
578 class SOS_PHONE(_SET_PHONE):
579     PROTO = 0x41
580
581
582 class DAD_PHONE(_SET_PHONE):
583     PROTO = 0x42
584
585
586 class MOM_PHONE(_SET_PHONE):
587     PROTO = 0x43
588
589
590 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
591     PROTO = 0x44
592
593
594 class GPS_OFF_PERIOD(GPS303Pkt):
595     PROTO = 0x46
596     OUT_KWARGS = (
597         ("onoff", int, 0),
598         ("fm", hhmm, "0000"),
599         ("to", hhmm, "2359"),
600     )
601
602     def out_encode(self) -> bytes:
603         return (
604             pack("B", self.onoff)
605             + bytes.fromhex(self.fm)
606             + bytes.fromhex(self.to)
607         )
608
609
610 class DND_PERIOD(GPS303Pkt):
611     PROTO = 0x47
612     OUT_KWARGS = (
613         ("onoff", int, 0),
614         ("week", int, 3),
615         ("fm1", hhmm, "0000"),
616         ("to1", hhmm, "2359"),
617         ("fm2", hhmm, "0000"),
618         ("to2", hhmm, "2359"),
619     )
620
621     def out_encode(self) -> bytes:
622         return (
623             pack("B", self.onoff)
624             + pack("B", self.week)
625             + bytes.fromhex(self.fm1)
626             + bytes.fromhex(self.to1)
627             + bytes.fromhex(self.fm2)
628             + bytes.fromhex(self.to2)
629         )
630
631
632 class RESTART_SHUTDOWN(GPS303Pkt):
633     PROTO = 0x48
634     OUT_KWARGS = (("flag", int, 0),)
635
636     def out_encode(self) -> bytes:
637         # 1 - restart
638         # 2 - shutdown
639         return pack("B", self.flag)
640
641
642 class DEVICE(GPS303Pkt):
643     PROTO = 0x49
644     OUT_KWARGS = (("flag", int, 0),)
645
646     # 0 - Stop looking for equipment
647     # 1 - Start looking for equipment
648     def out_encode(self) -> bytes:
649         return pack("B", self.flag)
650
651
652 class ALARM_CLOCK(GPS303Pkt):
653     PROTO = 0x50
654     OUT_KWARGS: Tuple[
655         Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
656     ] = (
657         ("alarms", l3alarms, []),
658     )
659
660     def out_encode(self) -> bytes:
661         return b"".join(
662             pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
663         )
664
665
666 class STOP_ALARM(GPS303Pkt):
667     PROTO = 0x56
668
669     def in_decode(self, length: int, payload: bytes) -> None:
670         self.flag = payload[0]
671
672
673 class SETUP(GPS303Pkt):
674     PROTO = 0x57
675     RESPOND = Respond.EXT
676     OUT_KWARGS = (
677         ("uploadintervalseconds", intx, 0x0300),
678         ("binaryswitch", intx, 0b00110001),
679         ("alarms", l3int, [0, 0, 0]),
680         ("dndtimeswitch", int, 0),
681         ("dndtimes", l3int, [0, 0, 0]),
682         ("gpstimeswitch", int, 0),
683         ("gpstimestart", int, 0),
684         ("gpstimestop", int, 0),
685         ("phonenumbers", l3str, ["", "", ""]),
686     )
687
688     def out_encode(self) -> bytes:
689         def pack3b(x: int) -> bytes:
690             return pack("!I", x)[1:]
691
692         return b"".join(
693             [
694                 pack("!H", self.uploadintervalseconds),
695                 pack("B", self.binaryswitch),
696             ]
697             + [pack3b(el) for el in self.alarms]
698             + [
699                 pack("B", self.dndtimeswitch),
700             ]
701             + [pack3b(el) for el in self.dndtimes]
702             + [
703                 pack("B", self.gpstimeswitch),
704                 pack("!H", self.gpstimestart),
705                 pack("!H", self.gpstimestop),
706             ]
707             + [b";".join([el.encode() for el in self.phonenumbers])]
708         )
709
710     def in_encode(self) -> bytes:
711         return b""
712
713
714 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
715     PROTO = 0x58
716
717
718 class RESTORE_PASSWORD(GPS303Pkt):
719     PROTO = 0x67
720
721
722 class WIFI_POSITIONING(_WIFI_POSITIONING):
723     PROTO = 0x69
724     RESPOND = Respond.EXT
725     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
726
727     def out_encode(self) -> bytes:
728         if self.latitude is None or self.longitude is None:
729             return b""
730         return "{:+#010.8g},{:+#010.8g}".format(
731             self.latitude, self.longitude
732         ).encode()
733
734     def out_decode(self, length: int, payload: bytes) -> None:
735         lat, lon = payload.decode().split(",")
736         self.latitude = float(lat)
737         self.longitude = float(lon)
738
739
740 class MANUAL_POSITIONING(GPS303Pkt):
741     PROTO = 0x80
742
743     def in_decode(self, length: int, payload: bytes) -> None:
744         self.flag = payload[0] if len(payload) > 0 else -1
745         self.reason = {
746             1: "Incorrect time",
747             2: "LBS less",
748             3: "WiFi less",
749             4: "LBS search > 3 times",
750             5: "Same LBS and WiFi data",
751             6: "LBS prohibited, WiFi absent",
752             7: "GPS spacing < 50 m",
753         }.get(self.flag, "Unknown")
754
755
756 class BATTERY_CHARGE(GPS303Pkt):
757     PROTO = 0x81
758
759
760 class CHARGER_CONNECTED(GPS303Pkt):
761     PROTO = 0x82
762
763
764 class CHARGER_DISCONNECTED(GPS303Pkt):
765     PROTO = 0x83
766
767
768 class VIBRATION_RECEIVED(GPS303Pkt):
769     PROTO = 0x94
770
771
772 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
773     PROTO = 0x98
774     RESPOND = Respond.EXT
775     OUT_KWARGS = (("interval", int, 10),)
776
777     def in_decode(self, length: int, payload: bytes) -> None:
778         self.interval = unpack("!H", payload[:2])
779
780     def out_encode(self) -> bytes:
781         return pack("!H", self.interval)
782
783
784 class SOS_ALARM(GPS303Pkt):
785     PROTO = 0x99
786
787
788 class UNKNOWN_B3(GPS303Pkt):
789     PROTO = 0xB3
790     IN_KWARGS = (("asciidata", str, ""),)
791
792     def in_decode(self, length: int, payload: bytes) -> None:
793         self.asciidata = payload.decode()
794
795
796 # Build dicts protocol number -> class and class name -> protocol number
797 CLASSES = {}
798 PROTOS = {}
799 if True:  # just to indent the code, sorry!
800     for cls in [
801         cls
802         for name, cls in globals().items()
803         if isclass(cls)
804         and issubclass(cls, GPS303Pkt)
805         and not name.startswith("_")
806     ]:
807         if hasattr(cls, "PROTO"):
808             CLASSES[cls.PROTO] = cls
809             PROTOS[cls.__name__] = cls.PROTO
810
811
812 def class_by_prefix(
813     prefix: str,
814 ) -> Union[Type[GPS303Pkt], List[str]]:
815     if prefix.startswith(PROTO_PREFIX):
816         pname = prefix[len(PROTO_PREFIX) :]
817     else:
818         raise KeyError(pname)
819     lst = [
820         (name, proto)
821         for name, proto in PROTOS.items()
822         if name.upper().startswith(prefix.upper())
823     ]
824     if len(lst) != 1:
825         return [name for name, _ in lst]
826     _, proto = lst[0]
827     return CLASSES[proto]
828
829
830 def proto_handled(proto: str) -> bool:
831     return proto.startswith(PROTO_PREFIX)
832
833
834 def proto_of_message(packet: bytes) -> str:
835     return CLASSES.get(packet[1], UNKNOWN).proto_name()
836
837
838 def imei_from_packet(packet: bytes) -> Optional[str]:
839     if packet[1] == LOGIN.PROTO:
840         msg = parse_message(packet)
841         if isinstance(msg, LOGIN):
842             return msg.imei
843     return None
844
845
846 def is_goodbye_packet(packet: bytes) -> bool:
847     return packet[1] == HIBERNATION.PROTO
848
849
850 def inline_response(packet: bytes) -> Optional[bytes]:
851     proto = packet[1]
852     if proto in CLASSES:
853         cls = CLASSES[proto]
854         if cls.RESPOND is Respond.INL:
855             return cls.Out().packed
856     return None
857
858
859 def probe_buffer(buffer: bytes) -> bool:
860     framestart = buffer.find(b"xx")
861     if framestart < 0:
862         return False
863     if len(buffer) - framestart < 6:
864         return False
865     return True
866
867
868 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
869     """From a packet (without framing bytes) derive the XXX.In object"""
870     length, proto = unpack("BB", packet[:2])
871     payload = packet[2:]
872     if proto not in CLASSES:
873         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
874             f"Proto {proto} is unknown"
875         )
876     else:
877         try:
878             if is_incoming:
879                 return CLASSES[proto].In(length, payload)
880             else:
881                 return CLASSES[proto].Out(length, payload)
882         except (DecodeError, ValueError, IndexError) as e:
883             cause = e
884     if is_incoming:
885         retobj = UNKNOWN.In(length, payload)
886     else:
887         retobj = UNKNOWN.Out(length, payload)
888     retobj.PROTO = proto  # Override class attr with object attr
889     retobj.cause = cause
890     return retobj
891
892
893 def exposed_protos() -> List[Tuple[str, bool]]:
894     return [
895         (cls.proto_name(), cls.RESPOND is Respond.EXT)
896         for cls in CLASSES.values()
897         if hasattr(cls, "rectified")
898     ]
899
900
901 def make_response(cmd: str, imei: str, **kwargs: Any) -> Optional[GPS303Pkt]:
902     if cmd == "poweroff":
903         return HIBERNATION.Out()
904     return None