class TestWithServers(TestCase):
def setUp(self, *args: str) -> None:
- with closing(socket(AF_INET6, SOCK_DGRAM)) as sock:
- sock.bind(("", 0))
- sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
- freeport = sock.getsockname()[1]
+ with closing(socket(AF_INET6, SOCK_DGRAM)) as sock1, closing(
+ socket(AF_INET6, SOCK_DGRAM)
+ ) as sock2:
+ freeports = []
+ for sock in sock1, sock2:
+ sock.bind(("", 0))
+ sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+ freeports.append(sock.getsockname()[1])
_, self.tmpfilebase = mkstemp()
self.conf = ConfigParser()
self.conf["collector"] = {
- "port": str(freeport),
+ "port": str(freeports[0]),
"publishurl": "ipc://" + self.tmpfilebase + ".pub",
"listenurl": "ipc://" + self.tmpfilebase + ".pul",
}
+ self.conf["storage"] = {
+ "dbfn": self.tmpfilebase + ".storage.sqlite",
+ }
+ self.conf["opencellid"] = {
+ "dbfn": self.tmpfilebase + ".opencellid.sqlite",
+ }
+ self.conf["lookaside"] = {
+ "backend": "opencellid",
+ }
+ self.conf["wsgateway"] = {
+ "port": str(freeports[1]),
+ }
self.children = []
for srvname in args:
if srvname == "collector":
0,
srvname + " terminated with non-zero return code",
)
- for sfx in (".pub", ".pul"):
- unlink(self.tmpfilebase + sfx)
+ for sfx in (
+ "",
+ ".pub",
+ ".pul",
+ ".storage.sqlite",
+ ".opencellid.sqlite",
+ ):
+ try:
+ unlink(self.tmpfilebase + sfx)
+ except OSError:
+ pass
def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None: