__all__ = (
"Stream",
"class_by_prefix",
+ "enframe",
"inline_response",
"parse_message",
"probe_buffer",
self.imei: Optional[str] = None
self.datalen: int = 0
- @staticmethod
- def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
- assert imei is not None and len(imei) == 10
- return f"[LT*{imei:10s}*{len(buffer):04X}*".encode() + buffer + b"]"
-
def recv(self, segment: bytes) -> List[Union[bytes, str]]:
"""
Process next segment of the stream. Return successfully deframed
)
self.buffer = self.buffer[toskip:]
# From this point, buffer starts with a packet header
- if self.imei is not None and self.imei != imei:
+ if self.imei is None:
+ self.imei = imei
+ if self.imei != imei:
msgs.append(
f"Packet's imei {imei} mismatches"
- f" previous value {self.imei}"
+ f" previous value {self.imei}, old value kept"
)
- self.imei = imei
self.datalen = datalen
if len(self.buffer) < self.datalen + 21: # Incomplete packet
break
return ret
+def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
+ assert imei is not None and len(imei) == 10
+ off, vid, _, dlen = _framestart(buffer)
+ assert off == 0
+ return f"[{vid:2s}*{imei:10s}*{dlen:04X}*".encode() + buffer[20:]
+
+
### Parser/Constructor ###
@property
def packed(self) -> bytes:
- return self.encode().encode() # first is object's, second str's
+ buffer = self.encode().encode()
+ return f"[LT*0000000000*{len(buffer):04X}*".encode() + buffer + b"]"
class UNKNOWN(BeeSurePkt):
def proto_of_message(packet: bytes) -> str:
- return PROTO_PREFIX + packet.split(b",")[0].decode()
+ return PROTO_PREFIX + packet[20:-1].split(b",")[0].decode()
def imei_from_packet(packet: bytes) -> Optional[str]: