Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9346508
D3140.id11158.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
D3140.id11158.diff
View Options
diff --git a/swh/storage/replay.py b/swh/storage/replay.py
--- a/swh/storage/replay.py
+++ b/swh/storage/replay.py
@@ -101,6 +101,8 @@
objects = fix_objects(object_type, objects)
if object_type == "content":
+ # for bw compat, skipped content should now be delivered in the skipped_content
+ # topic
contents: List[BaseContent] = []
skipped_contents: List[BaseContent] = []
for content in objects:
@@ -109,9 +111,11 @@
skipped_contents.append(c)
else:
contents.append(c)
-
collision_aware_content_add(storage.skipped_content_add, skipped_contents)
collision_aware_content_add(storage.content_add_metadata, contents)
+ if object_type == "skipped_content":
+ skipped_contents = [SkippedContent.from_dict(obj) for obj in objects]
+ collision_aware_content_add(storage.skipped_content_add, skipped_contents)
elif object_type == "origin_visit":
visits: List[OriginVisit] = []
origins: List[Origin] = []
@@ -121,7 +125,7 @@
origins.append(Origin(url=visit.origin))
storage.origin_add(origins)
storage.origin_visit_upsert(visits)
- elif object_type in ("directory", "revision", "release", "snapshot", "origin"):
+ elif object_type in ("directory", "revision", "release", "snapshot", "origin",):
method = getattr(storage, object_type + "_add")
method(object_converter_fn[object_type](o) for o in objects)
else:
diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py
--- a/swh/storage/tests/test_replay.py
+++ b/swh/storage/tests/test_replay.py
@@ -15,7 +15,7 @@
import pytest
-from swh.model.hashutil import hash_to_hex
+from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS
from swh.model.model import Content
from swh.storage import get_storage
@@ -311,7 +311,7 @@
assert vin == vout
-def test_write_replay_origin_visit():
+def test_write_replay_origin_visit(kafka_server):
"""Test origin_visit when the 'origin' is just a string."""
now = datetime.datetime.now(tz=UTC)
visits = [
@@ -388,3 +388,80 @@
}
]
_test_write_replay_origin_visit(visits)
+
+
+def test_replay_skipped_content(kafka_server, kafka_prefix):
+ """Test the 'skipped_content' topic is properly replayed."""
+ _check_replay_skipped_content(kafka_server, kafka_prefix, "skipped_content")
+
+
+def test_replay_skipped_content_bwcompat(kafka_server, kafka_prefix):
+ """Test the 'content' topic can be used to replay SkippedContent objects."""
+ _check_replay_skipped_content(kafka_server, kafka_prefix, "content")
+
+
+def _check_replay_skipped_content(kafka_server, kafka_prefix, topic):
+ skipped_contents = _gen_skipped_contents(100)
+ nb_sent = len(skipped_contents)
+
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test producer",
+ "acks": "all",
+ }
+ )
+ replayer = JournalClient(
+ brokers=[kafka_server],
+ group_id="test consumer",
+ prefix=kafka_prefix,
+ stop_after_objects=nb_sent,
+ )
+ assert f"{kafka_prefix}.skipped_content" in replayer.subscription
+
+ for i, obj in enumerate(skipped_contents):
+ obj.pop("data", None)
+ producer.produce(
+ topic=f"{kafka_prefix}.{topic}",
+ key=key_to_kafka({"sha1": obj["sha1"]}),
+ value=value_to_kafka(obj),
+ )
+ producer.flush()
+
+ storage = get_storage(cls="memory")
+
+ worker_fn = functools.partial(process_replay_objects, storage=storage)
+ nb_inserted = 0
+ while nb_inserted < nb_sent:
+ nb_inserted += replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+ for content in skipped_contents:
+ assert not storage.content_find({"sha1": content["sha1"]})
+
+ # no skipped_content_find API endpoint, so use this instead
+ assert not list(storage.skipped_content_missing(skipped_contents))
+
+
+def _updated(d1, d2):
+ d1.update(d2)
+ d1.pop("data", None)
+ return d1
+
+
+def _gen_skipped_contents(n=10):
+ # we do not use the hypothesis strategy here because this does not plays well with
+ # pytest fixtures, and it makes test execution very slow
+ algos = DEFAULT_ALGORITHMS | {"length"}
+ now = datetime.datetime.now()
+ return [
+ _updated(
+ MultiHash.from_data(data=f"foo{i}".encode(), hash_names=algos).digest(),
+ {
+ "status": "absent",
+ "reason": "why not",
+ "origin": f"https://somewhere/{i}",
+ "ctime": now,
+ },
+ )
+ for i in range(n)
+ ]
diff --git a/swh/storage/writer.py b/swh/storage/writer.py
--- a/swh/storage/writer.py
+++ b/swh/storage/writer.py
@@ -61,7 +61,7 @@
self.content_add(contents)
def skipped_content_add(self, contents: Iterable[SkippedContent]) -> None:
- self.write_additions("content", contents)
+ self.write_additions("skipped_content", contents)
def directory_add(self, directories: Iterable[Directory]) -> None:
self.write_additions("directory", directories)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 4:07 PM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222932
Attached To
D3140: journal: add a skipped_content topic dedicated to SkippedContent objects
Event Timeline
Log In to Comment