diff --git a/docker/tests/conftest.py b/docker/tests/conftest.py
index aeb1d53..94eae58 100644
--- a/docker/tests/conftest.py
+++ b/docker/tests/conftest.py
@@ -1,167 +1,166 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from os.path import join
import re
-import subprocess
import time
from typing import Generator, Mapping, Tuple
from urllib.parse import urljoin
import pytest
import requests
import testinfra
APIURL = "http://127.0.0.1:5080/api/1/"
SAMPLE_METADATA = """\
Test Software
swh
test-software
No One
some-metadata-provenance-url
"""
+HOST = testinfra.get_host("local://")
+COMPOSE_CMD = "docker-compose"
# wait-for-it timout
WFI_TIMEOUT = 60
# scope='session' so we use the same container for all the tests;
@pytest.fixture(scope="session")
def docker_compose(request):
# start the whole cluster
- subprocess.check_output(["docker-compose", "up", "-d"])
+ HOST.check_output(f"{COMPOSE_CMD} up -d")
yield
# and stop it
- subprocess.check_call(["docker-compose", "down", "-v"])
+ HOST.check_output(f"{COMPOSE_CMD} down -v")
@pytest.fixture(scope="session")
def scheduler_host(request, docker_compose):
# run a container in which test commands are executed
docker_id = (
- subprocess.check_output(
- ["docker-compose", "run", "-d", "swh-scheduler", "shell", "sleep", "1h"]
+ HOST.check_output(
+ f"{COMPOSE_CMD} run -d swh-scheduler shell sleep 1h"
)
- .decode()
.strip()
)
scheduler_host = testinfra.get_host("docker://" + docker_id)
scheduler_host.check_output(f"wait-for-it swh-scheduler:5008 -t {WFI_TIMEOUT}")
scheduler_host.check_output(f"wait-for-it swh-storage:5002 -t {WFI_TIMEOUT}")
# return a testinfra connection to the container
yield scheduler_host
# at the end of the test suite, destroy the container
- subprocess.check_call(["docker", "rm", "-f", docker_id])
+ HOST.check_output(f"docker rm -f {docker_id}")
# scope='session' so we use the same container for all the tests;
@pytest.fixture(scope="session")
def deposit_host(request, docker_compose):
# run a container in which test commands are executed
docker_id = (
- subprocess.check_output(
- ["docker-compose", "run", "-d", "swh-deposit", "shell", "sleep", "1h"]
+ HOST.check_output(
+ f"{COMPOSE_CMD} run -d swh-deposit shell sleep 1h"
)
- .decode()
.strip()
)
deposit_host = testinfra.get_host("docker://" + docker_id)
deposit_host.check_output("echo 'print(\"Hello World!\")\n' > /tmp/hello.py")
deposit_host.check_output("tar -C /tmp -czf /tmp/archive.tgz /tmp/hello.py")
deposit_host.check_output(f"echo '{SAMPLE_METADATA}' > /tmp/metadata.xml")
deposit_host.check_output(f"wait-for-it swh-deposit:5006 -t {WFI_TIMEOUT}")
# return a testinfra connection to the container
yield deposit_host
# at the end of the test suite, destroy the container
- subprocess.check_call(["docker", "rm", "-f", docker_id])
+ HOST.check_output(f"docker rm -f {docker_id}")
@pytest.fixture(scope="session")
def git_url():
return "https://forge.softwareheritage.org/source/swh-core"
@pytest.fixture(scope="session")
def git_origin(scheduler_host, git_url):
task = scheduler_host.check_output(f"swh scheduler task add load-git url={git_url}")
taskid = re.search(r"^Task (?P\d+)$", task, flags=re.MULTILINE).group("id")
assert int(taskid) > 0
for i in range(60):
status = scheduler_host.check_output(
f"swh scheduler task list --list-runs --task-id {taskid}"
)
if "Executions:" in status:
if "[eventful]" in status:
break
if "[started]" in status or "[scheduled]" in status:
time.sleep(1)
continue
if "[failed]" in status:
- loader_logs = subprocess.check_output(
- ["docker-compose", "logs", "swh-loader"]
+ loader_logs = HOST.check_output(
+ f"{COMPOSE_CMD} logs swh-loader"
)
assert False, (
"Loading execution failed\n"
f"status: {status}\n"
- f"loader logs: " + loader_logs.decode(errors="replace")
+ f"loader logs: " + loader_logs
)
assert False, f"Loading execution failed, task status is {status}"
return git_url
# Utility functions
def apiget(path: str, verb: str = "GET", **kwargs):
"""Query the API at path and return the json result or raise an
AssertionError"""
url = urljoin(APIURL, path)
resp = requests.request(verb, url, **kwargs)
assert resp.status_code == 200, f"failed to retrieve {url}: {resp.text}"
return resp.json()
def pollapi(path: str, verb: str = "GET", **kwargs):
"""Poll the API at path until it returns an OK result"""
url = urljoin(APIURL, path)
for i in range(60):
resp = requests.request(verb, url, **kwargs)
if resp.ok:
break
time.sleep(1)
else:
assert False, f"Polling {url} failed"
return resp
def getdirectory(
dirid: str, currentpath: str = ""
) -> Generator[Tuple[str, Mapping], None, None]:
"""Recursively retrieve directory description from the archive"""
directory = apiget(f"directory/{dirid}")
for direntry in directory:
path = join(currentpath, direntry["name"])
if direntry["type"] != "dir":
yield (path, direntry)
else:
yield from getdirectory(direntry["target"], path)