Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_cli.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | from collections import Counter | ||||
import copy | |||||
import functools | import functools | ||||
import logging | import logging | ||||
import re | import re | ||||
import tempfile | import tempfile | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import Any, Dict, Tuple | from typing import Any, Dict, Tuple | ||||
from unittest.mock import patch | from unittest.mock import patch | ||||
from click.testing import CliRunner | from click.testing import CliRunner | ||||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||||
import pytest | import pytest | ||||
import yaml | |||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.objstorage.backends.in_memory import InMemoryObjStorage | from swh.objstorage.backends.in_memory import InMemoryObjStorage | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.journal.cli import cli | from swh.journal.cli import cli | ||||
from swh.journal.replay import CONTENT_REPLAY_RETRIES | from swh.journal.replay import CONTENT_REPLAY_RETRIES | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka, value_to_kafka | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
CLI_CONFIG = ''' | CLI_CONFIG = { | ||||
storage: | 'storage': { | ||||
cls: memory | 'cls': 'memory', | ||||
args: {} | }, | ||||
objstorage_src: | 'objstorage_src': { | ||||
cls: mocked | 'cls': 'mocked', | ||||
args: | 'name': 'src', | ||||
name: src | }, | ||||
objstorage_dst: | 'objstorage_dst': { | ||||
cls: mocked | 'cls': 'mocked', | ||||
args: | 'name': 'dst', | ||||
name: dst | }, | ||||
''' | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def storage(): | def storage(): | ||||
"""An swh-storage object that gets injected into the CLI functions.""" | """An swh-storage object that gets injected into the CLI functions.""" | ||||
storage_config = { | storage_config = { | ||||
'cls': 'pipeline', | 'cls': 'pipeline', | ||||
'steps': [ | 'steps': [ | ||||
Show All 10 Lines | |||||
@pytest.fixture | @pytest.fixture | ||||
def monkeypatch_retry_sleep(monkeypatch): | def monkeypatch_retry_sleep(monkeypatch): | ||||
from swh.journal.replay import copy_object, obj_in_objstorage | from swh.journal.replay import copy_object, obj_in_objstorage | ||||
monkeypatch.setattr(copy_object.retry, 'sleep', lambda x: None) | monkeypatch.setattr(copy_object.retry, 'sleep', lambda x: None) | ||||
monkeypatch.setattr(obj_in_objstorage.retry, 'sleep', lambda x: None) | monkeypatch.setattr(obj_in_objstorage.retry, 'sleep', lambda x: None) | ||||
def invoke(catch_exceptions, args, env=None): | def invoke(catch_exceptions, args, env=None): | ||||
config = copy.deepcopy(CLI_CONFIG) | |||||
runner = CliRunner() | runner = CliRunner() | ||||
with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: | with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: | ||||
config_fd.write(CLI_CONFIG) | yaml.dump(config, config_fd) | ||||
config_fd.seek(0) | config_fd.seek(0) | ||||
args = ['-C' + config_fd.name] + args | args = ['-C' + config_fd.name] + args | ||||
result = runner.invoke( | result = runner.invoke( | ||||
cli, args, obj={'log_level': logging.DEBUG}, env=env, | cli, args, obj={'log_level': logging.DEBUG}, env=env, | ||||
) | ) | ||||
if not catch_exceptions and result.exception: | if not catch_exceptions and result.exception: | ||||
print(result.output) | print(result.output) | ||||
raise result.exception | raise result.exception | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | def test_replay( | ||||
assert storage.snapshot_get(snapshot['id']) == { | assert storage.snapshot_get(snapshot['id']) == { | ||||
**snapshot, 'next_branch': None} | **snapshot, 'next_branch': None} | ||||
def _patch_objstorages(names): | def _patch_objstorages(names): | ||||
objstorages = {name: InMemoryObjStorage() for name in names} | objstorages = {name: InMemoryObjStorage() for name in names} | ||||
def get_mock_objstorage(cls, args): | def get_mock_objstorage(cls, **args): | ||||
assert cls == 'mocked', cls | assert cls == 'mocked', cls | ||||
return objstorages[args['name']] | return objstorages[args['name']] | ||||
def decorator(f): | def decorator(f): | ||||
@functools.wraps(f) | @functools.wraps(f) | ||||
@patch('swh.journal.cli.get_objstorage') | @patch('swh.journal.cli.get_objstorage') | ||||
def newf(get_objstorage_mock, *args, **kwargs): | def newf(get_objstorage_mock, *args, **kwargs): | ||||
get_objstorage_mock.side_effect = get_mock_objstorage | get_objstorage_mock.side_effect = get_mock_objstorage | ||||
▲ Show 20 Lines • Show All 478 Lines • Show Last 20 Lines |