]> www.average.org Git - loctrkd.git/blobdiff - test/common.py
tests: separate fuzz tests in two modules
[loctrkd.git] / test / common.py
index 7a863106309e61072511641a7a975f5e344abbdf..131169ccedb80b42d3864f5fe055c191c303261f 100644 (file)
@@ -9,18 +9,22 @@ from multiprocessing import Process
 from os import kill, unlink
 from signal import SIGINT
 from socket import (
+    AF_INET,
     AF_INET6,
+    getaddrinfo,
     MSG_DONTWAIT,
     SOCK_DGRAM,
+    SOCK_STREAM,
     SOL_SOCKET,
     SO_REUSEADDR,
     socket,
     SocketType,
 )
 from sys import exit, stderr
+from random import Random
 from tempfile import mkstemp
 from time import sleep
-from typing import Optional
+from typing import Any, Optional
 from unittest import TestCase
 
 from loctrkd.common import init_protocols
@@ -117,6 +121,28 @@ class TestWithServers(TestCase):
                 pass
 
 
+class Fuzz(TestWithServers):
+    def setUp(self, *args: str, **kwargs: Any) -> None:
+        super().setUp("collector")
+        self.rnd = Random()
+        for fam, typ, pro, cnm, skadr in getaddrinfo(
+            "127.0.0.1",
+            self.conf.getint("collector", "port"),
+            family=AF_INET,
+            type=SOCK_STREAM,
+        ):
+            break  # Just take the first element
+        self.sock = socket(AF_INET, SOCK_STREAM)
+        self.sock.connect(skadr)
+
+    def tearDown(self) -> None:
+        sleep(1)  # give collector some time
+        send_and_drain(self.sock, None)
+        self.sock.close()
+        sleep(1)  # Let the server close their side
+        super().tearDown()
+
+
 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
     if buf is not None:
         sock.send(buf)