diff --git a/docker/tests/conftest.py b/docker/tests/conftest.py index 445f78c..3f073cb 100644 --- a/docker/tests/conftest.py +++ b/docker/tests/conftest.py @@ -1,178 +1,180 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from os.path import join import re import time from typing import Generator, Mapping, Tuple from urllib.parse import urljoin +from uuid import uuid4 as uuid import pytest import requests import testinfra APIURL = "http://127.0.0.1:5080/api/1/" SAMPLE_METADATA = """\ Test Software swh test-software No One some-metadata-provenance-url """ # wait-for-it timout WFI_TIMEOUT = 60 @pytest.fixture(scope="session") def docker_host(): return testinfra.get_host("local://") @pytest.fixture(scope="session") def compose_cmd(docker_host): + project_name = f"swh_test_{uuid()}" try: docker_host.check_output("docker compose version") - return "docker compose" + return f"docker compose -p {project_name}" except AssertionError: print("Fall back to old docker-compose command") - return "docker-compose" + return f"docker-compose -p {project_name}" # scope='session' so we use the same container for all the tests; @pytest.fixture(scope="session") def docker_compose(request, docker_host, compose_cmd): # start the whole cluster docker_host.check_output(f"{compose_cmd} up -d") # small hack: add a helper func to docker_host; so it's not necessary to # use all 3 docker_compose, docker_host and compose_cmd fixtures everywhere docker_host.check_compose_output = lambda command: docker_host.check_output( f"{compose_cmd} {command}" ) yield docker_host # and stop the cluster docker_host.check_output(f"{compose_cmd} down -v") @pytest.fixture(scope="session") def scheduler_host(request, docker_compose): # run a container in which test commands are executed docker_id = docker_compose.check_compose_output( "run -d swh-scheduler shell sleep 1h" ).strip() scheduler_host = testinfra.get_host("docker://" + docker_id) scheduler_host.check_output(f"wait-for-it swh-scheduler:5008 -t {WFI_TIMEOUT}") scheduler_host.check_output(f"wait-for-it swh-storage:5002 -t {WFI_TIMEOUT}") # return a testinfra connection to the container yield scheduler_host # at the end of the test suite, destroy the container docker_compose.check_output(f"docker rm -f {docker_id}") # scope='session' so we use the same container for all the tests; @pytest.fixture(scope="session") def deposit_host(request, docker_compose): # run a container in which test commands are executed docker_id = docker_compose.check_compose_output( "run -d swh-deposit shell sleep 1h" ).strip() deposit_host = testinfra.get_host("docker://" + docker_id) deposit_host.check_output("echo 'print(\"Hello World!\")\n' > /tmp/hello.py") deposit_host.check_output("tar -C /tmp -czf /tmp/archive.tgz /tmp/hello.py") deposit_host.check_output(f"echo '{SAMPLE_METADATA}' > /tmp/metadata.xml") deposit_host.check_output(f"wait-for-it swh-deposit:5006 -t {WFI_TIMEOUT}") # return a testinfra connection to the container yield deposit_host # at the end of the test suite, destroy the container docker_compose.check_output(f"docker rm -f {docker_id}") @pytest.fixture(scope="session") def git_url(): return "https://forge.softwareheritage.org/source/swh-core" @pytest.fixture(scope="session") def git_origin(docker_compose, scheduler_host, git_url): task = scheduler_host.check_output(f"swh scheduler task add load-git url={git_url}") taskid = re.search(r"^Task (?P\d+)$", task, flags=re.MULTILINE).group("id") assert int(taskid) > 0 for i in range(60): status = scheduler_host.check_output( f"swh scheduler task list --list-runs --task-id {taskid}" ) if "Executions:" in status: if "[eventful]" in status: break if "[started]" in status or "[scheduled]" in status: time.sleep(1) continue if "[failed]" in status: loader_logs = docker_compose.check_compose_output("logs swh-loader") assert False, ( "Loading execution failed\n" f"status: {status}\n" f"loader logs: " + loader_logs ) assert False, f"Loading execution failed, task status is {status}" return git_url # Utility functions def apiget(path: str, verb: str = "GET", **kwargs): """Query the API at path and return the json result or raise an AssertionError""" url = urljoin(APIURL, path) resp = requests.request(verb, url, **kwargs) assert resp.status_code == 200, f"failed to retrieve {url}: {resp.text}" return resp.json() def pollapi(path: str, verb: str = "GET", **kwargs): """Poll the API at path until it returns an OK result""" url = urljoin(APIURL, path) for i in range(60): resp = requests.request(verb, url, **kwargs) if resp.ok: break time.sleep(1) else: assert False, f"Polling {url} failed" return resp def getdirectory( dirid: str, currentpath: str = "" ) -> Generator[Tuple[str, Mapping], None, None]: """Recursively retrieve directory description from the archive""" directory = apiget(f"directory/{dirid}") for direntry in directory: path = join(currentpath, direntry["name"]) if direntry["type"] != "dir": yield (path, direntry) else: yield from getdirectory(direntry["target"], path)