from random import Random
from socket import getaddrinfo, socket, AF_INET, SOCK_STREAM
-from sqlite3 import connect, Row
+from sqlite3 import connect
from time import sleep
+from typing import Any
import unittest
from .common import send_and_drain, TestWithServers
from gps303.gps303proto import *
class Storage(TestWithServers):
- def setUp(self, *args: str) -> None:
+ def setUp(self, *args: str, **kwargs: Any) -> None:
super().setUp("collector", "storage", "lookaside", "termconfig")
for fam, typ, pro, cnm, skadr in getaddrinfo(
"127.0.0.1",
sleep(1)
got = set()
with connect(self.conf.get("storage", "dbfn")) as db:
- db.row_factory = Row
for is_incoming, packet in db.execute(
"select is_incoming, packet from events"
):