2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import pack, unpack
34 "GPS_OFFLINE_POSITIONING",
39 "WIFI_OFFLINE_POSITIONING",
42 "GPS_LBS_SWITCH_TIMES",
43 "REMOTE_MONITOR_PHONE",
55 "SYNCHRONOUS_WHITELIST",
61 "CHARGER_DISCONNECTED",
63 "POSITION_UPLOAD_INTERVAL",
70 if isinstance(x, str):
76 """Check for the string that represents hours and minutes"""
77 if not isinstance(x, str) or len(x) != 4:
78 raise ValueError(str(x) + " is not a four-character string")
81 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82 raise ValueError(str(x) + " does not contain valid hours and minutes")
87 if isinstance(x, str):
89 if len(x) != 3 or not all(isinstance(el, str) for el in x):
90 raise ValueError(str(x) + " is not a list of three strings")
95 if isinstance(x, str):
97 x = [int(el) for el in x]
98 if len(x) != 3 or not all(isinstance(el, int) for el in x):
99 raise ValueError(str(x) + " is not a list of three integers")
105 For each class corresponding to a message, automatically create
106 two nested classes `In` and `Out` that also inherit from their
107 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108 copied to the `In` nested class under the name `KWARGS`, and
109 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110 to the nested class `Out`. In addition, method `encode` is
111 defined in both classes equal to `in_encode()` and `out_encode()`
115 def __new__(cls, name, bases, attrs):
116 newcls = super().__new__(cls, name, bases, attrs)
117 newcls.In = super().__new__(
121 {"KWARGS": newcls.IN_KWARGS, "encode": newcls.in_encode},
123 newcls.Out = super().__new__(
127 {"KWARGS": newcls.OUT_KWARGS, "encode": newcls.out_encode},
133 NON = 0 # Incoming, no response needed
134 INL = 1 # Birirectional, use `inline_response()`
135 EXT = 2 # Birirectional, use external responder
138 class GPS303Pkt(metaclass=MetaPkt):
139 RESPOND = Respond.NON # Do not send anything back by default
141 # Have these kwargs for now, TODO redo
142 IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
145 def __init__(self, *args, **kwargs):
146 assert len(args) == 0
147 for kw, typ, dfl in self.KWARGS:
148 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
150 print("KWARGS", self.KWARGS)
151 print("kwargs", kwargs)
153 self.__class__.__name__ + " stray kwargs " + str(kwargs)
157 return "{}({})".format(
158 self.__class__.__name__,
162 'bytes.fromhex("{}")'.format(v.hex())
163 if isinstance(v, bytes)
166 for k, v in self.__dict__.items()
167 if not k.startswith("_")
172 raise NotImplementedError(
173 self.__class__.__name__ + ".encode() not implemented"
176 def out_encode(self):
181 payload = self.encode()
182 length = len(payload) + 1
183 return pack("BB", length, self.PROTO) + payload
186 def from_packet(cls, length, payload):
187 return cls.In(payload=payload, length=length)
190 class UNKNOWN(GPS303Pkt):
191 PROTO = 256 # > 255 is impossible in real packets
194 class LOGIN(GPS303Pkt):
196 RESPOND = Respond.INL
197 # Default response for ACK, can also respond with STOP_UPLOAD
200 def from_packet(cls, length, payload):
201 self = super().from_packet(length, payload)
202 self.imei = payload[:-1].hex()
203 self.ver = unpack("B", payload[-1:])[0]
207 class SUPERVISION(GPS303Pkt):
209 OUT_KWARGS = (("status", int, 1),)
211 def out_encode(self):
212 # 1: The device automatically answers Pickup effect
213 # 2: Automatically Answering Two-way Calls
214 # 3: Ring manually answer the two-way call
215 return pack("B", self.status)
218 class HEARTBEAT(GPS303Pkt):
220 RESPOND = Respond.INL
223 class _GPS_POSITIONING(GPS303Pkt):
224 RESPOND = Respond.INL
227 def from_packet(cls, length, payload):
228 self = super().from_packet(length, payload)
229 self.dtime = payload[:6]
230 if self.dtime == b"\0\0\0\0\0\0":
233 self.devtime = datetime(
234 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
236 self.gps_data_length = payload[6] >> 4
237 self.gps_nb_sat = payload[6] & 0x0F
238 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
239 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
240 flip_lon = bool(flags & 0b0000100000000000) # bit 4
241 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
242 self.heading = flags & 0b0000001111111111 # bits 6 - last
243 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
244 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
249 def out_encode(self):
250 tup = datetime.utcnow().timetuple()
251 ttup = (tup[0] % 100,) + tup[1:6]
252 return pack("BBBBBB", *ttup)
255 class GPS_POSITIONING(_GPS_POSITIONING):
259 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
263 class STATUS(GPS303Pkt):
265 RESPOND = Respond.EXT
266 OUT_KWARGS = (("upload_interval", int, 25),)
269 def from_packet(cls, length, payload):
270 self = super().from_packet(length, payload)
271 if len(payload) == 5:
278 ) = unpack("BBBBB", payload)
279 elif len(payload) == 4:
280 self.batt, self.ver, self.timezone, self.intvl = unpack(
286 def out_encode(self): # Set interval in minutes
287 return pack("B", self.upload_interval)
290 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
292 RESPOND = Respond.INL
295 class RESET(GPS303Pkt):
296 # Device sends when it got reset SMS
297 # Server can send to initiate factory reset
301 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
303 OUT_KWARGS = (("number", int, 3),)
305 def out_encode(self): # Number of whitelist entries
306 return pack("B", number)
309 class _WIFI_POSITIONING(GPS303Pkt):
311 def from_packet(cls, length, payload):
312 self = super().from_packet(length, payload)
313 self.dtime = payload[:6]
314 if self.dtime == b"\0\0\0\0\0\0":
317 self.devtime = datetime.strptime(
318 self.dtime.hex(), "%y%m%d%H%M%S"
319 ).astimezone(tz=timezone.utc)
321 for i in range(self.length): # length has special meaning here
322 slice = payload[6 + i * 7 : 13 + i * 7]
323 self.wifi_aps.append(
324 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
326 gsm_slice = payload[6 + self.length * 7 :]
327 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
329 for i in range(ncells):
330 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
331 locac, cellid, sigstr = unpack(
332 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
334 self.gsm_cells.append((locac, cellid, -sigstr))
338 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
340 RESPOND = Respond.INL
342 def out_encode(self):
343 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
346 class TIME(GPS303Pkt):
348 RESPOND = Respond.INL
350 def out_encode(self):
351 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
354 class PROHIBIT_LBS(GPS303Pkt):
356 OUT_KWARGS = (("status", int, 1),)
358 def out_encode(self): # Server sent, 0-off, 1-on
359 return pack("B", self.status)
362 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
365 # Data is in packed decimal
367 # 00/01 - Don't set / Set upload period
368 # HHMMHHMM - Upload period
370 # 00/01 - Don't set / Set time of boot
371 # HHMM - Time of boot
372 # 00/01 - Don't set / Set time of shutdown
373 # HHMM - Time of shutdown
374 def out_encode(self):
378 class _SET_PHONE(GPS303Pkt):
379 OUT_KWARGS = (("phone", str, ""),)
381 def out_encode(self):
382 return self.phone.encode()
385 class REMOTE_MONITOR_PHONE(_SET_PHONE):
389 class SOS_PHONE(_SET_PHONE):
393 class DAD_PHONE(_SET_PHONE):
397 class MOM_PHONE(_SET_PHONE):
401 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
405 class GPS_OFF_PERIOD(GPS303Pkt):
409 ("fm", hhmm, "0000"),
410 ("to", hhmm, "2359"),
413 def out_encode(self):
415 pack("B", self.onoff)
416 + bytes.fromhex(self.fm)
417 + bytes.fromhex(self.to)
421 class DND_PERIOD(GPS303Pkt):
426 ("fm1", hhmm, "0000"),
427 ("to1", hhmm, "2359"),
428 ("fm2", hhmm, "0000"),
429 ("to2", hhmm, "2359"),
432 def out_endode(self):
434 pack("B", self.onoff)
435 + pack("B", self.week)
436 + bytes.fromhex(self.fm1)
437 + bytes.fromhex(self.to1)
438 + bytes.fromhex(self.fm2)
439 + bytes.fromhex(self.to2)
443 class RESTART_SHUTDOWN(GPS303Pkt):
445 OUT_KWARGS = (("flag", int, 0),)
447 def out_encode(self):
450 return pack("B", self.flag)
453 class DEVICE(GPS303Pkt):
455 OUT_KWARGS = (("flag", int, 0),)
457 # 0 - Stop looking for equipment
458 # 1 - Start looking for equipment
459 def out_encode(self):
460 return pack("B", self.flag)
463 class ALARM_CLOCK(GPS303Pkt):
466 def out_encode(self):
467 # TODO implement parsing kwargs
468 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
470 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
474 class STOP_ALARM(GPS303Pkt):
478 def from_packet(cls, length, payload):
479 self = super().from_packet(length, payload)
480 self.flag = payload[0]
484 class SETUP(GPS303Pkt):
486 RESPOND = Respond.EXT
488 ("uploadintervalseconds", intx, 0x0300),
489 ("binaryswitch", intx, 0b00110001),
490 ("alarms", l3int, [0, 0, 0]),
491 ("dndtimeswitch", int, 0),
492 ("dndtimes", l3int, [0, 0, 0]),
493 ("gpstimeswitch", int, 0),
494 ("gpstimestart", int, 0),
495 ("gpstimestop", int, 0),
496 ("phonenumbers", l3str, ["", "", ""]),
499 def out_encode(self):
501 return pack("!I", x)[1:]
505 pack("!H", self.uploadintervalseconds),
506 pack("B", self.binaryswitch),
508 + [pack3b(el) for el in self.alarms]
510 pack("B", self.dndtimeswitch),
512 + [pack3b(el) for el in self.dndtimes]
514 pack("B", self.gpstimeswitch),
515 pack("!H", self.gpstimestart),
516 pack("!H", self.gpstimestop),
518 + [b";".join([el.encode() for el in self.phonenumbers])]
522 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
526 class RESTORE_PASSWORD(GPS303Pkt):
530 class WIFI_POSITIONING(_WIFI_POSITIONING):
532 RESPOND = Respond.EXT
533 OUT_KWARGS = (("lat", float, None), ("lon", float, None))
535 def out_encode(self):
536 if self.lat is None or self.lon is None:
538 return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
541 class MANUAL_POSITIONING(GPS303Pkt):
545 def from_packet(cls, length, payload):
546 self = super().from_packet(length, payload)
547 self.flag = payload[0] if len(payload) > 0 else None
552 4: "LBS search > 3 times",
553 5: "Same LBS and WiFi data",
554 6: "LBS prohibited, WiFi absent",
555 7: "GPS spacing < 50 m",
556 }.get(self.flag, "Unknown")
560 class BATTERY_CHARGE(GPS303Pkt):
564 class CHARGER_CONNECTED(GPS303Pkt):
568 class CHARGER_DISCONNECTED(GPS303Pkt):
572 class VIBRATION_RECEIVED(GPS303Pkt):
576 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
578 RESPOND = Respond.EXT
579 OUT_KWARGS = (("interval", int, 10),)
582 def from_packet(cls, length, payload):
583 self = super().from_packet(length, payload)
584 self.interval = unpack("!H", payload[:2])
587 def out_encode(self):
588 return pack("!H", interval)
591 class SOS_ALARM(GPS303Pkt):
595 class UNKNOWN_B3(GPS303Pkt):
597 IN_KWARGS = (("asciidata", str, ""),)
600 def from_packet(cls, length, payload):
601 self = super().from_packet(length, payload)
602 self.asciidata = payload.decode()
606 # Build dicts protocol number -> class and class name -> protocol number
609 if True: # just to indent the code, sorry!
612 for name, cls in globals().items()
614 and issubclass(cls, GPS303Pkt)
615 and not name.startswith("_")
617 if hasattr(cls, "PROTO"):
618 CLASSES[cls.PROTO] = cls
619 PROTOS[cls.__name__] = cls.PROTO
622 def class_by_prefix(prefix):
625 for name, proto in PROTOS.items()
626 if name.upper().startswith(prefix.upper())
631 return CLASSES[proto]
634 def proto_by_name(name):
635 return PROTOS.get(name, -1)
638 def proto_of_message(packet):
639 return unpack("B", packet[1:2])[0]
642 def inline_response(packet):
643 proto = proto_of_message(packet)
646 if cls.RESPOND is Respond.INL:
647 return cls.Out().packed
651 def parse_message(packet):
652 """ From a packet (without framing bytes) derive the XXX.In object """
653 length, proto = unpack("BB", packet[:2])
656 return CLASSES[proto].from_packet(length, payload)
658 retobj = UNKNOWN.from_packet(length, payload)
659 retobj.PROTO = proto # Override class attr with object attr