1 """ Common housekeeping for tests that rely on daemons """
3 from configparser import ConfigParser, SectionProxy
4 from contextlib import closing, ExitStack
5 from http.server import HTTPServer, SimpleHTTPRequestHandler
6 from importlib import import_module
7 from logging import DEBUG, StreamHandler, WARNING
8 from multiprocessing import Process
9 from os import kill, unlink
10 from signal import SIGINT
23 from sys import exit, stderr
24 from random import Random
25 from tempfile import mkstemp
26 from time import sleep
27 from typing import Any, Optional
28 from unittest import TestCase
30 from loctrkd.common import init_protocols
35 class TestWithServers(TestCase):
37 self, *args: str, httpd: bool = False, verbose: bool = False
40 with ExitStack() as stack:
41 for _ in range(NUMPORTS):
42 sk = stack.enter_context(closing(socket(AF_INET6, SOCK_DGRAM)))
44 sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
45 freeports.append(sk.getsockname()[1])
46 _, self.tmpfilebase = mkstemp()
47 self.conf = ConfigParser()
48 self.conf["common"] = {
49 "protocols": "zx303proto,beesure",
51 self.conf["collector"] = {
52 "port": str(freeports[0]),
53 "publishurl": "ipc://" + self.tmpfilebase + ".pub",
54 "listenurl": "ipc://" + self.tmpfilebase + ".pul",
56 self.conf["storage"] = {
57 "dbfn": self.tmpfilebase + ".storage.sqlite",
60 self.conf["opencellid"] = {
61 "dbfn": self.tmpfilebase + ".opencellid.sqlite",
62 "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
64 self.conf["rectifier"] = {
65 "lookaside": "opencellid",
66 "publishurl": "ipc://" + self.tmpfilebase + ".rect.pub",
68 self.conf["wsgateway"] = {
69 "port": str(freeports[1]),
71 init_protocols(self.conf)
74 if srvname == "collector":
75 kwargs = {"handle_hibernate": False}
78 cls = import_module("loctrkd." + srvname, package=".")
80 cls.log.addHandler(StreamHandler(stderr))
81 cls.log.setLevel(DEBUG)
83 cls.log.setLevel(WARNING)
84 p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
86 self.children.append((srvname, p))
88 server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
90 def run(server: HTTPServer) -> None:
92 server.serve_forever()
93 except KeyboardInterrupt:
94 # TODO: this still leaves unclosed socket in the server
97 p = Process(target=run, args=(server,))
99 self.children.append(("httpd", p))
101 for srvname, p in self.children:
104 f"{srvname} did not start, exit code {p.exitcode}",
107 def tearDown(self) -> None:
108 for srvname, p in self.children:
109 if p.pid is not None:
118 ".opencellid.sqlite",
121 unlink(self.tmpfilebase + sfx)
124 for srvname, p in self.children:
128 f"{srvname} terminated with exit code {p.exitcode}",
132 class Fuzz(TestWithServers):
133 def setUp(self, *args: str, **kwargs: Any) -> None:
134 super().setUp("collector")
136 for fam, typ, pro, cnm, skadr in getaddrinfo(
138 self.conf.getint("collector", "port"),
142 break # Just take the first element
143 self.sock = socket(AF_INET, SOCK_STREAM)
144 self.sock.connect(skadr)
146 def tearDown(self) -> None:
147 sleep(1) # give collector some time
148 send_and_drain(self.sock, None)
150 sleep(1) # Let the server close their side
154 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
158 sock.recv(4096, MSG_DONTWAIT)
159 except BlockingIOError: