Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7342983
D1769.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
4 KB
Subscribers
None
D1769.id.diff
View Options
diff --git a/swh/journal/cli.py b/swh/journal/cli.py
--- a/swh/journal/cli.py
+++ b/swh/journal/cli.py
@@ -137,6 +137,9 @@
@cli.command()
+@click.option('--max-messages', '-m', default=None, type=int,
+ help='Maximum number of objects to replay. Default is to '
+ 'run forever.')
@click.option('--concurrency', type=int,
default=8,
help='Concurrentcy level.')
@@ -150,7 +153,7 @@
help='Name of the group id for reading from Kafka.'
'(deprecated, use the config file instead)')
@click.pass_context
-def content_replay(ctx, concurrency, brokers, prefix, group_id):
+def content_replay(ctx, max_messages, concurrency, brokers, prefix, group_id):
"""Fill a destination Object Storage (typically a mirror) by reading a Journal
and retrieving objects from an existing source ObjStorage.
@@ -184,13 +187,13 @@
try:
nb_messages = 0
- while True:
+ while not max_messages or nb_messages < max_messages:
nb_messages += client.process(worker_fn)
logger.info('Processed %d messages.' % nb_messages)
except KeyboardInterrupt:
ctx.exit(0)
else:
- logger.info('Done.')
+ print('Done.')
def main():
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -3,6 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import functools
import re
import tempfile
from subprocess import Popen
@@ -13,6 +14,7 @@
from kafka import KafkaProducer
import pytest
+from swh.objstorage.backends.in_memory import InMemoryObjStorage
from swh.storage.in_memory import Storage
from swh.journal.cli import cli
@@ -23,6 +25,14 @@
storage:
cls: memory
args: {}
+objstorage_src:
+ cls: mocked
+ args:
+ name: src
+objstorage_dst:
+ cls: mocked
+ args:
+ name: dst
'''
@@ -87,4 +97,71 @@
**snapshot, 'next_branch': None}
-# TODO: write a test for the content-replay command
+def _patch_objstorages(names):
+ objstorages = {name: InMemoryObjStorage() for name in names}
+
+ def get_mock_objstorage(cls, args):
+ assert cls == 'mocked', cls
+ return objstorages[args['name']]
+
+ def decorator(f):
+ @functools.wraps(f)
+ @patch('swh.journal.cli.get_objstorage')
+ def newf(get_objstorage_mock, *args, **kwargs):
+ get_objstorage_mock.side_effect = get_mock_objstorage
+ f(*args, objstorages=objstorages, **kwargs)
+
+ return newf
+
+ return decorator
+
+
+def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages):
+ producer = KafkaProducer(
+ bootstrap_servers='localhost:{}'.format(kafka_port),
+ key_serializer=key_to_kafka,
+ value_serializer=value_to_kafka,
+ client_id='test-producer',
+ )
+
+ contents = {}
+ for i in range(10):
+ content = b'\x00'*19 + bytes([i])
+ sha1 = objstorages['src'].add(content)
+ contents[sha1] = content
+ producer.send(topic=kafka_prefix+'.content', key=sha1, value={
+ 'sha1': sha1,
+ 'status': 'visible',
+ })
+
+ producer.flush()
+
+ return contents
+
+
+@_patch_objstorages(['src', 'dst'])
+def test_replay_content(
+ objstorages,
+ storage: Storage,
+ kafka_prefix: str,
+ kafka_server: Tuple[Popen, int]):
+ (_, kafka_port) = kafka_server
+ kafka_prefix += '.swh.journal.objects'
+
+ contents = _fill_objstorage_and_kafka(
+ kafka_port, kafka_prefix, objstorages)
+
+ result = invoke(False, [
+ 'content-replay',
+ '--broker', 'localhost:%d' % kafka_port,
+ '--group-id', 'test-cli-consumer',
+ '--prefix', kafka_prefix,
+ '--max-messages', '10',
+ ])
+ expected = r'Done.\n'
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages['dst'], sha1
+ assert objstorages['dst'].get(sha1) == content
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mar 17 2025, 6:30 PM (7 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217179
Attached To
D1769: Add test for the CLI endpoint content_replay.
Event Timeline
Log In to Comment