diff --git a/pyproject.toml b/pyproject.toml index eaa1b2f..cd6721d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,14 +1,18 @@ [tool.black] target-version = ['py37'] [tool.isort] multi_line_output = 3 include_trailing_comma = true force_grid_wrap = 0 use_parentheses = true ensure_newline_before_comments = true line_length = 88 force_sort_within_sections = true [tool.pytest.ini_options] env_files = 'env/tests.env' +log_cli = true +log_cli_level = "INFO" +log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)" +log_cli_date_format = "%Y-%m-%d %H:%M:%S" diff --git a/tests/conftest.py b/tests/conftest.py index 2993987..dc9dce6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,118 +1,121 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging from os import chdir, environ from pathlib import Path from shutil import copy, copytree import time from uuid import uuid4 import pytest import testinfra APIURL = "http://127.0.0.1:5080/api/1/" SWH_IMAGE_TAG = environ["SWH_IMAGE_TAG"] SRC_PATH = Path(__file__).resolve().parent.parent KAFKA_USERNAME = environ["SWH_MIRROR_TEST_KAFKA_USERNAME"] KAFKA_PASSWORD = environ["SWH_MIRROR_TEST_KAFKA_PASSWORD"] KAFKA_BROKER = environ["SWH_MIRROR_TEST_KAFKA_BROKER"] KAFKA_GROUPID = f"{KAFKA_USERNAME}-{uuid4()}" OBJSTORAGE_URL = environ["SWH_MIRROR_TEST_OBJSTORAGE_URL"] WFI_TIMEOUT = 60 +LOGGER = logging.getLogger(__name__) + def pytest_addoption(parser, pluginmanager): parser.addoption( "--keep-stack", action="store_true", help="Do not teardown the docker stack" ) @pytest.fixture(scope="session") def docker_host(): return testinfra.get_host("local://") # scope='session' so we use the same container for all the tests; @pytest.fixture(scope="session") def mirror_stack(request, docker_host, tmp_path_factory): tmp_path = tmp_path_factory.mktemp("mirror") copytree(SRC_PATH / "conf", tmp_path / "conf") copytree(SRC_PATH / "env", tmp_path / "env") copy(SRC_PATH / "mirror.yml", tmp_path) cwd = Path.cwd() chdir(tmp_path) # copy test-specific conf files conftmpl = { "username": KAFKA_USERNAME, "password": KAFKA_PASSWORD, "group_id": KAFKA_GROUPID, "broker": KAFKA_BROKER, "objstorage_url": OBJSTORAGE_URL, } for conffile in (tmp_path / "conf").glob("*.yml.test"): with open(conffile.as_posix()[:-5], "w") as outf: outf.write(conffile.read_text().format(**conftmpl)) # start the whole cluster stack_name = f"swhtest_{tmp_path.name}" - print("Create missing secrets") + LOGGER.info("Create missing secrets") existing_secrets = [ line.strip() for line in docker_host.check_output( "docker secret ls --format '{{.Name}}'" ).splitlines() ] for srv in ("storage", "web", "vault", "scheduler"): secret = f"swh-mirror-{srv}-db-password" if secret not in existing_secrets: - print("Creating secret {secret}") + LOGGER.info("Creating secret %s", secret) docker_host.check_output( f"echo not-so-secret | docker secret create {secret} -" ) - print("Remove config objects (if any)") + LOGGER.info("Remove config objects (if any)") existing_configs = [ line.strip() for line in docker_host.check_output( "docker config ls --format '{{.Name}}'" ).splitlines() ] for cfg in existing_configs: if cfg.startswith(f"{stack_name}_"): docker_host.check_output(f"docker config rm {cfg}") - print(f"Deploy docker stack {stack_name}") + LOGGER.info("Deploy docker stack %s", stack_name) docker_host.check_output(f"docker stack deploy -c mirror.yml {stack_name}") yield stack_name # breakpoint() if not request.config.getoption("keep_stack"): - print(f"Remove stack {stack_name}") + LOGGER.info("Remove stack %s", stack_name) docker_host.check_output(f"docker stack rm {stack_name}") # wait for services to be down - print(f"Wait for all services of {stack_name} to be down") + LOGGER.info("Wait for all services of %s to be down", stack_name) while docker_host.check_output( "docker service ls --format {{.Name}} " f"--filter label=com.docker.stack.namespace={stack_name}" ): time.sleep(0.2) # give a bit of time to docker to sync the state of service<->volumes # relations so the next step runs ok time.sleep(1) - print(f"Remove volumes of stack {stack_name}") + LOGGER.info("Remove volumes of stack %s", stack_name) for volume in docker_host.check_output( "docker volume ls --format {{.Name}} " f"--filter label=com.docker.stack.namespace={stack_name}" ).splitlines(): try: docker_host.check_output(f"docker volume rm {volume}") except AssertionError: - print(f"Failed to remove volume {volume}") + LOGGER.error("Failed to remove volume %s", volume) chdir(cwd) diff --git a/tests/test_graph_replayer.py b/tests/test_graph_replayer.py index 3a0b158..7dd233e 100644 --- a/tests/test_graph_replayer.py +++ b/tests/test_graph_replayer.py @@ -1,444 +1,446 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from hashlib import sha1 from io import BytesIO import re import tarfile import time from urllib.parse import quote from confluent_kafka import Consumer, KafkaException import msgpack import requests -from .conftest import KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME +from .conftest import KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER SERVICES = { "{}_content-replayer": "0/0", "{}_db-storage": "1/1", "{}_db-web": "1/1", "{}_grafana": "1/1", "{}_graph-replayer": "0/0", "{}_memcache": "1/1", "{}_nginx": "1/1", "{}_objstorage": "1/1", "{}_prometheus": "1/1", "{}_prometheus-statsd-exporter": "1/1", "{}_redis": "1/1", "{}_storage": "1/1", "{}_web": "1/1", "{}_db-vault": "1/1", "{}_vault": "1/1", "{}_vault-worker": "1/1", "{}_scheduler": "1/1", "{}_scheduler-listener": "1/1", "{}_scheduler-runner": "1/1", } ATTEMPTS = 200 DELAY = 0.5 SCALE = 2 API_URL = "http://127.0.0.1:5081/api/1" def running_services(host, stack): all_services = host.check_output( "docker service ls --format '{{.Name}} {{.Replicas}}'" ) return dict( line.split()[:2] for line in all_services.splitlines() if line.startswith(f"{stack}_") ) def check_running_services(host, stack, services): - print("Waiting for service", services) + LOGGER.info("Waiting for services %s", services) mirror_services_ = {} for i in range(ATTEMPTS): mirror_services = running_services(host, stack) mirror_services = {k: v for k, v in mirror_services.items() if k in services} if mirror_services == services: - print("Got them all!") + LOGGER.info("Got them all!") break if mirror_services != mirror_services_: - print("Not yet there", mirror_services) + LOGGER.info("Not yet there %s", mirror_services) mirror_services_ = mirror_services time.sleep(0.5) return mirror_services == services def get_logs(host, service): rows = host.check_output(f"docker service logs -t {service}").splitlines() reg = re.compile( rf"^(?P.+) {service}[.]" r"(?P\d+)[.](?P\w+)@(?P\w+) +[|] " r"(?P.+)$" ) return [m.groupdict() for m in (reg.match(row) for row in rows) if m is not None] def wait_for_log_entry(host, service, logline, occurrences=1): for i in range(ATTEMPTS): logs = get_logs(host, service) match = [entry for entry in logs if logline in entry["logline"]] if match and len(match) >= occurrences: return match time.sleep(DELAY) return [] def content_get(url, done): content = get(url) swhid = f"swh:1:cnt:{content['checksums']['sha1_git']}" # checking the actual blob is present and valid # XXX: a bit sad... try: data = get(content["data_url"]) except Exception as exc: - print("Failed loading", content["data_url"]) - print(exc) + LOGGER.error("Failed loading %s", content["data_url"], exc_info=exc) raise assert len(data) == content["length"] assert sha1(data).hexdigest() == content["checksums"]["sha1"] if swhid not in done: done.add(swhid) yield content def directory_get(url, done): directory = get(url) id = url.split("/")[-2] swhid = f"swh:1:dir:{id}" if swhid not in done: done.add(swhid) for entry in directory: if entry["type"] == "file": swhid = f"swh:1:cnt:{entry['target']}" if swhid not in done: yield from content_get(entry["target_url"], done) elif entry["type"] == "dir": swhid = f"swh:1:dir:{entry['target']}" if swhid not in done: yield from directory_get(entry["target_url"], done) def revision_get(url, done): revision = get(url) swhid = f"swh:1:rev:{revision['id']}" if swhid not in done: done.add(swhid) yield revision swhid = f"swh:1:dir:{revision['directory']}" if swhid not in done: yield from directory_get(revision["directory_url"], done) for parent in revision["parents"]: if f"swh:1:rev:{parent['id']}" not in done: yield from revision_get(parent["url"], done) def snapshot_get(url, done): snapshot = get(url) for branchname, branch in snapshot["branches"].items(): if branch: yield from resolve_target( branch["target_type"], branch["target"], branch["target_url"], done, ) def origin_get(url, done=None): if done is None: done = set() visit = get(f"{API_URL}/origin/{url}/visit/latest/?require_snapshot=true") if not visit.get("snapshot"): return swhid = f"swh:1:snp:{visit['snapshot']}" if swhid not in done: done.add(swhid) snapshot_url = visit["snapshot_url"] if snapshot_url: yield from snapshot_get(snapshot_url, done) def resolve_target(target_type, target, target_url, done): if target_type == "revision": if f"swh:1:rev:{target}" not in done: yield from revision_get(target_url, done) elif target_type == "release": if f"swh:1:rel:{target}" not in done: yield from release_get(target_url, done) elif target_type == "directory": if f"swh:1:dir:{target}" not in done: yield from directory_get(target_url, done) elif target_type == "content": if f"swh:1:cnt:{target}" not in done: yield from content_get(target_url, done) elif target_type == "snapshot": if f"swh:1:snp:{target}" not in done: yield from snapshot_get(target_url, done) # elif target_type == "alias": # if f"swh:1:snp:{target}" not in done: # yield from snapshot_get(target_url, done) def release_get(url, done): release = get(url) swhid = f"swh:1:rel:{release['id']}" if swhid not in done: done.add(swhid) yield release yield from resolve_target( release["target_type"], release["target"], release["target_url"], done ) def branch_get(url): branches = set() visits = get(f"{API_URL}/origin/{url}/visits/") for visit in visits: snapshot_url = visit.get("snapshot_url") while snapshot_url: snapshot = get(snapshot_url) for name, tgt in snapshot["branches"].items(): if tgt is not None: branches.add( (name, tgt["target_type"], tgt["target"], tgt["target_url"]) ) snapshot_url = snapshot["next_branch"] return len(visits), branches timing_stats = [] def get(url): t0 = time.time() resp = requests.get(url) if resp.headers["content-type"].lower() == "application/json": result = resp.json() else: result = resp.content timing_stats.append(time.time() - t0) return result def post(url): t0 = time.time() resp = requests.post(url) assert resp.status_code in (200, 201, 202) if resp.headers["content-type"].lower() == "application/json": result = resp.json() else: result = resp.content timing_stats.append(time.time() - t0) return result def get_stats(origin): result = {"origin": origin} swhids = set() list(origin_get(origin, done=swhids)) result["cnt"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:cnt:")]) result["dir"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:dir:")]) result["rev"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:rev:")]) visits, branches = branch_get(origin) result["visit"] = visits result["release"] = len([br for br in branches if br[1] == "release"]) result["alias"] = len([br for br in branches if br[1] == "alias"]) result["branch"] = len([br for br in branches if br[1] == "revision"]) return result, swhids def get_expected_stats(): cfg = { "bootstrap.servers": "broker1.journal.staging.swh.network:9093", "sasl.username": KAFKA_USERNAME, "sasl.password": KAFKA_PASSWORD, "group.id": KAFKA_GROUPID, "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-512", "session.timeout.ms": 600000, "max.poll.interval.ms": 3600000, "message.max.bytes": 1000000000, "auto.offset.reset": "earliest", "enable.auto.commit": True, "enable.partition.eof": True, } partitions = set() def on_assign(cons, parts): - print("assignment", parts) + LOGGER.info("assignment %s", parts) for p in parts: partitions.add(p.partition) consumer = Consumer(cfg) consumer.subscribe(["swh.test.objects.stats"], on_assign=on_assign) stats = {} try: while True: msg = consumer.poll(timeout=10.0) if msg is None: if not partitions: break continue if msg.error(): if msg.error().name() == "_PARTITION_EOF": partitions.discard(msg.partition()) if not partitions: break else: raise KafkaException(msg.error()) else: # Proper message k = msgpack.unpackb(msg.key()) v = msgpack.unpackb(msg.value()) - print( - "%% %s [%d] at offset %d with key %s:\n" - % (msg.topic(), msg.partition(), msg.offset(), k) + LOGGER.info( + "%% %s [%d] at offset %d with key %s:\n", + msg.topic(), + msg.partition(), + msg.offset(), + k, ) assert k == v["origin"] stats[k] = v except KeyboardInterrupt: assert False, "%% Aborted by user" return stats def test_mirror(host, mirror_stack): services = {k.format(mirror_stack): v for k, v in SERVICES.items()} check_running_services(host, mirror_stack, services) # run replayer services for service_type in ("content", "graph"): service = f"{mirror_stack}_{service_type}-replayer" - print(f"Scale {service} to 1") + LOGGER.info("Scale %s to 1", service) host.check_output(f"docker service scale -d {service}=1") if not check_running_services(host, mirror_stack, {service: "1/1"}): breakpoint() logs = wait_for_log_entry( host, service, f"Starting the SWH mirror {service_type} replayer" ) assert len(logs) == 1 - print(f"Scale {service} to {SCALE}") + LOGGER.info("Scale %s to %d", service, SCALE) host.check_output(f"docker service scale -d {service}={SCALE}") check_running_services(host, mirror_stack, {service: f"{SCALE}/{SCALE}"}) logs = wait_for_log_entry( host, service, f"Starting the SWH mirror {service_type} replayer", SCALE ) assert len(logs) == SCALE # wait for the replaying to be done (stop_on_oef is true) - print(f"Wait for {service} to be done") + LOGGER.info("Wait for %s to be done", service) logs = wait_for_log_entry(host, service, "Done.", SCALE) # >= SCALE below because replayer services may have been restarted # (once done) before we scale them to 0 if not (len(logs) >= SCALE): breakpoint() assert len(logs) >= SCALE - print(f"Scale {service} to 0") + LOGGER.info("Scale %s to 0", service) check_running_services(host, mirror_stack, {service: f"0/{SCALE}"}) # TODO: check there are no error reported in redis after the replayers are done origins = get(f"{API_URL}/origins/") if False: # check replicated archive is in good shape expected_stats = get_expected_stats() - print("Check replicated archive") + LOGGER.info("Check replicated archive") # seems the graph replayer is OK, let's check the archive can tell something expected_origins = sorted(expected_stats) assert len(origins) == len(expected_origins) assert sorted(o["url"] for o in origins) == expected_origins for origin, expected in expected_stats.items(): timing_stats.clear() assert origin == expected["origin"] origin_stats, swhids = get_stats(origin) - print(origin_stats) - print(f"{len(timing_stats)} REQS took {sum(timing_stats)}s") + LOGGER.info("%s", origin_stats) + LOGGER.info("%d REQS took %ss", len(timing_stats), sum(timing_stats)) assert origin_stats == expected - print(f"{origin} is OK") + LOGGER.info("%s is OK", origin) # test the vault service cooks = [] # first start all the cookings for origin in origins: - print(f"Cook HEAD for {origin['url']}") + LOGGER.info("Cook HEAD for %s", origin["url"]) visit = get( f"{API_URL}/origin/{origin['url']}/visit/latest/?require_snapshot=true" ) assert visit snp = get(visit["snapshot_url"]) assert snp branches = snp.get("branches", {}) head = branches.get("HEAD") assert head while True: if head["target_type"] == "alias": head = branches[head["target"]] elif head["target_type"] == "release": head = get(head["target_url"]) elif head["target_type"] == "directory": swhid = f"swh:1:dir:{head['target']}" break elif head["target_type"] == "revision": rev = get(head["target_url"]) swhid = f"swh:1:dir:{rev['directory']}" break else: breakpoint() - print(f"Directory is {swhid}") + LOGGER.info("Directory is %s", swhid) cook = post(f"{API_URL}/vault/flat/{swhid}/") assert cook assert cook["status"] in ("new", "pending") cooks.append((origin["url"], swhid, cook)) # then wait for successful cooks while not all(cook["status"] == "done" for _, _, cook in cooks): origin, swhid, cook = cooks.pop(0) cook = get(f"{API_URL}/vault/flat/{swhid}") cooks.append((origin, swhid, cook)) # should all be in "done" status for origin, swhid, cook in cooks: assert cook["status"] == "done" # so we can download it tarfilecontent = get(cook["fetch_url"]) assert isinstance(tarfilecontent, bytes) tarfileobj = tarfile.open(fileobj=BytesIO(tarfilecontent)) filelist = tarfileobj.getnames() assert all(fname.startswith(swhid) for fname in filelist) for path in filelist[1:]: tarinfo = tarfileobj.getmember(path) - expected = get( - f"{API_URL}/directory/{quote(path[10:])}" - ) # remove the 'swh:1:dir:' part + url = f"{API_URL}/directory/{quote(path[10:])}" + expected = get(url) # remove the 'swh:1:dir:' part + LOGGER.info("Retrieved from storage: %s → %s", url, expected) if expected["type"] == "dir": assert tarinfo.isdir() elif expected["type"] == "file": if expected["perms"] == 0o120000: assert tarinfo.issym() tgt = get(expected["target_url"]) symlnk = get(tgt["data_url"]) assert symlnk == tarinfo.linkpath.encode() else: assert tarinfo.isfile() assert expected["length"] == tarinfo.size assert ( sha1(tarfileobj.extractfile(tarinfo).read()).hexdigest() == expected["checksums"]["sha1"] ) else: breakpoint() pass