]> www.average.org Git - loctrkd.git/blobdiff - test/common.py
Update changelog for 2.00 release
[loctrkd.git] / test / common.py
index dccecfc1bb9283c88740602c85c72cacb79306f9..17446a55f807311aab059fb6beabbbd9bb0fb80e 100644 (file)
 """ Common housekeeping for tests that rely on daemons """
 
 from configparser import ConfigParser, SectionProxy
 """ Common housekeeping for tests that rely on daemons """
 
 from configparser import ConfigParser, SectionProxy
-from contextlib import closing
+from contextlib import closing, ExitStack
+from http.server import HTTPServer, SimpleHTTPRequestHandler
 from importlib import import_module
 from importlib import import_module
+from logging import DEBUG, StreamHandler, WARNING
 from multiprocessing import Process
 from os import kill, unlink
 from signal import SIGINT
 from multiprocessing import Process
 from os import kill, unlink
 from signal import SIGINT
-from socket import AF_INET6, SOCK_DGRAM, SOL_SOCKET, SO_REUSEADDR, socket
+from socket import (
+    AF_INET,
+    AF_INET6,
+    getaddrinfo,
+    MSG_DONTWAIT,
+    SOCK_DGRAM,
+    SOCK_STREAM,
+    SOL_SOCKET,
+    SO_REUSEADDR,
+    socket,
+    SocketType,
+)
+from sys import exit, stderr
+from random import Random
 from tempfile import mkstemp
 from time import sleep
 from tempfile import mkstemp
 from time import sleep
+from typing import Any, Optional
 from unittest import TestCase
 
 from unittest import TestCase
 
+from loctrkd.common import init_protocols
+
+NUMPORTS = 3
+
 
 class TestWithServers(TestCase):
 
 class TestWithServers(TestCase):
-    def setUp(self, *args: str) -> None:
-        with closing(socket(AF_INET6, SOCK_DGRAM)) as sock:
-            sock.bind(("", 0))
-            sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
-            freeport = sock.getsockname()[1]
+    def setUp(
+        self, *args: str, httpd: bool = False, verbose: bool = False
+    ) -> None:
+        freeports = []
+        with ExitStack() as stack:
+            for _ in range(NUMPORTS):
+                sk = stack.enter_context(closing(socket(AF_INET6, SOCK_DGRAM)))
+                sk.bind(("", 0))
+                sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+                freeports.append(sk.getsockname()[1])
         _, self.tmpfilebase = mkstemp()
         self.conf = ConfigParser()
         _, self.tmpfilebase = mkstemp()
         self.conf = ConfigParser()
+        self.conf["common"] = {
+            "protocols": "zx303proto,beesure",
+        }
         self.conf["collector"] = {
         self.conf["collector"] = {
-            "port": str(freeport),
+            "port": str(freeports[0]),
             "publishurl": "ipc://" + self.tmpfilebase + ".pub",
             "listenurl": "ipc://" + self.tmpfilebase + ".pul",
         }
             "publishurl": "ipc://" + self.tmpfilebase + ".pub",
             "listenurl": "ipc://" + self.tmpfilebase + ".pul",
         }
+        self.conf["storage"] = {
+            "dbfn": self.tmpfilebase + ".storage.sqlite",
+            "events": "yes",
+        }
+        self.conf["opencellid"] = {
+            "dbfn": self.tmpfilebase + ".opencellid.sqlite",
+            "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
+        }
+        self.conf["rectifier"] = {
+            "lookaside": "opencellid",
+            "publishurl": "ipc://" + self.tmpfilebase + ".rect.pub",
+        }
+        self.conf["wsgateway"] = {
+            "port": str(freeports[1]),
+        }
+        init_protocols(self.conf)
         self.children = []
         for srvname in args:
             if srvname == "collector":
                 kwargs = {"handle_hibernate": False}
             else:
                 kwargs = {}
         self.children = []
         for srvname in args:
             if srvname == "collector":
                 kwargs = {"handle_hibernate": False}
             else:
                 kwargs = {}
-            cls = import_module("gps303." + srvname, package=".")
+            cls = import_module("loctrkd." + srvname, package=".")
+            if verbose:
+                cls.log.addHandler(StreamHandler(stderr))
+                cls.log.setLevel(DEBUG)
+            else:
+                cls.log.setLevel(WARNING)
             p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
             p.start()
             self.children.append((srvname, p))
             p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
             p.start()
             self.children.append((srvname, p))
+        if httpd:
+            server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
+
+            def run(server: HTTPServer) -> None:
+                try:
+                    server.serve_forever()
+                except KeyboardInterrupt:
+                    # TODO: this still leaves unclosed socket in the server
+                    server.shutdown()
+
+            p = Process(target=run, args=(server,))
+            p.start()
+            self.children.append(("httpd", p))
         sleep(1)
         sleep(1)
+        for srvname, p in self.children:
+            self.assertTrue(
+                p.is_alive(),
+                f"{srvname} did not start, exit code {p.exitcode}",
+            )
 
     def tearDown(self) -> None:
 
     def tearDown(self) -> None:
-        sleep(1)
         for srvname, p in self.children:
             if p.pid is not None:
                 kill(p.pid, SIGINT)
             p.join()
         for srvname, p in self.children:
             if p.pid is not None:
                 kill(p.pid, SIGINT)
             p.join()
-            print(srvname, "terminated with return code", p.exitcode)
-        for sfx in (".pub", ".pul"):
-            unlink(self.tmpfilebase + sfx)
+        for sfx in (
+            "",
+            ".pub",
+            ".rect.pub",
+            ".pul",
+            ".storage.sqlite",
+            ".opencellid.sqlite",
+        ):
+            try:
+                unlink(self.tmpfilebase + sfx)
+            except OSError:
+                pass
+        for srvname, p in self.children:
+            self.assertEqual(
+                p.exitcode,
+                0,
+                f"{srvname} terminated with exit code {p.exitcode}",
+            )
+
+
+class Fuzz(TestWithServers):
+    def setUp(self, *args: str, **kwargs: Any) -> None:
+        super().setUp("collector")
+        self.rnd = Random()
+        for fam, typ, pro, cnm, skadr in getaddrinfo(
+            "127.0.0.1",
+            self.conf.getint("collector", "port"),
+            family=AF_INET,
+            type=SOCK_STREAM,
+        ):
+            break  # Just take the first element
+        self.sock = socket(AF_INET, SOCK_STREAM)
+        self.sock.connect(skadr)
+
+    def tearDown(self) -> None:
+        sleep(1)  # give collector some time
+        send_and_drain(self.sock, None)
+        self.sock.close()
+        sleep(1)  # Let the server close their side
+        super().tearDown()
+
+
+def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
+    if buf is not None:
+        sock.send(buf)
+    try:
+        sock.recv(4096, MSG_DONTWAIT)
+    except BlockingIOError:
+        pass