]> www.average.org Git - loctrkd.git/blob - test/common.py
7a863106309e61072511641a7a975f5e344abbdf
[loctrkd.git] / test / common.py
1 """ Common housekeeping for tests that rely on daemons """
2
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing, ExitStack
5 from http.server import HTTPServer, SimpleHTTPRequestHandler
6 from importlib import import_module
7 from logging import DEBUG, StreamHandler
8 from multiprocessing import Process
9 from os import kill, unlink
10 from signal import SIGINT
11 from socket import (
12     AF_INET6,
13     MSG_DONTWAIT,
14     SOCK_DGRAM,
15     SOL_SOCKET,
16     SO_REUSEADDR,
17     socket,
18     SocketType,
19 )
20 from sys import exit, stderr
21 from tempfile import mkstemp
22 from time import sleep
23 from typing import Optional
24 from unittest import TestCase
25
26 from loctrkd.common import init_protocols
27
28 NUMPORTS = 3
29
30
31 class TestWithServers(TestCase):
32     def setUp(
33         self, *args: str, httpd: bool = False, verbose: bool = False
34     ) -> None:
35         freeports = []
36         with ExitStack() as stack:
37             for _ in range(NUMPORTS):
38                 sk = stack.enter_context(closing(socket(AF_INET6, SOCK_DGRAM)))
39                 sk.bind(("", 0))
40                 sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
41                 freeports.append(sk.getsockname()[1])
42         _, self.tmpfilebase = mkstemp()
43         self.conf = ConfigParser()
44         self.conf["common"] = {
45             "protocols": "zx303proto",
46         }
47         self.conf["collector"] = {
48             "port": str(freeports[0]),
49             "publishurl": "ipc://" + self.tmpfilebase + ".pub",
50             "listenurl": "ipc://" + self.tmpfilebase + ".pul",
51         }
52         self.conf["storage"] = {
53             "dbfn": self.tmpfilebase + ".storage.sqlite",
54             "events": "yes",
55         }
56         self.conf["opencellid"] = {
57             "dbfn": self.tmpfilebase + ".opencellid.sqlite",
58             "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
59         }
60         self.conf["rectifier"] = {
61             "lookaside": "opencellid",
62             "publishurl": "ipc://" + self.tmpfilebase + ".rect.pub",
63         }
64         self.conf["wsgateway"] = {
65             "port": str(freeports[1]),
66         }
67         init_protocols(self.conf)
68         self.children = []
69         for srvname in args:
70             if srvname == "collector":
71                 kwargs = {"handle_hibernate": False}
72             else:
73                 kwargs = {}
74             cls = import_module("loctrkd." + srvname, package=".")
75             if verbose:
76                 cls.log.addHandler(StreamHandler(stderr))
77                 cls.log.setLevel(DEBUG)
78             p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
79             p.start()
80             self.children.append((srvname, p))
81         if httpd:
82             server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
83
84             def run(server: HTTPServer) -> None:
85                 try:
86                     server.serve_forever()
87                 except KeyboardInterrupt:
88                     # TODO: this still leaves unclosed socket in the server
89                     server.shutdown()
90
91             p = Process(target=run, args=(server,))
92             p.start()
93             self.children.append(("httpd", p))
94         sleep(1)
95
96     def tearDown(self) -> None:
97         for srvname, p in self.children:
98             if p.pid is not None:
99                 kill(p.pid, SIGINT)
100             p.join()
101             self.assertEqual(
102                 p.exitcode,
103                 0,
104                 f"{srvname} terminated with return code {p.exitcode}",
105             )
106         for sfx in (
107             "",
108             ".pub",
109             ".rect.pub",
110             ".pul",
111             ".storage.sqlite",
112             ".opencellid.sqlite",
113         ):
114             try:
115                 unlink(self.tmpfilebase + sfx)
116             except OSError:
117                 pass
118
119
120 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
121     if buf is not None:
122         sock.send(buf)
123     try:
124         sock.recv(4096, MSG_DONTWAIT)
125     except BlockingIOError:
126         pass