diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt --- a/requirements-swh-journal.txt +++ b/requirements-swh-journal.txt @@ -1 +1 @@ -swh.journal >= 0.9 +swh.journal >= 1.2 diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py --- a/swh/storage/backfill.py +++ b/swh/storage/backfill.py @@ -598,6 +598,15 @@ def __init__(self, config=None): self.config = config self.check_config(config) + self._db = None + self.writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]}) + assert self.writer.journal is not None + + @property + def db(self): + if self._db is None: + self._db = BaseDb.connect(self.config["storage"]["db"]) + return self._db def check_config(self, config): missing_keys = [] @@ -653,10 +662,6 @@ object_type, start_object, end_object ) - db = BaseDb.connect(self.config["storage"]["db"]) - writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]}) - assert writer.journal is not None - for range_start, range_end in RANGE_GENERATORS[object_type]( start_object, end_object ): @@ -667,10 +672,10 @@ _format_range_bound(range_end), ) - objects = fetch(db, object_type, start=range_start, end=range_end) + objects = fetch(self.db, object_type, start=range_start, end=range_end) if not dry_run: - writer.write_additions(object_type, objects) + self.writer.write_additions(object_type, objects) else: # only consume the objects iterator to check for any potential # decoding/encoding errors diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -5,7 +5,17 @@ import datetime from enum import Enum -from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + List, + Optional, + Sequence, + Tuple, + TypeVar, +) import attr from typing_extensions import Protocol, TypedDict, runtime_checkable @@ -34,6 +44,9 @@ ) from swh.model.swhids import ExtendedSWHID, ObjectType +if TYPE_CHECKING: + from swh.storage.writer import JournalWriter + class ListOrder(Enum): """Specifies the order for paginated endpoints returning sorted results.""" @@ -76,6 +89,8 @@ @runtime_checkable class StorageInterface(Protocol): + journal_writer: Optional["JournalWriter"] + @remote_api_endpoint("check_config") def check_config(self, *, check_write: bool) -> bool: """Check that the storage is configured and ready to go.""" diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py --- a/swh/storage/tests/test_backfill.py +++ b/swh/storage/tests/test_backfill.py @@ -235,6 +235,7 @@ "brokers": [kafka_server], "client_id": "kafka_writer-1", "prefix": prefix1, + "auto_flush": False, } swh_storage_backend_config["journal_writer"] = journal1 storage = get_storage(**swh_storage_backend_config) @@ -242,6 +243,8 @@ for object_type, objects in TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") method(objects) + assert storage.journal_writer is not None + storage.journal_writer.journal.flush() # now apply the backfiller on the storage to fill the journal under prefix2 backfiller_config = { @@ -249,6 +252,7 @@ "brokers": [kafka_server], "client_id": "kafka_writer-2", "prefix": prefix2, + "auto_flush": False, }, "storage": swh_storage_backend_config, } @@ -257,6 +261,7 @@ backfiller = JournalBackfiller(backfiller_config) for object_type in TEST_OBJECTS: backfiller.run(object_type, None, None) + backfiller.writer.journal.flush() # Trace log messages for unhandled object types in the replayer caplog.set_level(logging.DEBUG, "swh.storage.replay") diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py --- a/swh/storage/tests/test_kafka_writer.py +++ b/swh/storage/tests/test_kafka_writer.py @@ -25,6 +25,7 @@ "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, + "auto_flush": False, } storage_config: Dict[str, Any] = { "cls": "pipeline", @@ -58,6 +59,8 @@ expected_messages += len(objs) else: assert False, obj_type + assert storage.journal_writer is not None + storage.journal_writer.journal.flush() existing_topics = set( topic @@ -97,6 +100,7 @@ "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": True, + "auto_flush": False, } storage_config: Dict[str, Any] = { "cls": "pipeline", @@ -117,6 +121,8 @@ method = getattr(storage, obj_type + "_add") method(objs) expected_messages += len(objs) + assert storage.journal_writer is not None + storage.journal_writer.journal.flush() existing_topics = set( topic diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -70,6 +70,7 @@ "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, + "auto_flush": False, } storage_config: Dict[str, Any] = { "cls": "memory", @@ -106,6 +107,7 @@ method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) + src.journal_writer.journal.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") @@ -145,6 +147,7 @@ method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) + src.journal_writer.journal.flush() # Create collision in input data # These should not be written in the destination @@ -341,6 +344,7 @@ "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": True, + "auto_flush": False, } src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config} @@ -355,6 +359,8 @@ method = getattr(storage, obj_type + "_add") method(objs) nb_sent += len(objs) + assert storage.journal_writer is not None + storage.journal_writer.journal.flush() # Fill a destination storage from Kafka, potentially using privileged topics dst_storage = get_storage(cls="memory") @@ -404,6 +410,7 @@ method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) + src.journal_writer.journal.flush() # Fill the destination storage from Kafka dst = get_storage(cls="memory") @@ -467,6 +474,8 @@ src.journal_writer.journal.send(topic, dict_repr["id"], dict_repr) nb_sent += 1 + src.journal_writer.journal.flush() + # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst)