]> www.average.org Git - loctrkd.git/blobdiff - test/test_storage.py
test: better aquisition of free ports
[loctrkd.git] / test / test_storage.py
index 0c5e260e9243a90265a814e0a08c9a542571c175..80ccba61fed7ad87a373162bcdea545526c77030 100644 (file)
@@ -1,41 +1,53 @@
 """ Send junk to the collector """
 
 from random import Random
-from socket import getaddrinfo, socket, AF_INET6, SOCK_STREAM
-from sqlite3 import connect
+from socket import getaddrinfo, socket, AF_INET, SOCK_STREAM
+from sqlite3 import connect, Row
 from time import sleep
+from typing import Any
 import unittest
 from .common import send_and_drain, TestWithServers
+from gps303.gps303proto import *
 
 
 class Storage(TestWithServers):
-    def setUp(self, *args: str) -> None:
+    def setUp(self, *args: str, **kwargs: Any) -> None:
         super().setUp("collector", "storage", "lookaside", "termconfig")
         for fam, typ, pro, cnm, skadr in getaddrinfo(
-            "::1",
+            "127.0.0.1",
             self.conf.getint("collector", "port"),
-            family=AF_INET6,
+            family=AF_INET,
             type=SOCK_STREAM,
         ):
             break  # Just take the first element
-        self.sock = socket(AF_INET6, SOCK_STREAM)
+        self.sock = socket(AF_INET, SOCK_STREAM)
         self.sock.connect(skadr)
 
     def tearDown(self) -> None:
         sleep(1)  # give collector some time
-        send_and_drain(self.sock, None)
-        self.sock.close()
         super().tearDown()
 
     def test_storage(self) -> None:
-        buf = b"xx\r\x01\x03Y3\x90w\x19q\x85\x05\r\n"
-        send_and_drain(self.sock, buf)
-        # TODO: make a proper sequence
+        for buf in (
+            LOGIN.In(imei="9999123456780000", ver=9).packed,
+            STATUS.In().packed,
+            HIBERNATION.In().packed,
+        ):
+            send_and_drain(self.sock, b"xx" + buf + b"\r\n")
+        self.sock.close()
+        sleep(1)
+        got = set()
         with connect(self.conf.get("storage", "dbfn")) as db:
-            c = db.cursor()
-            c.execute("select * from events")
-            events = [dict(row) for row in c]
-        print(events)
+            db.row_factory = Row
+            for is_incoming, packet in db.execute(
+                "select is_incoming, packet from events"
+            ):
+                msg = parse_message(packet, is_incoming=is_incoming)
+                # print(msg)
+                got.add(type(msg))
+        self.assertEqual(
+            got, {LOGIN.Out, HIBERNATION.In, LOGIN.In, STATUS.Out, STATUS.In}
+        )
 
 
 if __name__ == "__main__":