Page MenuHomeSoftware Heritage

D1769.id.diff
No OneTemporary

D1769.id.diff

diff --git a/swh/journal/cli.py b/swh/journal/cli.py
--- a/swh/journal/cli.py
+++ b/swh/journal/cli.py
@@ -137,6 +137,9 @@
@cli.command()
+@click.option('--max-messages', '-m', default=None, type=int,
+ help='Maximum number of objects to replay. Default is to '
+ 'run forever.')
@click.option('--concurrency', type=int,
default=8,
help='Concurrentcy level.')
@@ -150,7 +153,7 @@
help='Name of the group id for reading from Kafka.'
'(deprecated, use the config file instead)')
@click.pass_context
-def content_replay(ctx, concurrency, brokers, prefix, group_id):
+def content_replay(ctx, max_messages, concurrency, brokers, prefix, group_id):
"""Fill a destination Object Storage (typically a mirror) by reading a Journal
and retrieving objects from an existing source ObjStorage.
@@ -184,13 +187,13 @@
try:
nb_messages = 0
- while True:
+ while not max_messages or nb_messages < max_messages:
nb_messages += client.process(worker_fn)
logger.info('Processed %d messages.' % nb_messages)
except KeyboardInterrupt:
ctx.exit(0)
else:
- logger.info('Done.')
+ print('Done.')
def main():
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -3,6 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import functools
import re
import tempfile
from subprocess import Popen
@@ -13,6 +14,7 @@
from kafka import KafkaProducer
import pytest
+from swh.objstorage.backends.in_memory import InMemoryObjStorage
from swh.storage.in_memory import Storage
from swh.journal.cli import cli
@@ -23,6 +25,14 @@
storage:
cls: memory
args: {}
+objstorage_src:
+ cls: mocked
+ args:
+ name: src
+objstorage_dst:
+ cls: mocked
+ args:
+ name: dst
'''
@@ -87,4 +97,71 @@
**snapshot, 'next_branch': None}
-# TODO: write a test for the content-replay command
+def _patch_objstorages(names):
+ objstorages = {name: InMemoryObjStorage() for name in names}
+
+ def get_mock_objstorage(cls, args):
+ assert cls == 'mocked', cls
+ return objstorages[args['name']]
+
+ def decorator(f):
+ @functools.wraps(f)
+ @patch('swh.journal.cli.get_objstorage')
+ def newf(get_objstorage_mock, *args, **kwargs):
+ get_objstorage_mock.side_effect = get_mock_objstorage
+ f(*args, objstorages=objstorages, **kwargs)
+
+ return newf
+
+ return decorator
+
+
+def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages):
+ producer = KafkaProducer(
+ bootstrap_servers='localhost:{}'.format(kafka_port),
+ key_serializer=key_to_kafka,
+ value_serializer=value_to_kafka,
+ client_id='test-producer',
+ )
+
+ contents = {}
+ for i in range(10):
+ content = b'\x00'*19 + bytes([i])
+ sha1 = objstorages['src'].add(content)
+ contents[sha1] = content
+ producer.send(topic=kafka_prefix+'.content', key=sha1, value={
+ 'sha1': sha1,
+ 'status': 'visible',
+ })
+
+ producer.flush()
+
+ return contents
+
+
+@_patch_objstorages(['src', 'dst'])
+def test_replay_content(
+ objstorages,
+ storage: Storage,
+ kafka_prefix: str,
+ kafka_server: Tuple[Popen, int]):
+ (_, kafka_port) = kafka_server
+ kafka_prefix += '.swh.journal.objects'
+
+ contents = _fill_objstorage_and_kafka(
+ kafka_port, kafka_prefix, objstorages)
+
+ result = invoke(False, [
+ 'content-replay',
+ '--broker', 'localhost:%d' % kafka_port,
+ '--group-id', 'test-cli-consumer',
+ '--prefix', kafka_prefix,
+ '--max-messages', '10',
+ ])
+ expected = r'Done.\n'
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages['dst'], sha1
+ assert objstorages['dst'].get(sha1) == content

File Metadata

Mime Type
text/plain
Expires
Mar 17 2025, 6:30 PM (7 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217179

Event Timeline