Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9313150
D8740.id31548.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
D8740.id31548.diff
View Options
diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt
--- a/requirements-swh-journal.txt
+++ b/requirements-swh-journal.txt
@@ -1 +1 @@
-swh.journal >= 0.9
+swh.journal >= 1.2
diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py
--- a/swh/storage/backfill.py
+++ b/swh/storage/backfill.py
@@ -598,6 +598,15 @@
def __init__(self, config=None):
self.config = config
self.check_config(config)
+ self._db = None
+ self.writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]})
+ assert self.writer.journal is not None
+
+ @property
+ def db(self):
+ if self._db is None:
+ self._db = BaseDb.connect(self.config["storage"]["db"])
+ return self._db
def check_config(self, config):
missing_keys = []
@@ -653,10 +662,6 @@
object_type, start_object, end_object
)
- db = BaseDb.connect(self.config["storage"]["db"])
- writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]})
- assert writer.journal is not None
-
for range_start, range_end in RANGE_GENERATORS[object_type](
start_object, end_object
):
@@ -667,10 +672,10 @@
_format_range_bound(range_end),
)
- objects = fetch(db, object_type, start=range_start, end=range_end)
+ objects = fetch(self.db, object_type, start=range_start, end=range_end)
if not dry_run:
- writer.write_additions(object_type, objects)
+ self.writer.write_additions(object_type, objects)
else:
# only consume the objects iterator to check for any potential
# decoding/encoding errors
diff --git a/swh/storage/interface.py b/swh/storage/interface.py
--- a/swh/storage/interface.py
+++ b/swh/storage/interface.py
@@ -5,7 +5,17 @@
import datetime
from enum import Enum
-from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ Tuple,
+ TypeVar,
+)
import attr
from typing_extensions import Protocol, TypedDict, runtime_checkable
@@ -34,6 +44,9 @@
)
from swh.model.swhids import ExtendedSWHID, ObjectType
+if TYPE_CHECKING:
+ from swh.storage.writer import JournalWriter
+
class ListOrder(Enum):
"""Specifies the order for paginated endpoints returning sorted results."""
@@ -76,6 +89,8 @@
@runtime_checkable
class StorageInterface(Protocol):
+ journal_writer: Optional["JournalWriter"]
+
@remote_api_endpoint("check_config")
def check_config(self, *, check_write: bool) -> bool:
"""Check that the storage is configured and ready to go."""
diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py
--- a/swh/storage/tests/test_backfill.py
+++ b/swh/storage/tests/test_backfill.py
@@ -235,6 +235,7 @@
"brokers": [kafka_server],
"client_id": "kafka_writer-1",
"prefix": prefix1,
+ "auto_flush": False,
}
swh_storage_backend_config["journal_writer"] = journal1
storage = get_storage(**swh_storage_backend_config)
@@ -242,6 +243,8 @@
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
method(objects)
+ assert storage.journal_writer is not None
+ storage.journal_writer.journal.flush()
# now apply the backfiller on the storage to fill the journal under prefix2
backfiller_config = {
@@ -249,6 +252,7 @@
"brokers": [kafka_server],
"client_id": "kafka_writer-2",
"prefix": prefix2,
+ "auto_flush": False,
},
"storage": swh_storage_backend_config,
}
@@ -257,6 +261,7 @@
backfiller = JournalBackfiller(backfiller_config)
for object_type in TEST_OBJECTS:
backfiller.run(object_type, None, None)
+ backfiller.writer.journal.flush()
# Trace log messages for unhandled object types in the replayer
caplog.set_level(logging.DEBUG, "swh.storage.replay")
diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py
--- a/swh/storage/tests/test_kafka_writer.py
+++ b/swh/storage/tests/test_kafka_writer.py
@@ -25,6 +25,7 @@
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": False,
+ "auto_flush": False,
}
storage_config: Dict[str, Any] = {
"cls": "pipeline",
@@ -58,6 +59,8 @@
expected_messages += len(objs)
else:
assert False, obj_type
+ assert storage.journal_writer is not None
+ storage.journal_writer.journal.flush()
existing_topics = set(
topic
@@ -97,6 +100,7 @@
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": True,
+ "auto_flush": False,
}
storage_config: Dict[str, Any] = {
"cls": "pipeline",
@@ -117,6 +121,8 @@
method = getattr(storage, obj_type + "_add")
method(objs)
expected_messages += len(objs)
+ assert storage.journal_writer is not None
+ storage.journal_writer.journal.flush()
existing_topics = set(
topic
diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py
--- a/swh/storage/tests/test_replay.py
+++ b/swh/storage/tests/test_replay.py
@@ -70,6 +70,7 @@
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
+ "auto_flush": False,
}
storage_config: Dict[str, Any] = {
"cls": "memory",
@@ -106,6 +107,7 @@
method = getattr(src, object_type + "_add")
method(objects)
nb_sent += len(objects)
+ src.journal_writer.journal.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
@@ -145,6 +147,7 @@
method = getattr(src, object_type + "_add")
method(objects)
nb_sent += len(objects)
+ src.journal_writer.journal.flush()
# Create collision in input data
# These should not be written in the destination
@@ -341,6 +344,7 @@
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": True,
+ "auto_flush": False,
}
src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config}
@@ -355,6 +359,8 @@
method = getattr(storage, obj_type + "_add")
method(objs)
nb_sent += len(objs)
+ assert storage.journal_writer is not None
+ storage.journal_writer.journal.flush()
# Fill a destination storage from Kafka, potentially using privileged topics
dst_storage = get_storage(cls="memory")
@@ -404,6 +410,7 @@
method = getattr(src, object_type + "_add")
method(objects)
nb_sent += len(objects)
+ src.journal_writer.journal.flush()
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
@@ -467,6 +474,8 @@
src.journal_writer.journal.send(topic, dict_repr["id"], dict_repr)
nb_sent += 1
+ src.journal_writer.journal.flush()
+
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jul 2, 11:25 AM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223446
Attached To
D8740: tests: only flush() the kafka journal writer once per test
Event Timeline
Log In to Comment