1 """ Send junk to the collector """
3 from random import Random
4 from socket import getaddrinfo, socket, AF_INET6, MSG_DONTWAIT, SOCK_STREAM
6 from typing import Optional
8 from .common import TestWithServers
13 class Fuzz(TestWithServers):
14 def setUp(self, *args: str) -> None:
15 super().setUp("collector")
17 for fam, typ, pro, cnm, skadr in getaddrinfo(
19 self.conf.getint("collector", "port"),
23 break # Just take the first element
24 self.sock = socket(AF_INET6, SOCK_STREAM)
25 self.sock.connect(skadr)
27 def tearDown(self) -> None:
28 sleep(1) # give collector some time
29 self._send_and_drain(None)
31 print("finished fuzzing")
34 def _send_and_drain(self, buf: Optional[bytes]) -> None:
38 self.sock.recv(4096, MSG_DONTWAIT)
39 except BlockingIOError:
42 def test_stream(self) -> None:
43 for _ in range(REPEAT):
44 size = self.rnd.randint(1, 5000)
45 buf = self.rnd.randbytes(size)
46 self._send_and_drain(buf)
48 def test_msgs(self) -> None:
49 for _ in range(REPEAT):
50 size = self.rnd.randint(0, 300)
51 buf = b"xx" + self.rnd.randbytes(size) + b"\r\n"
52 self._send_and_drain(buf)
55 if __name__ == "__main__":