1 """ Send junk to the collector """
3 from random import Random
4 from socket import getaddrinfo, socket, AF_INET6, SOCK_STREAM
5 from sqlite3 import connect
8 from .common import send_and_drain, TestWithServers
11 class Storage(TestWithServers):
12 def setUp(self, *args: str) -> None:
13 super().setUp("collector", "storage", "lookaside", "termconfig")
14 for fam, typ, pro, cnm, skadr in getaddrinfo(
16 self.conf.getint("collector", "port"),
20 break # Just take the first element
21 self.sock = socket(AF_INET6, SOCK_STREAM)
22 self.sock.connect(skadr)
24 def tearDown(self) -> None:
25 sleep(1) # give collector some time
26 send_and_drain(self.sock, None)
30 def test_storage(self) -> None:
31 buf = b"xx\r\x01\x03Y3\x90w\x19q\x85\x05\r\n"
32 send_and_drain(self.sock, buf)
33 # TODO: make a proper sequence
34 with connect(self.conf.get("storage", "dbfn")) as db:
36 c.execute("select * from events")
37 events = [dict(row) for row in c]
41 if __name__ == "__main__":