Page MenuHomeSoftware Heritage

D3140.id11158.diff
No OneTemporary

D3140.id11158.diff

diff --git a/swh/storage/replay.py b/swh/storage/replay.py
--- a/swh/storage/replay.py
+++ b/swh/storage/replay.py
@@ -101,6 +101,8 @@
objects = fix_objects(object_type, objects)
if object_type == "content":
+ # for bw compat, skipped content should now be delivered in the skipped_content
+ # topic
contents: List[BaseContent] = []
skipped_contents: List[BaseContent] = []
for content in objects:
@@ -109,9 +111,11 @@
skipped_contents.append(c)
else:
contents.append(c)
-
collision_aware_content_add(storage.skipped_content_add, skipped_contents)
collision_aware_content_add(storage.content_add_metadata, contents)
+ if object_type == "skipped_content":
+ skipped_contents = [SkippedContent.from_dict(obj) for obj in objects]
+ collision_aware_content_add(storage.skipped_content_add, skipped_contents)
elif object_type == "origin_visit":
visits: List[OriginVisit] = []
origins: List[Origin] = []
@@ -121,7 +125,7 @@
origins.append(Origin(url=visit.origin))
storage.origin_add(origins)
storage.origin_visit_upsert(visits)
- elif object_type in ("directory", "revision", "release", "snapshot", "origin"):
+ elif object_type in ("directory", "revision", "release", "snapshot", "origin",):
method = getattr(storage, object_type + "_add")
method(object_converter_fn[object_type](o) for o in objects)
else:
diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py
--- a/swh/storage/tests/test_replay.py
+++ b/swh/storage/tests/test_replay.py
@@ -15,7 +15,7 @@
import pytest
-from swh.model.hashutil import hash_to_hex
+from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS
from swh.model.model import Content
from swh.storage import get_storage
@@ -311,7 +311,7 @@
assert vin == vout
-def test_write_replay_origin_visit():
+def test_write_replay_origin_visit(kafka_server):
"""Test origin_visit when the 'origin' is just a string."""
now = datetime.datetime.now(tz=UTC)
visits = [
@@ -388,3 +388,80 @@
}
]
_test_write_replay_origin_visit(visits)
+
+
+def test_replay_skipped_content(kafka_server, kafka_prefix):
+ """Test the 'skipped_content' topic is properly replayed."""
+ _check_replay_skipped_content(kafka_server, kafka_prefix, "skipped_content")
+
+
+def test_replay_skipped_content_bwcompat(kafka_server, kafka_prefix):
+ """Test the 'content' topic can be used to replay SkippedContent objects."""
+ _check_replay_skipped_content(kafka_server, kafka_prefix, "content")
+
+
+def _check_replay_skipped_content(kafka_server, kafka_prefix, topic):
+ skipped_contents = _gen_skipped_contents(100)
+ nb_sent = len(skipped_contents)
+
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test producer",
+ "acks": "all",
+ }
+ )
+ replayer = JournalClient(
+ brokers=[kafka_server],
+ group_id="test consumer",
+ prefix=kafka_prefix,
+ stop_after_objects=nb_sent,
+ )
+ assert f"{kafka_prefix}.skipped_content" in replayer.subscription
+
+ for i, obj in enumerate(skipped_contents):
+ obj.pop("data", None)
+ producer.produce(
+ topic=f"{kafka_prefix}.{topic}",
+ key=key_to_kafka({"sha1": obj["sha1"]}),
+ value=value_to_kafka(obj),
+ )
+ producer.flush()
+
+ storage = get_storage(cls="memory")
+
+ worker_fn = functools.partial(process_replay_objects, storage=storage)
+ nb_inserted = 0
+ while nb_inserted < nb_sent:
+ nb_inserted += replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+ for content in skipped_contents:
+ assert not storage.content_find({"sha1": content["sha1"]})
+
+ # no skipped_content_find API endpoint, so use this instead
+ assert not list(storage.skipped_content_missing(skipped_contents))
+
+
+def _updated(d1, d2):
+ d1.update(d2)
+ d1.pop("data", None)
+ return d1
+
+
+def _gen_skipped_contents(n=10):
+ # we do not use the hypothesis strategy here because this does not plays well with
+ # pytest fixtures, and it makes test execution very slow
+ algos = DEFAULT_ALGORITHMS | {"length"}
+ now = datetime.datetime.now()
+ return [
+ _updated(
+ MultiHash.from_data(data=f"foo{i}".encode(), hash_names=algos).digest(),
+ {
+ "status": "absent",
+ "reason": "why not",
+ "origin": f"https://somewhere/{i}",
+ "ctime": now,
+ },
+ )
+ for i in range(n)
+ ]
diff --git a/swh/storage/writer.py b/swh/storage/writer.py
--- a/swh/storage/writer.py
+++ b/swh/storage/writer.py
@@ -61,7 +61,7 @@
self.content_add(contents)
def skipped_content_add(self, contents: Iterable[SkippedContent]) -> None:
- self.write_additions("content", contents)
+ self.write_additions("skipped_content", contents)
def directory_add(self, directories: Iterable[Directory]) -> None:
self.write_additions("directory", directories)

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 4:07 PM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222932

Event Timeline