diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. -swh.core[http] >= 0.3 +swh.core[http] >= 1.0.0 swh.objstorage >= 0.2.2 swh.journal >= 0.4.2 diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py --- a/swh/objstorage/replayer/tests/test_cli.py +++ b/swh/objstorage/replayer/tests/test_cli.py @@ -88,7 +88,7 @@ NUM_CONTENTS = 10 -def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages): +def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorage): producer = Producer( { "bootstrap.servers": kafka_server, @@ -100,7 +100,7 @@ contents = {} for i in range(NUM_CONTENTS): content = b"\x00" * 19 + bytes([i]) - sha1 = objstorages["src"].add(content) + sha1 = objstorage.add(content) contents[sha1] = content producer.produce( topic=kafka_prefix + ".content", @@ -121,7 +121,9 @@ kafka_server: Tuple[Popen, int], ): - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) result = invoke( "replay", @@ -152,7 +154,9 @@ caplog, ): - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") @@ -192,7 +196,9 @@ caplog, ): - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) # Setup log capture to fish the consumer settings out of the log messages caplog.set_level(logging.DEBUG, "swh.journal.client") @@ -239,7 +245,9 @@ kafka_server: Tuple[Popen, int], ): - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode="w+b") as fd: @@ -293,12 +301,13 @@ caplog, ): - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break - objstorages["dst"].add(content, obj_id=sha1) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") @@ -371,8 +380,9 @@ monkeypatch_retry_sleep, ): - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: @@ -414,7 +424,9 @@ monkeypatch_retry_sleep, ): - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) add_failures = {} get_failures = {} @@ -503,7 +515,9 @@ caplog, ): - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) num_contents_deleted = 5 contents_deleted = set() diff --git a/swh/objstorage/replayer/tests/test_statsd.py b/swh/objstorage/replayer/tests/test_statsd.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/replayer/tests/test_statsd.py @@ -0,0 +1,121 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools +import re + +import pytest + +from swh.journal.client import JournalClient +from swh.journal.writer import get_journal_writer +from swh.model.model import Content +from swh.objstorage.factory import get_objstorage +from swh.objstorage.replayer import replay +from swh.objstorage.replayer.replay import process_replay_objects_content + + +@pytest.fixture +def statsd(monkeypatch, statsd): + monkeypatch.setattr(replay, "statsd", statsd) + yield statsd + + +def test_replay_statds(kafka_server, kafka_prefix, kafka_consumer_group, statsd): + objstorage1 = get_objstorage(cls="memory") + objstorage2 = get_objstorage(cls="memory") + + writer = get_journal_writer( + cls="kafka", + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + anonymize=False, + ) + + # Fill the source objstorage with a bunch of content object. In the end, + # there should be 2 content objects for each possible replaying decision + # (aka. skipped, excluded, in_dst, not_in_src, failed and copied): + # contents[0:2] are properly copied + # contents[2:4] are excluded + # contents[4:6] are in dst + # contents[6:8] are hidden + contents = [ + Content.from_data( + f"foo{i}".encode(), status="hidden" if 6 <= i < 8 else "visible" + ) + for i in range(8) + ] + + for content in contents: + objstorage1.add(content.data) + writer.write_addition("content", content) + excluded = [c.sha1 for c in contents[2:4]] + + def exclude_fn(cnt_d): + return cnt_d["sha1"] in excluded + + for content in contents[4:6]: + objstorage2.add(content.data) + + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_on_eof=True, + # stop_after_objects=len(objects), + ) + + worker_fn = functools.partial( + process_replay_objects_content, + src=objstorage1, + dst=objstorage2, + exclude_fn=exclude_fn, + ) + replayer.process(worker_fn) + + # We cannot expect any order from replayed objects, so statsd reports won't + # be sorted according to contents, so we just count the expected occurrence + # of each statsd message. + prefix = "swh_content_replayer" + expected_report_re = ( + f"^{prefix}_duration_seconds:[0-9]+[.][0-9]+[|]ms[|]#request:get$", + f"^{prefix}_duration_seconds:[0-9]+[.][0-9]+[|]ms[|]#request:put$", + f"^{prefix}_bytes:4[|]c$", + ) + expected_reports = ( + 2, + 2, + 2, + ) + decisions = ("copied", "skipped", "excluded", "in_dst", "not_in_src", "failed") + decision_re = ( + "^swh_content_replayer_operations_total:1[|]c[|]#decision:(?P" + + "|".join(decisions) + + ")(?P,.+)?$" + ) + + operations = dict.fromkeys(decisions, 0) + reports = dict.fromkeys(expected_report_re, 0) + + for report in (r.decode() for r in statsd.socket.payloads): + m = re.match(decision_re, report) + if m: + operations[m.group("decision")] += 1 + else: + for expected in expected_report_re: + m = re.match(expected, report) + if m: + reports[expected] += 1 + + assert operations["skipped"] == 2 + assert operations["excluded"] == 2 + assert operations["in_dst"] == 2 + assert operations["copied"] == 2 + for report_re, expected in zip(expected_report_re, expected_reports): + assert reports[report_re] == expected + + # TODO: + assert operations["not_in_src"] == 0 + assert operations["failed"] == 0