Page MenuHomeSoftware Heritage

D1541.id5145.diff
No OneTemporary

D1541.id5145.diff

diff --git a/swh/journal/cli.py b/swh/journal/cli.py
--- a/swh/journal/cli.py
+++ b/swh/journal/cli.py
@@ -11,9 +11,11 @@
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.storage import get_storage
+from swh.objstorage import get_objstorage
from swh.journal.client import JournalClient
from swh.journal.replay import process_replay_objects
+from swh.journal.replay import process_replay_objects_content
from swh.journal.backfill import JournalBackfiller
@@ -48,6 +50,24 @@
ctx.obj['config'] = conf
+def get_journal_client(ctx, brokers, prefix, group_id):
+ conf = ctx.obj['config']
+ if brokers is None:
+ brokers = conf.get('journal', {}).get('brokers')
+ if not brokers:
+ ctx.fail('You must specify at least one kafka broker.')
+ if not isinstance(brokers, (list, tuple)):
+ brokers = [brokers]
+
+ if prefix is None:
+ prefix = conf.get('journal', {}).get('prefix')
+
+ if group_id is None:
+ group_id = conf.get('journal', {}).get('group_id')
+
+ return JournalClient(brokers=brokers, group_id=group_id, prefix=prefix)
+
+
@cli.command()
@click.option('--max-messages', '-m', default=None, type=int,
help='Maximum number of objects to replay. Default is to '
@@ -75,20 +95,7 @@
except KeyError:
ctx.fail('You must have a storage configured in your config file.')
- if brokers is None:
- brokers = conf.get('journal', {}).get('brokers')
- if not brokers:
- ctx.fail('You must specify at least one kafka broker.')
- if not isinstance(brokers, (list, tuple)):
- brokers = [brokers]
-
- if prefix is None:
- prefix = conf.get('journal', {}).get('prefix')
-
- if group_id is None:
- group_id = conf.get('journal', {}).get('group_id')
-
- client = JournalClient(brokers=brokers, group_id=group_id, prefix=prefix)
+ client = get_journal_client(ctx, brokers, prefix, group_id)
worker_fn = functools.partial(process_replay_objects, storage=storage)
try:
@@ -136,6 +143,57 @@
ctx.exit(0)
+@cli.command()
+@click.option('--broker', 'brokers', type=str, multiple=True,
+ hidden=True, # prefer config file
+ help='Kafka broker to connect to.')
+@click.option('--prefix', type=str, default=None,
+ hidden=True, # prefer config file
+ help='Prefix of Kafka topic names to read from.')
+@click.option('--group-id', '--consumer-id', type=str,
+ hidden=True, # prefer config file
+ help='Name of the consumer/group id for reading from Kafka.')
+@click.pass_context
+def content_replay(ctx, brokers, prefix, group_id):
+ """Fill a destination Object Storage (typically a mirror) by reading a Journal
+ and retrieving objects from an existing source ObjStorage.
+
+ There can be several 'replayers' filling a given ObjStorage as long as they
+ use the same `group-id`.
+
+ This service retrieves object ids to copy from the 'content' topic. It will
+ only copy object's content if the object's description in the kafka
+ nmessage has the status:visible set.
+ """
+ logger = logging.getLogger(__name__)
+ conf = ctx.obj['config']
+ try:
+ objstorage_src = get_objstorage(**conf.pop('objstorage_src'))
+ except KeyError:
+ ctx.fail('You must have a source objstorage configured in '
+ 'your config file.')
+ try:
+ objstorage_dst = get_objstorage(**conf.pop('objstorage_dst'))
+ except KeyError:
+ ctx.fail('You must have a destination objstorage configured '
+ 'in your config file.')
+
+ client = get_journal_client(ctx, brokers, prefix, group_id)
+ worker_fn = functools.partial(process_replay_objects_content,
+ src=objstorage_src,
+ dst=objstorage_dst)
+
+ try:
+ nb_messages = 0
+ while True:
+ nb_messages += client.process(worker_fn)
+ logger.info('Processed %d messages.' % nb_messages)
+ except KeyboardInterrupt:
+ ctx.exit(0)
+ else:
+ print('Done.')
+
+
def main():
logging.basicConfig()
return cli(auto_envvar_prefix='SWH_JOURNAL')
diff --git a/swh/journal/replay.py b/swh/journal/replay.py
--- a/swh/journal/replay.py
+++ b/swh/journal/replay.py
@@ -6,6 +6,7 @@
import logging
from swh.storage import HashCollision
+from swh.objstorage.objstorage import ID_HASH_ALGO
logger = logging.getLogger(__name__)
@@ -38,4 +39,18 @@
}
for obj in objects])
else:
- assert False
+ logger.warning('Received a series of %s, this should not happen',
+ object_type)
+
+
+def process_replay_objects_content(all_objects, *, src, dst):
+ for (object_type, objects) in all_objects.items():
+ if object_type != 'content':
+ logger.warning('Received a series of %s, this should not happen',
+ object_type)
+ continue
+ for obj in objects:
+ if obj['status'] == 'visible':
+ obj_id = obj[ID_HASH_ALGO]
+ obj = src.get(obj_id)
+ dst.add(obj, obj_id=obj_id)
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -85,3 +85,6 @@
assert storage.snapshot_get(snapshot['id']) == {
**snapshot, 'next_branch': None}
+
+
+# TODO: write a test for the content-replay command
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -16,6 +16,7 @@
from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES
from swh.journal.direct_writer import DirectKafkaWriter
from swh.journal.replay import process_replay_objects
+from swh.journal.replay import process_replay_objects_content
from swh.journal.serializers import (
key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value)
@@ -101,3 +102,32 @@
# TODO: add test for hash collision
+
+
+@given(lists(object_dicts(), min_size=1))
+@settings(suppress_health_check=[HealthCheck.too_slow])
+def test_write_replay_content(objects):
+ queue = []
+ replayer = MockedJournalClient(queue)
+
+ storage1 = Storage()
+ storage1.journal_writer = MockedKafkaWriter(queue)
+
+ for (obj_type, obj) in objects:
+ obj = obj.copy()
+ if obj_type == 'content':
+ storage1.content_add([obj])
+
+ queue_size = sum(len(partition)
+ for batch in queue
+ for partition in batch.values())
+
+ storage2 = Storage()
+ worker_fn = functools.partial(process_replay_objects_content,
+ src=storage1.objstorage,
+ dst=storage2.objstorage)
+ nb_messages = 0
+ while nb_messages < queue_size:
+ nb_messages += replayer.process(worker_fn)
+
+ assert storage1.objstorage.state == storage2.objstorage.state

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 23, 2:35 AM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3231529

Event Timeline