Page MenuHomeSoftware Heritage

D3299.diff
No OneTemporary

D3299.diff

diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt
--- a/requirements-swh-journal.txt
+++ b/requirements-swh-journal.txt
@@ -1 +1 @@
-swh.journal >= 0.2
+swh.journal >= 0.3.2
diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py
--- a/swh/storage/backfill.py
+++ b/swh/storage/backfill.py
@@ -21,6 +21,7 @@
from swh.core.db import BaseDb
from swh.journal.writer.kafka import KafkaJournalWriter
from swh.storage.converters import db_to_release, db_to_revision
+from swh.storage.replay import object_converter_fn
logger = logging.getLogger(__name__)
@@ -34,6 +35,7 @@
"snapshot": "id",
"origin": "id",
"origin_visit": "origin_visit.origin",
+ "origin_visit_status": "origin_visit_status.origin",
}
COLUMNS = {
@@ -61,15 +63,15 @@
("revision.id", "id"),
"date",
"date_offset",
+ "date_neg_utc_offset",
"committer_date",
"committer_date_offset",
+ "committer_date_neg_utc_offset",
"type",
"directory",
"message",
"synthetic",
"metadata",
- "date_neg_utc_offset",
- "committer_date_neg_utc_offset",
(
"array(select parent_id::bytea from revision_history rh "
"where rh.id = revision.id order by rh.parent_rank asc)",
@@ -88,10 +90,10 @@
("release.id", "id"),
"date",
"date_offset",
+ "date_neg_utc_offset",
"comment",
("release.name", "name"),
"synthetic",
- "date_neg_utc_offset",
"target",
"target_type",
("a.id", "author_id"),
@@ -100,12 +102,19 @@
("a.fullname", "author_fullname"),
],
"snapshot": ["id", "object_id"],
- "origin": ["type", "url"],
+ "origin": ["url"],
"origin_visit": [
"visit",
- "origin.type",
- "origin_visit.type",
- "url",
+ "type",
+ ("origin.url", "origin"),
+ "date",
+ "snapshot",
+ "status",
+ "metadata",
+ ],
+ "origin_visit_status": [
+ "visit",
+ ("origin.url", "origin"),
"date",
"snapshot",
"status",
@@ -121,6 +130,7 @@
"person c on revision.committer=c.id",
],
"origin_visit": ["origin on origin_visit.origin=origin.id"],
+ "origin_visit_status": ["origin on origin_visit_status.origin=origin.id"],
}
@@ -171,10 +181,7 @@
compatible objects.
"""
- release = db_to_release(release)
- if "author" in release and release["author"]:
- del release["author"]["id"]
- return release
+ return db_to_release(release)
def snapshot_converter(db, snapshot):
@@ -204,22 +211,11 @@
return snapshot
-def origin_visit_converter(db, origin_visit):
- origin = {
- "type": origin_visit.pop("origin.type"),
- "url": origin_visit.pop("url"),
- }
- origin_visit["origin"] = origin
- origin_visit["type"] = origin_visit.pop("origin_visit.type")
- return origin_visit
-
-
CONVERTERS = {
"directory": directory_converter,
"revision": revision_converter,
"release": release_converter,
"snapshot": snapshot_converter,
- "origin_visit": origin_visit_converter,
}
@@ -307,6 +303,7 @@
"snapshot": lambda start, end: byte_ranges(16, start, end),
"origin": integer_ranges,
"origin_visit": integer_ranges,
+ "origin_visit_status": integer_ranges,
}
@@ -478,7 +475,8 @@
_format_range_bound(range_end),
)
- for obj in fetch(db, object_type, start=range_start, end=range_end,):
+ for obj_d in fetch(db, object_type, start=range_start, end=range_end,):
+ obj = object_converter_fn[object_type](obj_d)
if dry_run:
continue
writer.write_addition(object_type=object_type, object_=obj)
diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py
--- a/swh/storage/tests/test_backfill.py
+++ b/swh/storage/tests/test_backfill.py
@@ -4,9 +4,16 @@
# See top-level LICENSE file for more information
import pytest
+import functools
+from unittest.mock import patch
+from swh.storage import get_storage
from swh.storage.backfill import JournalBackfiller, compute_query, PARTITION_KEY
+from swh.storage.replay import process_replay_objects
+from swh.storage.tests.test_replay import check_replayed
+from swh.journal.client import JournalClient
+from swh.journal.tests.journal_data import TEST_OBJECTS
TEST_CONFIG = {
"brokers": ["localhost"],
@@ -108,9 +115,8 @@
assert column_aliases == [
"visit",
- "origin.type",
- "origin_visit.type",
- "url",
+ "type",
+ "origin",
"date",
"snapshot",
"status",
@@ -120,7 +126,7 @@
assert (
query
== """
-select visit,origin.type,origin_visit.type,url,date,snapshot,status,metadata
+select visit,type,origin.url as origin,date,snapshot,status,metadata
from origin_visit
left join origin on origin_visit.origin=origin.id
where (origin_visit.origin) >= %s and (origin_visit.origin) < %s
@@ -137,10 +143,10 @@
"id",
"date",
"date_offset",
+ "date_neg_utc_offset",
"comment",
"name",
"synthetic",
- "date_neg_utc_offset",
"target",
"target_type",
"author_id",
@@ -152,9 +158,87 @@
assert (
query
== """
-select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname
+select release.id as id,date,date_offset,date_neg_utc_offset,comment,release.name as name,synthetic,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname
from release
left join person a on release.author=a.id
where (release.id) >= %s and (release.id) < %s
""" # noqa
)
+
+
+RANGE_GENERATORS = {
+ "content": lambda start, end: [(None, None)],
+ "skipped_content": lambda start, end: [(None, None)],
+ "directory": lambda start, end: [(None, None)],
+ "revision": lambda start, end: [(None, None)],
+ "release": lambda start, end: [(None, None)],
+ "snapshot": lambda start, end: [(None, None)],
+ "origin": lambda start, end: [(None, 10000)],
+ "origin_visit": lambda start, end: [(None, 10000)],
+ "origin_visit_status": lambda start, end: [(None, 10000)],
+}
+
+
+@patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS)
+def test_backfiller(
+ swh_storage_backend_config,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: str,
+):
+ prefix1 = f"{kafka_prefix}-1"
+ prefix2 = f"{kafka_prefix}-2"
+
+ journal1 = {
+ "cls": "kafka",
+ "brokers": [kafka_server],
+ "client_id": "kafka_writer-1",
+ "prefix": prefix1,
+ }
+ swh_storage_backend_config["journal_writer"] = journal1
+ storage = get_storage(**swh_storage_backend_config)
+
+ # fill the storage and the journal (under prefix1)
+ for object_type, objects in TEST_OBJECTS.items():
+ method = getattr(storage, object_type + "_add")
+ method(objects)
+
+ # now apply the backfiller on the storage to fill the journal under prefix2
+ backfiller_config = {
+ "brokers": [kafka_server],
+ "client_id": "kafka_writer-2",
+ "prefix": prefix2,
+ "storage_dbconn": swh_storage_backend_config["db"],
+ }
+
+ # Backfilling
+ backfiller = JournalBackfiller(backfiller_config)
+ for object_type in TEST_OBJECTS:
+ backfiller.run(object_type, None, None)
+
+ # now check journal content are the same under both topics
+ # use the replayer scaffolding to fill storages to make is a bit easier
+ # Replaying #1
+ sto1 = get_storage(cls="memory")
+ replayer1 = JournalClient(
+ brokers=kafka_server,
+ group_id=f"{kafka_consumer_group}-1",
+ prefix=prefix1,
+ stop_on_eof=True,
+ )
+ worker_fn1 = functools.partial(process_replay_objects, storage=sto1)
+ replayer1.process(worker_fn1)
+
+ # Replaying #2
+ sto2 = get_storage(cls="memory")
+ replayer2 = JournalClient(
+ brokers=kafka_server,
+ group_id=f"{kafka_consumer_group}-2",
+ prefix=prefix2,
+ stop_on_eof=True,
+ )
+ worker_fn2 = functools.partial(process_replay_objects, storage=sto2)
+ replayer2.process(worker_fn2)
+
+ # Compare storages
+ check_replayed(sto1, sto2)

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 17, 5:50 PM (2 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222637

Event Timeline