from contextlib import closing, ExitStack
from http.server import HTTPServer, SimpleHTTPRequestHandler
from importlib import import_module
from contextlib import closing, ExitStack
from http.server import HTTPServer, SimpleHTTPRequestHandler
from importlib import import_module
from multiprocessing import Process
from os import kill, unlink
from signal import SIGINT
from multiprocessing import Process
from os import kill, unlink
from signal import SIGINT
- def setUp(self, *args: str, httpd: bool = False) -> None:
+ def setUp(
+ self, *args: str, httpd: bool = False, verbose: bool = False
+ ) -> None:
freeports.append(sk.getsockname()[1])
_, self.tmpfilebase = mkstemp()
self.conf = ConfigParser()
freeports.append(sk.getsockname()[1])
_, self.tmpfilebase = mkstemp()
self.conf = ConfigParser()
self.conf["collector"] = {
"port": str(freeports[0]),
"publishurl": "ipc://" + self.tmpfilebase + ".pub",
self.conf["collector"] = {
"port": str(freeports[0]),
"publishurl": "ipc://" + self.tmpfilebase + ".pub",
"dbfn": self.tmpfilebase + ".opencellid.sqlite",
"downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
}
"dbfn": self.tmpfilebase + ".opencellid.sqlite",
"downloadurl": f"http://localhost:{freeports[2]}/test/262.csv.gz",
}
- self.conf["lookaside"] = {
- "backend": "opencellid",
+ self.conf["rectifier"] = {
+ "lookaside": "opencellid",
+ "publishurl": "ipc://" + self.tmpfilebase + ".rect.pub",
}
self.conf["wsgateway"] = {
"port": str(freeports[1]),
}
self.conf["wsgateway"] = {
"port": str(freeports[1]),
- cls = import_module("gps303." + srvname, package=".")
+ cls = import_module("loctrkd." + srvname, package=".")
+ if verbose:
+ cls.log.addHandler(StreamHandler(stderr))
+ cls.log.setLevel(DEBUG)
p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
p.start()
self.children.append((srvname, p))
p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
p.start()
self.children.append((srvname, p))