Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_cli.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | |||||
import re | import re | ||||
import tempfile | import tempfile | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import Tuple | from typing import Tuple | ||||
from unittest.mock import patch | from unittest.mock import patch | ||||
from click.testing import CliRunner | from click.testing import CliRunner | ||||
from kafka import KafkaProducer | from kafka import KafkaProducer | ||||
import pytest | import pytest | ||||
from swh.objstorage.backends.in_memory import InMemoryObjStorage | |||||
from swh.storage.in_memory import Storage | from swh.storage.in_memory import Storage | ||||
from swh.journal.cli import cli | from swh.journal.cli import cli | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka, value_to_kafka | ||||
CLI_CONFIG = ''' | CLI_CONFIG = ''' | ||||
storage: | storage: | ||||
cls: memory | cls: memory | ||||
args: {} | args: {} | ||||
objstorage_src: | |||||
cls: mocked | |||||
args: | |||||
name: src | |||||
objstorage_dst: | |||||
cls: mocked | |||||
args: | |||||
name: dst | |||||
''' | ''' | ||||
@pytest.fixture | @pytest.fixture | ||||
def storage(): | def storage(): | ||||
"""An instance of swh.storage.in_memory.Storage that gets injected | """An instance of swh.storage.in_memory.Storage that gets injected | ||||
into the CLI functions.""" | into the CLI functions.""" | ||||
storage = Storage() | storage = Storage() | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def test_replay( | ||||
expected = r'Done.\n' | expected = r'Done.\n' | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
assert storage.snapshot_get(snapshot['id']) == { | assert storage.snapshot_get(snapshot['id']) == { | ||||
**snapshot, 'next_branch': None} | **snapshot, 'next_branch': None} | ||||
# TODO: write a test for the content-replay command | def _patch_objstorages(names): | ||||
objstorages = {name: InMemoryObjStorage() for name in names} | |||||
def get_mock_objstorage(cls, args): | |||||
assert cls == 'mocked', cls | |||||
return objstorages[args['name']] | |||||
def decorator(f): | |||||
@functools.wraps(f) | |||||
@patch('swh.journal.cli.get_objstorage') | |||||
def newf(get_objstorage_mock, *args, **kwargs): | |||||
get_objstorage_mock.side_effect = get_mock_objstorage | |||||
f(*args, objstorages=objstorages, **kwargs) | |||||
return newf | |||||
return decorator | |||||
def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): | |||||
producer = KafkaProducer( | |||||
bootstrap_servers='localhost:{}'.format(kafka_port), | |||||
key_serializer=key_to_kafka, | |||||
value_serializer=value_to_kafka, | |||||
client_id='test-producer', | |||||
) | |||||
contents = {} | |||||
for i in range(10): | |||||
content = b'\x00'*19 + bytes([i]) | |||||
sha1 = objstorages['src'].add(content) | |||||
contents[sha1] = content | |||||
producer.send(topic=kafka_prefix+'.content', key=sha1, value={ | |||||
'sha1': sha1, | |||||
'status': 'visible', | |||||
}) | |||||
producer.flush() | |||||
return contents | |||||
@_patch_objstorages(['src', 'dst']) | |||||
def test_replay_content( | |||||
objstorages, | |||||
storage: Storage, | |||||
kafka_prefix: str, | |||||
kafka_server: Tuple[Popen, int]): | |||||
(_, kafka_port) = kafka_server | |||||
kafka_prefix += '.swh.journal.objects' | |||||
contents = _fill_objstorage_and_kafka( | |||||
kafka_port, kafka_prefix, objstorages) | |||||
result = invoke(False, [ | |||||
'content-replay', | |||||
'--broker', 'localhost:%d' % kafka_port, | |||||
'--group-id', 'test-cli-consumer', | |||||
'--prefix', kafka_prefix, | |||||
'--max-messages', '10', | |||||
]) | |||||
expected = r'Done.\n' | |||||
assert result.exit_code == 0, result.output | |||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | |||||
for (sha1, content) in contents.items(): | |||||
assert sha1 in objstorages['dst'], sha1 | |||||
assert objstorages['dst'].get(sha1) == content |