1 """ Common housekeeping for tests that rely on daemons """
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing
5 from importlib import import_module
6 from multiprocessing import Process
7 from os import kill, unlink
8 from signal import SIGINT
18 from tempfile import mkstemp
19 from time import sleep
20 from typing import Optional
21 from unittest import TestCase
24 class TestWithServers(TestCase):
25 def setUp(self, *args: str) -> None:
26 with closing(socket(AF_INET6, SOCK_DGRAM)) as sock:
28 sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
29 freeport = sock.getsockname()[1]
30 _, self.tmpfilebase = mkstemp()
31 self.conf = ConfigParser()
32 self.conf["collector"] = {
33 "port": str(freeport),
34 "publishurl": "ipc://" + self.tmpfilebase + ".pub",
35 "listenurl": "ipc://" + self.tmpfilebase + ".pul",
39 if srvname == "collector":
40 kwargs = {"handle_hibernate": False}
43 cls = import_module("gps303." + srvname, package=".")
44 p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
46 self.children.append((srvname, p))
49 def tearDown(self) -> None:
51 for srvname, p in self.children:
58 srvname + " terminated with non-zero return code",
60 for sfx in (".pub", ".pul"):
61 unlink(self.tmpfilebase + sfx)
64 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
68 sock.recv(4096, MSG_DONTWAIT)
69 except BlockingIOError: