Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9339762
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
View Options
diff --git a/tests/test_graph_replayer.py b/tests/test_graph_replayer.py
index 2be1773..a715fbc 100644
--- a/tests/test_graph_replayer.py
+++ b/tests/test_graph_replayer.py
@@ -1,446 +1,449 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from hashlib import sha1
from io import BytesIO
import re
import tarfile
import time
from urllib.parse import quote
from confluent_kafka import Consumer, KafkaException
import msgpack
import requests
from .conftest import KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER
SERVICES = {
+ "{}_amqp": "1/1",
"{}_content-replayer": "0/0",
- "{}_db-storage": "1/1",
- "{}_db-web": "1/1",
"{}_grafana": "1/1",
"{}_graph-replayer": "0/0",
"{}_memcache": "1/1",
+ "{}_mailhog": "1/1",
"{}_nginx": "1/1",
"{}_objstorage": "1/1",
"{}_prometheus": "1/1",
"{}_prometheus-statsd-exporter": "1/1",
"{}_redis": "1/1",
"{}_storage": "1/1",
+ "{}_storage-db": "1/1",
"{}_web": "1/1",
- "{}_db-vault": "1/1",
+ "{}_web-db": "1/1",
"{}_vault": "1/1",
+ "{}_vault-db": "1/1",
"{}_vault-worker": "1/1",
"{}_scheduler": "1/1",
+ "{}_scheduler-db": "1/1",
"{}_scheduler-listener": "1/1",
"{}_scheduler-runner": "1/1",
}
ATTEMPTS = 600
DELAY = 1
SCALE = 2
API_URL = "http://127.0.0.1:5081/api/1"
def running_services(host, stack):
all_services = host.check_output(
"docker service ls --format '{{.Name}} {{.Replicas}}'"
)
return dict(
line.split()[:2]
for line in all_services.splitlines()
if line.startswith(f"{stack}_")
)
def check_running_services(host, stack, services):
LOGGER.info("Waiting for services %s", services)
mirror_services_ = {}
for i in range(ATTEMPTS):
mirror_services = running_services(host, stack)
mirror_services = {k: v for k, v in mirror_services.items() if k in services}
if mirror_services == services:
LOGGER.info("Got them all!")
break
if mirror_services != mirror_services_:
LOGGER.info("Not yet there %s", mirror_services)
mirror_services_ = mirror_services
time.sleep(0.5)
return mirror_services == services
def get_logs(host, service):
rows = host.check_output(f"docker service logs -t {service}").splitlines()
reg = re.compile(
rf"^(?P<timestamp>.+) {service}[.]"
r"(?P<num>\d+)[.](?P<id>\w+)@(?P<host>\w+) +[|] "
r"(?P<logline>.+)$"
)
return [m.groupdict() for m in (reg.match(row) for row in rows) if m is not None]
def wait_for_log_entry(host, service, logline, occurrences=1):
for i in range(ATTEMPTS):
logs = get_logs(host, service)
match = [entry for entry in logs if logline in entry["logline"]]
if match and len(match) >= occurrences:
return match
time.sleep(DELAY)
return []
def content_get(url, done):
content = get(url)
swhid = f"swh:1:cnt:{content['checksums']['sha1_git']}"
# checking the actual blob is present and valid
# XXX: a bit sad...
try:
data = get(content["data_url"])
except Exception as exc:
LOGGER.error("Failed loading %s", content["data_url"], exc_info=exc)
raise
assert len(data) == content["length"]
assert sha1(data).hexdigest() == content["checksums"]["sha1"]
if swhid not in done:
done.add(swhid)
yield content
def directory_get(url, done):
directory = get(url)
id = url.split("/")[-2]
swhid = f"swh:1:dir:{id}"
if swhid not in done:
done.add(swhid)
for entry in directory:
if entry["type"] == "file":
swhid = f"swh:1:cnt:{entry['target']}"
if swhid not in done:
yield from content_get(entry["target_url"], done)
elif entry["type"] == "dir":
swhid = f"swh:1:dir:{entry['target']}"
if swhid not in done:
yield from directory_get(entry["target_url"], done)
def revision_get(url, done):
revision = get(url)
swhid = f"swh:1:rev:{revision['id']}"
if swhid not in done:
done.add(swhid)
yield revision
swhid = f"swh:1:dir:{revision['directory']}"
if swhid not in done:
yield from directory_get(revision["directory_url"], done)
for parent in revision["parents"]:
if f"swh:1:rev:{parent['id']}" not in done:
yield from revision_get(parent["url"], done)
def snapshot_get(url, done):
snapshot = get(url)
for branchname, branch in snapshot["branches"].items():
if branch:
yield from resolve_target(
branch["target_type"],
branch["target"],
branch["target_url"],
done,
)
def origin_get(url, done=None):
if done is None:
done = set()
visit = get(f"{API_URL}/origin/{url}/visit/latest/?require_snapshot=true")
if not visit.get("snapshot"):
return
swhid = f"swh:1:snp:{visit['snapshot']}"
if swhid not in done:
done.add(swhid)
snapshot_url = visit["snapshot_url"]
if snapshot_url:
yield from snapshot_get(snapshot_url, done)
def resolve_target(target_type, target, target_url, done):
if target_type == "revision":
if f"swh:1:rev:{target}" not in done:
yield from revision_get(target_url, done)
elif target_type == "release":
if f"swh:1:rel:{target}" not in done:
yield from release_get(target_url, done)
elif target_type == "directory":
if f"swh:1:dir:{target}" not in done:
yield from directory_get(target_url, done)
elif target_type == "content":
if f"swh:1:cnt:{target}" not in done:
yield from content_get(target_url, done)
elif target_type == "snapshot":
if f"swh:1:snp:{target}" not in done:
yield from snapshot_get(target_url, done)
# elif target_type == "alias":
# if f"swh:1:snp:{target}" not in done:
# yield from snapshot_get(target_url, done)
def release_get(url, done):
release = get(url)
swhid = f"swh:1:rel:{release['id']}"
if swhid not in done:
done.add(swhid)
yield release
yield from resolve_target(
release["target_type"], release["target"], release["target_url"], done
)
def branch_get(url):
branches = set()
visits = get(f"{API_URL}/origin/{url}/visits/")
for visit in visits:
snapshot_url = visit.get("snapshot_url")
while snapshot_url:
snapshot = get(snapshot_url)
for name, tgt in snapshot["branches"].items():
if tgt is not None:
branches.add(
(name, tgt["target_type"], tgt["target"], tgt["target_url"])
)
snapshot_url = snapshot["next_branch"]
return len(visits), branches
timing_stats = []
def get(url):
t0 = time.time()
resp = requests.get(url)
if resp.headers["content-type"].lower() == "application/json":
result = resp.json()
else:
result = resp.content
timing_stats.append(time.time() - t0)
return result
def post(url):
t0 = time.time()
resp = requests.post(url)
assert resp.status_code in (200, 201, 202)
if resp.headers["content-type"].lower() == "application/json":
result = resp.json()
else:
result = resp.content
timing_stats.append(time.time() - t0)
return result
def get_stats(origin):
result = {"origin": origin}
swhids = set()
list(origin_get(origin, done=swhids))
result["cnt"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:cnt:")])
result["dir"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:dir:")])
result["rev"] = len([swhid for swhid in swhids if swhid.startswith("swh:1:rev:")])
visits, branches = branch_get(origin)
result["visit"] = visits
result["release"] = len([br for br in branches if br[1] == "release"])
result["alias"] = len([br for br in branches if br[1] == "alias"])
result["branch"] = len([br for br in branches if br[1] == "revision"])
return result, swhids
def get_expected_stats():
cfg = {
"bootstrap.servers": "broker1.journal.staging.swh.network:9093",
"sasl.username": KAFKA_USERNAME,
"sasl.password": KAFKA_PASSWORD,
"group.id": KAFKA_GROUPID,
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-512",
"session.timeout.ms": 600000,
"max.poll.interval.ms": 3600000,
"message.max.bytes": 1000000000,
"auto.offset.reset": "earliest",
"enable.auto.commit": True,
"enable.partition.eof": True,
}
partitions = set()
def on_assign(cons, parts):
LOGGER.info("assignment %s", parts)
for p in parts:
partitions.add(p.partition)
consumer = Consumer(cfg)
consumer.subscribe(["swh.test.objects.stats"], on_assign=on_assign)
stats = {}
try:
while True:
msg = consumer.poll(timeout=10.0)
if msg is None:
if not partitions:
break
continue
if msg.error():
if msg.error().name() == "_PARTITION_EOF":
partitions.discard(msg.partition())
if not partitions:
break
else:
raise KafkaException(msg.error())
else:
# Proper message
k = msgpack.unpackb(msg.key())
v = msgpack.unpackb(msg.value())
LOGGER.info(
"%% %s [%d] at offset %d with key %s:\n",
msg.topic(),
msg.partition(),
msg.offset(),
k,
)
assert k == v["origin"]
stats[k] = v
except KeyboardInterrupt:
assert False, "%% Aborted by user"
return stats
def test_mirror(host, mirror_stack):
services = {k.format(mirror_stack): v for k, v in SERVICES.items()}
check_running_services(host, mirror_stack, services)
# run replayer services
for service_type in ("content", "graph"):
service = f"{mirror_stack}_{service_type}-replayer"
LOGGER.info("Scale %s to 1", service)
host.check_output(f"docker service scale -d {service}=1")
if not check_running_services(host, mirror_stack, {service: "1/1"}):
breakpoint()
logs = wait_for_log_entry(
host, service, f"Starting the SWH mirror {service_type} replayer"
)
assert len(logs) == 1
LOGGER.info("Scale %s to %d", service, SCALE)
host.check_output(f"docker service scale -d {service}={SCALE}")
check_running_services(host, mirror_stack, {service: f"{SCALE}/{SCALE}"})
logs = wait_for_log_entry(
host, service, f"Starting the SWH mirror {service_type} replayer", SCALE
)
assert len(logs) == SCALE
# wait for the replaying to be done (stop_on_oef is true)
LOGGER.info("Wait for %s to be done", service)
logs = wait_for_log_entry(host, service, "Done.", SCALE)
# >= SCALE below because replayer services may have been restarted
# (once done) before we scale them to 0
if not (len(logs) >= SCALE):
breakpoint()
assert len(logs) >= SCALE
LOGGER.info("Scale %s to 0", service)
check_running_services(host, mirror_stack, {service: f"0/{SCALE}"})
# TODO: check there are no error reported in redis after the replayers are done
origins = get(f"{API_URL}/origins/")
if False:
# check replicated archive is in good shape
expected_stats = get_expected_stats()
LOGGER.info("Check replicated archive")
# seems the graph replayer is OK, let's check the archive can tell something
expected_origins = sorted(expected_stats)
assert len(origins) == len(expected_origins)
assert sorted(o["url"] for o in origins) == expected_origins
for origin, expected in expected_stats.items():
timing_stats.clear()
assert origin == expected["origin"]
origin_stats, swhids = get_stats(origin)
LOGGER.info("%s", origin_stats)
LOGGER.info("%d REQS took %ss", len(timing_stats), sum(timing_stats))
assert origin_stats == expected
LOGGER.info("%s is OK", origin)
# test the vault service
cooks = []
# first start all the cookings
for origin in origins:
LOGGER.info("Cook HEAD for %s", origin["url"])
visit = get(
f"{API_URL}/origin/{origin['url']}/visit/latest/?require_snapshot=true"
)
assert visit
snp = get(visit["snapshot_url"])
assert snp
branches = snp.get("branches", {})
head = branches.get("HEAD")
assert head
while True:
if head["target_type"] == "alias":
head = branches[head["target"]]
elif head["target_type"] == "release":
head = get(head["target_url"])
elif head["target_type"] == "directory":
swhid = f"swh:1:dir:{head['target']}"
break
elif head["target_type"] == "revision":
rev = get(head["target_url"])
swhid = f"swh:1:dir:{rev['directory']}"
break
else:
breakpoint()
LOGGER.info("Directory is %s", swhid)
cook = post(f"{API_URL}/vault/flat/{swhid}/")
assert cook
assert cook["status"] in ("new", "pending")
cooks.append((origin["url"], swhid, cook))
# then wait for successful cooks
while not all(cook["status"] == "done" for _, _, cook in cooks):
origin, swhid, cook = cooks.pop(0)
cook = get(f"{API_URL}/vault/flat/{swhid}")
cooks.append((origin, swhid, cook))
# should all be in "done" status
for origin, swhid, cook in cooks:
assert cook["status"] == "done"
# so we can download it
tarfilecontent = get(cook["fetch_url"])
assert isinstance(tarfilecontent, bytes)
tarfileobj = tarfile.open(fileobj=BytesIO(tarfilecontent))
filelist = tarfileobj.getnames()
assert all(fname.startswith(swhid) for fname in filelist)
for path in filelist[1:]:
tarinfo = tarfileobj.getmember(path)
url = f"{API_URL}/directory/{quote(path[10:])}"
expected = get(url) # remove the 'swh:1:dir:' part
LOGGER.info("Retrieved from storage: %s → %s", url, expected)
if expected["type"] == "dir":
assert tarinfo.isdir()
elif expected["type"] == "file":
if expected["perms"] == 0o120000:
assert tarinfo.issym()
tgt = get(expected["target_url"])
symlnk = get(tgt["data_url"])
assert symlnk == tarinfo.linkpath.encode()
else:
assert tarinfo.isfile()
assert expected["length"] == tarinfo.size
assert (
sha1(tarfileobj.extractfile(tarinfo).read()).hexdigest()
== expected["checksums"]["sha1"]
)
else:
breakpoint()
pass
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 9:53 AM (5 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3288813
Attached To
rCDFP Deployment tools for hosting a mirror
Event Timeline
Log In to Comment