""" Zeromq messages """
-from datetime import datetime, timezone
import ipaddress as ip
from struct import pack, unpack
-__all__ = "Bcast", "LocEvt", "Resp"
+__all__ = "Bcast", "Resp"
def pack_peer(peeraddr):
),
)
+ def __eq__(self, other):
+ return all(
+ [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
+ )
+
def decode(self, buffer):
- raise RuntimeError(
- self.__class__.__name__ + "must implement `encode()` method"
+ raise NotImplementedError(
+ self.__class__.__name__ + "must implement `decode()` method"
)
@property
def packed(self):
- raise RuntimeError(
- self.__class__.__name__ + "must implement `encode()` method"
+ raise NotImplementedError(
+ self.__class__.__name__ + "must implement `packed()` property"
)
def decode(self, buffer):
self.imei = buffer[:16].decode()
self.packet = buffer[16:]
-
-
-class LocEvt(_Zmsg):
- """Zmq message with original or approximated location from lookaside"""
-
- KWARGS = (
- ("imei", "0000000000000000"),
- ("devtime", datetime(1970, 1, 1, tzinfo=timezone.utc)),
- ("lat", 0.0),
- ("lon", 0.0),
- ("is_gps", True),
- )
-
- @property
- def packed(self):
- return self.imei.encode() + pack(
- "!dddB",
- self.devtime.replace(tzinfo=timezone.utc).timestamp(),
- self.lat,
- self.lon,
- int(self.is_gps),
- )
-
- def decode(self, buffer):
- self.imei = buffer[:16].decode()
- when, self.lat, self.lon, is_gps = unpack("!dddB", buffer[16:])
- self.devtime = datetime.fromtimestamp(when).astimezone(tz=timezone.utc)
- self.is_gps = bool(is_gps)