""" Send junk to the collector """
from random import Random
-from socket import getaddrinfo, socket, AF_INET6, SOCK_STREAM
-from sqlite3 import connect
+from socket import getaddrinfo, socket, AF_INET, SOCK_STREAM
+from sqlite3 import connect, Row
from time import sleep
+from typing import Any
import unittest
from .common import send_and_drain, TestWithServers
+from gps303.gps303proto import *
class Storage(TestWithServers):
- def setUp(self, *args: str) -> None:
+ def setUp(self, *args: str, **kwargs: Any) -> None:
super().setUp("collector", "storage", "lookaside", "termconfig")
for fam, typ, pro, cnm, skadr in getaddrinfo(
- "::1",
+ "127.0.0.1",
self.conf.getint("collector", "port"),
- family=AF_INET6,
+ family=AF_INET,
type=SOCK_STREAM,
):
break # Just take the first element
- self.sock = socket(AF_INET6, SOCK_STREAM)
+ self.sock = socket(AF_INET, SOCK_STREAM)
self.sock.connect(skadr)
def tearDown(self) -> None:
sleep(1) # give collector some time
- send_and_drain(self.sock, None)
- self.sock.close()
super().tearDown()
def test_storage(self) -> None:
- buf = b"xx\r\x01\x03Y3\x90w\x19q\x85\x05\r\n"
- send_and_drain(self.sock, buf)
- # TODO: make a proper sequence
+ for buf in (
+ LOGIN.In(imei="9999123456780000", ver=9).packed,
+ STATUS.In().packed,
+ HIBERNATION.In().packed,
+ ):
+ send_and_drain(self.sock, b"xx" + buf + b"\r\n")
+ self.sock.close()
+ sleep(1)
+ got = set()
with connect(self.conf.get("storage", "dbfn")) as db:
- c = db.cursor()
- c.execute("select * from events")
- events = [dict(row) for row in c]
- print(events)
+ db.row_factory = Row
+ for is_incoming, packet in db.execute(
+ "select is_incoming, packet from events"
+ ):
+ msg = parse_message(packet, is_incoming=is_incoming)
+ # print(msg)
+ got.add(type(msg))
+ self.assertEqual(
+ got, {LOGIN.Out, HIBERNATION.In, LOGIN.In, STATUS.Out, STATUS.In}
+ )
if __name__ == "__main__":