]> www.average.org Git - loctrkd.git/blob - test/common.py
test: move `send_and_drain` to common module
[loctrkd.git] / test / common.py
1 """ Common housekeeping for tests that rely on daemons """
2
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing
5 from importlib import import_module
6 from multiprocessing import Process
7 from os import kill, unlink
8 from signal import SIGINT
9 from socket import (
10     AF_INET6,
11     MSG_DONTWAIT,
12     SOCK_DGRAM,
13     SOL_SOCKET,
14     SO_REUSEADDR,
15     socket,
16     SocketType,
17 )
18 from tempfile import mkstemp
19 from time import sleep
20 from typing import Optional
21 from unittest import TestCase
22
23
24 class TestWithServers(TestCase):
25     def setUp(self, *args: str) -> None:
26         with closing(socket(AF_INET6, SOCK_DGRAM)) as sock:
27             sock.bind(("", 0))
28             sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
29             freeport = sock.getsockname()[1]
30         _, self.tmpfilebase = mkstemp()
31         self.conf = ConfigParser()
32         self.conf["collector"] = {
33             "port": str(freeport),
34             "publishurl": "ipc://" + self.tmpfilebase + ".pub",
35             "listenurl": "ipc://" + self.tmpfilebase + ".pul",
36         }
37         self.children = []
38         for srvname in args:
39             if srvname == "collector":
40                 kwargs = {"handle_hibernate": False}
41             else:
42                 kwargs = {}
43             cls = import_module("gps303." + srvname, package=".")
44             p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
45             p.start()
46             self.children.append((srvname, p))
47         sleep(1)
48
49     def tearDown(self) -> None:
50         sleep(1)
51         for srvname, p in self.children:
52             if p.pid is not None:
53                 kill(p.pid, SIGINT)
54             p.join()
55             self.assertEqual(
56                 p.exitcode,
57                 0,
58                 srvname + " terminated with non-zero return code",
59             )
60         for sfx in (".pub", ".pul"):
61             unlink(self.tmpfilebase + sfx)
62
63
64 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
65     if buf is not None:
66         sock.send(buf)
67     try:
68         sock.recv(4096, MSG_DONTWAIT)
69     except BlockingIOError:
70         pass