diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt --- a/requirements-swh-journal.txt +++ b/requirements-swh-journal.txt @@ -1 +1 @@ -swh.journal >= 0.4 +swh.journal >= 0.5.1 diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py --- a/swh/storage/backfill.py +++ b/swh/storage/backfill.py @@ -19,7 +19,6 @@ from typing import Any, Callable, Dict from swh.core.db import BaseDb -from swh.journal.writer import get_journal_writer from swh.model.model import ( BaseModel, Directory, @@ -37,6 +36,7 @@ db_to_revision, ) from swh.storage.replay import object_converter_fn +from swh.storage.writer import JournalWriter logger = logging.getLogger(__name__) @@ -526,7 +526,9 @@ ) db = BaseDb.connect(self.config["storage"]["db"]) - writer = get_journal_writer(cls="kafka", **self.config["journal_writer"]) + writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]}) + assert writer.journal is not None + for range_start, range_end in RANGE_GENERATORS[object_type]( start_object, end_object ): @@ -540,9 +542,9 @@ for obj in fetch(db, object_type, start=range_start, end=range_end,): if dry_run: continue - writer.write_addition(object_type=object_type, object_=obj) + writer.write_addition(object_type, obj) - writer.producer.flush() + writer.journal.producer.flush() if __name__ == "__main__": diff --git a/swh/storage/writer.py b/swh/storage/writer.py --- a/swh/storage/writer.py +++ b/swh/storage/writer.py @@ -29,6 +29,15 @@ # mypy limitation, see https://github.com/python/mypy/issues/1153 +def model_object_dict_sanitizer( + object_type: str, object_dict: Dict[str, Any] +) -> Dict[str, str]: + object_dict = object_dict.copy() + if object_type == "content": + object_dict.pop("data", None) + return object_dict + + class JournalWriter: """Journal writer storage collaborator. It's in charge of adding objects to the journal. @@ -42,13 +51,19 @@ "You need the swh.journal package to use the " "journal_writer feature" ) - self.journal = get_journal_writer(**journal_writer) + self.journal = get_journal_writer( + value_sanitizer=model_object_dict_sanitizer, **journal_writer + ) else: self.journal = None - def write_additions(self, obj_type, values) -> None: + def write_addition(self, object_type, value) -> None: + if self.journal: + self.journal.write_addition(object_type, value) + + def write_additions(self, object_type, values) -> None: if self.journal: - self.journal.write_additions(obj_type, values) + self.journal.write_additions(object_type, values) def content_add(self, contents: Iterable[Content]) -> None: """Add contents to the journal. Drop the data field if provided.