diff --git a/conf/web.yml b/conf/web.yml --- a/conf/web.yml +++ b/conf/web.yml @@ -20,7 +20,7 @@ url: http://vault:5005/ allowed_hosts: - - appserver + - "*" debug: yes diff --git a/env/tests.env.template b/env/tests.env.template --- a/env/tests.env.template +++ b/env/tests.env.template @@ -3,3 +3,4 @@ SWH_MIRROR_TEST_KAFKA_PASSWORD=SOME_INTERESTING_SECRET SWH_MIRROR_TEST_KAFKA_BROKER=broker1.journal.staging.swh.network:9093 SWH_MIRROR_TEST_OBJSTORAGE_URL=https://swh-prod:SOME_INTERESTING_SECRET@objstorage.softwareheritage.org/ +SWH_MIRROR_TEST_API_URL=http://127.0.0.1:5081/api/1 \ No newline at end of file diff --git a/mirror.yml b/mirror.yml --- a/mirror.yml +++ b/mirror.yml @@ -254,8 +254,6 @@ # going above the number of partitions on the kafka cluster (so the 64 # and 254 upper limits depending on the execution environment). replicas: 0 - restart_policy: - condition: "none" networks: - swhtest-mirror env_file: diff --git a/pyproject.toml b/pyproject.toml --- a/pyproject.toml +++ b/pyproject.toml @@ -16,3 +16,8 @@ log_cli_level = "INFO" log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)" log_cli_date_format = "%Y-%m-%d %H:%M:%S" +timeout = 1871 +timeout_method = "signal" + +[build-system] +requires = ["setuptools", "wheel"] diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,7 @@ pytest -pytest-testinfra pytest-dotenv +pytest-timeout requests msgpack +confluent-kafka +python-on-whales diff --git a/tests/conftest.py b/tests/conftest.py --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,11 +7,10 @@ from os import chdir, environ from pathlib import Path from shutil import copy, copytree -import time from uuid import uuid4 import pytest -import testinfra +from python_on_whales import DockerClient, DockerException APIURL = "http://127.0.0.1:5080/api/1/" SWH_IMAGE_TAG = environ["SWH_IMAGE_TAG"] @@ -22,6 +21,7 @@ KAFKA_BROKER = environ["SWH_MIRROR_TEST_KAFKA_BROKER"] KAFKA_GROUPID = f"{KAFKA_USERNAME}-{uuid4()}" OBJSTORAGE_URL = environ["SWH_MIRROR_TEST_OBJSTORAGE_URL"] +API_URL = environ["SWH_MIRROR_TEST_API_URL"] WFI_TIMEOUT = 60 LOGGER = logging.getLogger(__name__) @@ -34,17 +34,18 @@ @pytest.fixture(scope="session") -def docker_host(): - return testinfra.get_host("local://") +def docker_client(): + return DockerClient() # scope='session' so we use the same container for all the tests; @pytest.fixture(scope="session") -def mirror_stack(request, docker_host, tmp_path_factory): +def mirror_stack(request, docker_client, tmp_path_factory): tmp_path = tmp_path_factory.mktemp("mirror") copytree(SRC_PATH / "conf", tmp_path / "conf") copytree(SRC_PATH / "env", tmp_path / "env") copy(SRC_PATH / "mirror.yml", tmp_path) + Path(tmp_path / "secret").write_bytes(b"not-so-secret\n") cwd = Path.cwd() chdir(tmp_path) # copy test-specific conf files @@ -63,59 +64,51 @@ stack_name = f"swhtest_{tmp_path.name}" LOGGER.info("Create missing secrets") - existing_secrets = [ - line.strip() - for line in docker_host.check_output( - "docker secret ls --format '{{.Name}}'" - ).splitlines() - ] for srv in ("storage", "web", "vault", "scheduler"): - secret = f"swh-mirror-{srv}-db-password" - if secret not in existing_secrets: - LOGGER.info("Creating secret %s", secret) - docker_host.check_output( - f"echo not-so-secret | docker secret create {secret} -" - ) + secret_name = f"swh-mirror-{srv}-db-password" + try: + docker_client.secret.create(secret_name, tmp_path / "secret") + LOGGER.info("Created secret %s", secret_name) + except DockerException as e: + if "code = AlreadyExists" not in e.stderr: + raise + LOGGER.info("Remove config objects (if any)") - existing_configs = [ - line.strip() - for line in docker_host.check_output( - "docker config ls --format '{{.Name}}'" - ).splitlines() - ] - for cfg in existing_configs: - if cfg.startswith(f"{stack_name}_"): - docker_host.check_output(f"docker config rm {cfg}") + existing_configs = docker_client.config.list( + filters={"label=com.docker.stack.namespace": stack_name} + ) + for config in existing_configs: + config.remove() LOGGER.info("Deploy docker stack %s", stack_name) - docker_host.check_output(f"docker stack deploy -c mirror.yml {stack_name}") + docker_stack = docker_client.stack.deploy(stack_name, "mirror.yml") - yield stack_name + yield docker_stack - # breakpoint() if not request.config.getoption("keep_stack"): LOGGER.info("Remove stack %s", stack_name) - docker_host.check_output(f"docker stack rm {stack_name}") - # wait for services to be down - LOGGER.info("Wait for all services of %s to be down", stack_name) - while docker_host.check_output( - "docker service ls --format {{.Name}} " - f"--filter label=com.docker.stack.namespace={stack_name}" - ): - time.sleep(0.2) - - # give a bit of time to docker to sync the state of service<->volumes - # relations so the next step runs ok - time.sleep(20) - LOGGER.info("Remove volumes of stack %s", stack_name) - for volume in docker_host.check_output( - "docker volume ls --format {{.Name}} " - f"--filter label=com.docker.stack.namespace={stack_name}" - ).splitlines(): + docker_stack.remove() + stack_containers = docker_client.container.list( + filters={"label=com.docker.stack.namespace": stack_name} + ) + + try: + LOGGER.info("Waiting for all containers of %s to be down", stack_name) + docker_client.container.wait(stack_containers) + except DockerException as e: + # We have a TOCTOU issue, so skip the error if some containers have already + # been stopped by the time we wait for them. + if "No such container" not in e.stderr: + raise + LOGGER.info("Remove volumes of stack %s", stack_name) + stack_volumes = docker_client.volume.list( + filters={"label=com.docker.stack.namespace": stack_name} + ) + for volume in stack_volumes: try: - docker_host.check_output(f"docker volume rm {volume}") - except AssertionError: - LOGGER.error("Failed to remove volume %s", volume) + volume.remove() + except DockerException: + LOGGER.exception("Failed to remove volume %s", volume) chdir(cwd) diff --git a/tests/test_graph_replayer.py b/tests/test_graph_replayer.py --- a/tests/test_graph_replayer.py +++ b/tests/test_graph_replayer.py @@ -8,86 +8,104 @@ import re import tarfile import time +from typing import Dict from urllib.parse import quote from confluent_kafka import Consumer, KafkaException import msgpack +from python_on_whales import DockerException import requests -from .conftest import KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER +from .conftest import API_URL, KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER -SERVICES = { +INITIAL_SERVICES_STATUS = { + "{}_amqp": "1/1", "{}_content-replayer": "0/0", - "{}_db-storage": "1/1", - "{}_db-web": "1/1", - "{}_grafana": "1/1", "{}_graph-replayer": "0/0", "{}_memcache": "1/1", + "{}_mailhog": "1/1", "{}_nginx": "1/1", "{}_objstorage": "1/1", - "{}_prometheus": "1/1", - "{}_prometheus-statsd-exporter": "1/1", "{}_redis": "1/1", "{}_storage": "1/1", + "{}_storage-db": "1/1", "{}_web": "1/1", - "{}_db-vault": "1/1", + "{}_web-db": "1/1", "{}_vault": "1/1", + "{}_vault-db": "1/1", "{}_vault-worker": "1/1", "{}_scheduler": "1/1", + "{}_scheduler-db": "1/1", "{}_scheduler-listener": "1/1", "{}_scheduler-runner": "1/1", } -ATTEMPTS = 600 -DELAY = 1 SCALE = 2 -API_URL = "http://127.0.0.1:5081/api/1" - - -def running_services(host, stack): - all_services = host.check_output( - "docker service ls --format '{{.Name}} {{.Replicas}}'" - ) - return dict( - line.split()[:2] - for line in all_services.splitlines() - if line.startswith(f"{stack}_") - ) - - -def check_running_services(host, stack, services): - LOGGER.info("Waiting for services %s", services) - mirror_services_ = {} - for i in range(ATTEMPTS): - mirror_services = running_services(host, stack) - mirror_services = {k: v for k, v in mirror_services.items() if k in services} - if mirror_services == services: - LOGGER.info("Got them all!") - break - if mirror_services != mirror_services_: - LOGGER.info("Not yet there %s", mirror_services) - mirror_services_ = mirror_services - time.sleep(0.5) - return mirror_services == services -def get_logs(host, service): - rows = host.check_output(f"docker service logs -t {service}").splitlines() - reg = re.compile( - rf"^(?P.+) {service}[.]" - r"(?P\d+)[.](?P\w+)@(?P\w+) +[|] " - r"(?P.+)$" - ) - return [m.groupdict() for m in (reg.match(row) for row in rows) if m is not None] +def service_target_replicas(service): + if "Replicated" in service.spec.mode: + return service.spec.mode["Replicated"]["Replicas"] + elif "Global" in service.spec.mode: + return 1 + else: + raise ValueError(f"Unknown mode {service.spec.mode}") -def wait_for_log_entry(host, service, logline, occurrences=1): - for i in range(ATTEMPTS): - logs = get_logs(host, service) - match = [entry for entry in logs if logline in entry["logline"]] - if match and len(match) >= occurrences: - return match - time.sleep(DELAY) - return [] +def is_task_running(task): + try: + return task.status.state == "running" + except DockerException as e: + # A task might already have disappeared before we can get its status. + # In that case, we know for sure it’s not running. + if "No such object" in e.stderr: + return False + else: + raise + + +def wait_services_status(stack, target_status: Dict[str, int]): + LOGGER.info("Waiting for services %s", target_status) + last_changed_status = {} + while True: + services = [ + service + for service in stack.services() + if service.spec.name in target_status + ] + status = { + service.spec.name: "%s/%s" + % ( + len([True for task in service.ps() if is_task_running(task)]), + service_target_replicas(service), + ) + for service in services + } + if status == target_status: + LOGGER.info("Got them all!") + break + if status != last_changed_status: + LOGGER.info("Not yet there %s", status) + last_changed_status = status + time.sleep(1) + return status == target_status + + +def wait_for_log_entry(docker_client, service, log_entry, occurrences=1): + count = 0 + for stream_type, stream_content in docker_client.service.logs( + service, follow=True, stream=True + ): + LOGGER.debug("%s output: %s", service.spec.name, stream_content) + if stream_type != "stdout": + continue + count += len( + re.findall( + re.escape(log_entry.encode("us-ascii", errors="replace")), + stream_content, + ) + ) + if count >= occurrences: + break def content_get(url, done): @@ -316,40 +334,43 @@ return stats -def test_mirror(host, mirror_stack): - services = {k.format(mirror_stack): v for k, v in SERVICES.items()} - check_running_services(host, mirror_stack, services) +def test_mirror(docker_client, mirror_stack): + initial_services_status = { + k.format(mirror_stack.name): v for k, v in INITIAL_SERVICES_STATUS.items() + } + wait_services_status(mirror_stack, initial_services_status) # run replayer services for service_type in ("content", "graph"): - service = f"{mirror_stack}_{service_type}-replayer" - LOGGER.info("Scale %s to 1", service) - host.check_output(f"docker service scale -d {service}=1") - if not check_running_services(host, mirror_stack, {service: "1/1"}): - breakpoint() - logs = wait_for_log_entry( - host, service, f"Starting the SWH mirror {service_type} replayer" + service = docker_client.service.inspect( + f"{mirror_stack}_{service_type}-replayer" + ) + LOGGER.info("Scale %s to 1", service.spec.name) + service.scale(1) + wait_services_status(mirror_stack, {service.spec.name: "1/1"}) + wait_for_log_entry( + docker_client, + service, + f"Starting the SWH mirror {service_type} replayer", + 1, ) - assert len(logs) == 1 - LOGGER.info("Scale %s to %d", service, SCALE) - host.check_output(f"docker service scale -d {service}={SCALE}") - check_running_services(host, mirror_stack, {service: f"{SCALE}/{SCALE}"}) - logs = wait_for_log_entry( - host, service, f"Starting the SWH mirror {service_type} replayer", SCALE + LOGGER.info("Scale %s to %d", service.spec.name, SCALE) + service.scale(SCALE) + wait_for_log_entry( + docker_client, + service, + f"Starting the SWH mirror {service_type} replayer", + SCALE, ) - assert len(logs) == SCALE # wait for the replaying to be done (stop_on_oef is true) - LOGGER.info("Wait for %s to be done", service) - logs = wait_for_log_entry(host, service, "Done.", SCALE) - # >= SCALE below because replayer services may have been restarted - # (once done) before we scale them to 0 - if not (len(logs) >= SCALE): - breakpoint() - assert len(logs) >= SCALE - LOGGER.info("Scale %s to 0", service) - check_running_services(host, mirror_stack, {service: f"0/{SCALE}"}) + LOGGER.info("Wait for %s to be done", service.spec.name) + wait_for_log_entry(docker_client, service, "Done.", SCALE) + + LOGGER.info("Scale %s to 0", service.spec.name) + service.scale(0) + wait_services_status(mirror_stack, {service.spec.name: "0/0"}) # TODO: check there are no error reported in redis after the replayers are done