Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/tests/conftest.py
Show All 22 Lines | |||||
class GraphServerProcess(multiprocessing.Process): | class GraphServerProcess(multiprocessing.Process): | ||||
def __init__(self, q, *args, **kwargs): | def __init__(self, q, *args, **kwargs): | ||||
self.q = q | self.q = q | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
def run(self): | def run(self): | ||||
try: | try: | ||||
backend = Backend(graph_path=str(TEST_GRAPH_PATH)) | backend = Backend(graph_path=str(TEST_GRAPH_PATH)) | ||||
with backend: | |||||
with loop_context() as loop: | with loop_context() as loop: | ||||
app = make_app(backend=backend, debug=True) | app = make_app(backend=backend, debug=True) | ||||
client = TestClient(TestServer(app), loop=loop) | client = TestClient(TestServer(app), loop=loop) | ||||
loop.run_until_complete(client.start_server()) | loop.run_until_complete(client.start_server()) | ||||
url = client.make_url("/graph/") | url = client.make_url("/graph/") | ||||
self.q.put(url) | self.q.put(url) | ||||
loop.run_forever() | loop.run_forever() | ||||
except Exception as e: | except Exception as e: | ||||
self.q.put(e) | self.q.put(e) | ||||
@pytest.fixture(scope="module", params=["remote", "naive"]) | @pytest.fixture(scope="module", params=["remote", "naive"]) | ||||
def graph_client(request): | def graph_client(request): | ||||
if request.param == "remote": | if request.param == "remote": | ||||
queue = multiprocessing.Queue() | queue = multiprocessing.Queue() | ||||
Show All 19 Lines |