diff --git a/conf/web.yml b/conf/web.yml index 439cfa0..ce0220a 100644 --- a/conf/web.yml +++ b/conf/web.yml @@ -1,61 +1,61 @@ storage: cls: remote url: http://storage:5002/ - timeout: 5 + timeout: 30 objstorage: cls: remote url: http://objstorage:5003/ #indexer_storage: # cls: remote # url: http://indexer-storage:5007/ # scheduler: cls: remote url: http://scheduler:5008/ vault: cls: remote url: http://vault:5005/ allowed_hosts: - appserver debug: yes serve_assets: yes production_db: name: postgresql:///?service=swh throttling: cache_uri: memcache:11211 scopes: swh_api: limiter_rate: default: 120/h exempted_networks: - 0.0.0.0/0 swh_api_origin_search: limiter_rate: default: 70/m exempted_networks: - 0.0.0.0/0 swh_api_origin_visit_latest: limiter_rate: default: 700/m exempted_networks: - 0.0.0.0/0 swh_vault_cooking: limiter_rate: default: 120/h exempted_networks: - 0.0.0.0/0 swh_save_origin: limiter_rate: default: 120/h exempted_networks: - 0.0.0.0/0 search: {} diff --git a/tests/conftest.py b/tests/conftest.py index dc9dce6..cf02122 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,121 +1,121 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from os import chdir, environ from pathlib import Path from shutil import copy, copytree import time from uuid import uuid4 import pytest import testinfra APIURL = "http://127.0.0.1:5080/api/1/" SWH_IMAGE_TAG = environ["SWH_IMAGE_TAG"] SRC_PATH = Path(__file__).resolve().parent.parent KAFKA_USERNAME = environ["SWH_MIRROR_TEST_KAFKA_USERNAME"] KAFKA_PASSWORD = environ["SWH_MIRROR_TEST_KAFKA_PASSWORD"] KAFKA_BROKER = environ["SWH_MIRROR_TEST_KAFKA_BROKER"] KAFKA_GROUPID = f"{KAFKA_USERNAME}-{uuid4()}" OBJSTORAGE_URL = environ["SWH_MIRROR_TEST_OBJSTORAGE_URL"] WFI_TIMEOUT = 60 LOGGER = logging.getLogger(__name__) def pytest_addoption(parser, pluginmanager): parser.addoption( "--keep-stack", action="store_true", help="Do not teardown the docker stack" ) @pytest.fixture(scope="session") def docker_host(): return testinfra.get_host("local://") # scope='session' so we use the same container for all the tests; @pytest.fixture(scope="session") def mirror_stack(request, docker_host, tmp_path_factory): tmp_path = tmp_path_factory.mktemp("mirror") copytree(SRC_PATH / "conf", tmp_path / "conf") copytree(SRC_PATH / "env", tmp_path / "env") copy(SRC_PATH / "mirror.yml", tmp_path) cwd = Path.cwd() chdir(tmp_path) # copy test-specific conf files conftmpl = { "username": KAFKA_USERNAME, "password": KAFKA_PASSWORD, "group_id": KAFKA_GROUPID, "broker": KAFKA_BROKER, "objstorage_url": OBJSTORAGE_URL, } for conffile in (tmp_path / "conf").glob("*.yml.test"): with open(conffile.as_posix()[:-5], "w") as outf: outf.write(conffile.read_text().format(**conftmpl)) # start the whole cluster stack_name = f"swhtest_{tmp_path.name}" LOGGER.info("Create missing secrets") existing_secrets = [ line.strip() for line in docker_host.check_output( "docker secret ls --format '{{.Name}}'" ).splitlines() ] for srv in ("storage", "web", "vault", "scheduler"): secret = f"swh-mirror-{srv}-db-password" if secret not in existing_secrets: LOGGER.info("Creating secret %s", secret) docker_host.check_output( f"echo not-so-secret | docker secret create {secret} -" ) LOGGER.info("Remove config objects (if any)") existing_configs = [ line.strip() for line in docker_host.check_output( "docker config ls --format '{{.Name}}'" ).splitlines() ] for cfg in existing_configs: if cfg.startswith(f"{stack_name}_"): docker_host.check_output(f"docker config rm {cfg}") LOGGER.info("Deploy docker stack %s", stack_name) docker_host.check_output(f"docker stack deploy -c mirror.yml {stack_name}") yield stack_name # breakpoint() if not request.config.getoption("keep_stack"): LOGGER.info("Remove stack %s", stack_name) docker_host.check_output(f"docker stack rm {stack_name}") # wait for services to be down LOGGER.info("Wait for all services of %s to be down", stack_name) while docker_host.check_output( "docker service ls --format {{.Name}} " f"--filter label=com.docker.stack.namespace={stack_name}" ): time.sleep(0.2) # give a bit of time to docker to sync the state of service<->volumes # relations so the next step runs ok - time.sleep(1) + time.sleep(20) LOGGER.info("Remove volumes of stack %s", stack_name) for volume in docker_host.check_output( "docker volume ls --format {{.Name}} " f"--filter label=com.docker.stack.namespace={stack_name}" ).splitlines(): try: docker_host.check_output(f"docker volume rm {volume}") except AssertionError: LOGGER.error("Failed to remove volume %s", volume) chdir(cwd) diff --git a/tests/test_graph_replayer.py b/tests/test_graph_replayer.py index 7dd233e..2be1773 100644 --- a/tests/test_graph_replayer.py +++ b/tests/test_graph_replayer.py @@ -1,446 +1,446 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from hashlib import sha1 from io import BytesIO import re import tarfile import time from urllib.parse import quote from confluent_kafka import Consumer, KafkaException import msgpack import requests from .conftest import KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER SERVICES = { "{}_content-replayer": "0/0", "{}_db-storage": "1/1", "{}_db-web": "1/1", "{}_grafana": "1/1", "{}_graph-replayer": "0/0", "{}_memcache": "1/1", "{}_nginx": "1/1", "{}_objstorage": "1/1", "{}_prometheus": "1/1", "{}_prometheus-statsd-exporter": "1/1", "{}_redis": "1/1", "{}_storage": "1/1", "{}_web": "1/1", "{}_db-vault": "1/1", "{}_vault": "1/1", "{}_vault-worker": "1/1", "{}_scheduler": "1/1", "{}_scheduler-listener": "1/1", "{}_scheduler-runner": "1/1", } -ATTEMPTS = 200 -DELAY = 0.5 +ATTEMPTS = 600 +DELAY = 1 SCALE = 2 API_URL = "http://127.0.0.1:5081/api/1" def running_services(host, stack): all_services = host.check_output( "docker service ls --format '{{.Name}} {{.Replicas}}'" ) return dict( line.split()[:2] for line in all_services.splitlines() if line.startswith(f"{stack}_") ) def check_running_services(host, stack, services): LOGGER.info("Waiting for services %s", services) mirror_services_ = {} for i in range(ATTEMPTS): mirror_services = running_services(host, stack) mirror_services = {k: v for k, v in mirror_services.items() if k in services} if mirror_services == services: LOGGER.info("Got them all!") break if mirror_services != mirror_services_: LOGGER.info("Not yet there %s", mirror_services) mirror_services_ = mirror_services time.sleep(0.5) return mirror_services == services def get_logs(host, service): rows = host.check_output(f"docker service logs -t {service}").splitlines() reg = re.compile( rf"^(?P.+) {service}[.]" r"(?P\d+)[.](?P\w+)@(?P\w+) +[|] " r"(?P.+)$" ) return [m.groupdict() for m in (reg.match(row) for row in rows) if m is not None] def wait_for_log_entry(host, service, logline, occurrences=1): for i in range(ATTEMPTS): logs = get_logs(host, service) match = [entry for entry in logs if logline in entry["logline"]] if match and len(match) >= occurrences: return match time.sleep(DELAY) return [] def content_get(url, done): content = get(url) swhid = f"swh:1:cnt:{content['checksums']['sha1_git']}" # checking the actual blob is present and valid # XXX: a bit sad... try: data = get(content["data_url"]) except Exception as exc: LOGGER.error("Failed loading %s", content["data_url"], exc_info=exc) raise assert len(data) == content["length"] assert sha1(data).hexdigest() == content["checksums"]["sha1"] if swhid not in done: done.add(swhid) yield content def directory_get(url, done): directory = get(url) id = url.split("/")[-2] swhid = f"swh:1:dir:{id}" if swhid not in done: done.add(swhid) for entry in directory: if entry["type"] == "file": swhid = f"swh:1:cnt:{entry['target']}" if swhid not in done: yield from content_get(entry["target_url"], done) elif entry["type"] == "dir": swhid = f"swh:1:dir:{entry['target']}" if swhid not in done: yield from directory_get(entry["target_url"], done) def revision_get(url, done): revision = get(url) swhid = f"swh:1:rev:{revision['id']}" if swhid not in done: done.add(swhid) yield revision swhid = f"swh:1:dir:{revision['directory']}" if swhid not in done: yield from directory_get(revision["directory_url"], done) for parent in revision["parents"]: if f"swh:1:rev:{parent['id']}" not in done: yield from revision_get(parent["url"], done) def snapshot_get(url, done): snapshot = get(url) for branchname, branch in snapshot["branches"].items(): if branch: yield from resolve_target( branch["target_type"], branch["target"], branch["target_url"], done, ) def origin_get(url, done=None): if done is None: done = set() visit = get(f"{API_URL}/origin/{url}/visit/latest/?require_snapshot=true") if not visit.get("snapshot"): return swhid = f"swh:1:snp:{visit['snapshot']}" if swhid not in done: done.add(swhid) snapshot_url = visit["snapshot_url"] if snapshot_url: yield from snapshot_get(snapshot_url, done) def resolve_target(target_type, target, target_url, done): if target_type == "revision": if f"swh:1:rev:{target}" not in done: yield from revision_get(target_url, done) elif target_type == "release": if f"swh:1:rel:{target}" not in done: yield from release_get(target_url, done) elif target_type == "directory": if f"swh:1:dir:{target}" not in done: yield from directory_get(target_url, done) elif target_type == "content": if f"swh:1:cnt:{target}" not in done: yield from content_get(target_url, done) elif target_type == "snapshot": if f"swh:1:snp:{target}" not in done: yield from snapshot_get(target_url, done) # elif target_type == "alias": # if f"swh:1:snp:{target}" not in done: # yield from snapshot_get(target_url, done) def release_get(url, done): release = get(url) swhid = f"swh:1:rel:{release['id']}" if swhid not in done: done.add(swhid) yield release yield from resolve_target( release["target_type"], release["target"], release["target_url"], done ) def branch_get(url): branches = set() visits = get(f"{API_URL}/origin/{url}/visits/") for visit in visits: snapshot_url = visit.get("snapshot_url") while snapshot_url: snapshot = get(snapshot_url) for name, tgt in snapshot["branches"].items(): if tgt is not None: branches.add( (name, tgt["target_type"], tgt["target"], tgt["target_url"]) ) snapshot_url = snapshot["next_branch"] return len(visits), branches timing_stats = [] def get(url): t0 = time.time() resp = requests.get(url) if resp.headers["content-type"].lower() == "application/json": result = resp.json() else: result = resp.content timing_stats.append(time.time() - t0) return result def post(url): t0 = time.time() resp = requests.post(url) assert resp.status_code in (200, 201, 202) if resp.headers["content-type"].lower() == "application/json": result = resp.json() else: result = resp.content timing_stats.append(time.time() - t0) return result def get_stats(origin): result = {"origin": origin} swhids = set() list(origin_get(origin, done=swhids)) result["cnt"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:cnt:")]) result["dir"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:dir:")]) result["rev"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:rev:")]) visits, branches = branch_get(origin) result["visit"] = visits result["release"] = len([br for br in branches if br[1] == "release"]) result["alias"] = len([br for br in branches if br[1] == "alias"]) result["branch"] = len([br for br in branches if br[1] == "revision"]) return result, swhids def get_expected_stats(): cfg = { "bootstrap.servers": "broker1.journal.staging.swh.network:9093", "sasl.username": KAFKA_USERNAME, "sasl.password": KAFKA_PASSWORD, "group.id": KAFKA_GROUPID, "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-512", "session.timeout.ms": 600000, "max.poll.interval.ms": 3600000, "message.max.bytes": 1000000000, "auto.offset.reset": "earliest", "enable.auto.commit": True, "enable.partition.eof": True, } partitions = set() def on_assign(cons, parts): LOGGER.info("assignment %s", parts) for p in parts: partitions.add(p.partition) consumer = Consumer(cfg) consumer.subscribe(["swh.test.objects.stats"], on_assign=on_assign) stats = {} try: while True: msg = consumer.poll(timeout=10.0) if msg is None: if not partitions: break continue if msg.error(): if msg.error().name() == "_PARTITION_EOF": partitions.discard(msg.partition()) if not partitions: break else: raise KafkaException(msg.error()) else: # Proper message k = msgpack.unpackb(msg.key()) v = msgpack.unpackb(msg.value()) LOGGER.info( "%% %s [%d] at offset %d with key %s:\n", msg.topic(), msg.partition(), msg.offset(), k, ) assert k == v["origin"] stats[k] = v except KeyboardInterrupt: assert False, "%% Aborted by user" return stats def test_mirror(host, mirror_stack): services = {k.format(mirror_stack): v for k, v in SERVICES.items()} check_running_services(host, mirror_stack, services) # run replayer services for service_type in ("content", "graph"): service = f"{mirror_stack}_{service_type}-replayer" LOGGER.info("Scale %s to 1", service) host.check_output(f"docker service scale -d {service}=1") if not check_running_services(host, mirror_stack, {service: "1/1"}): breakpoint() logs = wait_for_log_entry( host, service, f"Starting the SWH mirror {service_type} replayer" ) assert len(logs) == 1 LOGGER.info("Scale %s to %d", service, SCALE) host.check_output(f"docker service scale -d {service}={SCALE}") check_running_services(host, mirror_stack, {service: f"{SCALE}/{SCALE}"}) logs = wait_for_log_entry( host, service, f"Starting the SWH mirror {service_type} replayer", SCALE ) assert len(logs) == SCALE # wait for the replaying to be done (stop_on_oef is true) LOGGER.info("Wait for %s to be done", service) logs = wait_for_log_entry(host, service, "Done.", SCALE) # >= SCALE below because replayer services may have been restarted # (once done) before we scale them to 0 if not (len(logs) >= SCALE): breakpoint() assert len(logs) >= SCALE LOGGER.info("Scale %s to 0", service) check_running_services(host, mirror_stack, {service: f"0/{SCALE}"}) # TODO: check there are no error reported in redis after the replayers are done origins = get(f"{API_URL}/origins/") if False: # check replicated archive is in good shape expected_stats = get_expected_stats() LOGGER.info("Check replicated archive") # seems the graph replayer is OK, let's check the archive can tell something expected_origins = sorted(expected_stats) assert len(origins) == len(expected_origins) assert sorted(o["url"] for o in origins) == expected_origins for origin, expected in expected_stats.items(): timing_stats.clear() assert origin == expected["origin"] origin_stats, swhids = get_stats(origin) LOGGER.info("%s", origin_stats) LOGGER.info("%d REQS took %ss", len(timing_stats), sum(timing_stats)) assert origin_stats == expected LOGGER.info("%s is OK", origin) # test the vault service cooks = [] # first start all the cookings for origin in origins: LOGGER.info("Cook HEAD for %s", origin["url"]) visit = get( f"{API_URL}/origin/{origin['url']}/visit/latest/?require_snapshot=true" ) assert visit snp = get(visit["snapshot_url"]) assert snp branches = snp.get("branches", {}) head = branches.get("HEAD") assert head while True: if head["target_type"] == "alias": head = branches[head["target"]] elif head["target_type"] == "release": head = get(head["target_url"]) elif head["target_type"] == "directory": swhid = f"swh:1:dir:{head['target']}" break elif head["target_type"] == "revision": rev = get(head["target_url"]) swhid = f"swh:1:dir:{rev['directory']}" break else: breakpoint() LOGGER.info("Directory is %s", swhid) cook = post(f"{API_URL}/vault/flat/{swhid}/") assert cook assert cook["status"] in ("new", "pending") cooks.append((origin["url"], swhid, cook)) # then wait for successful cooks while not all(cook["status"] == "done" for _, _, cook in cooks): origin, swhid, cook = cooks.pop(0) cook = get(f"{API_URL}/vault/flat/{swhid}") cooks.append((origin, swhid, cook)) # should all be in "done" status for origin, swhid, cook in cooks: assert cook["status"] == "done" # so we can download it tarfilecontent = get(cook["fetch_url"]) assert isinstance(tarfilecontent, bytes) tarfileobj = tarfile.open(fileobj=BytesIO(tarfilecontent)) filelist = tarfileobj.getnames() assert all(fname.startswith(swhid) for fname in filelist) for path in filelist[1:]: tarinfo = tarfileobj.getmember(path) url = f"{API_URL}/directory/{quote(path[10:])}" expected = get(url) # remove the 'swh:1:dir:' part LOGGER.info("Retrieved from storage: %s → %s", url, expected) if expected["type"] == "dir": assert tarinfo.isdir() elif expected["type"] == "file": if expected["perms"] == 0o120000: assert tarinfo.issym() tgt = get(expected["target_url"]) symlnk = get(tgt["data_url"]) assert symlnk == tarinfo.linkpath.encode() else: assert tarinfo.isfile() assert expected["length"] == tarinfo.size assert ( sha1(tarfileobj.extractfile(tarinfo).read()).hexdigest() == expected["checksums"]["sha1"] ) else: breakpoint() pass