]> www.average.org Git - loctrkd.git/blobdiff - test/test_fuzz.py
opencellid: use temp table rather than memory db
[loctrkd.git] / test / test_fuzz.py
index 0456fda0352ddede2b315fb351bf431b500929ba..ecedb2ae6dc34c4c4320ac95488c5170cdbcbdb0 100644 (file)
@@ -1,55 +1,47 @@
 """ Send junk to the collector """
 
 from random import Random
-from socket import getaddrinfo, socket, AF_INET6, MSG_DONTWAIT, SOCK_STREAM
+from socket import getaddrinfo, socket, AF_INET, SOCK_STREAM
 from time import sleep
-from typing import Optional
+from typing import Any
 import unittest
-from .common import TestWithServers
+from .common import send_and_drain, TestWithServers
 
 REPEAT: int = 1000000
 
 
 class Fuzz(TestWithServers):
-    def setUp(self, *args: str) -> None:
+    def setUp(self, *args: str, **kwargs: Any) -> None:
         super().setUp("collector")
         self.rnd = Random()
         for fam, typ, pro, cnm, skadr in getaddrinfo(
-            "::1",
+            "127.0.0.1",
             self.conf.getint("collector", "port"),
-            family=AF_INET6,
+            family=AF_INET,
             type=SOCK_STREAM,
         ):
             break  # Just take the first element
-        self.sock = socket(AF_INET6, SOCK_STREAM)
+        self.sock = socket(AF_INET, SOCK_STREAM)
         self.sock.connect(skadr)
 
     def tearDown(self) -> None:
         sleep(1)  # give collector some time
-        self._send_and_drain(None)
+        send_and_drain(self.sock, None)
         self.sock.close()
-        print("finished fuzzing")
+        sleep(1)  # Let the server close their side
         super().tearDown()
 
-    def _send_and_drain(self, buf: Optional[bytes]) -> None:
-        if buf is not None:
-            self.sock.send(buf)
-        try:
-            self.sock.recv(4096, MSG_DONTWAIT)
-        except BlockingIOError:
-            pass
-
     def test_stream(self) -> None:
         for _ in range(REPEAT):
             size = self.rnd.randint(1, 5000)
             buf = self.rnd.randbytes(size)
-            self._send_and_drain(buf)
+            send_and_drain(self.sock, buf)
 
     def test_msgs(self) -> None:
         for _ in range(REPEAT):
             size = self.rnd.randint(0, 300)
             buf = b"xx" + self.rnd.randbytes(size) + b"\r\n"
-            self._send_and_drain(buf)
+            send_and_drain(self.sock, buf)
 
 
 if __name__ == "__main__":