]> www.average.org Git - loctrkd.git/blob - test/common.py
test: adjust sleeps and drains when daemons run
[loctrkd.git] / test / common.py
1 """ Common housekeeping for tests that rely on daemons """
2
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing
5 from importlib import import_module
6 from multiprocessing import Process
7 from os import kill, unlink
8 from signal import SIGINT
9 from socket import (
10     AF_INET6,
11     MSG_DONTWAIT,
12     SOCK_DGRAM,
13     SOL_SOCKET,
14     SO_REUSEADDR,
15     socket,
16     SocketType,
17 )
18 from tempfile import mkstemp
19 from time import sleep
20 from typing import Optional
21 from unittest import TestCase
22
23
24 class TestWithServers(TestCase):
25     def setUp(self, *args: str) -> None:
26         with closing(socket(AF_INET6, SOCK_DGRAM)) as sock1, closing(
27             socket(AF_INET6, SOCK_DGRAM)
28         ) as sock2:
29             freeports = []
30             for sock in sock1, sock2:
31                 sock.bind(("", 0))
32                 sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
33                 freeports.append(sock.getsockname()[1])
34         _, self.tmpfilebase = mkstemp()
35         self.conf = ConfigParser()
36         self.conf["collector"] = {
37             "port": str(freeports[0]),
38             "publishurl": "ipc://" + self.tmpfilebase + ".pub",
39             "listenurl": "ipc://" + self.tmpfilebase + ".pul",
40         }
41         self.conf["storage"] = {
42             "dbfn": self.tmpfilebase + ".storage.sqlite",
43         }
44         self.conf["opencellid"] = {
45             "dbfn": self.tmpfilebase + ".opencellid.sqlite",
46         }
47         self.conf["lookaside"] = {
48             "backend": "opencellid",
49         }
50         self.conf["wsgateway"] = {
51             "port": str(freeports[1]),
52         }
53         self.children = []
54         for srvname in args:
55             if srvname == "collector":
56                 kwargs = {"handle_hibernate": False}
57             else:
58                 kwargs = {}
59             cls = import_module("gps303." + srvname, package=".")
60             p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
61             p.start()
62             self.children.append((srvname, p))
63         sleep(1)
64
65     def tearDown(self) -> None:
66         for srvname, p in self.children:
67             if p.pid is not None:
68                 kill(p.pid, SIGINT)
69             p.join()
70             self.assertEqual(
71                 p.exitcode,
72                 0,
73                 srvname + " terminated with non-zero return code",
74             )
75         for sfx in (
76             "",
77             ".pub",
78             ".pul",
79             ".storage.sqlite",
80             ".opencellid.sqlite",
81         ):
82             try:
83                 unlink(self.tmpfilebase + sfx)
84             except OSError:
85                 pass
86
87
88 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
89     if buf is not None:
90         sock.send(buf)
91     try:
92         sock.recv(4096, MSG_DONTWAIT)
93     except BlockingIOError:
94         pass