diff --git a/pyproject.toml b/pyproject.toml index 66c29dc..4cba88c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,21 +1,23 @@ [tool.black] target-version = ['py37'] [tool.isort] multi_line_output = 3 include_trailing_comma = true force_grid_wrap = 0 use_parentheses = true ensure_newline_before_comments = true line_length = 88 force_sort_within_sections = true [tool.pytest.ini_options] env_files = 'env/tests.env' log_cli = true log_cli_level = "INFO" log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)" log_cli_date_format = "%Y-%m-%d %H:%M:%S" +timeout = 1871 +timeout_method = "signal" [build-system] -requires = ["setuptools", "wheel", "tree_sitter"] +requires = ["setuptools", "wheel"] diff --git a/requirements-test.txt b/requirements-test.txt index 8f1dac9..1f5dee3 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,6 +1,7 @@ pytest -pytest-testinfra pytest-dotenv +pytest-timeout requests msgpack confluent-kafka +python-on-whales diff --git a/tests/README.md b/tests/README.md index 5c0049d..1872da7 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,194 +1,192 @@ # mirror stack deployment tests These are a set of tests for the deployment of a full software heritage mirror stack. As of today, only docker swarm based deployment tests are available. ## docker swarm deployment tests -This test is using -[pytest-testinfra](https://github.com/pytest-dev/pytest-testinfra) to -orchestrate the deployment and checks that are made against the replicated -Software Heritage Archive. +This test is using a Docker Swarm to orchestrate the deployment and checks that +are made against the replicated Software Heritage Archive. The idea of this test is: - a test dataset is built by loading a few origins in a dedicated swh instance (using the swh-environment/docker), - the gathered objects are pushed in a dedicated set of kafka topics on swh's staging kafka broker (swh.test.objects), - expected statistics for each origin are also computed and pushed in the swh.test.objects.stats topic; these statistics are simply the total number, for each origin, of each object type (content, directory, revision, snapshot, release) is reachable from that origin. Then, the test scenario is the following: 1. copy all docker config files and resolve template ones in a temp dir (especially conf/graph-replayer.yml.test and conf/content-replayer.yml.test, see the mirror_stack fixture in conftest.py), 2. create and deploy a docker stack from the mirror.yml compose file from the tmp dir; note that replayer services are not started at this point (their replication factor is set to 0 in mirror.yml), 3. wait for all the services to be up 4. scale the content replayer service to 1, and wait for the service to be up, 5. scale the content replayer service to 4, and wait for services to be up, 6. wait for the content replaying to be done (test replayer services are configured with stop_on_eof=true), 7. scale the content replayer to 0 8. repeat steps 4-7 for the graph-replayer 9. retrieve expected stats for each origin from a dedicated swh.test.objects.stats topic on kafka, 10. compute these stats from the replicated archive; note that this step also check content object hashes from the replicated objstorage, 11. compare computed stats with expected ones. 12. spawn vault (flat) cooking for each origin (latest snapshot's master) 13. wait for the tgz artifacts to be generated by vault-workers 14. download resulting artifacts and make a few checks on their content. Obviously, this test heavily depends on the content of ``swh.test.objects`` topics on kafka, thus some tooling is required to manage said test dataset. These tools are not part of this repo, but will be provided in the swh-environment git repo (these are using the development docker environment). ### Running the test The test is written using pytest-testinfra, thus relies on the pytest execution tool. Note that for this test run: - docker swarm must be enabled - it will use dedicated test kafka topics on the staging kafka broker hosted by software heritage (see the Journal TLS endpoint listed on https://docs.softwareheritage.org/sysadm/network-architecture/service-urls.html#public-urls), - it will require a few environment variables set before running the test, namely: - `SWH_MIRROR_TEST_KAFKA_USERNAME`: login used to access the kafka broker, - `SWH_MIRROR_TEST_KAFKA_PASSWORD`: password used to access the kafka broker, - `SWH_MIRROR_TEST_KAFKA_BROKER`: URL os the kafka broker (should be the one described above), - `SWH_MIRROR_TEST_OBJSTORAGE_URL`: the URL of the source object storage used for the content replication; it would typically include access credentials, e.g. `https://login:password@objstorage.softwareheritage.org/`, - `SWH_IMAGE_TAG`: the docker image tag to be tested. You can copy the template `env/tests.env.template` to `env/tests.env` to set them. - the `softwareheritage/base`, `softwareheritage/web`, `softwareheritage/replayer` and `softwareheritage/test` images must be built with the proper image tag (`$SWH_IMAGE_TAG`). See the `../images/build_images.sh` script to rebuild images if need be. Assuming you have a properly set up environment: ``` # check the docker swarm cluster is ok ~/swh-mirror$ docker node ls ID HOSTNAME STATUS AVAILABILITY MANAGER STATUS ENGINE VERSION w6uzfpxayyc8l9ksfud7dlq9p * libra Ready Active Leader 20.10.5+dfsg1 # check images ~/swh-mirror$ echo $SWH_IMAGE_TAG 20220805-185133 ~/swh-mirror$ docker image ls -f reference="softwareheritage/*:$SWH_IMAGE_TAG" REPOSITORY TAG IMAGE ID CREATED SIZE softwareheritage/replayer 20220805-185133 da2d12d57a65 5 days ago 223MB softwareheritage/test 20220805-185133 cb4449867d3a 5 days ago 682MB softwareheritage/web 20220805-185133 66c54d5c2611 5 days ago 364MB softwareheritage/base 20220805-185133 528010e1fc9c 5 days ago 682MB # check environment variables are set ~/swh-mirror$ env | grep SWH_MIRROR_TEST SWH_MIRROR_TEST_KAFKA_PASSWORD= SWH_MIRROR_TEST_KAFKA_BROKER=broker1.journal.staging.swh.network:9093 SWH_MIRROR_TEST_KAFKA_USERNAME=mirror-test-ro SWH_MIRROR_TEST_OBJSTORAGE_URL=https://:@objstorage.softwareheritage.org/ ``` you should be able to execute the test: ``` ~/swh-mirror$ pytest ============================== test session starts ============================== platform linux -- Python 3.9.2, pytest-6.2.5, py-1.9.0, pluggy-1.0.0 rootdir: /home/ddouard/swh/swh-docker plugins: django-4.5.2, dash-1.18.1, django-test-migrations-1.2.0, forked-1.4.0, redis-2.4.0, requests-mock-1.9.3, Faker-4.18.0, asyncio-0.18.1, xdist-2.1.0, hypothesis-6.4.3, testinfra-6.8.0, postgresql-3.1.3, flask-1.1.0, mock-3.7.0, swh.journal-1.0.1.dev10+gdb9d202, swh.core-2.13 asyncio: mode=legacy collected 1 item tests/test_graph_replayer.py . [100%] =============================== warnings summary ================================ ../../.virtualenvs/swh/lib/python3.9/site-packages/pytest_asyncio/plugin.py:191 /home/ddouard/.virtualenvs/swh/lib/python3.9/site-packages/pytest_asyncio/plugin.py:191: DeprecationWarning: The 'asyncio_mode' default value will change to 'strict' in future, please explicitly use 'asyncio_mode=strict' or 'asyncio_mode=auto' in pytest configuration file. config.issue_config_time_warning(LEGACY_MODE, stacklevel=2) -- Docs: https://docs.pytest.org/en/stable/warnings.html =================== 1 passed, 1 warning in 923.19s (0:15:23) ==================== ``` Note the test takes quite some time to execute, so be patient. Troubleshooting =============== ### Watch out for stale services If something goes wrong, you might want to check if you have any remaining Docker services setup: docker service ls If you want to shut them all down, you can use: docker service rm $(docker service ls --format '{{.Name}}') ### I want a shell! To run a shell in an image in the Swarm context, use the following: docker run --network=swhtest_mirror0_swh-mirror -ti --env-file env/common-python.env --env STATSD_TAGS="role:content-replayer,hostname:${HOSTNAME}" -v /tmp/pytest-of-lunar/pytest-current/mirrorcurrent/conf/content-replayer.yml:/etc/softwareheritage/config.yml softwareheritage/replayer:20220915-163058 shell ### Some containers are never started If you notice that some container stay at 0 replicas in `docker service ls`, it probably means the rule for services, as described in `mirror.yml`, cannot be fulfilled by the current nodes part of the swarm. Most likely, you are missing the labels locating the volumes needed by the containers. You might want to run: docker node update $HOSTNAME \ --label-add org.softwareheritage.mirror.monitoring=true \ --label-add org.softwareheritage.mirror.volumes.objstorage=true \ --label-add org.softwareheritage.mirror.volumes.redis=true \ --label-add org.softwareheritage.mirror.volumes.storage-db=true \ --label-add org.softwareheritage.mirror.volumes.web-db=true ### SWH services keep restarting If SWH services keep restarting, look at the service logs, but don’t forget to look at the logs for Docker service (using `journalctl -u docker.service` for example). If you see: error="task: non-zero exit (124)" It means that `wait-for-it` has reached its timeout. You should double check the network configuration, including the firewall. ### Failure while checking the Vault service If the test fail with the following exception: ~~~ > assert isinstance(tarfilecontent, bytes) E assert False E + where False = isinstance({'exception': 'NotFoundExc', 'reason': 'Cooked archive for swh:1:dir:c1695cab57e5bfe64ea4b0900c4575bf7240483d not found.', 'traceback': 'Traceback (most recent call last):\n File "/usr/lib/python3/dist-packages/rest_framework/views.py", line 492, in dispatch\n response = handler(request, *args, **kwargs)\n File "/usr/lib/python3/dist-packages/rest_framework/decorators.py", line 54, in handler\n return func(*args, **kwargs)\n File "/usr/lib/python3/dist-pac→ …/swh-mirror/tests/test_graph_replayer.py:423: AssertionError ~~~ It is most likely because of a stale database. Remove the vault volume using: docker volume rm swhtest_mirror0_vault-db In general, the test has been designed to be run on empty volumes. diff --git a/tests/conftest.py b/tests/conftest.py index cf02122..7734917 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,121 +1,113 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from os import chdir, environ from pathlib import Path from shutil import copy, copytree -import time from uuid import uuid4 import pytest -import testinfra +from python_on_whales import DockerClient, DockerException APIURL = "http://127.0.0.1:5080/api/1/" SWH_IMAGE_TAG = environ["SWH_IMAGE_TAG"] SRC_PATH = Path(__file__).resolve().parent.parent KAFKA_USERNAME = environ["SWH_MIRROR_TEST_KAFKA_USERNAME"] KAFKA_PASSWORD = environ["SWH_MIRROR_TEST_KAFKA_PASSWORD"] KAFKA_BROKER = environ["SWH_MIRROR_TEST_KAFKA_BROKER"] KAFKA_GROUPID = f"{KAFKA_USERNAME}-{uuid4()}" OBJSTORAGE_URL = environ["SWH_MIRROR_TEST_OBJSTORAGE_URL"] WFI_TIMEOUT = 60 LOGGER = logging.getLogger(__name__) def pytest_addoption(parser, pluginmanager): parser.addoption( "--keep-stack", action="store_true", help="Do not teardown the docker stack" ) @pytest.fixture(scope="session") -def docker_host(): - return testinfra.get_host("local://") +def docker_client(): + return DockerClient() # scope='session' so we use the same container for all the tests; @pytest.fixture(scope="session") -def mirror_stack(request, docker_host, tmp_path_factory): +def mirror_stack(request, docker_client, tmp_path_factory): tmp_path = tmp_path_factory.mktemp("mirror") copytree(SRC_PATH / "conf", tmp_path / "conf") copytree(SRC_PATH / "env", tmp_path / "env") copy(SRC_PATH / "mirror.yml", tmp_path) + Path(tmp_path / "secret").write_bytes(b"not-so-secret\n") cwd = Path.cwd() chdir(tmp_path) # copy test-specific conf files conftmpl = { "username": KAFKA_USERNAME, "password": KAFKA_PASSWORD, "group_id": KAFKA_GROUPID, "broker": KAFKA_BROKER, "objstorage_url": OBJSTORAGE_URL, } for conffile in (tmp_path / "conf").glob("*.yml.test"): with open(conffile.as_posix()[:-5], "w") as outf: outf.write(conffile.read_text().format(**conftmpl)) # start the whole cluster stack_name = f"swhtest_{tmp_path.name}" LOGGER.info("Create missing secrets") - existing_secrets = [ - line.strip() - for line in docker_host.check_output( - "docker secret ls --format '{{.Name}}'" - ).splitlines() - ] for srv in ("storage", "web", "vault", "scheduler"): - secret = f"swh-mirror-{srv}-db-password" - if secret not in existing_secrets: - LOGGER.info("Creating secret %s", secret) - docker_host.check_output( - f"echo not-so-secret | docker secret create {secret} -" - ) + secret_name = f"swh-mirror-{srv}-db-password" + try: + docker_client.secret.create(secret_name, tmp_path / "secret") + LOGGER.info("Created secret %s", secret_name) + except DockerException as e: + if "code = AlreadyExists" not in e.stderr: + raise + LOGGER.info("Remove config objects (if any)") - existing_configs = [ - line.strip() - for line in docker_host.check_output( - "docker config ls --format '{{.Name}}'" - ).splitlines() - ] - for cfg in existing_configs: - if cfg.startswith(f"{stack_name}_"): - docker_host.check_output(f"docker config rm {cfg}") + existing_configs = docker_client.config.list( + filters={"label=com.docker.stack.namespace": stack_name} + ) + for config in existing_configs: + config.remove() LOGGER.info("Deploy docker stack %s", stack_name) - docker_host.check_output(f"docker stack deploy -c mirror.yml {stack_name}") + docker_stack = docker_client.stack.deploy(stack_name, "mirror.yml") - yield stack_name + yield docker_stack - # breakpoint() if not request.config.getoption("keep_stack"): LOGGER.info("Remove stack %s", stack_name) - docker_host.check_output(f"docker stack rm {stack_name}") - # wait for services to be down - LOGGER.info("Wait for all services of %s to be down", stack_name) - while docker_host.check_output( - "docker service ls --format {{.Name}} " - f"--filter label=com.docker.stack.namespace={stack_name}" - ): - time.sleep(0.2) - - # give a bit of time to docker to sync the state of service<->volumes - # relations so the next step runs ok - time.sleep(20) - LOGGER.info("Remove volumes of stack %s", stack_name) - for volume in docker_host.check_output( - "docker volume ls --format {{.Name}} " - f"--filter label=com.docker.stack.namespace={stack_name}" - ).splitlines(): + docker_stack.remove() + stack_containers = docker_client.container.list( + filters={"label=com.docker.stack.namespace": stack_name} + ) + + try: + LOGGER.info("Waiting for all containers of %s to be down", stack_name) + docker_client.container.wait(stack_containers) + except DockerException as e: + # We have a TOCTOU issue, so skip the error if some containers have already + # been stopped by the time we wait for them. + if "No such container" not in e.stderr: + raise + LOGGER.info("Remove volumes of stack %s", stack_name) + stack_volumes = docker_client.volume.list( + filters={"label=com.docker.stack.namespace": stack_name} + ) + for volume in stack_volumes: try: - docker_host.check_output(f"docker volume rm {volume}") - except AssertionError: - LOGGER.error("Failed to remove volume %s", volume) + volume.remove() + except DockerException: + LOGGER.exception("Failed to remove volume %s", volume) chdir(cwd) diff --git a/tests/test_graph_replayer.py b/tests/test_graph_replayer.py index a715fbc..3f4b948 100644 --- a/tests/test_graph_replayer.py +++ b/tests/test_graph_replayer.py @@ -1,449 +1,471 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from hashlib import sha1 from io import BytesIO import re import tarfile import time +from typing import Dict from urllib.parse import quote from confluent_kafka import Consumer, KafkaException import msgpack +from python_on_whales import DockerException import requests from .conftest import KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER -SERVICES = { +INITIAL_SERVICES_STATUS = { "{}_amqp": "1/1", "{}_content-replayer": "0/0", "{}_grafana": "1/1", "{}_graph-replayer": "0/0", "{}_memcache": "1/1", "{}_mailhog": "1/1", "{}_nginx": "1/1", "{}_objstorage": "1/1", "{}_prometheus": "1/1", "{}_prometheus-statsd-exporter": "1/1", "{}_redis": "1/1", "{}_storage": "1/1", "{}_storage-db": "1/1", "{}_web": "1/1", "{}_web-db": "1/1", "{}_vault": "1/1", "{}_vault-db": "1/1", "{}_vault-worker": "1/1", "{}_scheduler": "1/1", "{}_scheduler-db": "1/1", "{}_scheduler-listener": "1/1", "{}_scheduler-runner": "1/1", } -ATTEMPTS = 600 -DELAY = 1 SCALE = 2 API_URL = "http://127.0.0.1:5081/api/1" -def running_services(host, stack): - all_services = host.check_output( - "docker service ls --format '{{.Name}} {{.Replicas}}'" - ) - return dict( - line.split()[:2] - for line in all_services.splitlines() - if line.startswith(f"{stack}_") - ) - - -def check_running_services(host, stack, services): - LOGGER.info("Waiting for services %s", services) - mirror_services_ = {} - for i in range(ATTEMPTS): - mirror_services = running_services(host, stack) - mirror_services = {k: v for k, v in mirror_services.items() if k in services} - if mirror_services == services: - LOGGER.info("Got them all!") - break - if mirror_services != mirror_services_: - LOGGER.info("Not yet there %s", mirror_services) - mirror_services_ = mirror_services - time.sleep(0.5) - return mirror_services == services - - -def get_logs(host, service): - rows = host.check_output(f"docker service logs -t {service}").splitlines() - reg = re.compile( - rf"^(?P.+) {service}[.]" - r"(?P\d+)[.](?P\w+)@(?P\w+) +[|] " - r"(?P.+)$" - ) - return [m.groupdict() for m in (reg.match(row) for row in rows) if m is not None] +def service_target_replicas(service): + if "Replicated" in service.spec.mode: + return service.spec.mode["Replicated"]["Replicas"] + elif "Global" in service.spec.mode: + return 1 + else: + raise ValueError(f"Unknown mode {service.spec.mode}") -def wait_for_log_entry(host, service, logline, occurrences=1): - for i in range(ATTEMPTS): - logs = get_logs(host, service) - match = [entry for entry in logs if logline in entry["logline"]] - if match and len(match) >= occurrences: - return match - time.sleep(DELAY) - return [] +def is_task_running(task): + try: + return task.status.state == "running" + except DockerException as e: + # A task might already have disappeared before we can get its status. + # In that case, we know for sure it’s not running. + if "No such object" in e.stderr: + return False + else: + raise + + +def wait_services_status(stack, target_status: Dict[str, int]): + LOGGER.info("Waiting for services %s", target_status) + last_changed_status = {} + while True: + services = [ + service + for service in stack.services() + if service.spec.name in target_status + ] + status = { + service.spec.name: "%s/%s" + % ( + len([True for task in service.ps() if is_task_running(task)]), + service_target_replicas(service), + ) + for service in services + } + if status == target_status: + LOGGER.info("Got them all!") + break + if status != last_changed_status: + LOGGER.info("Not yet there %s", status) + last_changed_status = status + time.sleep(1) + return status == target_status + + +def wait_for_log_entry(docker_client, service, log_entry, occurrences=1): + count = 0 + for stream_type, stream_content in docker_client.service.logs( + service, follow=True, stream=True + ): + LOGGER.debug("%s output: %s", service.spec.name, stream_content) + if stream_type != "stdout": + continue + count += len( + re.findall( + re.escape(log_entry.encode("us-ascii", errors="replace")), + stream_content, + ) + ) + if count >= occurrences: + break def content_get(url, done): content = get(url) swhid = f"swh:1:cnt:{content['checksums']['sha1_git']}" # checking the actual blob is present and valid # XXX: a bit sad... try: data = get(content["data_url"]) except Exception as exc: LOGGER.error("Failed loading %s", content["data_url"], exc_info=exc) raise assert len(data) == content["length"] assert sha1(data).hexdigest() == content["checksums"]["sha1"] if swhid not in done: done.add(swhid) yield content def directory_get(url, done): directory = get(url) id = url.split("/")[-2] swhid = f"swh:1:dir:{id}" if swhid not in done: done.add(swhid) for entry in directory: if entry["type"] == "file": swhid = f"swh:1:cnt:{entry['target']}" if swhid not in done: yield from content_get(entry["target_url"], done) elif entry["type"] == "dir": swhid = f"swh:1:dir:{entry['target']}" if swhid not in done: yield from directory_get(entry["target_url"], done) def revision_get(url, done): revision = get(url) swhid = f"swh:1:rev:{revision['id']}" if swhid not in done: done.add(swhid) yield revision swhid = f"swh:1:dir:{revision['directory']}" if swhid not in done: yield from directory_get(revision["directory_url"], done) for parent in revision["parents"]: if f"swh:1:rev:{parent['id']}" not in done: yield from revision_get(parent["url"], done) def snapshot_get(url, done): snapshot = get(url) for branchname, branch in snapshot["branches"].items(): if branch: yield from resolve_target( branch["target_type"], branch["target"], branch["target_url"], done, ) def origin_get(url, done=None): if done is None: done = set() visit = get(f"{API_URL}/origin/{url}/visit/latest/?require_snapshot=true") if not visit.get("snapshot"): return swhid = f"swh:1:snp:{visit['snapshot']}" if swhid not in done: done.add(swhid) snapshot_url = visit["snapshot_url"] if snapshot_url: yield from snapshot_get(snapshot_url, done) def resolve_target(target_type, target, target_url, done): if target_type == "revision": if f"swh:1:rev:{target}" not in done: yield from revision_get(target_url, done) elif target_type == "release": if f"swh:1:rel:{target}" not in done: yield from release_get(target_url, done) elif target_type == "directory": if f"swh:1:dir:{target}" not in done: yield from directory_get(target_url, done) elif target_type == "content": if f"swh:1:cnt:{target}" not in done: yield from content_get(target_url, done) elif target_type == "snapshot": if f"swh:1:snp:{target}" not in done: yield from snapshot_get(target_url, done) # elif target_type == "alias": # if f"swh:1:snp:{target}" not in done: # yield from snapshot_get(target_url, done) def release_get(url, done): release = get(url) swhid = f"swh:1:rel:{release['id']}" if swhid not in done: done.add(swhid) yield release yield from resolve_target( release["target_type"], release["target"], release["target_url"], done ) def branch_get(url): branches = set() visits = get(f"{API_URL}/origin/{url}/visits/") for visit in visits: snapshot_url = visit.get("snapshot_url") while snapshot_url: snapshot = get(snapshot_url) for name, tgt in snapshot["branches"].items(): if tgt is not None: branches.add( (name, tgt["target_type"], tgt["target"], tgt["target_url"]) ) snapshot_url = snapshot["next_branch"] return len(visits), branches timing_stats = [] def get(url): t0 = time.time() resp = requests.get(url) if resp.headers["content-type"].lower() == "application/json": result = resp.json() else: result = resp.content timing_stats.append(time.time() - t0) return result def post(url): t0 = time.time() resp = requests.post(url) assert resp.status_code in (200, 201, 202) if resp.headers["content-type"].lower() == "application/json": result = resp.json() else: result = resp.content timing_stats.append(time.time() - t0) return result def get_stats(origin): result = {"origin": origin} swhids = set() list(origin_get(origin, done=swhids)) result["cnt"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:cnt:")]) result["dir"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:dir:")]) result["rev"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:rev:")]) visits, branches = branch_get(origin) result["visit"] = visits result["release"] = len([br for br in branches if br[1] == "release"]) result["alias"] = len([br for br in branches if br[1] == "alias"]) result["branch"] = len([br for br in branches if br[1] == "revision"]) return result, swhids def get_expected_stats(): cfg = { "bootstrap.servers": "broker1.journal.staging.swh.network:9093", "sasl.username": KAFKA_USERNAME, "sasl.password": KAFKA_PASSWORD, "group.id": KAFKA_GROUPID, "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-512", "session.timeout.ms": 600000, "max.poll.interval.ms": 3600000, "message.max.bytes": 1000000000, "auto.offset.reset": "earliest", "enable.auto.commit": True, "enable.partition.eof": True, } partitions = set() def on_assign(cons, parts): LOGGER.info("assignment %s", parts) for p in parts: partitions.add(p.partition) consumer = Consumer(cfg) consumer.subscribe(["swh.test.objects.stats"], on_assign=on_assign) stats = {} try: while True: msg = consumer.poll(timeout=10.0) if msg is None: if not partitions: break continue if msg.error(): if msg.error().name() == "_PARTITION_EOF": partitions.discard(msg.partition()) if not partitions: break else: raise KafkaException(msg.error()) else: # Proper message k = msgpack.unpackb(msg.key()) v = msgpack.unpackb(msg.value()) LOGGER.info( "%% %s [%d] at offset %d with key %s:\n", msg.topic(), msg.partition(), msg.offset(), k, ) assert k == v["origin"] stats[k] = v except KeyboardInterrupt: assert False, "%% Aborted by user" return stats -def test_mirror(host, mirror_stack): - services = {k.format(mirror_stack): v for k, v in SERVICES.items()} - check_running_services(host, mirror_stack, services) +def test_mirror(docker_client, mirror_stack): + initial_services_status = { + k.format(mirror_stack.name): v for k, v in INITIAL_SERVICES_STATUS.items() + } + wait_services_status(mirror_stack, initial_services_status) # run replayer services for service_type in ("content", "graph"): - service = f"{mirror_stack}_{service_type}-replayer" - LOGGER.info("Scale %s to 1", service) - host.check_output(f"docker service scale -d {service}=1") - if not check_running_services(host, mirror_stack, {service: "1/1"}): - breakpoint() - logs = wait_for_log_entry( - host, service, f"Starting the SWH mirror {service_type} replayer" + service = docker_client.service.inspect( + f"{mirror_stack}_{service_type}-replayer" + ) + LOGGER.info("Scale %s to 1", service.spec.name) + service.scale(1) + wait_services_status(mirror_stack, {service.spec.name: "1/1"}) + wait_for_log_entry( + docker_client, + service, + f"Starting the SWH mirror {service_type} replayer", + 1, ) - assert len(logs) == 1 - LOGGER.info("Scale %s to %d", service, SCALE) - host.check_output(f"docker service scale -d {service}={SCALE}") - check_running_services(host, mirror_stack, {service: f"{SCALE}/{SCALE}"}) - logs = wait_for_log_entry( - host, service, f"Starting the SWH mirror {service_type} replayer", SCALE + LOGGER.info("Scale %s to %d", service.spec.name, SCALE) + service.scale(SCALE) + wait_for_log_entry( + docker_client, + service, + f"Starting the SWH mirror {service_type} replayer", + SCALE, ) - assert len(logs) == SCALE # wait for the replaying to be done (stop_on_oef is true) - LOGGER.info("Wait for %s to be done", service) - logs = wait_for_log_entry(host, service, "Done.", SCALE) - # >= SCALE below because replayer services may have been restarted - # (once done) before we scale them to 0 - if not (len(logs) >= SCALE): - breakpoint() - assert len(logs) >= SCALE - LOGGER.info("Scale %s to 0", service) - check_running_services(host, mirror_stack, {service: f"0/{SCALE}"}) + LOGGER.info("Wait for %s to be done", service.spec.name) + wait_for_log_entry(docker_client, service, "Done.", SCALE) + + LOGGER.info("Scale %s to 0", service.spec.name) + service.scale(0) + wait_services_status(mirror_stack, {service.spec.name: "0/0"}) # TODO: check there are no error reported in redis after the replayers are done origins = get(f"{API_URL}/origins/") if False: # check replicated archive is in good shape expected_stats = get_expected_stats() LOGGER.info("Check replicated archive") # seems the graph replayer is OK, let's check the archive can tell something expected_origins = sorted(expected_stats) assert len(origins) == len(expected_origins) assert sorted(o["url"] for o in origins) == expected_origins for origin, expected in expected_stats.items(): timing_stats.clear() assert origin == expected["origin"] origin_stats, swhids = get_stats(origin) LOGGER.info("%s", origin_stats) LOGGER.info("%d REQS took %ss", len(timing_stats), sum(timing_stats)) assert origin_stats == expected LOGGER.info("%s is OK", origin) # test the vault service cooks = [] # first start all the cookings for origin in origins: LOGGER.info("Cook HEAD for %s", origin["url"]) visit = get( f"{API_URL}/origin/{origin['url']}/visit/latest/?require_snapshot=true" ) assert visit snp = get(visit["snapshot_url"]) assert snp branches = snp.get("branches", {}) head = branches.get("HEAD") assert head while True: if head["target_type"] == "alias": head = branches[head["target"]] elif head["target_type"] == "release": head = get(head["target_url"]) elif head["target_type"] == "directory": swhid = f"swh:1:dir:{head['target']}" break elif head["target_type"] == "revision": rev = get(head["target_url"]) swhid = f"swh:1:dir:{rev['directory']}" break else: breakpoint() LOGGER.info("Directory is %s", swhid) cook = post(f"{API_URL}/vault/flat/{swhid}/") assert cook assert cook["status"] in ("new", "pending") cooks.append((origin["url"], swhid, cook)) # then wait for successful cooks while not all(cook["status"] == "done" for _, _, cook in cooks): origin, swhid, cook = cooks.pop(0) cook = get(f"{API_URL}/vault/flat/{swhid}") cooks.append((origin, swhid, cook)) # should all be in "done" status for origin, swhid, cook in cooks: assert cook["status"] == "done" # so we can download it tarfilecontent = get(cook["fetch_url"]) assert isinstance(tarfilecontent, bytes) tarfileobj = tarfile.open(fileobj=BytesIO(tarfilecontent)) filelist = tarfileobj.getnames() assert all(fname.startswith(swhid) for fname in filelist) for path in filelist[1:]: tarinfo = tarfileobj.getmember(path) url = f"{API_URL}/directory/{quote(path[10:])}" expected = get(url) # remove the 'swh:1:dir:' part LOGGER.info("Retrieved from storage: %s → %s", url, expected) if expected["type"] == "dir": assert tarinfo.isdir() elif expected["type"] == "file": if expected["perms"] == 0o120000: assert tarinfo.issym() tgt = get(expected["target_url"]) symlnk = get(tgt["data_url"]) assert symlnk == tarinfo.linkpath.encode() else: assert tarinfo.isfile() assert expected["length"] == tarinfo.size assert ( sha1(tarfileobj.extractfile(tarinfo).read()).hexdigest() == expected["checksums"]["sha1"] ) else: breakpoint() pass