class ProtoModule:
class Stream:
- @staticmethod
- def enframe(buffer: bytes) -> bytes:
- ...
-
def recv(self, segment: bytes) -> List[Union[bytes, str]]:
...
def close(self) -> bytes:
...
+ @staticmethod
+ def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
+ ...
+
@staticmethod
def probe_buffer(buffer: bytes) -> bool:
...
def proto_of_message(packet: bytes) -> str:
...
- @staticmethod
- def proto_by_name(name: str) -> int:
- ...
-
pmods: List[ProtoModule] = []
return msgs
def send(self, buffer: bytes) -> None:
- assert self.stream is not None
+ assert self.stream is not None and self.pmod is not None
try:
- self.sock.send(self.stream.enframe(buffer))
+ self.sock.send(self.pmod.enframe(buffer, imei=self.imei))
except OSError as e:
log.error(
"Sending to fd %d (IMEI %s): %s",