]> www.average.org Git - loctrkd.git/blobdiff - test/common.py
test: add fuzzer for beesure protocol
[loctrkd.git] / test / common.py
index 7a863106309e61072511641a7a975f5e344abbdf..a31ff094d9d806ebfa5b1572d49587a2a20c7bd8 100644 (file)
@@ -9,18 +9,22 @@ from multiprocessing import Process
 from os import kill, unlink
 from signal import SIGINT
 from socket import (
+    AF_INET,
     AF_INET6,
+    getaddrinfo,
     MSG_DONTWAIT,
     SOCK_DGRAM,
+    SOCK_STREAM,
     SOL_SOCKET,
     SO_REUSEADDR,
     socket,
     SocketType,
 )
 from sys import exit, stderr
+from random import Random
 from tempfile import mkstemp
 from time import sleep
-from typing import Optional
+from typing import Any, Optional
 from unittest import TestCase
 
 from loctrkd.common import init_protocols
@@ -42,7 +46,7 @@ class TestWithServers(TestCase):
         _, self.tmpfilebase = mkstemp()
         self.conf = ConfigParser()
         self.conf["common"] = {
-            "protocols": "zx303proto",
+            "protocols": "zx303proto,beesure",
         }
         self.conf["collector"] = {
             "port": str(freeports[0]),
@@ -117,6 +121,28 @@ class TestWithServers(TestCase):
                 pass
 
 
+class Fuzz(TestWithServers):
+    def setUp(self, *args: str, **kwargs: Any) -> None:
+        super().setUp("collector")
+        self.rnd = Random()
+        for fam, typ, pro, cnm, skadr in getaddrinfo(
+            "127.0.0.1",
+            self.conf.getint("collector", "port"),
+            family=AF_INET,
+            type=SOCK_STREAM,
+        ):
+            break  # Just take the first element
+        self.sock = socket(AF_INET, SOCK_STREAM)
+        self.sock.connect(skadr)
+
+    def tearDown(self) -> None:
+        sleep(1)  # give collector some time
+        send_and_drain(self.sock, None)
+        self.sock.close()
+        sleep(1)  # Let the server close their side
+        super().tearDown()
+
+
 def send_and_drain(sock: SocketType, buf: Optional[bytes]) -> None:
     if buf is not None:
         sock.send(buf)