X-Git-Url: http://www.average.org/gitweb/?p=loctrkd.git;a=blobdiff_plain;f=gps303%2Fzmsg.py;h=b6faa7025a1df310422e6e51603878d9c01254b1;hp=2d497c0cd5a98997bb3aa89c101a39fb29213c54;hb=c3bc6d5bbdc0d0bf10e338c6e3bad1a519d5afa0;hpb=d6d5b6c95b677d98777959040bac808e4c5499c7 diff --git a/gps303/zmsg.py b/gps303/zmsg.py index 2d497c0..b6faa70 100644 --- a/gps303/zmsg.py +++ b/gps303/zmsg.py @@ -7,7 +7,7 @@ from typing import Any, cast, Optional, Tuple, Type, Union __all__ = "Bcast", "Resp", "topic" -def pack_peer( +def pack_peer( # 18 bytes peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]] ) -> bytes: if peeraddr is None: @@ -93,9 +93,9 @@ class _Zmsg: def topic( - proto: int, is_incoming: bool = True, imei: Optional[str] = None + proto: str, is_incoming: bool = True, imei: Optional[str] = None ) -> bytes: - return pack("BB", is_incoming, proto) + ( + return pack("B16s", is_incoming, proto.encode()) + ( b"" if imei is None else pack("16s", imei.encode()) ) @@ -105,7 +105,7 @@ class Bcast(_Zmsg): KWARGS = ( ("is_incoming", True), - ("proto", 256), + ("proto", "UNKNOWN"), ("imei", None), ("when", None), ("peeraddr", None), @@ -116,31 +116,28 @@ class Bcast(_Zmsg): def packed(self) -> bytes: return ( pack( - "BB16s", + "!B16s16sd", int(self.is_incoming), - self.proto, + self.proto[:16].ljust(16, "\0").encode(), b"0000000000000000" if self.imei is None else self.imei.encode(), - ) - + ( - b"\0\0\0\0\0\0\0\0" - if self.when is None - else pack("!d", self.when) + 0 if self.when is None else self.when, ) + pack_peer(self.peeraddr) + self.packet ) def decode(self, buffer: bytes) -> None: - self.is_incoming = bool(buffer[0]) - self.proto = buffer[1] - self.imei: Optional[str] = buffer[2:18].decode() - if self.imei == "0000000000000000": - self.imei = None - self.when = unpack("!d", buffer[18:26])[0] - self.peeraddr = unpack_peer(buffer[26:44]) - self.packet = buffer[44:] + is_incoming, proto, imei, when = unpack("!B16s16sd", buffer[:41]) + self.is_incoming = bool(is_incoming) + self.proto = proto.decode() + self.imei = ( + None if imei == b"0000000000000000" else imei.decode().strip("\0") + ) + self.when = when + self.peeraddr = unpack_peer(buffer[41:59]) + self.packet = buffer[59:] class Resp(_Zmsg): @@ -152,20 +149,20 @@ class Resp(_Zmsg): def packed(self) -> bytes: return ( pack( - "16s", + "!16sd", "0000000000000000" if self.imei is None else self.imei.encode(), - ) - + ( - b"\0\0\0\0\0\0\0\0" - if self.when is None - else pack("!d", self.when) + 0 if self.when is None else self.when, ) + self.packet ) def decode(self, buffer: bytes) -> None: - self.imei = buffer[:16].decode() - self.when = unpack("!d", buffer[16:24])[0] + imei, when = unpack("!16sd", buffer[:24]) + self.imei = ( + None if imei == b"0000000000000000" else imei.decode().strip("\0") + ) + + self.when = when self.packet = buffer[24:]