1 """ Send junk to the collector """
3 from random import Random
4 from socket import getaddrinfo, socket, AF_INET, SOCK_STREAM
5 from sqlite3 import connect, Row
8 from .common import send_and_drain, TestWithServers
9 from gps303.gps303proto import *
12 class Storage(TestWithServers):
13 def setUp(self, *args: str) -> None:
14 super().setUp("collector", "storage", "lookaside", "termconfig")
15 for fam, typ, pro, cnm, skadr in getaddrinfo(
17 self.conf.getint("collector", "port"),
21 break # Just take the first element
22 self.sock = socket(AF_INET, SOCK_STREAM)
23 self.sock.connect(skadr)
25 def tearDown(self) -> None:
26 sleep(1) # give collector some time
29 def test_storage(self) -> None:
31 LOGIN.In(imei="9999123456780000", ver=9).packed,
33 HIBERNATION.In().packed,
35 send_and_drain(self.sock, b"xx" + buf + b"\r\n")
39 with connect(self.conf.get("storage", "dbfn")) as db:
41 for is_incoming, packet in db.execute(
42 "select is_incoming, packet from events"
44 msg = parse_message(packet, is_incoming=is_incoming)
48 got, {LOGIN.Out, HIBERNATION.In, LOGIN.In, STATUS.Out, STATUS.In}
52 if __name__ == "__main__":