diff --git a/tests/test_graph_replayer.py b/tests/test_graph_replayer.py index 2be1773..a715fbc 100644 --- a/tests/test_graph_replayer.py +++ b/tests/test_graph_replayer.py @@ -1,446 +1,449 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from hashlib import sha1 from io import BytesIO import re import tarfile import time from urllib.parse import quote from confluent_kafka import Consumer, KafkaException import msgpack import requests from .conftest import KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER SERVICES = { + "{}_amqp": "1/1", "{}_content-replayer": "0/0", - "{}_db-storage": "1/1", - "{}_db-web": "1/1", "{}_grafana": "1/1", "{}_graph-replayer": "0/0", "{}_memcache": "1/1", + "{}_mailhog": "1/1", "{}_nginx": "1/1", "{}_objstorage": "1/1", "{}_prometheus": "1/1", "{}_prometheus-statsd-exporter": "1/1", "{}_redis": "1/1", "{}_storage": "1/1", + "{}_storage-db": "1/1", "{}_web": "1/1", - "{}_db-vault": "1/1", + "{}_web-db": "1/1", "{}_vault": "1/1", + "{}_vault-db": "1/1", "{}_vault-worker": "1/1", "{}_scheduler": "1/1", + "{}_scheduler-db": "1/1", "{}_scheduler-listener": "1/1", "{}_scheduler-runner": "1/1", } ATTEMPTS = 600 DELAY = 1 SCALE = 2 API_URL = "http://127.0.0.1:5081/api/1" def running_services(host, stack): all_services = host.check_output( "docker service ls --format '{{.Name}} {{.Replicas}}'" ) return dict( line.split()[:2] for line in all_services.splitlines() if line.startswith(f"{stack}_") ) def check_running_services(host, stack, services): LOGGER.info("Waiting for services %s", services) mirror_services_ = {} for i in range(ATTEMPTS): mirror_services = running_services(host, stack) mirror_services = {k: v for k, v in mirror_services.items() if k in services} if mirror_services == services: LOGGER.info("Got them all!") break if mirror_services != mirror_services_: LOGGER.info("Not yet there %s", mirror_services) mirror_services_ = mirror_services time.sleep(0.5) return mirror_services == services def get_logs(host, service): rows = host.check_output(f"docker service logs -t {service}").splitlines() reg = re.compile( rf"^(?P.+) {service}[.]" r"(?P\d+)[.](?P\w+)@(?P\w+) +[|] " r"(?P.+)$" ) return [m.groupdict() for m in (reg.match(row) for row in rows) if m is not None] def wait_for_log_entry(host, service, logline, occurrences=1): for i in range(ATTEMPTS): logs = get_logs(host, service) match = [entry for entry in logs if logline in entry["logline"]] if match and len(match) >= occurrences: return match time.sleep(DELAY) return [] def content_get(url, done): content = get(url) swhid = f"swh:1:cnt:{content['checksums']['sha1_git']}" # checking the actual blob is present and valid # XXX: a bit sad... try: data = get(content["data_url"]) except Exception as exc: LOGGER.error("Failed loading %s", content["data_url"], exc_info=exc) raise assert len(data) == content["length"] assert sha1(data).hexdigest() == content["checksums"]["sha1"] if swhid not in done: done.add(swhid) yield content def directory_get(url, done): directory = get(url) id = url.split("/")[-2] swhid = f"swh:1:dir:{id}" if swhid not in done: done.add(swhid) for entry in directory: if entry["type"] == "file": swhid = f"swh:1:cnt:{entry['target']}" if swhid not in done: yield from content_get(entry["target_url"], done) elif entry["type"] == "dir": swhid = f"swh:1:dir:{entry['target']}" if swhid not in done: yield from directory_get(entry["target_url"], done) def revision_get(url, done): revision = get(url) swhid = f"swh:1:rev:{revision['id']}" if swhid not in done: done.add(swhid) yield revision swhid = f"swh:1:dir:{revision['directory']}" if swhid not in done: yield from directory_get(revision["directory_url"], done) for parent in revision["parents"]: if f"swh:1:rev:{parent['id']}" not in done: yield from revision_get(parent["url"], done) def snapshot_get(url, done): snapshot = get(url) for branchname, branch in snapshot["branches"].items(): if branch: yield from resolve_target( branch["target_type"], branch["target"], branch["target_url"], done, ) def origin_get(url, done=None): if done is None: done = set() visit = get(f"{API_URL}/origin/{url}/visit/latest/?require_snapshot=true") if not visit.get("snapshot"): return swhid = f"swh:1:snp:{visit['snapshot']}" if swhid not in done: done.add(swhid) snapshot_url = visit["snapshot_url"] if snapshot_url: yield from snapshot_get(snapshot_url, done) def resolve_target(target_type, target, target_url, done): if target_type == "revision": if f"swh:1:rev:{target}" not in done: yield from revision_get(target_url, done) elif target_type == "release": if f"swh:1:rel:{target}" not in done: yield from release_get(target_url, done) elif target_type == "directory": if f"swh:1:dir:{target}" not in done: yield from directory_get(target_url, done) elif target_type == "content": if f"swh:1:cnt:{target}" not in done: yield from content_get(target_url, done) elif target_type == "snapshot": if f"swh:1:snp:{target}" not in done: yield from snapshot_get(target_url, done) # elif target_type == "alias": # if f"swh:1:snp:{target}" not in done: # yield from snapshot_get(target_url, done) def release_get(url, done): release = get(url) swhid = f"swh:1:rel:{release['id']}" if swhid not in done: done.add(swhid) yield release yield from resolve_target( release["target_type"], release["target"], release["target_url"], done ) def branch_get(url): branches = set() visits = get(f"{API_URL}/origin/{url}/visits/") for visit in visits: snapshot_url = visit.get("snapshot_url") while snapshot_url: snapshot = get(snapshot_url) for name, tgt in snapshot["branches"].items(): if tgt is not None: branches.add( (name, tgt["target_type"], tgt["target"], tgt["target_url"]) ) snapshot_url = snapshot["next_branch"] return len(visits), branches timing_stats = [] def get(url): t0 = time.time() resp = requests.get(url) if resp.headers["content-type"].lower() == "application/json": result = resp.json() else: result = resp.content timing_stats.append(time.time() - t0) return result def post(url): t0 = time.time() resp = requests.post(url) assert resp.status_code in (200, 201, 202) if resp.headers["content-type"].lower() == "application/json": result = resp.json() else: result = resp.content timing_stats.append(time.time() - t0) return result def get_stats(origin): result = {"origin": origin} swhids = set() list(origin_get(origin, done=swhids)) result["cnt"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:cnt:")]) result["dir"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:dir:")]) result["rev"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:rev:")]) visits, branches = branch_get(origin) result["visit"] = visits result["release"] = len([br for br in branches if br[1] == "release"]) result["alias"] = len([br for br in branches if br[1] == "alias"]) result["branch"] = len([br for br in branches if br[1] == "revision"]) return result, swhids def get_expected_stats(): cfg = { "bootstrap.servers": "broker1.journal.staging.swh.network:9093", "sasl.username": KAFKA_USERNAME, "sasl.password": KAFKA_PASSWORD, "group.id": KAFKA_GROUPID, "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-512", "session.timeout.ms": 600000, "max.poll.interval.ms": 3600000, "message.max.bytes": 1000000000, "auto.offset.reset": "earliest", "enable.auto.commit": True, "enable.partition.eof": True, } partitions = set() def on_assign(cons, parts): LOGGER.info("assignment %s", parts) for p in parts: partitions.add(p.partition) consumer = Consumer(cfg) consumer.subscribe(["swh.test.objects.stats"], on_assign=on_assign) stats = {} try: while True: msg = consumer.poll(timeout=10.0) if msg is None: if not partitions: break continue if msg.error(): if msg.error().name() == "_PARTITION_EOF": partitions.discard(msg.partition()) if not partitions: break else: raise KafkaException(msg.error()) else: # Proper message k = msgpack.unpackb(msg.key()) v = msgpack.unpackb(msg.value()) LOGGER.info( "%% %s [%d] at offset %d with key %s:\n", msg.topic(), msg.partition(), msg.offset(), k, ) assert k == v["origin"] stats[k] = v except KeyboardInterrupt: assert False, "%% Aborted by user" return stats def test_mirror(host, mirror_stack): services = {k.format(mirror_stack): v for k, v in SERVICES.items()} check_running_services(host, mirror_stack, services) # run replayer services for service_type in ("content", "graph"): service = f"{mirror_stack}_{service_type}-replayer" LOGGER.info("Scale %s to 1", service) host.check_output(f"docker service scale -d {service}=1") if not check_running_services(host, mirror_stack, {service: "1/1"}): breakpoint() logs = wait_for_log_entry( host, service, f"Starting the SWH mirror {service_type} replayer" ) assert len(logs) == 1 LOGGER.info("Scale %s to %d", service, SCALE) host.check_output(f"docker service scale -d {service}={SCALE}") check_running_services(host, mirror_stack, {service: f"{SCALE}/{SCALE}"}) logs = wait_for_log_entry( host, service, f"Starting the SWH mirror {service_type} replayer", SCALE ) assert len(logs) == SCALE # wait for the replaying to be done (stop_on_oef is true) LOGGER.info("Wait for %s to be done", service) logs = wait_for_log_entry(host, service, "Done.", SCALE) # >= SCALE below because replayer services may have been restarted # (once done) before we scale them to 0 if not (len(logs) >= SCALE): breakpoint() assert len(logs) >= SCALE LOGGER.info("Scale %s to 0", service) check_running_services(host, mirror_stack, {service: f"0/{SCALE}"}) # TODO: check there are no error reported in redis after the replayers are done origins = get(f"{API_URL}/origins/") if False: # check replicated archive is in good shape expected_stats = get_expected_stats() LOGGER.info("Check replicated archive") # seems the graph replayer is OK, let's check the archive can tell something expected_origins = sorted(expected_stats) assert len(origins) == len(expected_origins) assert sorted(o["url"] for o in origins) == expected_origins for origin, expected in expected_stats.items(): timing_stats.clear() assert origin == expected["origin"] origin_stats, swhids = get_stats(origin) LOGGER.info("%s", origin_stats) LOGGER.info("%d REQS took %ss", len(timing_stats), sum(timing_stats)) assert origin_stats == expected LOGGER.info("%s is OK", origin) # test the vault service cooks = [] # first start all the cookings for origin in origins: LOGGER.info("Cook HEAD for %s", origin["url"]) visit = get( f"{API_URL}/origin/{origin['url']}/visit/latest/?require_snapshot=true" ) assert visit snp = get(visit["snapshot_url"]) assert snp branches = snp.get("branches", {}) head = branches.get("HEAD") assert head while True: if head["target_type"] == "alias": head = branches[head["target"]] elif head["target_type"] == "release": head = get(head["target_url"]) elif head["target_type"] == "directory": swhid = f"swh:1:dir:{head['target']}" break elif head["target_type"] == "revision": rev = get(head["target_url"]) swhid = f"swh:1:dir:{rev['directory']}" break else: breakpoint() LOGGER.info("Directory is %s", swhid) cook = post(f"{API_URL}/vault/flat/{swhid}/") assert cook assert cook["status"] in ("new", "pending") cooks.append((origin["url"], swhid, cook)) # then wait for successful cooks while not all(cook["status"] == "done" for _, _, cook in cooks): origin, swhid, cook = cooks.pop(0) cook = get(f"{API_URL}/vault/flat/{swhid}") cooks.append((origin, swhid, cook)) # should all be in "done" status for origin, swhid, cook in cooks: assert cook["status"] == "done" # so we can download it tarfilecontent = get(cook["fetch_url"]) assert isinstance(tarfilecontent, bytes) tarfileobj = tarfile.open(fileobj=BytesIO(tarfilecontent)) filelist = tarfileobj.getnames() assert all(fname.startswith(swhid) for fname in filelist) for path in filelist[1:]: tarinfo = tarfileobj.getmember(path) url = f"{API_URL}/directory/{quote(path[10:])}" expected = get(url) # remove the 'swh:1:dir:' part LOGGER.info("Retrieved from storage: %s → %s", url, expected) if expected["type"] == "dir": assert tarinfo.isdir() elif expected["type"] == "file": if expected["perms"] == 0o120000: assert tarinfo.issym() tgt = get(expected["target_url"]) symlnk = get(tgt["data_url"]) assert symlnk == tarinfo.linkpath.encode() else: assert tarinfo.isfile() assert expected["length"] == tarinfo.size assert ( sha1(tarfileobj.extractfile(tarinfo).read()).hexdigest() == expected["checksums"]["sha1"] ) else: breakpoint() pass