diff --git a/swh/storage/replay.py b/swh/storage/replay.py --- a/swh/storage/replay.py +++ b/swh/storage/replay.py @@ -101,6 +101,8 @@ objects = fix_objects(object_type, objects) if object_type == "content": + # for bw compat, skipped content should now be delivered in the skipped_content + # topic contents: List[BaseContent] = [] skipped_contents: List[BaseContent] = [] for content in objects: @@ -109,9 +111,11 @@ skipped_contents.append(c) else: contents.append(c) - collision_aware_content_add(storage.skipped_content_add, skipped_contents) collision_aware_content_add(storage.content_add_metadata, contents) + if object_type == "skipped_content": + skipped_contents = [SkippedContent.from_dict(obj) for obj in objects] + collision_aware_content_add(storage.skipped_content_add, skipped_contents) elif object_type == "origin_visit": visits: List[OriginVisit] = [] origins: List[Origin] = [] @@ -121,7 +125,7 @@ origins.append(Origin(url=visit.origin)) storage.origin_add(origins) storage.origin_visit_upsert(visits) - elif object_type in ("directory", "revision", "release", "snapshot", "origin"): + elif object_type in ("directory", "revision", "release", "snapshot", "origin",): method = getattr(storage, object_type + "_add") method(object_converter_fn[object_type](o) for o in objects) else: diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -15,7 +15,7 @@ import pytest -from swh.model.hashutil import hash_to_hex +from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS from swh.model.model import Content from swh.storage import get_storage @@ -311,7 +311,7 @@ assert vin == vout -def test_write_replay_origin_visit(): +def test_write_replay_origin_visit(kafka_server): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now(tz=UTC) visits = [ @@ -388,3 +388,78 @@ } ] _test_write_replay_origin_visit(visits) + + +def test_replay_skipped_content(kafka_server, kafka_prefix): + """Test the 'skipped_content' topic is properly replayed.""" + _check_replay_skipped_content(kafka_server, kafka_prefix, "skipped_content") + + +def test_replay_skipped_content_bwcompat(kafka_server, kafka_prefix): + """Test the 'content' topic can be used to replay SkippedContent objects.""" + _check_replay_skipped_content(kafka_server, kafka_prefix, "content") + + +def _check_replay_skipped_content(kafka_server, kafka_prefix, topic): + skipped_contents = _gen_skipped_contents(100) + nb_sent = len(skipped_contents) + + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test producer", + "acks": "all", + } + ) + replayer = JournalClient( + brokers=[kafka_server], + group_id="test consumer", + prefix=kafka_prefix, + stop_after_objects=nb_sent, + ) + assert f"{kafka_prefix}.skipped_content" in replayer.subscription + + for i, obj in enumerate(skipped_contents): + obj.pop("data", None) + producer.produce( + topic=f"{kafka_prefix}.{topic}", + key=key_to_kafka({"sha1": obj["sha1"]}), + value=value_to_kafka(obj), + ) + producer.flush() + + storage = get_storage(cls="memory") + + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted + for content in skipped_contents: + assert not storage.content_find({"sha1": content["sha1"]}) + + # no skipped_content_find API endpoint, so use this instead + assert not list(storage.skipped_content_missing(skipped_contents)) + + +def _updated(d1, d2): + d1.update(d2) + d1.pop("data", None) + return d1 + + +def _gen_skipped_contents(n=10): + # we do not use the hypothesis strategy here because this does not play well with + # pytest fixtures, and it makes test execution very slow + algos = DEFAULT_ALGORITHMS | {"length"} + now = datetime.datetime.now(tz=UTC) + return [ + _updated( + MultiHash.from_data(data=f"foo{i}".encode(), hash_names=algos).digest(), + { + "status": "absent", + "reason": "why not", + "origin": f"https://somewhere/{i}", + "ctime": now, + }, + ) + for i in range(n) + ] diff --git a/swh/storage/writer.py b/swh/storage/writer.py --- a/swh/storage/writer.py +++ b/swh/storage/writer.py @@ -61,7 +61,7 @@ self.content_add(contents) def skipped_content_add(self, contents: Iterable[SkippedContent]) -> None: - self.write_additions("content", contents) + self.write_additions("skipped_content", contents) def directory_add(self, directories: Iterable[Directory]) -> None: self.write_additions("directory", directories)