diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt --- a/requirements-swh-journal.txt +++ b/requirements-swh-journal.txt @@ -1 +1 @@ -swh.journal >= 0.2 +swh.journal >= 0.3.2 diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py --- a/swh/storage/backfill.py +++ b/swh/storage/backfill.py @@ -21,6 +21,7 @@ from swh.core.db import BaseDb from swh.journal.writer.kafka import KafkaJournalWriter from swh.storage.converters import db_to_release, db_to_revision +from swh.storage.replay import object_converter_fn logger = logging.getLogger(__name__) @@ -34,6 +35,7 @@ "snapshot": "id", "origin": "id", "origin_visit": "origin_visit.origin", + "origin_visit_status": "origin_visit_status.origin", } COLUMNS = { @@ -61,15 +63,15 @@ ("revision.id", "id"), "date", "date_offset", + "date_neg_utc_offset", "committer_date", "committer_date_offset", + "committer_date_neg_utc_offset", "type", "directory", "message", "synthetic", "metadata", - "date_neg_utc_offset", - "committer_date_neg_utc_offset", ( "array(select parent_id::bytea from revision_history rh " "where rh.id = revision.id order by rh.parent_rank asc)", @@ -88,10 +90,10 @@ ("release.id", "id"), "date", "date_offset", + "date_neg_utc_offset", "comment", ("release.name", "name"), "synthetic", - "date_neg_utc_offset", "target", "target_type", ("a.id", "author_id"), @@ -100,12 +102,19 @@ ("a.fullname", "author_fullname"), ], "snapshot": ["id", "object_id"], - "origin": ["type", "url"], + "origin": ["url"], "origin_visit": [ "visit", - "origin.type", - "origin_visit.type", - "url", + "type", + ("origin.url", "origin"), + "date", + "snapshot", + "status", + "metadata", + ], + "origin_visit_status": [ + "visit", + ("origin.url", "origin"), "date", "snapshot", "status", @@ -121,6 +130,7 @@ "person c on revision.committer=c.id", ], "origin_visit": ["origin on origin_visit.origin=origin.id"], + "origin_visit_status": ["origin on origin_visit_status.origin=origin.id"], } @@ -171,10 +181,7 @@ compatible objects. """ - release = db_to_release(release) - if "author" in release and release["author"]: - del release["author"]["id"] - return release + return db_to_release(release) def snapshot_converter(db, snapshot): @@ -204,22 +211,11 @@ return snapshot -def origin_visit_converter(db, origin_visit): - origin = { - "type": origin_visit.pop("origin.type"), - "url": origin_visit.pop("url"), - } - origin_visit["origin"] = origin - origin_visit["type"] = origin_visit.pop("origin_visit.type") - return origin_visit - - CONVERTERS = { "directory": directory_converter, "revision": revision_converter, "release": release_converter, "snapshot": snapshot_converter, - "origin_visit": origin_visit_converter, } @@ -307,6 +303,7 @@ "snapshot": lambda start, end: byte_ranges(16, start, end), "origin": integer_ranges, "origin_visit": integer_ranges, + "origin_visit_status": integer_ranges, } @@ -478,7 +475,8 @@ _format_range_bound(range_end), ) - for obj in fetch(db, object_type, start=range_start, end=range_end,): + for obj_d in fetch(db, object_type, start=range_start, end=range_end,): + obj = object_converter_fn[object_type](obj_d) if dry_run: continue writer.write_addition(object_type=object_type, object_=obj) diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py --- a/swh/storage/tests/test_backfill.py +++ b/swh/storage/tests/test_backfill.py @@ -4,9 +4,16 @@ # See top-level LICENSE file for more information import pytest +import functools +from unittest.mock import patch +from swh.storage import get_storage from swh.storage.backfill import JournalBackfiller, compute_query, PARTITION_KEY +from swh.storage.replay import process_replay_objects +from swh.storage.tests.test_replay import check_replayed +from swh.journal.client import JournalClient +from swh.journal.tests.journal_data import TEST_OBJECTS TEST_CONFIG = { "brokers": ["localhost"], @@ -108,9 +115,8 @@ assert column_aliases == [ "visit", - "origin.type", - "origin_visit.type", - "url", + "type", + "origin", "date", "snapshot", "status", @@ -120,7 +126,7 @@ assert ( query == """ -select visit,origin.type,origin_visit.type,url,date,snapshot,status,metadata +select visit,type,origin.url as origin,date,snapshot,status,metadata from origin_visit left join origin on origin_visit.origin=origin.id where (origin_visit.origin) >= %s and (origin_visit.origin) < %s @@ -137,10 +143,10 @@ "id", "date", "date_offset", + "date_neg_utc_offset", "comment", "name", "synthetic", - "date_neg_utc_offset", "target", "target_type", "author_id", @@ -152,9 +158,87 @@ assert ( query == """ -select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname +select release.id as id,date,date_offset,date_neg_utc_offset,comment,release.name as name,synthetic,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname from release left join person a on release.author=a.id where (release.id) >= %s and (release.id) < %s """ # noqa ) + + +RANGE_GENERATORS = { + "content": lambda start, end: [(None, None)], + "skipped_content": lambda start, end: [(None, None)], + "directory": lambda start, end: [(None, None)], + "revision": lambda start, end: [(None, None)], + "release": lambda start, end: [(None, None)], + "snapshot": lambda start, end: [(None, None)], + "origin": lambda start, end: [(None, 10000)], + "origin_visit": lambda start, end: [(None, 10000)], + "origin_visit_status": lambda start, end: [(None, 10000)], +} + + +@patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS) +def test_backfiller( + swh_storage_backend_config, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: str, +): + prefix1 = f"{kafka_prefix}-1" + prefix2 = f"{kafka_prefix}-2" + + journal1 = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer-1", + "prefix": prefix1, + } + swh_storage_backend_config["journal_writer"] = journal1 + storage = get_storage(**swh_storage_backend_config) + + # fill the storage and the journal (under prefix1) + for object_type, objects in TEST_OBJECTS.items(): + method = getattr(storage, object_type + "_add") + method(objects) + + # now apply the backfiller on the storage to fill the journal under prefix2 + backfiller_config = { + "brokers": [kafka_server], + "client_id": "kafka_writer-2", + "prefix": prefix2, + "storage_dbconn": swh_storage_backend_config["db"], + } + + # Backfilling + backfiller = JournalBackfiller(backfiller_config) + for object_type in TEST_OBJECTS: + backfiller.run(object_type, None, None) + + # now check journal content are the same under both topics + # use the replayer scaffolding to fill storages to make is a bit easier + # Replaying #1 + sto1 = get_storage(cls="memory") + replayer1 = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=prefix1, + stop_on_eof=True, + ) + worker_fn1 = functools.partial(process_replay_objects, storage=sto1) + replayer1.process(worker_fn1) + + # Replaying #2 + sto2 = get_storage(cls="memory") + replayer2 = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=prefix2, + stop_on_eof=True, + ) + worker_fn2 = functools.partial(process_replay_objects, storage=sto2) + replayer2.process(worker_fn2) + + # Compare storages + check_replayed(sto1, sto2)