]> www.average.org Git - loctrkd.git/blob - test/common.py
rename gps303 -> loctrkd
[loctrkd.git] / test / common.py
1 """ Common housekeeping for tests that rely on daemons """
2
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing, ExitStack
5 from http.server import HTTPServer, SimpleHTTPRequestHandler
6 from importlib import import_module
7 from logging import DEBUG, StreamHandler
8 from multiprocessing import Process
9 from os import kill, unlink
10 from signal import SIGINT
11 from socket import (
12     AF_INET6,
13     MSG_DONTWAIT,
14     SOCK_DGRAM,
15     SOL_SOCKET,
16     SO_REUSEADDR,
17     socket,
18     SocketType,
19 )
20 from sys import exit, stderr
21 from tempfile import mkstemp
22 from time import sleep
23 from typing import Optional
24 from unittest import TestCase
25
26 NUMPORTS = 3
27
28
29 class TestWithServers(TestCase):
30     def setUp(
31         self, *args: str, httpd: bool = False, verbose: bool = False
32     ) -> None:
33         freeports = []
34         with ExitStack() as stack:
35             for _ in range(NUMPORTS):
36                 sk = stack.enter_context(closing(socket(AF_INET6, SOCK_DGRAM)))
37                 sk.bind(("", 0))
38                 sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
39                 freeports.append(sk.getsockname()[1])
40         _, self.tmpfilebase = mkstemp()
41         self.conf = ConfigParser()
42         self.conf["collector"] = {
43             "port": str(freeports[0]),
44             "publishurl": "ipc://" + self.tmpfilebase + ".pub",
45             "listenurl": "ipc://" + self.tmpfilebase + ".pul",
46             "protocols": "zx303proto",
47         }
48         self.conf["storage"] = {
49             "dbfn": self.tmpfilebase + ".storage.sqlite",
50         }
51         self.conf["opencellid"] = {
52             "dbfn": self.tmpfilebase + ".opencellid.sqlite",
53             "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
54         }
55         self.conf["lookaside"] = {
56             "backend": "opencellid",
57         }
58         self.conf["wsgateway"] = {
59             "port": str(freeports[1]),
60         }
61         self.children = []
62         for srvname in args:
63             if srvname == "collector":
64                 kwargs = {"handle_hibernate": False}
65             else:
66                 kwargs = {}
67             cls = import_module("loctrkd." + srvname, package=".")
68             if verbose:
69                 cls.log.addHandler(StreamHandler(stderr))
70                 cls.log.setLevel(DEBUG)
71             p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
72             p.start()
73             self.children.append((srvname, p))
74         if httpd:
75             server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
76
77             def run(server: HTTPServer) -> None:
78                 try:
79                     server.serve_forever()
80                 except KeyboardInterrupt:
81                     # TODO: this still leaves unclosed socket in the server
82                     server.shutdown()
83
84             p = Process(target=run, args=(server,))
85             p.start()
86             self.children.append(("httpd", p))
87         sleep(1)
88
89     def tearDown(self) -> None:
90         for srvname, p in self.children:
91             if p.pid is not None:
92                 kill(p.pid, SIGINT)
93             p.join()
94             self.assertEqual(
95                 p.exitcode,
96                 0,
97                 f"{srvname} terminated with return code {p.exitcode}",
98             )
99         for sfx in (
100             "",
101             ".pub",
102             ".pul",
103             ".storage.sqlite",
104             ".opencellid.sqlite",
105         ):
106             try:
107                 unlink(self.tmpfilebase + sfx)
108             except OSError:
109                 pass
110
111
112 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
113     if buf is not None:
114         sock.send(buf)
115     try:
116         sock.recv(4096, MSG_DONTWAIT)
117     except BlockingIOError:
118         pass