2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
36 "GPS_OFFLINE_POSITIONING",
41 "WIFI_OFFLINE_POSITIONING",
46 "SYNCHRONOUS_WHITELIST",
52 "CHARGER_DISCONNECTED",
54 "POSITION_UPLOAD_INTERVAL",
57 log = getLogger("gps303")
62 For each class corresponding to a message, automatically create
63 two nested classes `In` and `Out` that also inherit from their
64 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
65 copied to the `In` nested class under the name `KWARGS`, and
66 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
67 to the nested class `Out`. In addition, method `encode` is
68 defined in both classes equal to `in_encode()` and `out_encode()`
72 def __new__(cls, name, bases, attrs):
73 newcls = super().__new__(cls, name, bases, attrs)
74 newcls.In = super().__new__(
78 {"KWARGS": newcls.IN_KWARGS, "encode": newcls.in_encode},
80 newcls.Out = super().__new__(
84 {"KWARGS": newcls.OUT_KWARGS, "encode": newcls.out_encode},
90 NON = 0 # Incoming, no response needed
91 INL = 1 # Birirectional, use `inline_response()`
92 EXT = 2 # Birirectional, use external responder
95 class GPS303Pkt(metaclass=MetaPkt):
96 RESPOND = Respond.NON # Do not send anything back by default
98 # Have these kwargs for now, TODO redo
99 IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
102 def __init__(self, *args, **kwargs):
103 assert len(args) == 0
104 for kw, typ, dfl in self.KWARGS:
105 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
107 print("KWARGS", self.KWARGS)
108 print("kwargs", kwargs)
110 self.__class__.__name__ + " stray kwargs " + str(kwargs)
114 return "{}({})".format(
115 self.__class__.__name__,
119 'bytes.fromhex("{}")'.format(v.hex())
120 if isinstance(v, bytes)
123 for k, v in self.__dict__.items()
124 if not k.startswith("_")
129 raise NotImplementedError(
130 self.__class__.__name__ + ".encode() not implemented"
133 def out_encode(self):
138 payload = self.encode()
139 length = len(payload) + 1
140 return pack("BB", length, self.PROTO) + payload
143 def from_packet(cls, length, payload):
144 return cls.In(payload=payload, length=length)
147 class UNKNOWN(GPS303Pkt):
148 PROTO = 256 # > 255 is impossible in real packets
151 class LOGIN(GPS303Pkt):
153 RESPOND = Respond.INL
154 # Default response for ACK, can also respond with STOP_UPLOAD
157 def from_packet(cls, length, payload):
158 self = super().from_packet(length, payload)
159 self.imei = payload[:-1].hex()
160 self.ver = unpack("B", payload[-1:])[0]
164 class SUPERVISION(GPS303Pkt):
166 OUT_KWARGS = (("status", int, 1),)
168 def out_encode(self):
169 # 1: The device automatically answers Pickup effect
170 # 2: Automatically Answering Two-way Calls
171 # 3: Ring manually answer the two-way call
172 return pack("B", self.status)
175 class HEARTBEAT(GPS303Pkt):
177 RESPOND = Respond.INL
180 class _GPS_POSITIONING(GPS303Pkt):
181 RESPOND = Respond.INL
184 def from_packet(cls, length, payload):
185 self = super().from_packet(length, payload)
186 self.dtime = payload[:6]
187 if self.dtime == b"\0\0\0\0\0\0":
190 self.devtime = datetime(
191 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
193 self.gps_data_length = payload[6] >> 4
194 self.gps_nb_sat = payload[6] & 0x0F
195 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
196 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
197 flip_lon = bool(flags & 0b0000100000000000) # bit 4
198 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
199 self.heading = flags & 0b0000001111111111 # bits 6 - last
200 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
201 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
206 def out_encode(self):
207 tup = datetime.utcnow().timetuple()
208 ttup = (tup[0] % 100,) + tup[1:6]
209 return pack("BBBBBB", *ttup)
212 class GPS_POSITIONING(_GPS_POSITIONING):
216 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
220 class STATUS(GPS303Pkt):
222 RESPOND = Respond.EXT
223 OUT_KWARGS = (("upload_interval", int, 25),)
226 def from_packet(cls, length, payload):
227 self = super().from_packet(length, payload)
228 if len(payload) == 5:
235 ) = unpack("BBBBB", payload)
236 elif len(payload) == 4:
237 self.batt, self.ver, self.timezone, self.intvl = unpack(
243 def out_encode(self): # Set interval in minutes
244 return cls.make_packet(pack("B", self.upload_interval))
247 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
249 RESPOND = Respond.INL
252 class RESET(GPS303Pkt):
253 # Device sends when it got reset SMS
254 # Server can send to initiate factory reset
258 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
260 OUT_KWARGS = (("number", int, 3),)
262 def out_encode(self): # Number of whitelist entries
263 return pack("B", number)
266 class _WIFI_POSITIONING(GPS303Pkt):
268 def from_packet(cls, length, payload):
269 self = super().from_packet(length, payload)
270 self.dtime = payload[:6]
271 if self.dtime == b"\0\0\0\0\0\0":
274 self.devtime = datetime.strptime(
275 self.dtime.hex(), "%y%m%d%H%M%S"
276 ).astimezone(tz=timezone.utc)
278 for i in range(self.length): # length has special meaning here
279 slice = payload[6 + i * 7 : 13 + i * 7]
280 self.wifi_aps.append(
281 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
283 gsm_slice = payload[6 + self.length * 7 :]
284 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
286 for i in range(ncells):
287 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
288 locac, cellid, sigstr = unpack(
289 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
291 self.gsm_cells.append((locac, cellid, -sigstr))
295 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
297 RESPOND = Respond.INL
299 def out_encode(self):
300 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
303 class TIME(GPS303Pkt):
305 RESPOND = Respond.INL
307 def out_encode(self):
308 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
311 class PROHIBIT_LBS(GPS303Pkt):
313 OUT_KWARGS = (("status", int, 1),)
315 def out_encode(self): # Server sent, 0-off, 1-on
316 return pack("B", self.status)
319 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
322 # Data is in packed decimal
324 # 00/01 - Don't set / Set upload period
325 # HHMMHHMM - Upload period
327 # 00/01 - Don't set / Set time of boot
328 # HHMM - Time of boot
329 # 00/01 - Don't set / Set time of shutdown
330 # HHMM - Time of shutdown
331 def out_encode(self):
335 class _SET_PHONE(GPS303Pkt):
336 OUT_KWARGS = (("phone", str, ""),)
338 def out_encode(self):
339 return self.phone.encode()
342 class REMOTE_MONITOR_PHONE(_SET_PHONE):
346 class SOS_PHONE(_SET_PHONE):
350 class DAD_PHONE(_SET_PHONE):
354 class MOM_PHONE(_SET_PHONE):
358 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
362 class GPS_OFF_PERIOD(GPS303Pkt):
364 OUT_KWARGS = (("onoff", int, 0), ("fm", str, "0000"), ("to", str, "2359"))
366 def out_encode(self):
368 pack("B", self.onoff)
369 + bytes.fromhex(self.fm)
370 + bytes.fromhex(self.to)
374 class DND_PERIOD(GPS303Pkt):
379 ("fm1", str, "0000"),
380 ("to1", str, "2359"),
381 ("fm2", str, "0000"),
382 ("to2", str, "2359"),
385 def out_endode(self):
387 pack("B", self.onoff)
388 + pack("B", self.week)
389 + bytes.fromhex(self.fm1)
390 + bytes.fromhex(self.to1)
391 + bytes.fromhex(self.fm2)
392 + bytes.fromhex(self.to2)
396 class RESTART_SHUTDOWN(GPS303Pkt):
398 OUT_KWARGS = (("flag", int, 2),)
400 def out_encode(self):
403 return pack("B", self.flag)
406 class DEVICE(GPS303Pkt):
408 OUT_KWARGS = (("flag", int, 0),)
410 # 0 - Stop looking for equipment
411 # 1 - Start looking for equipment
412 def out_encode(self):
413 return pack("B", self.flag)
416 class ALARM_CLOCK(GPS303Pkt):
419 def out_encode(self):
420 # TODO implement parsing kwargs
421 alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
423 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
427 class STOP_ALARM(GPS303Pkt):
431 def from_packet(cls, length, payload):
432 self = super().from_packet(length, payload)
433 self.flag = payload[0]
437 class SETUP(GPS303Pkt):
439 RESPOND = Respond.EXT
440 OUT_KWARGS = ( # TODO handle properly
441 ("uploadintervalseconds", int, 0x0300),
442 ("binaryswitch", int, 0b00110001),
443 ("alarms", lambda x: x, [0, 0, 0]),
444 ("dndtimeswitch", int, 0),
445 ("dndtimes", lambda x: x, [0, 0, 0]),
446 ("gpstimeswitch", int, 0),
447 ("gpstimestart", int, 0),
448 ("gpstimestop", int, 0),
449 ("phonenumbers", lambda x: x, ["", "", ""]),
452 def out_encode(self):
454 return pack("!I", x)[1:]
458 pack("!H", self.uploadintervalseconds),
459 pack("B", self.binaryswitch),
461 + [pack3b(el) for el in self.alarms]
463 pack("B", self.dndtimeswitch),
465 + [pack3b(el) for el in self.dndtimes]
467 pack("B", self.gpstimeswitch),
468 pack("!H", self.gpstimestart),
469 pack("!H", self.gpstimestop),
471 + [b";".join([el.encode() for el in self.phonenumbers])]
475 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
479 class RESTORE_PASSWORD(GPS303Pkt):
483 class WIFI_POSITIONING(_WIFI_POSITIONING):
485 RESPOND = Respond.EXT
486 OUT_KWARGS = (("lat", float, None), ("lon", float, None))
488 def out_encode(self):
489 if self.lat is None or self.lon is None:
491 return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
494 class MANUAL_POSITIONING(GPS303Pkt):
498 def from_packet(cls, length, payload):
499 self = super().from_packet(length, payload)
500 self.flag = payload[0] if len(payload) > 0 else None
505 4: "LBS search > 3 times",
506 5: "Same LBS and WiFi data",
507 6: "LBS prohibited, WiFi absent",
508 7: "GPS spacing < 50 m",
509 }.get(self.flag, "Unknown")
513 class BATTERY_CHARGE(GPS303Pkt):
517 class CHARGER_CONNECTED(GPS303Pkt):
521 class CHARGER_DISCONNECTED(GPS303Pkt):
525 class VIBRATION_RECEIVED(GPS303Pkt):
529 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
531 RESPOND = Respond.EXT
532 OUT_KWARGS = (("interval", int, 10),)
535 def from_packet(cls, length, payload):
536 self = super().from_packet(length, payload)
537 self.interval = unpack("!H", payload[:2])
540 def out_encode(self):
541 return pack("!H", interval)
544 class SOS_ALARM(GPS303Pkt):
548 # Build dicts protocol number -> class and class name -> protocol number
551 if True: # just to indent the code, sorry!
554 for name, cls in globals().items()
556 and issubclass(cls, GPS303Pkt)
557 and not name.startswith("_")
559 if hasattr(cls, "PROTO"):
560 CLASSES[cls.PROTO] = cls
561 PROTOS[cls.__name__] = cls.PROTO
564 def class_by_prefix(prefix):
567 for name, proto in PROTOS.items()
568 if name.upper().startswith(prefix.upper())
573 return CLASSES[proto]
576 def proto_by_name(name):
577 return PROTOS.get(name, -1)
580 def proto_of_message(packet):
581 return unpack("B", packet[1:2])[0]
584 def inline_response(packet):
585 proto = proto_of_message(packet)
588 if cls.RESPOND is Respond.INL:
589 return cls.Out().packed
593 def make_object(length, proto, payload):
595 return CLASSES[proto].from_packet(length, payload)
597 retobj = UNKNOWN.from_packet(length, payload)
598 retobj.PROTO = proto # Override class attr with object attr
602 def parse_message(packet):
603 length, proto = unpack("BB", packet[:2])
605 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
607 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
609 and len(payload) + adjust != length
612 "With proto %d length is %d but payload length is %d+%d",
618 return make_object(length, proto, payload)