class ProtoModule:
class Stream:
- @staticmethod
- def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
- ...
-
def recv(self, segment: bytes) -> List[Union[bytes, str]]:
...
def close(self) -> bytes:
...
+ @staticmethod
+ def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
+ ...
+
@staticmethod
def probe_buffer(buffer: bytes) -> bool:
...
return msgs
def send(self, buffer: bytes) -> None:
- assert self.stream is not None
+ assert self.stream is not None and self.pmod is not None
try:
- self.sock.send(self.stream.enframe(buffer, imei=self.imei))
+ self.sock.send(self.pmod.enframe(buffer, imei=self.imei))
except OSError as e:
log.error(
"Sending to fd %d (IMEI %s): %s",