]> www.average.org Git - loctrkd.git/blob - loctrkd/beesure.py
e535b76d1c56420c72b77433ee8078f7a8a5d61e
[loctrkd.git] / loctrkd / beesure.py
1 """
2 Implementation of the protocol "beesure" used by some watch-trackers
3 https://www.4p-touch.com/beesure-gps-setracker-server-protocol.html
4 """
5
6 from datetime import datetime, timezone
7 from enum import Enum
8 from inspect import isclass
9 import re
10 from struct import error, pack, unpack
11 from time import time
12 from typing import (
13     Any,
14     Callable,
15     Dict,
16     List,
17     Optional,
18     Tuple,
19     Type,
20     TYPE_CHECKING,
21     Union,
22 )
23
24 __all__ = (
25     "Stream",
26     "class_by_prefix",
27     "enframe",
28     "inline_response",
29     "proto_handled",
30     "parse_message",
31     "probe_buffer",
32     "proto_name",
33     "DecodeError",
34     "Respond",
35     "LK",
36 )
37
38 PROTO_PREFIX = "BS:"
39
40 ### Deframer ###
41
42 MAXBUFFER: int = 65557  # Theoretical max buffer 65536 + 21
43 RE = re.compile(b"\[(\w\w)\*(\d{10})\*([0-9a-fA-F]{4})\*")
44
45
46 def _framestart(buffer: bytes) -> Tuple[int, str, str, int]:
47     """
48     Find the start of the frame in the buffer.
49     If found, return (offset, vendorId, imei, datalen) tuple.
50     If not found, set -1 as the value of `offset`
51     """
52     mo = RE.search(buffer)
53     return (
54         (
55             mo.start(),
56             mo.group(1).decode(),
57             mo.group(2).decode(),
58             int(mo.group(3), 16),
59         )
60         if mo
61         else (-1, "", "", 0)
62     )
63
64
65 class Stream:
66     def __init__(self) -> None:
67         self.buffer = b""
68         self.imei: Optional[str] = None
69         self.datalen: int = 0
70
71     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
72         """
73         Process next segment of the stream. Return successfully deframed
74         packets as `bytes` and error messages as `str`.
75         """
76         when = time()
77         self.buffer += segment
78         if len(self.buffer) > MAXBUFFER:
79             # We are receiving junk. Let's drop it or we run out of memory.
80             self.buffer = b""
81             return [f"More than {MAXBUFFER} unparseable data, dropping"]
82         msgs: List[Union[bytes, str]] = []
83         while True:
84             if not self.datalen:  # we have not seen packet start yet
85                 toskip, _, imei, datalen = _framestart(self.buffer)
86                 if toskip < 0:  # No frames, continue reading
87                     break
88                 if toskip > 0:  # Should not happen, report
89                     msgs.append(
90                         f"Skipping {toskip} bytes of undecodable data"
91                         f' "{self.buffer[:toskip][:64]=!r}"'
92                     )
93                     self.buffer = self.buffer[toskip:]
94                     # From this point, buffer starts with a packet header
95                 if self.imei is None:
96                     self.imei = imei
97                 if self.imei != imei:
98                     msgs.append(
99                         f"Packet's imei {imei} mismatches"
100                         f" previous value {self.imei}, old value kept"
101                     )
102                 self.datalen = datalen
103             if len(self.buffer) < self.datalen + 21:  # Incomplete packet
104                 break
105             # At least one complete packet is present in the buffer
106             if chr(self.buffer[self.datalen + 20]) == "]":
107                 msgs.append(self.buffer[: self.datalen + 21])
108             else:
109                 msgs.append(
110                     f"Packet does not end with ']'"
111                     f" at {self.datalen+20}: {self.buffer=!r}"
112                 )
113             self.buffer = self.buffer[self.datalen + 21 :]
114             self.datalen = 0
115         return msgs
116
117     def close(self) -> bytes:
118         ret = self.buffer
119         self.buffer = b""
120         self.imei = None
121         self.datalen = 0
122         return ret
123
124
125 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
126     assert imei is not None and len(imei) == 10
127     off, vid, _, dlen = _framestart(buffer)
128     assert off == 0
129     return f"[{vid:2s}*{imei:10s}*{dlen:04X}*".encode() + buffer[20:]
130
131
132 ### Parser/Constructor ###
133
134
135 class DecodeError(Exception):
136     def __init__(self, e: Exception, **kwargs: Any) -> None:
137         super().__init__(e)
138         for k, v in kwargs.items():
139             setattr(self, k, v)
140
141
142 def maybe(typ: type) -> Callable[[Any], Any]:
143     return lambda x: None if x is None else typ(x)
144
145
146 def intx(x: Union[str, int]) -> int:
147     if isinstance(x, str):
148         x = int(x, 0)
149     return x
150
151
152 def boolx(x: Union[str, bool]) -> bool:
153     if isinstance(x, str):
154         if x.upper() in ("ON", "TRUE", "1"):
155             return True
156         if x.upper() in ("OFF", "FALSE", "0"):
157             return False
158         raise ValueError(str(x) + " could not be parsed as a Boolean")
159     return x
160
161
162 class MetaPkt(type):
163     """
164     For each class corresponding to a message, automatically create
165     two nested classes `In` and `Out` that also inherit from their
166     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
167     copied to the `In` nested class under the name `KWARGS`, and
168     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
169     to the nested class `Out`. In addition, method `encode` is
170     defined in both classes equal to `in_encode()` and `out_encode()`
171     respectively.
172     """
173
174     if TYPE_CHECKING:
175
176         def __getattr__(self, name: str) -> Any:
177             pass
178
179         def __setattr__(self, name: str, value: Any) -> None:
180             pass
181
182     def __new__(
183         cls: Type["MetaPkt"],
184         name: str,
185         bases: Tuple[type, ...],
186         attrs: Dict[str, Any],
187     ) -> "MetaPkt":
188         newcls = super().__new__(cls, name, bases, attrs)
189         newcls.In = super().__new__(
190             cls,
191             name + ".In",
192             (newcls,) + bases,
193             {
194                 "KWARGS": newcls.IN_KWARGS,
195                 "decode": newcls.in_decode,
196                 "encode": newcls.in_encode,
197             },
198         )
199         newcls.Out = super().__new__(
200             cls,
201             name + ".Out",
202             (newcls,) + bases,
203             {
204                 "KWARGS": newcls.OUT_KWARGS,
205                 "decode": newcls.out_decode,
206                 "encode": newcls.out_encode,
207             },
208         )
209         return newcls
210
211
212 class Respond(Enum):
213     NON = 0  # Incoming, no response needed
214     INL = 1  # Birirectional, use `inline_response()`
215     EXT = 2  # Birirectional, use external responder
216
217
218 class BeeSurePkt(metaclass=MetaPkt):
219     RESPOND = Respond.NON  # Do not send anything back by default
220     PROTO: str
221     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
222     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
223     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
224     In: Type["BeeSurePkt"]
225     Out: Type["BeeSurePkt"]
226
227     if TYPE_CHECKING:
228
229         def __getattr__(self, name: str) -> Any:
230             pass
231
232         def __setattr__(self, name: str, value: Any) -> None:
233             pass
234
235     def __init__(self, *args: Any, **kwargs: Any):
236         """
237         Construct the object _either_ from (length, payload),
238         _or_ from the values of individual fields
239         """
240         assert not args or (len(args) == 4 and not kwargs)
241         if args:  # guaranteed to be two arguments at this point
242             self.vendor, self.imei, self.datalength, self.payload = args
243             try:
244                 self.decode(*self.payload)
245             except error as e:
246                 raise DecodeError(e, obj=self)
247         else:
248             for kw, typ, dfl in self.KWARGS:
249                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
250             if kwargs:
251                 raise ValueError(
252                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
253                 )
254
255     def __repr__(self) -> str:
256         return "{}({})".format(
257             self.__class__.__name__,
258             ", ".join(
259                 "{}={}".format(
260                     k,
261                     'bytes.fromhex("{}")'.format(v.hex())
262                     if isinstance(v, bytes)
263                     else v.__repr__(),
264                 )
265                 for k, v in self.__dict__.items()
266                 if not k.startswith("_")
267             ),
268         )
269
270     def decode(self, *args: str) -> None:
271         ...
272
273     def in_decode(self, *args: str) -> None:
274         # Overridden in subclasses, otherwise do not decode payload
275         return
276
277     def out_decode(self, *args: str) -> None:
278         # Overridden in subclasses, otherwise do not decode payload
279         return
280
281     def encode(self) -> str:
282         ...
283
284     def in_encode(self) -> str:
285         # Necessary to emulate terminal, which is not implemented
286         raise NotImplementedError(
287             self.__class__.__name__ + ".encode() not implemented"
288         )
289
290     def out_encode(self) -> str:
291         # Overridden in subclasses, otherwise command verb only
292         return self.PROTO
293
294     @property
295     def packed(self) -> bytes:
296         buffer = self.encode().encode()
297         return f"[LT*0000000000*{len(buffer):04X}*".encode() + buffer + b"]"
298
299
300 class UNKNOWN(BeeSurePkt):
301     PROTO = "UNKNOWN"
302
303
304 class LK(BeeSurePkt):
305     PROTO = "LK"
306     RESPOND = Respond.INL
307
308     def in_decode(self, *args: str) -> None:
309         numargs = len(args)
310         if numargs > 1:
311             self.step = args[1]
312         if numargs > 2:
313             self.tumbling_number = args[2]
314         if numargs > 3:
315             self.battery_percentage = args[3]
316
317     def in_encode(self) -> str:
318         return "LK"
319
320
321 class CONFIG(BeeSurePkt):
322     PROTO = "CONFIG"
323
324
325 class ICCID(BeeSurePkt):
326     PROTO = "ICCID"
327
328
329 class UD(BeeSurePkt):
330     PROTO = "UD"
331
332     def in_decode(self, *args: str) -> None:
333         (
334             _,
335             self.date,
336             self.time,
337             self.gps_valid,
338             self.lat,
339             self.nors,
340             self.lon,
341             self.eorw,
342             self.speed,
343             self.direction,
344             self.altitude,
345             self.num_of_sats,
346             self.gsm_strength_percentage,
347             self.battery_percentage,
348             self.pedometer,
349             self.tubmling_times,
350             self.device_status,
351         ) = args[:17]
352         rest_args = args[17:]
353         self.base_stations_number = int(rest_args[0])
354         # ???, mcc, net, (area, cell, strength)*
355         self.base_stations = rest_args[1 : 4 + 3 * self.base_stations_number]
356         rest_args = rest_args[3 + 3 * self.base_stations_number + 1 :]
357         self.wifi_ap_number = int(rest_args[0])
358         # (SSID, MAC, strength)*
359         self.wifi_ap = rest_args[1 : 1 + 3 * self.wifi_ap_number]
360         self.positioning_accuracy = rest_args[-1]
361
362
363 class UD2(BeeSurePkt):
364     PROTO = "UD2"
365
366
367 class TKQ(BeeSurePkt):
368     PROTO = "TKQ"
369     RESPOND = Respond.INL
370
371
372 class TKQ2(BeeSurePkt):
373     PROTO = "TKQ2"
374     RESPOND = Respond.INL
375
376
377 class AL(BeeSurePkt):
378     PROTO = "AL"
379     RESPOND = Respond.INL
380
381
382 # Build dicts protocol number -> class and class name -> protocol number
383 CLASSES = {}
384 PROTOS = {}
385 if True:  # just to indent the code, sorry!
386     for cls in [
387         cls
388         for name, cls in globals().items()
389         if isclass(cls)
390         and issubclass(cls, BeeSurePkt)
391         and not name.startswith("_")
392     ]:
393         if hasattr(cls, "PROTO"):
394             CLASSES[cls.PROTO] = cls
395             PROTOS[cls.__name__] = cls.PROTO
396
397
398 def class_by_prefix(
399     prefix: str,
400 ) -> Union[Type[BeeSurePkt], List[Tuple[str, str]]]:
401     lst = [
402         (name, proto)
403         for name, proto in PROTOS.items()
404         if name.upper().startswith(prefix.upper())
405     ]
406     if len(lst) != 1:
407         return lst
408     _, proto = lst[0]
409     return CLASSES[proto]
410
411
412 def proto_handled(proto: str) -> bool:
413     return proto.startswith(PROTO_PREFIX)
414
415
416 def proto_name(obj: Union[MetaPkt, BeeSurePkt]) -> str:
417     return PROTO_PREFIX + (
418         obj.__class__.__name__ if isinstance(obj, BeeSurePkt) else obj.__name__
419     )
420
421
422 def proto_of_message(packet: bytes) -> str:
423     return PROTO_PREFIX + packet[20:-1].split(b",")[0].decode()
424
425
426 def imei_from_packet(packet: bytes) -> Optional[str]:
427     toskip, _, imei, _ = _framestart(packet)
428     if toskip == 0 and imei != "":
429         return imei
430     return None
431
432
433 def is_goodbye_packet(packet: bytes) -> bool:
434     return False
435
436
437 def inline_response(packet: bytes) -> Optional[bytes]:
438     proto = packet[20:-1].split(b",")[0].decode()
439     if proto in CLASSES:
440         cls = CLASSES[proto]
441         if cls.RESPOND is Respond.INL:
442             return cls.Out().packed
443     return None
444
445
446 def probe_buffer(buffer: bytes) -> bool:
447     return bool(RE.search(buffer))
448
449
450 def parse_message(packet: bytes, is_incoming: bool = True) -> BeeSurePkt:
451     """From a packet (without framing bytes) derive the XXX.In object"""
452     toskip, vendor, imei, datalength = _framestart(packet)
453     payload = packet[20:-1].decode().split(",")
454     proto = payload[0] if len(payload) > 0 else ""
455     if proto not in CLASSES:
456         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
457             f"Proto {proto} is unknown"
458         )
459     else:
460         try:
461             if is_incoming:
462                 return CLASSES[proto].In(vendor, imei, datalength, payload)
463             else:
464                 return CLASSES[proto].Out(vendor, imei, datalength, payload)
465         except (DecodeError, ValueError, IndexError) as e:
466             cause = e
467     if is_incoming:
468         retobj = UNKNOWN.In(vendor, imei, datalength, payload)
469     else:
470         retobj = UNKNOWN.Out(vendor, imei, datalength, payload)
471     retobj.PROTO = proto  # Override class attr with object attr
472     retobj.cause = cause
473     return retobj