Page MenuHomeSoftware Heritage

D8740.id31548.diff
No OneTemporary

D8740.id31548.diff

diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt
--- a/requirements-swh-journal.txt
+++ b/requirements-swh-journal.txt
@@ -1 +1 @@
-swh.journal >= 0.9
+swh.journal >= 1.2
diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py
--- a/swh/storage/backfill.py
+++ b/swh/storage/backfill.py
@@ -598,6 +598,15 @@
def __init__(self, config=None):
self.config = config
self.check_config(config)
+ self._db = None
+ self.writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]})
+ assert self.writer.journal is not None
+
+ @property
+ def db(self):
+ if self._db is None:
+ self._db = BaseDb.connect(self.config["storage"]["db"])
+ return self._db
def check_config(self, config):
missing_keys = []
@@ -653,10 +662,6 @@
object_type, start_object, end_object
)
- db = BaseDb.connect(self.config["storage"]["db"])
- writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]})
- assert writer.journal is not None
-
for range_start, range_end in RANGE_GENERATORS[object_type](
start_object, end_object
):
@@ -667,10 +672,10 @@
_format_range_bound(range_end),
)
- objects = fetch(db, object_type, start=range_start, end=range_end)
+ objects = fetch(self.db, object_type, start=range_start, end=range_end)
if not dry_run:
- writer.write_additions(object_type, objects)
+ self.writer.write_additions(object_type, objects)
else:
# only consume the objects iterator to check for any potential
# decoding/encoding errors
diff --git a/swh/storage/interface.py b/swh/storage/interface.py
--- a/swh/storage/interface.py
+++ b/swh/storage/interface.py
@@ -5,7 +5,17 @@
import datetime
from enum import Enum
-from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ Tuple,
+ TypeVar,
+)
import attr
from typing_extensions import Protocol, TypedDict, runtime_checkable
@@ -34,6 +44,9 @@
)
from swh.model.swhids import ExtendedSWHID, ObjectType
+if TYPE_CHECKING:
+ from swh.storage.writer import JournalWriter
+
class ListOrder(Enum):
"""Specifies the order for paginated endpoints returning sorted results."""
@@ -76,6 +89,8 @@
@runtime_checkable
class StorageInterface(Protocol):
+ journal_writer: Optional["JournalWriter"]
+
@remote_api_endpoint("check_config")
def check_config(self, *, check_write: bool) -> bool:
"""Check that the storage is configured and ready to go."""
diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py
--- a/swh/storage/tests/test_backfill.py
+++ b/swh/storage/tests/test_backfill.py
@@ -235,6 +235,7 @@
"brokers": [kafka_server],
"client_id": "kafka_writer-1",
"prefix": prefix1,
+ "auto_flush": False,
}
swh_storage_backend_config["journal_writer"] = journal1
storage = get_storage(**swh_storage_backend_config)
@@ -242,6 +243,8 @@
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
method(objects)
+ assert storage.journal_writer is not None
+ storage.journal_writer.journal.flush()
# now apply the backfiller on the storage to fill the journal under prefix2
backfiller_config = {
@@ -249,6 +252,7 @@
"brokers": [kafka_server],
"client_id": "kafka_writer-2",
"prefix": prefix2,
+ "auto_flush": False,
},
"storage": swh_storage_backend_config,
}
@@ -257,6 +261,7 @@
backfiller = JournalBackfiller(backfiller_config)
for object_type in TEST_OBJECTS:
backfiller.run(object_type, None, None)
+ backfiller.writer.journal.flush()
# Trace log messages for unhandled object types in the replayer
caplog.set_level(logging.DEBUG, "swh.storage.replay")
diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py
--- a/swh/storage/tests/test_kafka_writer.py
+++ b/swh/storage/tests/test_kafka_writer.py
@@ -25,6 +25,7 @@
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": False,
+ "auto_flush": False,
}
storage_config: Dict[str, Any] = {
"cls": "pipeline",
@@ -58,6 +59,8 @@
expected_messages += len(objs)
else:
assert False, obj_type
+ assert storage.journal_writer is not None
+ storage.journal_writer.journal.flush()
existing_topics = set(
topic
@@ -97,6 +100,7 @@
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": True,
+ "auto_flush": False,
}
storage_config: Dict[str, Any] = {
"cls": "pipeline",
@@ -117,6 +121,8 @@
method = getattr(storage, obj_type + "_add")
method(objs)
expected_messages += len(objs)
+ assert storage.journal_writer is not None
+ storage.journal_writer.journal.flush()
existing_topics = set(
topic
diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py
--- a/swh/storage/tests/test_replay.py
+++ b/swh/storage/tests/test_replay.py
@@ -70,6 +70,7 @@
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
+ "auto_flush": False,
}
storage_config: Dict[str, Any] = {
"cls": "memory",
@@ -106,6 +107,7 @@
method = getattr(src, object_type + "_add")
method(objects)
nb_sent += len(objects)
+ src.journal_writer.journal.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
@@ -145,6 +147,7 @@
method = getattr(src, object_type + "_add")
method(objects)
nb_sent += len(objects)
+ src.journal_writer.journal.flush()
# Create collision in input data
# These should not be written in the destination
@@ -341,6 +344,7 @@
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": True,
+ "auto_flush": False,
}
src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config}
@@ -355,6 +359,8 @@
method = getattr(storage, obj_type + "_add")
method(objs)
nb_sent += len(objs)
+ assert storage.journal_writer is not None
+ storage.journal_writer.journal.flush()
# Fill a destination storage from Kafka, potentially using privileged topics
dst_storage = get_storage(cls="memory")
@@ -404,6 +410,7 @@
method = getattr(src, object_type + "_add")
method(objects)
nb_sent += len(objects)
+ src.journal_writer.journal.flush()
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
@@ -467,6 +474,8 @@
src.journal_writer.journal.send(topic, dict_repr["id"], dict_repr)
nb_sent += 1
+ src.journal_writer.journal.flush()
+
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst)

File Metadata

Mime Type
text/plain
Expires
Wed, Jul 2, 11:25 AM (1 w, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223446

Event Timeline