]> www.average.org Git - loctrkd.git/blob - loctrkd/beesure.py
cleanup framing/deframing of beesure
[loctrkd.git] / loctrkd / beesure.py
1 """
2 Implementation of the protocol "beesure" used by some watch-trackers
3 https://www.4p-touch.com/beesure-gps-setracker-server-protocol.html
4 """
5
6 from datetime import datetime, timezone
7 from enum import Enum
8 from inspect import isclass
9 import re
10 from struct import error, pack, unpack
11 from time import time
12 from typing import (
13     Any,
14     Callable,
15     Dict,
16     List,
17     Optional,
18     Tuple,
19     Type,
20     TYPE_CHECKING,
21     Union,
22 )
23
24 __all__ = (
25     "Stream",
26     "class_by_prefix",
27     "enframe",
28     "inline_response",
29     "parse_message",
30     "probe_buffer",
31     "proto_by_name",
32     "proto_name",
33     "DecodeError",
34     "Respond",
35     "LK",
36 )
37
38 PROTO_PREFIX = "BS:"
39
40 ### Deframer ###
41
42 MAXBUFFER: int = 65557  # Theoretical max buffer 65536 + 21
43 RE = re.compile(b"\[(\w\w)\*(\d{10})\*([0-9a-fA-F]{4})\*")
44
45
46 def _framestart(buffer: bytes) -> Tuple[int, str, str, int]:
47     """
48     Find the start of the frame in the buffer.
49     If found, return (offset, vendorId, imei, datalen) tuple.
50     If not found, set -1 as the value of `offset`
51     """
52     mo = RE.search(buffer)
53     return (
54         (
55             mo.start(),
56             mo.group(1).decode(),
57             mo.group(2).decode(),
58             int(mo.group(3), 16),
59         )
60         if mo
61         else (-1, "", "", 0)
62     )
63
64
65 class Stream:
66     def __init__(self) -> None:
67         self.buffer = b""
68         self.imei: Optional[str] = None
69         self.datalen: int = 0
70
71     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
72         """
73         Process next segment of the stream. Return successfully deframed
74         packets as `bytes` and error messages as `str`.
75         """
76         when = time()
77         self.buffer += segment
78         if len(self.buffer) > MAXBUFFER:
79             # We are receiving junk. Let's drop it or we run out of memory.
80             self.buffer = b""
81             return [f"More than {MAXBUFFER} unparseable data, dropping"]
82         msgs: List[Union[bytes, str]] = []
83         while True:
84             if not self.datalen:  # we have not seen packet start yet
85                 toskip, _, imei, datalen = _framestart(self.buffer)
86                 if toskip < 0:  # No frames, continue reading
87                     break
88                 if toskip > 0:  # Should not happen, report
89                     msgs.append(
90                         f"Skipping {toskip} bytes of undecodable data"
91                         f' "{self.buffer[:toskip][:64]=!r}"'
92                     )
93                     self.buffer = self.buffer[toskip:]
94                     # From this point, buffer starts with a packet header
95                 if self.imei is None:
96                     self.imei = imei
97                 if self.imei != imei:
98                     msgs.append(
99                         f"Packet's imei {imei} mismatches"
100                         f" previous value {self.imei}, old value kept"
101                     )
102                 self.datalen = datalen
103             if len(self.buffer) < self.datalen + 21:  # Incomplete packet
104                 break
105             # At least one complete packet is present in the buffer
106             if chr(self.buffer[self.datalen + 20]) == "]":
107                 msgs.append(self.buffer[: self.datalen + 21])
108             else:
109                 msgs.append(
110                     f"Packet does not end with ']'"
111                     f" at {self.datalen+20}: {self.buffer=!r}"
112                 )
113             self.buffer = self.buffer[self.datalen + 21 :]
114             self.datalen = 0
115         return msgs
116
117     def close(self) -> bytes:
118         ret = self.buffer
119         self.buffer = b""
120         self.imei = None
121         self.datalen = 0
122         return ret
123
124
125 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
126     assert imei is not None and len(imei) == 10
127     off, vid, _, dlen = _framestart(buffer)
128     assert off == 0
129     return f"[{vid:2s}*{imei:10s}*{dlen:04X}*".encode() + buffer[20:]
130
131
132 ### Parser/Constructor ###
133
134
135 class DecodeError(Exception):
136     def __init__(self, e: Exception, **kwargs: Any) -> None:
137         super().__init__(e)
138         for k, v in kwargs.items():
139             setattr(self, k, v)
140
141
142 def maybe(typ: type) -> Callable[[Any], Any]:
143     return lambda x: None if x is None else typ(x)
144
145
146 def intx(x: Union[str, int]) -> int:
147     if isinstance(x, str):
148         x = int(x, 0)
149     return x
150
151
152 def boolx(x: Union[str, bool]) -> bool:
153     if isinstance(x, str):
154         if x.upper() in ("ON", "TRUE", "1"):
155             return True
156         if x.upper() in ("OFF", "FALSE", "0"):
157             return False
158         raise ValueError(str(x) + " could not be parsed as a Boolean")
159     return x
160
161
162 class MetaPkt(type):
163     """
164     For each class corresponding to a message, automatically create
165     two nested classes `In` and `Out` that also inherit from their
166     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
167     copied to the `In` nested class under the name `KWARGS`, and
168     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
169     to the nested class `Out`. In addition, method `encode` is
170     defined in both classes equal to `in_encode()` and `out_encode()`
171     respectively.
172     """
173
174     if TYPE_CHECKING:
175
176         def __getattr__(self, name: str) -> Any:
177             pass
178
179         def __setattr__(self, name: str, value: Any) -> None:
180             pass
181
182     def __new__(
183         cls: Type["MetaPkt"],
184         name: str,
185         bases: Tuple[type, ...],
186         attrs: Dict[str, Any],
187     ) -> "MetaPkt":
188         newcls = super().__new__(cls, name, bases, attrs)
189         newcls.In = super().__new__(
190             cls,
191             name + ".In",
192             (newcls,) + bases,
193             {
194                 "KWARGS": newcls.IN_KWARGS,
195                 "decode": newcls.in_decode,
196                 "encode": newcls.in_encode,
197             },
198         )
199         newcls.Out = super().__new__(
200             cls,
201             name + ".Out",
202             (newcls,) + bases,
203             {
204                 "KWARGS": newcls.OUT_KWARGS,
205                 "decode": newcls.out_decode,
206                 "encode": newcls.out_encode,
207             },
208         )
209         return newcls
210
211
212 class Respond(Enum):
213     NON = 0  # Incoming, no response needed
214     INL = 1  # Birirectional, use `inline_response()`
215     EXT = 2  # Birirectional, use external responder
216
217
218 class BeeSurePkt(metaclass=MetaPkt):
219     RESPOND = Respond.NON  # Do not send anything back by default
220     PROTO: str
221     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
222     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
223     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
224     In: Type["BeeSurePkt"]
225     Out: Type["BeeSurePkt"]
226
227     if TYPE_CHECKING:
228
229         def __getattr__(self, name: str) -> Any:
230             pass
231
232         def __setattr__(self, name: str, value: Any) -> None:
233             pass
234
235     def __init__(self, *args: Any, **kwargs: Any):
236         """
237         Construct the object _either_ from (length, payload),
238         _or_ from the values of individual fields
239         """
240         assert not args or (len(args) == 4 and not kwargs)
241         if args:  # guaranteed to be two arguments at this point
242             self.vendor, self.imei, self.datalength, self.payload = args
243             try:
244                 self.decode(*self.payload)
245             except error as e:
246                 raise DecodeError(e, obj=self)
247         else:
248             for kw, typ, dfl in self.KWARGS:
249                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
250             if kwargs:
251                 raise ValueError(
252                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
253                 )
254
255     def __repr__(self) -> str:
256         return "{}({})".format(
257             self.__class__.__name__,
258             ", ".join(
259                 "{}={}".format(
260                     k,
261                     'bytes.fromhex("{}")'.format(v.hex())
262                     if isinstance(v, bytes)
263                     else v.__repr__(),
264                 )
265                 for k, v in self.__dict__.items()
266                 if not k.startswith("_")
267             ),
268         )
269
270     def decode(self, *args: str) -> None:
271         ...
272
273     def in_decode(self, *args: str) -> None:
274         # Overridden in subclasses, otherwise do not decode payload
275         return
276
277     def out_decode(self, *args: str) -> None:
278         # Overridden in subclasses, otherwise do not decode payload
279         return
280
281     def encode(self) -> str:
282         ...
283
284     def in_encode(self) -> str:
285         # Necessary to emulate terminal, which is not implemented
286         raise NotImplementedError(
287             self.__class__.__name__ + ".encode() not implemented"
288         )
289
290     def out_encode(self) -> str:
291         # Overridden in subclasses, otherwise command verb only
292         return self.PROTO
293
294     @property
295     def packed(self) -> bytes:
296         buffer = self.encode().encode()
297         return f"[LT*0000000000*{len(buffer):04X}*".encode() + buffer + b"]"
298
299
300 class UNKNOWN(BeeSurePkt):
301     PROTO = "UNKNOWN"
302
303
304 class LK(BeeSurePkt):
305     PROTO = "LK"
306     RESPOND = Respond.INL
307
308     def in_decode(self, *args: str) -> None:
309         numargs = len(args)
310         if numargs > 1:
311             self.step = args[1]
312         if numargs > 2:
313             self.tumbling_number = args[2]
314         if numargs > 3:
315             self.battery_percentage = args[3]
316
317     def in_encode(self) -> str:
318         return "LK"
319
320
321 class CONFIG(BeeSurePkt):
322     PROTO = "CONFIG"
323     RESPOND = Respond.INL
324
325
326 class UD(BeeSurePkt):
327     PROTO = "UD"
328
329     def in_decode(self, *args: str) -> None:
330         (
331             _,
332             self.date,
333             self.time,
334             self.gps_valid,
335             self.lat,
336             self.nors,
337             self.lon,
338             self.eorw,
339             self.speed,
340             self.direction,
341             self.altitude,
342             self.num_of_sats,
343             self.gsm_strength_percentage,
344             self.battery_percentage,
345             self.pedometer,
346             self.tubmling_times,
347             self.device_status,
348         ) = args[:17]
349         rest_args = args[17:]
350         self.base_stations_number = int(rest_args[0])
351         self.base_stations = rest_args[1 : 4 + 3 * self.base_stations_number]
352         rest_args = rest_args[3 + 3 * self.base_stations_number + 1 :]
353         self.wifi_ap_number = int(rest_args[0])
354         self.wifi_ap = rest_args[1 : self.wifi_ap_number]
355         # rest_args = rest_args[self_wifi_ap_number+1:]
356         self.positioning_accuracy = rest_args[-1]
357
358
359 class UD2(BeeSurePkt):
360     PROTO = "UD2"
361
362
363 class TKQ(BeeSurePkt):
364     PROTO = "TKQ"
365     RESPOND = Respond.INL
366
367
368 class TKQ2(BeeSurePkt):
369     PROTO = "TKQ2"
370     RESPOND = Respond.INL
371
372
373 class AL(BeeSurePkt):
374     PROTO = "AL"
375     RESPOND = Respond.INL
376
377
378 # Build dicts protocol number -> class and class name -> protocol number
379 CLASSES = {}
380 PROTOS = {}
381 if True:  # just to indent the code, sorry!
382     for cls in [
383         cls
384         for name, cls in globals().items()
385         if isclass(cls)
386         and issubclass(cls, BeeSurePkt)
387         and not name.startswith("_")
388     ]:
389         if hasattr(cls, "PROTO"):
390             CLASSES[cls.PROTO] = cls
391             PROTOS[cls.__name__] = cls.PROTO
392
393
394 def class_by_prefix(
395     prefix: str,
396 ) -> Union[Type[BeeSurePkt], List[Tuple[str, str]]]:
397     lst = [
398         (name, proto)
399         for name, proto in PROTOS.items()
400         if name.upper().startswith(prefix.upper())
401     ]
402     if len(lst) != 1:
403         return lst
404     _, proto = lst[0]
405     return CLASSES[proto]
406
407
408 def proto_name(obj: Union[MetaPkt, BeeSurePkt]) -> str:
409     return PROTO_PREFIX + (
410         obj.__class__.__name__ if isinstance(obj, BeeSurePkt) else obj.__name__
411     )
412
413
414 def proto_by_name(name: str) -> str:
415     return PROTO_PREFIX + PROTOS.get(name, "UNKNOWN")
416
417
418 def proto_of_message(packet: bytes) -> str:
419     return PROTO_PREFIX + packet[20:-1].split(b",")[0].decode()
420
421
422 def imei_from_packet(packet: bytes) -> Optional[str]:
423     toskip, _, imei, _ = _framestart(packet)
424     if toskip == 0 and imei != "":
425         return imei
426     return None
427
428
429 def is_goodbye_packet(packet: bytes) -> bool:
430     return False
431
432
433 def inline_response(packet: bytes) -> Optional[bytes]:
434     proto = packet[20:-1].split(b",")[0].decode()
435     if proto in CLASSES:
436         cls = CLASSES[proto]
437         if cls.RESPOND is Respond.INL:
438             return cls.Out().packed
439     return None
440
441
442 def probe_buffer(buffer: bytes) -> bool:
443     return bool(RE.search(buffer))
444
445
446 def parse_message(packet: bytes, is_incoming: bool = True) -> BeeSurePkt:
447     """From a packet (without framing bytes) derive the XXX.In object"""
448     toskip, vendor, imei, datalength = _framestart(packet)
449     payload = packet[20:-1].decode().split(",")
450     proto = payload[0] if len(payload) > 0 else ""
451     if proto not in CLASSES:
452         cause: Union[DecodeError, ValueError, IndexError] = ValueError(
453             f"Proto {proto} is unknown"
454         )
455     else:
456         try:
457             if is_incoming:
458                 return CLASSES[proto].In(vendor, imei, datalength, payload)
459             else:
460                 return CLASSES[proto].Out(vendor, imei, datalength, payload)
461         except (DecodeError, ValueError, IndexError) as e:
462             cause = e
463     if is_incoming:
464         retobj = UNKNOWN.In(vendor, imei, datalength, payload)
465     else:
466         retobj = UNKNOWN.Out(vendor, imei, datalength, payload)
467     retobj.PROTO = proto  # Override class attr with object attr
468     retobj.cause = cause
469     return retobj