]> www.average.org Git - loctrkd.git/blob - test/test_storage.py
0c5e260e9243a90265a814e0a08c9a542571c175
[loctrkd.git] / test / test_storage.py
1 """ Send junk to the collector """
2
3 from random import Random
4 from socket import getaddrinfo, socket, AF_INET6, SOCK_STREAM
5 from sqlite3 import connect
6 from time import sleep
7 import unittest
8 from .common import send_and_drain, TestWithServers
9
10
11 class Storage(TestWithServers):
12     def setUp(self, *args: str) -> None:
13         super().setUp("collector", "storage", "lookaside", "termconfig")
14         for fam, typ, pro, cnm, skadr in getaddrinfo(
15             "::1",
16             self.conf.getint("collector", "port"),
17             family=AF_INET6,
18             type=SOCK_STREAM,
19         ):
20             break  # Just take the first element
21         self.sock = socket(AF_INET6, SOCK_STREAM)
22         self.sock.connect(skadr)
23
24     def tearDown(self) -> None:
25         sleep(1)  # give collector some time
26         send_and_drain(self.sock, None)
27         self.sock.close()
28         super().tearDown()
29
30     def test_storage(self) -> None:
31         buf = b"xx\r\x01\x03Y3\x90w\x19q\x85\x05\r\n"
32         send_and_drain(self.sock, buf)
33         # TODO: make a proper sequence
34         with connect(self.conf.get("storage", "dbfn")) as db:
35             c = db.cursor()
36             c.execute("select * from events")
37             events = [dict(row) for row in c]
38         print(events)
39
40
41 if __name__ == "__main__":
42     unittest.main()