]> www.average.org Git - loctrkd.git/blobdiff - test/common.py
test: opencellid downloader
[loctrkd.git] / test / common.py
index dccecfc1bb9283c88740602c85c72cacb79306f9..b9f768c8f55d84573a9149f5de248f955039d32b 100644 (file)
@@ -1,30 +1,59 @@
 """ Common housekeeping for tests that rely on daemons """
 
 from configparser import ConfigParser, SectionProxy
-from contextlib import closing
+from contextlib import closing, ExitStack
+from http.server import HTTPServer, SimpleHTTPRequestHandler
 from importlib import import_module
 from multiprocessing import Process
 from os import kill, unlink
 from signal import SIGINT
-from socket import AF_INET6, SOCK_DGRAM, SOL_SOCKET, SO_REUSEADDR, socket
+from socket import (
+    AF_INET6,
+    MSG_DONTWAIT,
+    SOCK_DGRAM,
+    SOL_SOCKET,
+    SO_REUSEADDR,
+    socket,
+    SocketType,
+)
+from sys import exit
 from tempfile import mkstemp
 from time import sleep
+from typing import Optional
 from unittest import TestCase
 
+NUMPORTS = 3
+
 
 class TestWithServers(TestCase):
-    def setUp(self, *args: str) -> None:
-        with closing(socket(AF_INET6, SOCK_DGRAM)) as sock:
-            sock.bind(("", 0))
-            sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
-            freeport = sock.getsockname()[1]
+    def setUp(self, *args: str, httpd: bool = False) -> None:
+        freeports = []
+        with ExitStack() as stack:
+            for _ in range(NUMPORTS):
+                sk = stack.enter_context(closing(socket(AF_INET6, SOCK_DGRAM)))
+                sk.bind(("", 0))
+                sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+                freeports.append(sk.getsockname()[1])
         _, self.tmpfilebase = mkstemp()
         self.conf = ConfigParser()
         self.conf["collector"] = {
-            "port": str(freeport),
+            "port": str(freeports[0]),
             "publishurl": "ipc://" + self.tmpfilebase + ".pub",
             "listenurl": "ipc://" + self.tmpfilebase + ".pul",
         }
+        self.conf["storage"] = {
+            "dbfn": self.tmpfilebase + ".storage.sqlite",
+        }
+        self.conf["opencellid"] = {
+            "dbfn": self.tmpfilebase + ".opencellid.sqlite",
+            "downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
+        }
+        self.conf["lookaside"] = {
+            "backend": "opencellid",
+        }
+        self.conf["wsgateway"] = {
+            "port": str(freeports[1]),
+        }
         self.children = []
         for srvname in args:
             if srvname == "collector":
@@ -35,14 +64,48 @@ class TestWithServers(TestCase):
             p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
             p.start()
             self.children.append((srvname, p))
+        if httpd:
+            server = HTTPServer(("", freeports[2]), SimpleHTTPRequestHandler)
+
+            def run(server):
+                try:
+                    server.serve_forever()
+                except KeyboardInterrupt:
+                    # TODO: this still leaves unclosed socket in the server
+                    server.shutdown()
+
+            p = Process(target=run, args=(server,))
+            p.start()
+            self.children.append(("httpd", p))
         sleep(1)
 
     def tearDown(self) -> None:
-        sleep(1)
         for srvname, p in self.children:
             if p.pid is not None:
                 kill(p.pid, SIGINT)
             p.join()
-            print(srvname, "terminated with return code", p.exitcode)
-        for sfx in (".pub", ".pul"):
-            unlink(self.tmpfilebase + sfx)
+            self.assertEqual(
+                p.exitcode,
+                0,
+                srvname + " terminated with non-zero return code",
+            )
+        for sfx in (
+            "",
+            ".pub",
+            ".pul",
+            ".storage.sqlite",
+            ".opencellid.sqlite",
+        ):
+            try:
+                unlink(self.tmpfilebase + sfx)
+            except OSError:
+                pass
+
+
+def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
+    if buf is not None:
+        sock.send(buf)
+    try:
+        sock.recv(4096, MSG_DONTWAIT)
+    except BlockingIOError:
+        pass