From 8be4295a5027349ebbf5242d131c5a942181f7a6 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Mon, 18 Apr 2022 20:41:46 +0200 Subject: [PATCH] WIP to reorganise to microservices --- gps303/__main__.py | 4 +- gps303/collector.py | 66 ++++++++++++----- gps303/gps303proto.py | 164 +++++++++++++++++++++++------------------- gps303/qry.py | 30 ++++++++ 4 files changed, 173 insertions(+), 91 deletions(-) create mode 100644 gps303/qry.py diff --git a/gps303/__main__.py b/gps303/__main__.py index d3d6d1e..51e462c 100755 --- a/gps303/__main__.py +++ b/gps303/__main__.py @@ -69,7 +69,7 @@ if __name__.endswith("__main__"): packet = clntsock.recv(4096) when = time() if packet: - msg = handle_packet(packet, clntaddr, when) + msg = handle_packet(packet) log.debug("%s from %s fd %d", msg, clntaddr, fd) if isinstance(msg, LOGIN): imei = msg.imei @@ -79,7 +79,7 @@ if __name__.endswith("__main__"): when, imei, msg.length, - msg.proto, + msg.PROTO, msg.payload, ) kwargs = prepare_response(conf, msg) diff --git a/gps303/collector.py b/gps303/collector.py index 28c636d..7ffa752 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -38,26 +38,53 @@ class Client: self.imei = None def close(self): + log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) self.sock.close() self.buffer = b"" self.imei = None def recv(self): - segment = self.sock.recv(4096) - if not segment: + """ Read from the socket and parse complete messages """ + try: + segment = self.sock.recv(4096) + except OSError: + log.warning("Reading from fd %d (IMEI %s): %s", + self.sock.fileno(), self.imei, e) + return None + if not segment: # Terminal has closed connection + log.info("EOF reading from fd %d (IMEI %s)", + self.sock.fileno(), self.imei) return None when = time() self.buffer += segment - # implement framing properly - msg = handle_packet(packet, self.addr, when) - self.buffer = self.buffer[len(packet):] - if isinstance(msg, LOGIN): - self.imei = msg.imei - return msg + msgs = [] + while True: + framestart = self.buffer.find(b"xx") + if framestart == -1: # No frames, return whatever we have + break + if framestart > 0: # Should not happen, report + log.warning("Undecodable data \"%s\" from fd %d (IMEI %s)", + self.buffer[:framestart].hex(), self.sock.fileno(), self.imei) + self.buffer = self.buffer[framestart:] + # At this point, buffer starts with a packet + frameend = self.buffer.find(b"\r\n", 4) + if frameend == -1: # Incomplete frame, return what we have + break + msg = parse_message(self.buffer[:frameend]) + self.buffer = self.buffer[frameend+2:] + if isinstance(msg, LOGIN): + self.imei = msg.imei + log.info("LOGIN from fd %d: IMEI %s", + self.sock.fileno(), self.imei) + msgs.append(msg) + return msgs def send(self, buffer): - self.sock.send(buffer) - + try: + self.sock.send(b"xx" + buffer + b"\r\n") + except OSError as e: + log.error("Sending to fd %d (IMEI %s): %s", + self.sock.fileno, self.imei, e) class Clients: def __init__(self): @@ -71,6 +98,7 @@ class Clients: def stop(self, fd): clnt = by_fd[fd] + log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei) clnt.close() if clnt.imei: del self.by_imei[clnt.imei] @@ -78,14 +106,17 @@ class Clients: def recv(self, fd): clnt = by_fd[fd] - msg = clnt.recv() - if isinstance(msg, LOGIN): - self.by_imei[clnt.imei] = clnt - return clnt.imei, msg + msgs = clnt.recv() + result = [] + for msg in msgs: + if isinstance(msg, LOGIN): + self.by_imei[clnt.imei] = clnt + result.append(clnt.imei, msg) + return result - def response(self, zmsg): - if zmsg.imei in self.by_imei: - clnt = self.by_imei[zmsg.imei].send(zmsg.payload) + def response(self, resp): + if resp.imei in self.by_imei: + self.by_imei[resp.imei].send(resp.payload) def runserver(opts, conf): @@ -124,6 +155,7 @@ def runserver(opts, conf): imei, msg = clients.recv(sk) zpub.send(Bcast(imei, msg).as_bytes) if msg is None or isinstance(msg, HIBERNATION): + log.debug("HIBERNATION from fd %d", sk) tostop.append(sk) # poll queue consumed, make changes now for fd in tostop: diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index 4789b03..e74a6b7 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -23,7 +23,10 @@ __all__ = ( "handle_packet", "make_object", "make_response", + "parse_message", + "proto_by_name", "set_config", + "GPS303Pkt", "UNKNOWN", "LOGIN", "SUPERVISION", @@ -53,7 +56,7 @@ __all__ = ( log = getLogger("gps303") -class _GT06pkt: +class GPS303Pkt: PROTO: int CONFIG = None @@ -78,8 +81,11 @@ class _GT06pkt: ) @classmethod - def from_packet(cls, length, proto, payload): - return cls(proto=proto, payload=payload, length=length) + def from_packet(cls, length, payload): + return cls(payload=payload, length=length) + + def to_packet(self): + return pack("BB", self.length, self.PROTO) + self.payload def response(self, *args): if len(args) == 0: @@ -89,19 +95,19 @@ class _GT06pkt: length = len(payload) + 1 if length > 6: length -= 6 - return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n" + return pack("BB", length, self.PROTO) + payload -class UNKNOWN(_GT06pkt): - pass +class UNKNOWN(GPS303Pkt): + PROTO = 256 # > 255 is impossible in real packets -class LOGIN(_GT06pkt): +class LOGIN(GPS303Pkt): PROTO = 0x01 @classmethod - def from_packet(cls, length, proto, payload): - self = super().from_packet(length, proto, payload) + def from_packet(cls, length, payload): + self = super().from_packet(length, payload) self.imei = payload[:-1].hex() self.ver = unpack("B", payload[-1:])[0] return self @@ -110,7 +116,7 @@ class LOGIN(_GT06pkt): return super().response(b"") -class SUPERVISION(_GT06pkt): # Server sends supervision number status +class SUPERVISION(GPS303Pkt): # Server sends supervision number status PROTO = 0x05 def response(self, supnum=0): @@ -120,14 +126,14 @@ class SUPERVISION(_GT06pkt): # Server sends supervision number status return super().response(b"") -class HEARTBEAT(_GT06pkt): +class HEARTBEAT(GPS303Pkt): PROTO = 0x08 -class _GPS_POSITIONING(_GT06pkt): +class _GPS_POSITIONING(GPS303Pkt): @classmethod - def from_packet(cls, length, proto, payload): - self = super().from_packet(length, proto, payload) + def from_packet(cls, length, payload): + self = super().from_packet(length, payload) self.dtime = payload[:6] if self.dtime == b"\0\0\0\0\0\0": self.devtime = None @@ -160,12 +166,12 @@ class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING): PROTO = 0x11 -class STATUS(_GT06pkt): +class STATUS(GPS303Pkt): PROTO = 0x13 @classmethod - def from_packet(cls, length, proto, payload): - self = super().from_packet(length, proto, payload) + def from_packet(cls, length, payload): + self = super().from_packet(length, payload) if len(payload) == 5: ( self.batt, @@ -185,28 +191,28 @@ class STATUS(_GT06pkt): return super().response(pack("B", upload_interval)) -class HIBERNATION(_GT06pkt): +class HIBERNATION(GPS303Pkt): PROTO = 0x14 -class RESET(_GT06pkt): # Device sends when it got reset SMS +class RESET(GPS303Pkt): # Device sends when it got reset SMS PROTO = 0x15 def response(self): # Server can send to initiate factory reset return super().response(b"") -class WHITELIST_TOTAL(_GT06pkt): # Server sends to initiage sync (0x58) +class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58) PROTO = 0x16 def response(self, number=3): # Number of whitelist entries return super().response(pack("B", number)) -class _WIFI_POSITIONING(_GT06pkt): +class _WIFI_POSITIONING(GPS303Pkt): @classmethod - def from_packet(cls, length, proto, payload): - self = super().from_packet(length, proto, payload) + def from_packet(cls, length, payload): + self = super().from_packet(length, payload) self.dtime = payload[:6] if self.dtime == b"\0\0\0\0\0\0": self.devtime = None @@ -239,7 +245,7 @@ class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING): return super().response(self.dtime) -class TIME(_GT06pkt): +class TIME(GPS303Pkt): PROTO = 0x30 def response(self): @@ -247,29 +253,29 @@ class TIME(_GT06pkt): return super().response(payload) -class PROHIBIT_LBS(_GT06pkt): +class PROHIBIT_LBS(GPS303Pkt): PROTO = 0x33 def response(self, status=1): # Server sent, 0-off, 1-on return super().response(pack("B", status)) -class MOM_PHONE(_GT06pkt): +class MOM_PHONE(GPS303Pkt): PROTO = 0x43 -class STOP_UPLOAD(_GT06pkt): # Server response to LOGIN to thwart the device +class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device PROTO = 0x44 def response(self): return super().response(b"") -class STOP_ALARM(_GT06pkt): +class STOP_ALARM(GPS303Pkt): PROTO = 0x56 -class SETUP(_GT06pkt): +class SETUP(GPS303Pkt): PROTO = 0x57 def response( @@ -307,11 +313,11 @@ class SETUP(_GT06pkt): return super().response(payload) -class SYNCHRONOUS_WHITELIST(_GT06pkt): +class SYNCHRONOUS_WHITELIST(GPS303Pkt): PROTO = 0x58 -class RESTORE_PASSWORD(_GT06pkt): +class RESTORE_PASSWORD(GPS303Pkt): PROTO = 0x67 @@ -328,32 +334,32 @@ class WIFI_POSITIONING(_WIFI_POSITIONING): return super().response(payload) -class MANUAL_POSITIONING(_GT06pkt): +class MANUAL_POSITIONING(GPS303Pkt): PROTO = 0x80 -class BATTERY_CHARGE(_GT06pkt): +class BATTERY_CHARGE(GPS303Pkt): PROTO = 0x81 -class CHARGER_CONNECTED(_GT06pkt): +class CHARGER_CONNECTED(GPS303Pkt): PROTO = 0x82 -class CHARGER_DISCONNECTED(_GT06pkt): +class CHARGER_DISCONNECTED(GPS303Pkt): PROTO = 0x83 -class VIBRATION_RECEIVED(_GT06pkt): +class VIBRATION_RECEIVED(GPS303Pkt): PROTO = 0x94 -class POSITION_UPLOAD_INTERVAL(_GT06pkt): +class POSITION_UPLOAD_INTERVAL(GPS303Pkt): PROTO = 0x98 @classmethod - def from_packet(cls, length, proto, payload): - self = super().from_packet(length, proto, payload) + def from_packet(cls, length, payload): + self = super().from_packet(length, payload) self.interval = unpack("!H", payload[:2]) return self @@ -361,61 +367,75 @@ class POSITION_UPLOAD_INTERVAL(_GT06pkt): return super().response(pack("!H", self.interval)) -class SOS_ALARM(_GT06pkt): +class SOS_ALARM(GPS303Pkt): PROTO = 0x99 -# Build a dict protocol number -> class +# Build dicts protocol number -> class and class name -> protocol number CLASSES = {} +PROTOS = {} if True: # just to indent the code, sorry! for cls in [ cls for name, cls in globals().items() if isclass(cls) - and issubclass(cls, _GT06pkt) + and issubclass(cls, GPS303Pkt) and not name.startswith("_") ]: if hasattr(cls, "PROTO"): CLASSES[cls.PROTO] = cls + PROTOS[cls.__name__] = cls.PROTO + + +def proto_by_name(name): + return PROTOS.get(name, -1) + + +def proto_of_message(packet): + return unpack("B", packet[1:2]) def make_object(length, proto, payload): if proto in CLASSES: - return CLASSES[proto].from_packet(length, proto, payload) + return CLASSES[proto].from_packet(length, payload) else: - return UNKNOWN.from_packet(length, proto, payload) + retobj = UNKNOWN.from_packet(length, payload) + retobj.PROTO = proto # Override class attr with object attr + return retobj + + +def parse_message(packet): + print("packet w/o frame", packet.hex()) + length, proto = unpack("BB", packet[:2]) + payload = packet[2:] + adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case + if ( + proto + not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO) + and length > 1 + and len(payload) + adjust != length + ): + log.warning( + "With proto %d length is %d but payload length is %d+%d", + proto, + length, + len(payload), + adjust, + ) + return make_object(length, proto, payload) -def handle_packet(packet, addr, when): - if len(packet) < 6: - return UNKNOWN.from_packet(0, 0, packet) - else: - xx, length, proto = unpack("!2sBB", packet[:4]) - crlf = packet[-2:] - payload = packet[4:-2] - adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case - if ( - proto - not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO) - and length > 1 - and len(payload) + adjust != length - ): - log.warning( - "With proto %d length is %d but payload length is %d+%d", - proto, - length, - len(payload), - adjust, - ) - if xx != b"xx" or crlf != b"\r\n": - return UNKNOWN.from_packet(length, proto, packet) # full packet - else: - return make_object(length, proto, payload) +def handle_packet(packet): # DEPRECATED + print("packet in frame", packet.hex()) + if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n": + return UNKNOWN.from_packet(len(packet), packet) + return parse_message(packet[2:-2]) -def make_response(msg, **kwargs): - return msg.response(**kwargs) +def make_response(msg, **kwargs): # DEPRECATED + inframe = msg.response(**kwargs) + return None if inframe is None else b"xx" + inframe + b"\r\n" def set_config(config): # Note that we are setting _class_ attribute - _GT06pkt.CONFIG = config + GPS303Pkt.CONFIG = config diff --git a/gps303/qry.py b/gps303/qry.py new file mode 100644 index 0000000..76df8f2 --- /dev/null +++ b/gps303/qry.py @@ -0,0 +1,30 @@ +from datetime import datetime, timezone +from sqlite3 import connect +import sys + +from .gps303proto import * + +db = connect(sys.argv[1]) +c = db.cursor() +if len(sys.argv) > 2: + proto = proto_by_name(sys.argv[2]) + if proto < 0: + raise ValueError("No protocol with name " + sys.argv[2]) + selector = " where proto = :proto" +else: + proto = -1 + selector = "" + +c.execute( + "select timestamp, imei, clntaddr, length, proto, payload from events" + + selector, {"proto": proto} +) + +for timestamp, imei, clntaddr, length, proto, payload in c: + msg = make_object(length, proto, payload) + print( + datetime.fromtimestamp(timestamp) + .astimezone(tz=timezone.utc) + .isoformat(), + msg, + ) -- 2.43.0