from multiprocessing import Process
from os import kill, unlink
from signal import SIGINT
-from socket import AF_INET6, SOCK_DGRAM, SOL_SOCKET, SO_REUSEADDR, socket
+from socket import (
+ AF_INET6,
+ MSG_DONTWAIT,
+ SOCK_DGRAM,
+ SOL_SOCKET,
+ SO_REUSEADDR,
+ socket,
+ SocketType,
+)
from tempfile import mkstemp
from time import sleep
+from typing import Optional
from unittest import TestCase
if p.pid is not None:
kill(p.pid, SIGINT)
p.join()
- print(srvname, "terminated with return code", p.exitcode)
+ self.assertEqual(
+ p.exitcode,
+ 0,
+ srvname + " terminated with non-zero return code",
+ )
for sfx in (".pub", ".pul"):
unlink(self.tmpfilebase + sfx)
+
+
+def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
+ if buf is not None:
+ sock.send(buf)
+ try:
+ sock.recv(4096, MSG_DONTWAIT)
+ except BlockingIOError:
+ pass