From 5e1e7a4d37a1e149d5e899dada7b55a863cd8e64 Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Mon, 9 May 2022 00:46:12 +0200 Subject: [PATCH] WIP retoure messaging --- gps303/backlog.py | 18 +++++------------- gps303/collector.py | 11 ++++++++++- gps303/evstore.py | 5 +++-- gps303/gps303proto.py | 26 +++++++++++++++++--------- gps303/lookaside.py | 24 ++++++++---------------- gps303/qry.py | 3 +++ gps303/storage.py | 4 +++- gps303/termconfig.py | 6 +++--- gps303/watch.py | 5 +++-- gps303/wsgateway.py | 40 +++++++++++++++++++++++++++++++++------- gps303/zmsg.py | 39 ++++++++++++++++++++++++++++----------- 11 files changed, 116 insertions(+), 65 deletions(-) diff --git a/gps303/backlog.py b/gps303/backlog.py index 39dfcbc..1322286 100644 --- a/gps303/backlog.py +++ b/gps303/backlog.py @@ -3,24 +3,16 @@ from .opencellid import qry_cell from .evstore import initdb, fetch from .gps303proto import GPS_POSITIONING, WIFI_POSITIONING, parse_message -from .zmsg import LocEvt -OCDB = None -def blinit(evdb, ocdb): - global OCDB - OCDB = ocdb +def blinit(evdb): initdb(evdb) + def backlog(imei, backlog): result = [] - for packet in fetch(imei, (GPS_POSITIONING.PROTO, WIFI_POSITIONING.PROTO), backlog): + for packet in fetch( + imei, (GPS_POSITIONING.PROTO, WIFI_POSITIONING.PROTO), backlog + ): msg = parse_message(packet) - if isinstance(msg, GPS_POSITIONING): - result.append(LocEvt(devtime=msg.devtime, lon=msg.longitude, - lat=msg.latitude, is_gps=True, imei=imei)) - elif isinstance(msg, WIFI_POSITIONING): - lat, lon = qry_cell(OCDB, msg.mcc, msg.gsm_cells) - result.append(LocEvt(devtime=msg.devtime, lon=lon, - lat=lat, is_gps=False, imei=imei)) return reversed(result) diff --git a/gps303/collector.py b/gps303/collector.py index 9f305e5..e63da32 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -166,7 +166,16 @@ def runserver(conf): while True: try: msg = zpull.recv(zmq.NOBLOCK) - tosend.append(Resp(msg)) + zmsg = Resp(msg) + zpub.send( + Bcast( + is_incoming=False, + proto=proto_of_message(zmsg.packet), + imei=zmsg.imei, + packet=zmsg.packet, + ).packed + ) + tosend.append(zmsg) except zmq.Again: break elif sk == tcpfd: diff --git a/gps303/evstore.py b/gps303/evstore.py index ec463c7..7617362 100644 --- a/gps303/evstore.py +++ b/gps303/evstore.py @@ -31,6 +31,7 @@ def stow(**kwargs): parms = { k: kwargs[k] if k in kwargs else v for k, v in ( + ("is_incoming", True), ("peeraddr", None), ("when", 0.0), ("imei", None), @@ -41,9 +42,9 @@ def stow(**kwargs): assert len(kwargs) <= len(parms) DB.execute( """insert or ignore into events - (tstamp, imei, peeraddr, proto, packet) + (tstamp, imei, peeraddr, proto, packet, is_incoming) values - (:when, :imei, :peeraddr, :proto, :packet) + (:when, :imei, :peeraddr, :proto, :packet, :is_incoming) """, parms, ) diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index 136f86b..8f4ffcc 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -186,10 +186,8 @@ class GPS303Pkt(metaclass=MetaPkt): return def out_decode(self, length, packet): - # Necessary to emulate terminal, which is not implemented - raise NotImplementedError( - self.__class__.__name__ + ".decode() not implemented" - ) + # Overridden in subclasses, otherwise do not decode payload + return def in_encode(self): # Necessary to emulate terminal, which is not implemented @@ -536,12 +534,19 @@ class RESTORE_PASSWORD(GPS303Pkt): class WIFI_POSITIONING(_WIFI_POSITIONING): PROTO = 0x69 RESPOND = Respond.EXT - OUT_KWARGS = (("lat", float, None), ("lon", float, None)) + OUT_KWARGS = (("latitude", float, None), ("longitude", float, None)) def out_encode(self): - if self.lat is None or self.lon is None: + if self.latitude is None or self.longitude is None: return b"" - return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode() + return "{:+#010.8g},{:+#010.8g}".format( + self.latitude, self.longitude + ).encode() + + def out_decode(self, length, payload): + lat, lon = payload.decode().split(",") + self.latitude = float(lat) + self.longitude = float(lon) class MANUAL_POSITIONING(GPS303Pkt): @@ -648,12 +653,15 @@ def inline_response(packet): return None -def parse_message(packet): +def parse_message(packet, is_incoming=True): """From a packet (without framing bytes) derive the XXX.In object""" length, proto = unpack("BB", packet[:2]) payload = packet[2:] if proto in CLASSES: - return CLASSES[proto].In(length, payload) + if is_incoming: + return CLASSES[proto].In(length, payload) + else: + return CLASSES[proto].Out(length, payload) else: retobj = UNKNOWN.In(length, payload) retobj.PROTO = proto # Override class attr with object attr diff --git a/gps303/lookaside.py b/gps303/lookaside.py index ecd5dfa..dd5a449 100644 --- a/gps303/lookaside.py +++ b/gps303/lookaside.py @@ -7,9 +7,9 @@ from struct import pack import zmq from . import common -from .gps303proto import parse_message, proto_by_name, WIFI_POSITIONING +from .gps303proto import parse_message, WIFI_POSITIONING from .opencellid import qry_cell -from .zmsg import Bcast, Resp +from .zmsg import Bcast, Resp, topic log = getLogger("gps303/lookaside") @@ -18,8 +18,8 @@ def runserver(conf): zctx = zmq.Context() zsub = zctx.socket(zmq.SUB) zsub.connect(conf.get("collector", "publishurl")) - topic = pack("B", proto_by_name("WIFI_POSITIONING")) - zsub.setsockopt(zmq.SUBSCRIBE, topic) + tosub = topic(WIFI_POSITIONING.PROTO) + zsub.setsockopt(zmq.SUBSCRIBE, tosub) zpush = zctx.socket(zmq.PUSH) zpush.connect(conf.get("collector", "listenurl")) @@ -34,21 +34,13 @@ def runserver(conf): datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), msg, ) - if not isinstance(msg, WIFI_POSITIONING): - log.error( - "IMEI %s from %s at %s: %s", - zmsg.imei, - zmsg.peeraddr, - datetime.fromtimestamp(zmsg.when).astimezone( - tz=timezone.utc - ), - msg, - ) - continue lat, lon = qry_cell( conf["opencellid"]["dbfn"], msg.mcc, msg.gsm_cells ) - resp = Resp(imei=zmsg.imei, packet=msg.Out(lat=lat, lon=lon).packed) + resp = Resp( + imei=zmsg.imei, + packet=msg.Out(latitude=lat, longitude=lon).packed, + ) log.debug("Response for lat=%s, lon=%s: %s", lat, lon, resp) zpush.send(resp.packed) diff --git a/gps303/qry.py b/gps303/qry.py index 17228c6..4aeb33c 100644 --- a/gps303/qry.py +++ b/gps303/qry.py @@ -21,6 +21,9 @@ c.execute( ) for tstamp, imei, peeraddr, proto, packet in c: + if len(packet) > packet[0] + 1: + print("proto", packet[1] , "datalen", len(packet), + "msg.length", packet[0], file=sys.stderr) msg = parse_message(packet) print( datetime.fromtimestamp(tstamp) diff --git a/gps303/storage.py b/gps303/storage.py index 38ffaae..5368efc 100644 --- a/gps303/storage.py +++ b/gps303/storage.py @@ -25,7 +25,8 @@ def runserver(conf): while True: zmsg = Bcast(zsub.recv()) log.debug( - "IMEI %s from %s at %s: %s", + "%s IMEI %s from %s at %s: %s", + "I" if zmsg.is_incoming else "O", zmsg.imei, zmsg.peeraddr, datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), @@ -37,6 +38,7 @@ def runserver(conf): else: peeraddr = None stow( + is_incoming=zmsg.is_incoming, peeraddr=peeraddr, when=zmsg.when, imei=zmsg.imei, diff --git a/gps303/termconfig.py b/gps303/termconfig.py index 771f988..f481a84 100644 --- a/gps303/termconfig.py +++ b/gps303/termconfig.py @@ -7,7 +7,7 @@ import zmq from . import common from .gps303proto import * -from .zmsg import Bcast, Resp +from .zmsg import Bcast, Resp, topic log = getLogger("gps303/termconfig") @@ -22,8 +22,8 @@ def runserver(conf): "SETUP", "POSITION_UPLOAD_INTERVAL", ): - topic = pack("B", proto_by_name(protoname)) - zsub.setsockopt(zmq.SUBSCRIBE, topic) + tosub = topic(proto_by_name(protoname)) + zsub.setsockopt(zmq.SUBSCRIBE, tosub) zpush = zctx.socket(zmq.PUSH) zpush.connect(conf.get("collector", "listenurl")) diff --git a/gps303/watch.py b/gps303/watch.py index ed51ad1..2e5b412 100644 --- a/gps303/watch.py +++ b/gps303/watch.py @@ -5,6 +5,7 @@ from logging import getLogger import zmq from . import common +from .gps303proto import parse_message from .zmsg import Bcast log = getLogger("gps303/watch") @@ -19,8 +20,8 @@ def runserver(conf): try: while True: zmsg = Bcast(zsub.recv()) - msg = parse_message(zmsg.packet) - print(zmsg.imei, msg) + msg = parse_message(zmsg.packet, zmsg.is_incoming) + print("I" if zmsg.is_incoming else "O", zmsg.imei, msg) except KeyboardInterrupt: pass diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index 8a5af7f..80926fb 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -18,7 +18,12 @@ import zmq from . import common from .backlog import blinit, backlog -from .zmsg import LocEvt +from .gps303proto import ( + GPS_POSITIONING, + WIFI_POSITIONING, + parse_message, +) +from .zmsg import Bcast, topic log = getLogger("gps303/wsgateway") htmlfile = None @@ -157,7 +162,12 @@ class Client: return msgs def wants(self, imei): - log.debug("wants %s? set is %s on fd %d", imei, self.imeis, self.sock.fileno()) + log.debug( + "wants %s? set is %s on fd %d", + imei, + self.imeis, + self.sock.fileno(), + ) return True # TODO: check subscriptions def send(self, message): @@ -224,11 +234,11 @@ class Clients: def runserver(conf): global htmlfile - blinit(conf.get("storage", "dbfn"), conf.get("opencellid", "dbfn")) + blinit(conf.get("storage", "dbfn")) htmlfile = conf.get("wsgateway", "htmlfile") zctx = zmq.Context() zsub = zctx.socket(zmq.SUB) - zsub.connect(conf.get("lookaside", "publishurl")) + zsub.connect(conf.get("collector", "publishurl")) tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setblocking(False) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) @@ -245,9 +255,23 @@ def runserver(conf): while True: neededsubs = clients.subs() for imei in neededsubs - activesubs: - zsub.setsockopt(zmq.SUBSCRIBE, imei.encode()) + zsub.setsockopt( + zmq.SUBSCRIBE, + topic(GPS_POSITIONING.PROTO, True), + ) + zsub.setsockopt( + zmq.SUBSCRIBE, + topic(WIFI_POSITIONING.PROTO, False), + ) for imei in activesubs - neededsubs: - zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode()) + zsub.setsockopt( + zmq.UNSUBSCRIBE, + topic(GPS_POSITIONING.PROTO, True), + ) + zsub.setsockopt( + zmq.UNSUBSCRIBE, + topic(WIFI_POSITIONING.PROTO, False), + ) activesubs = neededsubs log.debug("Subscribed to: %s", activesubs) tosend = [] @@ -259,8 +283,10 @@ def runserver(conf): if sk is zsub: while True: try: - zmsg = LocEvt(zsub.recv(zmq.NOBLOCK)) + zmsg = Bcast(zsub.recv(zmq.NOBLOCK)) + msg = parse_message(zmsg.packet) tosend.append(zmsg) + log.debug("Got %s", zmsg) except zmq.Again: break elif sk == tcpfd: diff --git a/gps303/zmsg.py b/gps303/zmsg.py index 2110113..8179616 100644 --- a/gps303/zmsg.py +++ b/gps303/zmsg.py @@ -3,12 +3,16 @@ import ipaddress as ip from struct import pack, unpack -__all__ = "Bcast", "Resp" +__all__ = "Bcast", "Resp", "topic" def pack_peer(peeraddr): try: - saddr, port, _x, _y = peeraddr + if peeraddr is None: + saddr = "::" + port = 0 + else: + saddr, port, _x, _y = peeraddr addr = ip.ip_address(saddr) except ValueError: saddr, port = peeraddr @@ -75,10 +79,17 @@ class _Zmsg: ) +def topic(proto, is_incoming=True, imei=None): + return ( + pack("BB", is_incoming, proto) + b"" if imei is None else imei.encode() + ) + + class Bcast(_Zmsg): """Zmq message to broadcast what was received from the terminal""" KWARGS = ( + ("is_incoming", True), ("proto", 256), ("imei", None), ("when", None), @@ -89,7 +100,7 @@ class Bcast(_Zmsg): @property def packed(self): return ( - pack("B", self.proto) + pack("BB", int(self.is_incoming), self.proto) + ("0000000000000000" if self.imei is None else self.imei).encode() + ( b"\0\0\0\0\0\0\0\0" @@ -101,26 +112,32 @@ class Bcast(_Zmsg): ) def decode(self, buffer): - self.proto = buffer[0] - self.imei = buffer[1:17].decode() + self.is_incoming = bool(buffer[0]) + self.proto = buffer[1] + self.imei = buffer[2:18].decode() if self.imei == "0000000000000000": self.imei = None - self.when = unpack("!d", buffer[17:25])[0] - self.peeraddr = unpack_peer(buffer[25:43]) - self.packet = buffer[43:] + self.when = unpack("!d", buffer[18:26])[0] + self.peeraddr = unpack_peer(buffer[26:44]) + self.packet = buffer[44:] class Resp(_Zmsg): """Zmq message received from a third party to send to the terminal""" - KWARGS = (("imei", None), ("packet", b"")) + KWARGS = (("imei", None), ("when", None), ("packet", b"")) @property def packed(self): return ( "0000000000000000" if self.imei is None else self.imei.encode() - ) + self.packet + ) + ( + b"\0\0\0\0\0\0\0\0" + if self.when is None + else pack("!d", self.when) + ) + self.packet def decode(self, buffer): self.imei = buffer[:16].decode() - self.packet = buffer[16:] + self.when = unpack("!d", buffer[16:24])[0] + self.packet = buffer[24:] -- 2.39.2