Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7147880
D1541.id5145.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
D1541.id5145.diff
View Options
diff --git a/swh/journal/cli.py b/swh/journal/cli.py
--- a/swh/journal/cli.py
+++ b/swh/journal/cli.py
@@ -11,9 +11,11 @@
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.storage import get_storage
+from swh.objstorage import get_objstorage
from swh.journal.client import JournalClient
from swh.journal.replay import process_replay_objects
+from swh.journal.replay import process_replay_objects_content
from swh.journal.backfill import JournalBackfiller
@@ -48,6 +50,24 @@
ctx.obj['config'] = conf
+def get_journal_client(ctx, brokers, prefix, group_id):
+ conf = ctx.obj['config']
+ if brokers is None:
+ brokers = conf.get('journal', {}).get('brokers')
+ if not brokers:
+ ctx.fail('You must specify at least one kafka broker.')
+ if not isinstance(brokers, (list, tuple)):
+ brokers = [brokers]
+
+ if prefix is None:
+ prefix = conf.get('journal', {}).get('prefix')
+
+ if group_id is None:
+ group_id = conf.get('journal', {}).get('group_id')
+
+ return JournalClient(brokers=brokers, group_id=group_id, prefix=prefix)
+
+
@cli.command()
@click.option('--max-messages', '-m', default=None, type=int,
help='Maximum number of objects to replay. Default is to '
@@ -75,20 +95,7 @@
except KeyError:
ctx.fail('You must have a storage configured in your config file.')
- if brokers is None:
- brokers = conf.get('journal', {}).get('brokers')
- if not brokers:
- ctx.fail('You must specify at least one kafka broker.')
- if not isinstance(brokers, (list, tuple)):
- brokers = [brokers]
-
- if prefix is None:
- prefix = conf.get('journal', {}).get('prefix')
-
- if group_id is None:
- group_id = conf.get('journal', {}).get('group_id')
-
- client = JournalClient(brokers=brokers, group_id=group_id, prefix=prefix)
+ client = get_journal_client(ctx, brokers, prefix, group_id)
worker_fn = functools.partial(process_replay_objects, storage=storage)
try:
@@ -136,6 +143,57 @@
ctx.exit(0)
+@cli.command()
+@click.option('--broker', 'brokers', type=str, multiple=True,
+ hidden=True, # prefer config file
+ help='Kafka broker to connect to.')
+@click.option('--prefix', type=str, default=None,
+ hidden=True, # prefer config file
+ help='Prefix of Kafka topic names to read from.')
+@click.option('--group-id', '--consumer-id', type=str,
+ hidden=True, # prefer config file
+ help='Name of the consumer/group id for reading from Kafka.')
+@click.pass_context
+def content_replay(ctx, brokers, prefix, group_id):
+ """Fill a destination Object Storage (typically a mirror) by reading a Journal
+ and retrieving objects from an existing source ObjStorage.
+
+ There can be several 'replayers' filling a given ObjStorage as long as they
+ use the same `group-id`.
+
+ This service retrieves object ids to copy from the 'content' topic. It will
+ only copy object's content if the object's description in the kafka
+ nmessage has the status:visible set.
+ """
+ logger = logging.getLogger(__name__)
+ conf = ctx.obj['config']
+ try:
+ objstorage_src = get_objstorage(**conf.pop('objstorage_src'))
+ except KeyError:
+ ctx.fail('You must have a source objstorage configured in '
+ 'your config file.')
+ try:
+ objstorage_dst = get_objstorage(**conf.pop('objstorage_dst'))
+ except KeyError:
+ ctx.fail('You must have a destination objstorage configured '
+ 'in your config file.')
+
+ client = get_journal_client(ctx, brokers, prefix, group_id)
+ worker_fn = functools.partial(process_replay_objects_content,
+ src=objstorage_src,
+ dst=objstorage_dst)
+
+ try:
+ nb_messages = 0
+ while True:
+ nb_messages += client.process(worker_fn)
+ logger.info('Processed %d messages.' % nb_messages)
+ except KeyboardInterrupt:
+ ctx.exit(0)
+ else:
+ print('Done.')
+
+
def main():
logging.basicConfig()
return cli(auto_envvar_prefix='SWH_JOURNAL')
diff --git a/swh/journal/replay.py b/swh/journal/replay.py
--- a/swh/journal/replay.py
+++ b/swh/journal/replay.py
@@ -6,6 +6,7 @@
import logging
from swh.storage import HashCollision
+from swh.objstorage.objstorage import ID_HASH_ALGO
logger = logging.getLogger(__name__)
@@ -38,4 +39,18 @@
}
for obj in objects])
else:
- assert False
+ logger.warning('Received a series of %s, this should not happen',
+ object_type)
+
+
+def process_replay_objects_content(all_objects, *, src, dst):
+ for (object_type, objects) in all_objects.items():
+ if object_type != 'content':
+ logger.warning('Received a series of %s, this should not happen',
+ object_type)
+ continue
+ for obj in objects:
+ if obj['status'] == 'visible':
+ obj_id = obj[ID_HASH_ALGO]
+ obj = src.get(obj_id)
+ dst.add(obj, obj_id=obj_id)
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -85,3 +85,6 @@
assert storage.snapshot_get(snapshot['id']) == {
**snapshot, 'next_branch': None}
+
+
+# TODO: write a test for the content-replay command
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -16,6 +16,7 @@
from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES
from swh.journal.direct_writer import DirectKafkaWriter
from swh.journal.replay import process_replay_objects
+from swh.journal.replay import process_replay_objects_content
from swh.journal.serializers import (
key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value)
@@ -101,3 +102,32 @@
# TODO: add test for hash collision
+
+
+@given(lists(object_dicts(), min_size=1))
+@settings(suppress_health_check=[HealthCheck.too_slow])
+def test_write_replay_content(objects):
+ queue = []
+ replayer = MockedJournalClient(queue)
+
+ storage1 = Storage()
+ storage1.journal_writer = MockedKafkaWriter(queue)
+
+ for (obj_type, obj) in objects:
+ obj = obj.copy()
+ if obj_type == 'content':
+ storage1.content_add([obj])
+
+ queue_size = sum(len(partition)
+ for batch in queue
+ for partition in batch.values())
+
+ storage2 = Storage()
+ worker_fn = functools.partial(process_replay_objects_content,
+ src=storage1.objstorage,
+ dst=storage2.objstorage)
+ nb_messages = 0
+ while nb_messages < queue_size:
+ nb_messages += replayer.process(worker_fn)
+
+ assert storage1.objstorage.state == storage2.objstorage.state
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 23, 2:35 AM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3231529
Attached To
D1541: add a content replayer service
Event Timeline
Log In to Comment