from datetime import datetime, timezone
from enum import Enum
from inspect import isclass
-from logging import getLogger
from struct import pack, unpack
__all__ = (
"class_by_prefix",
"inline_response",
- "make_object",
"parse_message",
"proto_by_name",
"Respond",
"VIBRATION_RECEIVED",
"POSITION_UPLOAD_INTERVAL",
"SOS_ALARM",
+ "UNKNOWN_B3",
)
-log = getLogger("gps303")
-
def intx(x):
if isinstance(x, str):
cls,
name + ".In",
(newcls,) + bases,
- {"KWARGS": newcls.IN_KWARGS, "encode": newcls.in_encode},
+ {
+ "KWARGS": newcls.IN_KWARGS,
+ "decode": newcls.in_decode,
+ "encode": newcls.in_encode,
+ },
)
newcls.Out = super().__new__(
cls,
name + ".Out",
(newcls,) + bases,
- {"KWARGS": newcls.OUT_KWARGS, "encode": newcls.out_encode},
+ {
+ "KWARGS": newcls.OUT_KWARGS,
+ "decode": newcls.out_decode,
+ "encode": newcls.out_encode,
+ },
)
return newcls
class GPS303Pkt(metaclass=MetaPkt):
RESPOND = Respond.NON # Do not send anything back by default
PROTO: int
- # Have these kwargs for now, TODO redo
- IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
+ IN_KWARGS = ()
OUT_KWARGS = ()
def __init__(self, *args, **kwargs):
),
)
+ def in_decode(self, length, packet):
+ return
+
+ def out_decode(self, length, packet):
+ raise NotImplementedError(
+ self.__class__.__name__ + ".decode() not implemented"
+ )
+
def in_encode(self):
raise NotImplementedError(
self.__class__.__name__ + ".encode() not implemented"
@classmethod
def from_packet(cls, length, payload):
- return cls.In(payload=payload, length=length)
+ self = cls.In()
+ self.length = length
+ self.payload = payload
+ self.decode(length, payload)
+ return self
class UNKNOWN(GPS303Pkt):
RESPOND = Respond.INL
# Default response for ACK, can also respond with STOP_UPLOAD
- @classmethod
- def from_packet(cls, length, payload):
- self = super().from_packet(length, payload)
+ def in_decode(self, length, payload):
self.imei = payload[:-1].hex()
self.ver = unpack("B", payload[-1:])[0]
return self
class _GPS_POSITIONING(GPS303Pkt):
RESPOND = Respond.INL
- @classmethod
- def from_packet(cls, length, payload):
- self = super().from_packet(length, payload)
+ def in_decode(self, length, payload):
self.dtime = payload[:6]
if self.dtime == b"\0\0\0\0\0\0":
self.devtime = None
else:
+ yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
self.devtime = datetime(
- *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
+ 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
)
self.gps_data_length = payload[6] >> 4
self.gps_nb_sat = payload[6] & 0x0F
flip_lat = not bool(flags & 0b0000010000000000) # bit 5
self.heading = flags & 0b0000001111111111 # bits 6 - last
self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
- self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
+ self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
self.speed = speed
self.flags = flags
return self
RESPOND = Respond.EXT
OUT_KWARGS = (("upload_interval", int, 25),)
- @classmethod
- def from_packet(cls, length, payload):
- self = super().from_packet(length, payload)
- if len(payload) == 5:
- (
- self.batt,
- self.ver,
- self.timezone,
- self.intvl,
- self.signal,
- ) = unpack("BBBBB", payload)
- elif len(payload) == 4:
- self.batt, self.ver, self.timezone, self.intvl = unpack(
- "BBBB", payload
- )
+ def in_decode(self, length, payload):
+ self.batt, self.ver, self.timezone, self.intvl = unpack(
+ "BBBB", payload[:4]
+ )
+ if len(payload) > 4:
+ self.signal = payload[4]
+ else:
self.signal = None
return self
class _WIFI_POSITIONING(GPS303Pkt):
- @classmethod
- def from_packet(cls, length, payload):
- self = super().from_packet(length, payload)
+ def in_decode(self, length, payload):
self.dtime = payload[:6]
if self.dtime == b"\0\0\0\0\0\0":
self.devtime = None
class STOP_ALARM(GPS303Pkt):
PROTO = 0x56
- @classmethod
- def from_packet(cls, length, payload):
- self = super().from_packet(length, payload)
+ def in_decode(self, length, payload):
self.flag = payload[0]
return self
class MANUAL_POSITIONING(GPS303Pkt):
PROTO = 0x80
- @classmethod
- def from_packet(cls, length, payload):
- self = super().from_packet(length, payload)
+ def in_decode(self, length, payload):
self.flag = payload[0] if len(payload) > 0 else None
self.reason = {
1: "Incorrect time",
RESPOND = Respond.EXT
OUT_KWARGS = (("interval", int, 10),)
- @classmethod
- def from_packet(cls, length, payload):
- self = super().from_packet(length, payload)
+ def in_decode(self, length, payload):
self.interval = unpack("!H", payload[:2])
return self
PROTO = 0x99
+class UNKNOWN_B3(GPS303Pkt):
+ PROTO = 0xB3
+ IN_KWARGS = (("asciidata", str, ""),)
+
+ def in_decode(self, length, payload):
+ self.asciidata = payload.decode()
+ return self
+
+
# Build dicts protocol number -> class and class name -> protocol number
CLASSES = {}
PROTOS = {}
return None
-def make_object(length, proto, payload):
+def parse_message(packet):
+ """From a packet (without framing bytes) derive the XXX.In object"""
+ length, proto = unpack("BB", packet[:2])
+ payload = packet[2:]
if proto in CLASSES:
return CLASSES[proto].from_packet(length, payload)
else:
retobj = UNKNOWN.from_packet(length, payload)
retobj.PROTO = proto # Override class attr with object attr
return retobj
-
-
-def parse_message(packet):
- length, proto = unpack("BB", packet[:2])
- payload = packet[2:]
- adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
- if (
- proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
- and length > 1
- and len(payload) + adjust != length
- ):
- log.warning(
- "With proto %d length is %d but payload length is %d+%d",
- proto,
- length,
- len(payload),
- adjust,
- )
- return make_object(length, proto, payload)