diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6aaaa3e..0470475 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,36 +1,31 @@ repos: -- repo: https://github.com/pre-commit/pre-commit-hooks - rev: v1.2.3 - hooks: - - id: trailing-whitespace - - id: flake8 - - id: check-json - - id: check-yaml + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.1.0 + hooks: + - id: trailing-whitespace + - id: check-json + - id: check-yaml -- repo: https://github.com/codespell-project/codespell - rev: v1.15.0 - hooks: - - id: codespell + - repo: https://gitlab.com/pycqa/flake8 + rev: 4.0.1 + hooks: + - id: flake8 + additional_dependencies: [flake8-bugbear==22.3.23] -# unfortunately, we are far from being able to enable this... -# - repo: https://github.com/PyCQA/pydocstyle.git -# rev: 4.0.0 -# hooks: -# - id: pydocstyle -# name: pydocstyle -# description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. -# entry: pydocstyle --convention=google -# language: python -# types: [python] + - repo: https://github.com/codespell-project/codespell + rev: v2.1.0 + hooks: + - id: codespell + name: Check source code spelling + args: [-L crate] + stages: [commit] -# black requires py3.6+ -#- repo: https://github.com/python/black -# rev: 19.3b0 -# hooks: -# - id: black -# language_version: python3 -#- repo: https://github.com/asottile/blacken-docs -# rev: v1.0.0-1 -# hooks: -# - id: blacken-docs -# additional_dependencies: [black==19.3b0] + - repo: https://github.com/PyCQA/isort + rev: 5.10.1 + hooks: + - id: isort + + - repo: https://github.com/python/black + rev: 22.3.0 + hooks: + - id: black diff --git a/conf/content-replayer.yml.test b/conf/content-replayer.yml.test new file mode 100644 index 0000000..43407ae --- /dev/null +++ b/conf/content-replayer.yml.test @@ -0,0 +1,37 @@ +# this config file is a template used for tests, see tests/conftest.py + +objstorage: + cls: remote + url: {objstorage_url} + max_retries: 5 + pool_connections: 100 + pool_maxsize: 200 + +objstorage_dst: + cls: remote + url: http://objstorage:5003 + +journal_client: + cls: kafka + brokers: + - {broker} + group_id: {group_id}_content + prefix: swh.test.objects + sasl.username: {username} + sasl.password: {password} + security.protocol: sasl_ssl + sasl.mechanism: SCRAM-SHA-512 + session.timeout.ms: 600000 + max.poll.interval.ms: 3600000 + message.max.bytes: 1000000000 + privileged: true + batch_size: 2000 + stop_on_eof: true + +replayer: + error_reporter: + # used to track objects that the replayer really failed at replication from + # the source objstorage to the destination one + host: redis + port: 6379 + db: 0 diff --git a/conf/graph-replayer.yml.test b/conf/graph-replayer.yml.test new file mode 100644 index 0000000..96b9d61 --- /dev/null +++ b/conf/graph-replayer.yml.test @@ -0,0 +1,54 @@ +# this config file is a template used for tests, see in tests/ + +storage: + cls: pipeline + steps: + - cls: filter + - cls: tenacious + error_rate_limit: + # fail after 10 errors for 1000 operations + errors: 10 + window_size: 1000 + - cls: remote + url: http://storage:5002/ + max_retries: 5 + pool_connections: 100 + pool_maxsize: 200 + +journal_client: + cls: kafka + brokers: + - {broker} + group_id: {group_id}_graph + prefix: swh.test.objects + sasl.username: {username} + sasl.password: {password} + security.protocol: sasl_ssl + sasl.mechanism: SCRAM-SHA-512 + session.timeout.ms: 600000 + max.poll.interval.ms: 3600000 + message.max.bytes: 1000000000 + object_types: + - content + - directory + - extid + - metadata_authority + - metadata_fetcher + - origin + - origin_visit + - origin_visit_status + - raw_extrinsic_metadata + - release + - revision + - skipped_content + - snapshot + privileged: true + stop_on_eof: true + +replayer: + error_reporter: + # used to track objects that the replayer really failed at storing in the + # storage + host: redis + port: 6379 + db: 0 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..206af2c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,12 @@ +[tool.black] +target-version = ['py37'] + +[tool.isort] +multi_line_output = 3 +include_trailing_comma = true +force_grid_wrap = 0 +use_parentheses = true +ensure_newline_before_comments = true +line_length = 88 +force_sort_within_sections = true + diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..2c446b4 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,8 @@ +[flake8] +# E203: whitespaces before ':' +# E231: missing whitespace after ',' +# E501: line too long, use B950 warning from flake8-bugbear instead +# W503: line break before binary operator +select = C,E,F,W,B950 +ignore = E203,E231,E501,W503 +max-line-length = 88 diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..856ec83 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,132 @@ +# mirror stack deployment tests + +These are a set of tests for the deployment of a full software heritage mirror +stack. + +As of today, only docker swarm based deployment tests are available. + +## docker swarm deployment tests + +This test is using +[pytest-testinfra](https://github.com/pytest-dev/pytest-testinfra) to +orchestrate the deployment and checks that are made against the replicated +Software Heritage Archive. + +The idea of this test is: +- a test dataset is built by loading a few origins in a dedicated swh instance + (using the swh-environment/docker), +- the gathered objects are pushed in a dedicated set of kafka + topics on swh's staging kafka broker (swh.test.objects), +- expected statistics for each origin are also computed and pushed in the + swh.test.objects.stats topic; these statistics are simply the total number, + for each origin, of each object type (content, directory, revision, snapshot, + release) is reachable from that origin. + +Then, the test scenario is the following: + +1. copy all docker config files and resolve template ones in a temp dir + (especially conf/graph-replayer.yml.test and conf/content-replayer.yml.test, + see the mirror_stack fixture in conftest.py), +2. create and deploy a docker stack from the mirror.yml compose file from the + tmp dir; note that replayer services are not started at this point (their + replication factor is set to 0 in mirror.yml), +3. wait for all the services to be up +4. scale the content replayer service to 1, and wait for the service to be up, +5. scale the content replayer service to 4, and wait for services to be up, +6. wait for the content replaying to be done (test replayer services are + configured with stop_on_eof=true), +7. scale the content replayer to 0 +8. repeat steps 4-7 for the graph-replayer +9. retrieve expected stats for each origin from a dedicated + swh.test.objects.stats topic on kafka, +10. compute these stats from the replicated archive; note that this step also + check content object hashes from the replicated objstorage, +11. compare computed stats with expected ones. +12. spawn vault (flat) cooking for each origin (latest snapshot's master) +13. wait for the tgz artifacts to be generated by vault-workers +14. download resulting artifacts and make a few checks on their content. + +Obviously, this test heavily depends on the content of ``swh.test.objects`` +topics on kafka, thus some tooling is required to manage said test dataset. +These tools are not part of this repo, but will be provided in the +swh-environment git repo (these are using the development docker environment). + +### Running the test + +The test is written using pytest-testinfra, thus relies on the pytest execution +tool. + +Note that for this test run: + +- docker swarm must be enabled + +- it will use dedicated test kafka topics on the staging kafka broker hosted by + software heritage (see the Journal TLS endpoint listed on + https://docs.softwareheritage.org/sysadm/network-architecture/service-urls.html#public-urls), + +- it will require a few environment variables set before running the test, + namely: + - `SWH_MIRROR_TEST_KAFKA_USERNAME`: login used to access the kafka + broker, + - `SWH_MIRROR_TEST_KAFKA_PASSWORD`: password used to access the kafka + broker, + - `SWH_MIRROR_TEST_KAFKA_BROKER`: URL os the kafka broker (should be the + one described above), + - `SWH_MIRROR_TEST_OBJSTORAGE_URL`: the URL of the source object storage used + for the content replication; it would typically include access credentials, + e.g. `https://login:password@objstorage.softwareheritage.org/`, + - `SWH_IMAGE_TAG`: the docker image tag to be tested. + +- the `softwareheritage/base`, `softwareheritage/web`, + `softwareheritage/replayer` and `softwareheritage/test` images must be built + with the proper image tag (`$SWH_IMAGE_TAG`). See the + `../images/build_images.sh` script to rebuild images if need be. + + +Assuming you have a properly set up environment: + +``` +# check the docker swarm cluster is ok +~/swh-mirror$ docker node ls +ID HOSTNAME STATUS AVAILABILITY MANAGER STATUS ENGINE VERSION +w6uzfpxayyc8l9ksfud7dlq9p * libra Ready Active Leader 20.10.5+dfsg1 +# check images +~/swh-mirror$ echo $SWH_IMAGE_TAG +20220805-185133 +~/swh-mirror$ docker image ls -f reference="softwareheritage/*:$SWH_IMAGE_TAG" +REPOSITORY TAG IMAGE ID CREATED SIZE +softwareheritage/replayer 20220805-185133 da2d12d57a65 5 days ago 223MB +softwareheritage/test 20220805-185133 cb4449867d3a 5 days ago 682MB +softwareheritage/web 20220805-185133 66c54d5c2611 5 days ago 364MB +softwareheritage/base 20220805-185133 528010e1fc9c 5 days ago 682MB +# check environment variables are set +~/swh-mirror$ env | grep SWH_MIRROR_TEST +SWH_MIRROR_TEST_KAFKA_PASSWORD= +SWH_MIRROR_TEST_KAFKA_BROKER=broker1.journal.staging.swh.network:9093 +SWH_MIRROR_TEST_KAFKA_USERNAME=mirror-test-ro +SWH_MIRROR_TEST_OBJSTORAGE_URL=https://:@objstorage.softwareheritage.org/ +``` + +you should be able to execute the test: + +``` +~/swh-mirror$ pytest +============================== test session starts ============================== +platform linux -- Python 3.9.2, pytest-6.2.5, py-1.9.0, pluggy-1.0.0 +rootdir: /home/ddouard/swh/swh-docker +plugins: django-4.5.2, dash-1.18.1, django-test-migrations-1.2.0, forked-1.4.0, redis-2.4.0, requests-mock-1.9.3, Faker-4.18.0, asyncio-0.18.1, xdist-2.1.0, hypothesis-6.4.3, testinfra-6.8.0, postgresql-3.1.3, flask-1.1.0, mock-3.7.0, swh.journal-1.0.1.dev10+gdb9d202, swh.core-2.13 +asyncio: mode=legacy +collected 1 item + +tests/test_graph_replayer.py . [100%] + +=============================== warnings summary ================================ +../../.virtualenvs/swh/lib/python3.9/site-packages/pytest_asyncio/plugin.py:191 + /home/ddouard/.virtualenvs/swh/lib/python3.9/site-packages/pytest_asyncio/plugin.py:191: DeprecationWarning: The 'asyncio_mode' default value will change to 'strict' in future, please explicitly use 'asyncio_mode=strict' or 'asyncio_mode=auto' in pytest configuration file. + config.issue_config_time_warning(LEGACY_MODE, stacklevel=2) + +-- Docs: https://docs.pytest.org/en/stable/warnings.html +=================== 1 passed, 1 warning in 923.19s (0:15:23) ==================== +``` + +Note the test takes quite some time to execute, so be patient. diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..2993987 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,118 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from os import chdir, environ +from pathlib import Path +from shutil import copy, copytree +import time +from uuid import uuid4 + +import pytest +import testinfra + +APIURL = "http://127.0.0.1:5080/api/1/" +SWH_IMAGE_TAG = environ["SWH_IMAGE_TAG"] +SRC_PATH = Path(__file__).resolve().parent.parent + +KAFKA_USERNAME = environ["SWH_MIRROR_TEST_KAFKA_USERNAME"] +KAFKA_PASSWORD = environ["SWH_MIRROR_TEST_KAFKA_PASSWORD"] +KAFKA_BROKER = environ["SWH_MIRROR_TEST_KAFKA_BROKER"] +KAFKA_GROUPID = f"{KAFKA_USERNAME}-{uuid4()}" +OBJSTORAGE_URL = environ["SWH_MIRROR_TEST_OBJSTORAGE_URL"] +WFI_TIMEOUT = 60 + + +def pytest_addoption(parser, pluginmanager): + parser.addoption( + "--keep-stack", action="store_true", help="Do not teardown the docker stack" + ) + + +@pytest.fixture(scope="session") +def docker_host(): + return testinfra.get_host("local://") + + +# scope='session' so we use the same container for all the tests; +@pytest.fixture(scope="session") +def mirror_stack(request, docker_host, tmp_path_factory): + tmp_path = tmp_path_factory.mktemp("mirror") + copytree(SRC_PATH / "conf", tmp_path / "conf") + copytree(SRC_PATH / "env", tmp_path / "env") + copy(SRC_PATH / "mirror.yml", tmp_path) + cwd = Path.cwd() + chdir(tmp_path) + # copy test-specific conf files + conftmpl = { + "username": KAFKA_USERNAME, + "password": KAFKA_PASSWORD, + "group_id": KAFKA_GROUPID, + "broker": KAFKA_BROKER, + "objstorage_url": OBJSTORAGE_URL, + } + + for conffile in (tmp_path / "conf").glob("*.yml.test"): + with open(conffile.as_posix()[:-5], "w") as outf: + outf.write(conffile.read_text().format(**conftmpl)) + # start the whole cluster + stack_name = f"swhtest_{tmp_path.name}" + + print("Create missing secrets") + existing_secrets = [ + line.strip() + for line in docker_host.check_output( + "docker secret ls --format '{{.Name}}'" + ).splitlines() + ] + for srv in ("storage", "web", "vault", "scheduler"): + secret = f"swh-mirror-{srv}-db-password" + if secret not in existing_secrets: + print("Creating secret {secret}") + docker_host.check_output( + f"echo not-so-secret | docker secret create {secret} -" + ) + print("Remove config objects (if any)") + existing_configs = [ + line.strip() + for line in docker_host.check_output( + "docker config ls --format '{{.Name}}'" + ).splitlines() + ] + for cfg in existing_configs: + if cfg.startswith(f"{stack_name}_"): + docker_host.check_output(f"docker config rm {cfg}") + + print(f"Deploy docker stack {stack_name}") + docker_host.check_output(f"docker stack deploy -c mirror.yml {stack_name}") + + yield stack_name + + # breakpoint() + if not request.config.getoption("keep_stack"): + print(f"Remove stack {stack_name}") + docker_host.check_output(f"docker stack rm {stack_name}") + # wait for services to be down + print(f"Wait for all services of {stack_name} to be down") + while docker_host.check_output( + "docker service ls --format {{.Name}} " + f"--filter label=com.docker.stack.namespace={stack_name}" + ): + time.sleep(0.2) + + # give a bit of time to docker to sync the state of service<->volumes + # relations so the next step runs ok + time.sleep(1) + print(f"Remove volumes of stack {stack_name}") + for volume in docker_host.check_output( + "docker volume ls --format {{.Name}} " + f"--filter label=com.docker.stack.namespace={stack_name}" + ).splitlines(): + + try: + docker_host.check_output(f"docker volume rm {volume}") + except AssertionError: + print(f"Failed to remove volume {volume}") + + chdir(cwd) diff --git a/tests/test_graph_replayer.py b/tests/test_graph_replayer.py new file mode 100644 index 0000000..3a0b158 --- /dev/null +++ b/tests/test_graph_replayer.py @@ -0,0 +1,444 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from hashlib import sha1 +from io import BytesIO +import re +import tarfile +import time +from urllib.parse import quote + +from confluent_kafka import Consumer, KafkaException +import msgpack +import requests + +from .conftest import KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME + +SERVICES = { + "{}_content-replayer": "0/0", + "{}_db-storage": "1/1", + "{}_db-web": "1/1", + "{}_grafana": "1/1", + "{}_graph-replayer": "0/0", + "{}_memcache": "1/1", + "{}_nginx": "1/1", + "{}_objstorage": "1/1", + "{}_prometheus": "1/1", + "{}_prometheus-statsd-exporter": "1/1", + "{}_redis": "1/1", + "{}_storage": "1/1", + "{}_web": "1/1", + "{}_db-vault": "1/1", + "{}_vault": "1/1", + "{}_vault-worker": "1/1", + "{}_scheduler": "1/1", + "{}_scheduler-listener": "1/1", + "{}_scheduler-runner": "1/1", +} +ATTEMPTS = 200 +DELAY = 0.5 +SCALE = 2 +API_URL = "http://127.0.0.1:5081/api/1" + + +def running_services(host, stack): + all_services = host.check_output( + "docker service ls --format '{{.Name}} {{.Replicas}}'" + ) + return dict( + line.split()[:2] + for line in all_services.splitlines() + if line.startswith(f"{stack}_") + ) + + +def check_running_services(host, stack, services): + print("Waiting for service", services) + mirror_services_ = {} + for i in range(ATTEMPTS): + mirror_services = running_services(host, stack) + mirror_services = {k: v for k, v in mirror_services.items() if k in services} + if mirror_services == services: + print("Got them all!") + break + if mirror_services != mirror_services_: + print("Not yet there", mirror_services) + mirror_services_ = mirror_services + time.sleep(0.5) + return mirror_services == services + + +def get_logs(host, service): + rows = host.check_output(f"docker service logs -t {service}").splitlines() + reg = re.compile( + rf"^(?P.+) {service}[.]" + r"(?P\d+)[.](?P\w+)@(?P\w+) +[|] " + r"(?P.+)$" + ) + return [m.groupdict() for m in (reg.match(row) for row in rows) if m is not None] + + +def wait_for_log_entry(host, service, logline, occurrences=1): + for i in range(ATTEMPTS): + logs = get_logs(host, service) + match = [entry for entry in logs if logline in entry["logline"]] + if match and len(match) >= occurrences: + return match + time.sleep(DELAY) + return [] + + +def content_get(url, done): + content = get(url) + swhid = f"swh:1:cnt:{content['checksums']['sha1_git']}" + # checking the actual blob is present and valid + # XXX: a bit sad... + try: + data = get(content["data_url"]) + except Exception as exc: + print("Failed loading", content["data_url"]) + print(exc) + raise + assert len(data) == content["length"] + assert sha1(data).hexdigest() == content["checksums"]["sha1"] + + if swhid not in done: + done.add(swhid) + yield content + + +def directory_get(url, done): + directory = get(url) + id = url.split("/")[-2] + swhid = f"swh:1:dir:{id}" + if swhid not in done: + done.add(swhid) + for entry in directory: + if entry["type"] == "file": + swhid = f"swh:1:cnt:{entry['target']}" + if swhid not in done: + yield from content_get(entry["target_url"], done) + elif entry["type"] == "dir": + swhid = f"swh:1:dir:{entry['target']}" + if swhid not in done: + yield from directory_get(entry["target_url"], done) + + +def revision_get(url, done): + revision = get(url) + swhid = f"swh:1:rev:{revision['id']}" + if swhid not in done: + done.add(swhid) + yield revision + swhid = f"swh:1:dir:{revision['directory']}" + if swhid not in done: + yield from directory_get(revision["directory_url"], done) + for parent in revision["parents"]: + if f"swh:1:rev:{parent['id']}" not in done: + yield from revision_get(parent["url"], done) + + +def snapshot_get(url, done): + snapshot = get(url) + for branchname, branch in snapshot["branches"].items(): + if branch: + yield from resolve_target( + branch["target_type"], + branch["target"], + branch["target_url"], + done, + ) + + +def origin_get(url, done=None): + if done is None: + done = set() + visit = get(f"{API_URL}/origin/{url}/visit/latest/?require_snapshot=true") + if not visit.get("snapshot"): + return + swhid = f"swh:1:snp:{visit['snapshot']}" + if swhid not in done: + done.add(swhid) + snapshot_url = visit["snapshot_url"] + if snapshot_url: + yield from snapshot_get(snapshot_url, done) + + +def resolve_target(target_type, target, target_url, done): + if target_type == "revision": + if f"swh:1:rev:{target}" not in done: + yield from revision_get(target_url, done) + elif target_type == "release": + if f"swh:1:rel:{target}" not in done: + yield from release_get(target_url, done) + elif target_type == "directory": + if f"swh:1:dir:{target}" not in done: + yield from directory_get(target_url, done) + elif target_type == "content": + if f"swh:1:cnt:{target}" not in done: + yield from content_get(target_url, done) + elif target_type == "snapshot": + if f"swh:1:snp:{target}" not in done: + yield from snapshot_get(target_url, done) + # elif target_type == "alias": + # if f"swh:1:snp:{target}" not in done: + # yield from snapshot_get(target_url, done) + + +def release_get(url, done): + release = get(url) + swhid = f"swh:1:rel:{release['id']}" + if swhid not in done: + done.add(swhid) + yield release + yield from resolve_target( + release["target_type"], release["target"], release["target_url"], done + ) + + +def branch_get(url): + branches = set() + visits = get(f"{API_URL}/origin/{url}/visits/") + for visit in visits: + snapshot_url = visit.get("snapshot_url") + while snapshot_url: + snapshot = get(snapshot_url) + for name, tgt in snapshot["branches"].items(): + if tgt is not None: + branches.add( + (name, tgt["target_type"], tgt["target"], tgt["target_url"]) + ) + snapshot_url = snapshot["next_branch"] + return len(visits), branches + + +timing_stats = [] + + +def get(url): + t0 = time.time() + resp = requests.get(url) + if resp.headers["content-type"].lower() == "application/json": + result = resp.json() + else: + result = resp.content + timing_stats.append(time.time() - t0) + return result + + +def post(url): + t0 = time.time() + resp = requests.post(url) + assert resp.status_code in (200, 201, 202) + if resp.headers["content-type"].lower() == "application/json": + result = resp.json() + else: + result = resp.content + timing_stats.append(time.time() - t0) + return result + + +def get_stats(origin): + result = {"origin": origin} + + swhids = set() + list(origin_get(origin, done=swhids)) + result["cnt"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:cnt:")]) + result["dir"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:dir:")]) + result["rev"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:rev:")]) + + visits, branches = branch_get(origin) + result["visit"] = visits + result["release"] = len([br for br in branches if br[1] == "release"]) + result["alias"] = len([br for br in branches if br[1] == "alias"]) + result["branch"] = len([br for br in branches if br[1] == "revision"]) + + return result, swhids + + +def get_expected_stats(): + + cfg = { + "bootstrap.servers": "broker1.journal.staging.swh.network:9093", + "sasl.username": KAFKA_USERNAME, + "sasl.password": KAFKA_PASSWORD, + "group.id": KAFKA_GROUPID, + "security.protocol": "sasl_ssl", + "sasl.mechanism": "SCRAM-SHA-512", + "session.timeout.ms": 600000, + "max.poll.interval.ms": 3600000, + "message.max.bytes": 1000000000, + "auto.offset.reset": "earliest", + "enable.auto.commit": True, + "enable.partition.eof": True, + } + + partitions = set() + + def on_assign(cons, parts): + print("assignment", parts) + for p in parts: + partitions.add(p.partition) + + consumer = Consumer(cfg) + consumer.subscribe(["swh.test.objects.stats"], on_assign=on_assign) + stats = {} + try: + while True: + msg = consumer.poll(timeout=10.0) + if msg is None: + if not partitions: + break + continue + if msg.error(): + if msg.error().name() == "_PARTITION_EOF": + partitions.discard(msg.partition()) + if not partitions: + break + else: + raise KafkaException(msg.error()) + else: + # Proper message + k = msgpack.unpackb(msg.key()) + v = msgpack.unpackb(msg.value()) + print( + "%% %s [%d] at offset %d with key %s:\n" + % (msg.topic(), msg.partition(), msg.offset(), k) + ) + assert k == v["origin"] + stats[k] = v + except KeyboardInterrupt: + assert False, "%% Aborted by user" + return stats + + +def test_mirror(host, mirror_stack): + services = {k.format(mirror_stack): v for k, v in SERVICES.items()} + check_running_services(host, mirror_stack, services) + + # run replayer services + for service_type in ("content", "graph"): + service = f"{mirror_stack}_{service_type}-replayer" + print(f"Scale {service} to 1") + host.check_output(f"docker service scale -d {service}=1") + if not check_running_services(host, mirror_stack, {service: "1/1"}): + breakpoint() + logs = wait_for_log_entry( + host, service, f"Starting the SWH mirror {service_type} replayer" + ) + assert len(logs) == 1 + + print(f"Scale {service} to {SCALE}") + host.check_output(f"docker service scale -d {service}={SCALE}") + check_running_services(host, mirror_stack, {service: f"{SCALE}/{SCALE}"}) + logs = wait_for_log_entry( + host, service, f"Starting the SWH mirror {service_type} replayer", SCALE + ) + assert len(logs) == SCALE + + # wait for the replaying to be done (stop_on_oef is true) + print(f"Wait for {service} to be done") + logs = wait_for_log_entry(host, service, "Done.", SCALE) + # >= SCALE below because replayer services may have been restarted + # (once done) before we scale them to 0 + if not (len(logs) >= SCALE): + breakpoint() + assert len(logs) >= SCALE + print(f"Scale {service} to 0") + check_running_services(host, mirror_stack, {service: f"0/{SCALE}"}) + + # TODO: check there are no error reported in redis after the replayers are done + + origins = get(f"{API_URL}/origins/") + if False: + # check replicated archive is in good shape + expected_stats = get_expected_stats() + print("Check replicated archive") + # seems the graph replayer is OK, let's check the archive can tell something + expected_origins = sorted(expected_stats) + assert len(origins) == len(expected_origins) + assert sorted(o["url"] for o in origins) == expected_origins + + for origin, expected in expected_stats.items(): + timing_stats.clear() + assert origin == expected["origin"] + origin_stats, swhids = get_stats(origin) + print(origin_stats) + print(f"{len(timing_stats)} REQS took {sum(timing_stats)}s") + assert origin_stats == expected + print(f"{origin} is OK") + + # test the vault service + cooks = [] + # first start all the cookings + for origin in origins: + print(f"Cook HEAD for {origin['url']}") + visit = get( + f"{API_URL}/origin/{origin['url']}/visit/latest/?require_snapshot=true" + ) + assert visit + snp = get(visit["snapshot_url"]) + assert snp + branches = snp.get("branches", {}) + head = branches.get("HEAD") + assert head + + while True: + if head["target_type"] == "alias": + head = branches[head["target"]] + elif head["target_type"] == "release": + head = get(head["target_url"]) + elif head["target_type"] == "directory": + swhid = f"swh:1:dir:{head['target']}" + break + elif head["target_type"] == "revision": + rev = get(head["target_url"]) + swhid = f"swh:1:dir:{rev['directory']}" + break + else: + breakpoint() + + print(f"Directory is {swhid}") + cook = post(f"{API_URL}/vault/flat/{swhid}/") + assert cook + assert cook["status"] in ("new", "pending") + cooks.append((origin["url"], swhid, cook)) + # then wait for successful cooks + while not all(cook["status"] == "done" for _, _, cook in cooks): + origin, swhid, cook = cooks.pop(0) + cook = get(f"{API_URL}/vault/flat/{swhid}") + cooks.append((origin, swhid, cook)) + # should all be in "done" status + for origin, swhid, cook in cooks: + assert cook["status"] == "done" + # so we can download it + tarfilecontent = get(cook["fetch_url"]) + assert isinstance(tarfilecontent, bytes) + tarfileobj = tarfile.open(fileobj=BytesIO(tarfilecontent)) + filelist = tarfileobj.getnames() + assert all(fname.startswith(swhid) for fname in filelist) + for path in filelist[1:]: + tarinfo = tarfileobj.getmember(path) + expected = get( + f"{API_URL}/directory/{quote(path[10:])}" + ) # remove the 'swh:1:dir:' part + if expected["type"] == "dir": + assert tarinfo.isdir() + elif expected["type"] == "file": + if expected["perms"] == 0o120000: + assert tarinfo.issym() + tgt = get(expected["target_url"]) + symlnk = get(tgt["data_url"]) + assert symlnk == tarinfo.linkpath.encode() + else: + assert tarinfo.isfile() + assert expected["length"] == tarinfo.size + assert ( + sha1(tarfileobj.extractfile(tarinfo).read()).hexdigest() + == expected["checksums"]["sha1"] + ) + else: + breakpoint() + pass