Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_cli.py
Show All 15 Lines | |||||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||||
import pytest | import pytest | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.objstorage.backends.in_memory import InMemoryObjStorage | from swh.objstorage.backends.in_memory import InMemoryObjStorage | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.journal.cli import cli | from swh.journal.cli import cli | ||||
from swh.journal.replay import CONTENT_REPLAY_RETRIES | |||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka, value_to_kafka | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
CLI_CONFIG = ''' | CLI_CONFIG = ''' | ||||
storage: | storage: | ||||
Show All 21 Lines | storage_config = { | ||||
] | ] | ||||
} | } | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
with patch('swh.journal.cli.get_storage') as get_storage_mock: | with patch('swh.journal.cli.get_storage') as get_storage_mock: | ||||
get_storage_mock.return_value = storage | get_storage_mock.return_value = storage | ||||
yield storage | yield storage | ||||
@pytest.fixture | |||||
def monkeypatch_retry_sleep(monkeypatch): | |||||
from swh.journal.replay import copy_object, obj_in_objstorage | |||||
monkeypatch.setattr(copy_object.retry, 'sleep', lambda x: None) | |||||
monkeypatch.setattr(obj_in_objstorage.retry, 'sleep', lambda x: None) | |||||
def invoke(catch_exceptions, args, env=None): | def invoke(catch_exceptions, args, env=None): | ||||
runner = CliRunner() | runner = CliRunner() | ||||
with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: | with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: | ||||
config_fd.write(CLI_CONFIG) | config_fd.write(CLI_CONFIG) | ||||
config_fd.seek(0) | config_fd.seek(0) | ||||
args = ['-C' + config_fd.name] + args | args = ['-C' + config_fd.name] + args | ||||
result = runner.invoke( | result = runner.invoke( | ||||
cli, args, obj={'log_level': logging.DEBUG}, env=env, | cli, args, obj={'log_level': logging.DEBUG}, env=env, | ||||
▲ Show 20 Lines • Show All 343 Lines • ▼ Show 20 Lines | |||||
@_patch_objstorages(['src', 'dst']) | @_patch_objstorages(['src', 'dst']) | ||||
def test_replay_content_check_dst_retry( | def test_replay_content_check_dst_retry( | ||||
objstorages, | objstorages, | ||||
storage, | storage, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int]): | kafka_server: Tuple[Popen, int], | ||||
monkeypatch_retry_sleep): | |||||
(_, kafka_port) = kafka_server | (_, kafka_port) = kafka_server | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_port, kafka_prefix, objstorages) | kafka_port, kafka_prefix, objstorages) | ||||
failures = {} | failures = {} | ||||
for i, (sha1, content) in enumerate(contents.items()): | for i, (sha1, content) in enumerate(contents.items()): | ||||
Show All 20 Lines | def test_replay_content_check_dst_retry( | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
for (sha1, content) in contents.items(): | for (sha1, content) in contents.items(): | ||||
assert sha1 in objstorages['dst'], sha1 | assert sha1 in objstorages['dst'], sha1 | ||||
assert objstorages['dst'].get(sha1) == content | assert objstorages['dst'].get(sha1) == content | ||||
@_patch_objstorages(['src', 'dst']) | @_patch_objstorages(['src', 'dst']) | ||||
def test_replay_content_failed_copy_retry( | |||||
objstorages, | |||||
storage, | |||||
kafka_prefix: str, | |||||
kafka_consumer_group: str, | |||||
kafka_server: Tuple[Popen, int], | |||||
caplog, | |||||
monkeypatch_retry_sleep): | |||||
(_, kafka_port) = kafka_server | |||||
kafka_prefix += '.swh.journal.objects' | |||||
contents = _fill_objstorage_and_kafka( | |||||
kafka_port, kafka_prefix, objstorages) | |||||
add_failures = {} | |||||
get_failures = {} | |||||
definitely_failed = set() | |||||
# We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. | |||||
# We generate failures for 2 different operations, get and add. | |||||
num_retry_contents = 2 * CONTENT_REPLAY_RETRIES | |||||
assert num_retry_contents < NUM_CONTENTS, ( | |||||
"Need to generate more test contents to properly test retry behavior" | |||||
) | |||||
for i, sha1 in enumerate(contents): | |||||
if i >= num_retry_contents: | |||||
break | |||||
# This generates a number of failures, up to CONTENT_REPLAY_RETRIES | |||||
vlorentz: please add a comment explaining the error pattern this loop emulates | |||||
num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 | |||||
# This generates failures of add for the first CONTENT_REPLAY_RETRIES | |||||
# objects, then failures of get. | |||||
if i < CONTENT_REPLAY_RETRIES: | |||||
add_failures['add', sha1] = num_failures | |||||
else: | |||||
get_failures['get', sha1] = num_failures | |||||
# Only contents that have CONTENT_REPLAY_RETRIES or more are | |||||
# definitely failing | |||||
if num_failures >= CONTENT_REPLAY_RETRIES: | |||||
definitely_failed.add(hash_to_hex(sha1)) | |||||
objstorages['dst'] = FlakyObjStorage( | |||||
state=objstorages['dst'].state, | |||||
failures=add_failures, | |||||
) | |||||
objstorages['src'] = FlakyObjStorage( | |||||
state=objstorages['src'].state, | |||||
failures=get_failures, | |||||
) | |||||
caplog.set_level(logging.DEBUG, 'swh.journal.replay') | |||||
result = invoke(False, [ | |||||
'content-replay', | |||||
'--broker', '127.0.0.1:%d' % kafka_port, | |||||
'--group-id', kafka_consumer_group, | |||||
'--prefix', kafka_prefix, | |||||
'--max-messages', str(NUM_CONTENTS), | |||||
]) | |||||
expected = r'Done.\n' | |||||
assert result.exit_code == 0, result.output | |||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | |||||
copied = 0 | |||||
actually_failed = set() | |||||
for record in caplog.records: | |||||
logtext = record.getMessage() | |||||
if 'copied' in logtext: | |||||
copied += 1 | |||||
elif 'Failed operation' in logtext: | |||||
assert record.levelno == logging.ERROR | |||||
assert record.args['retries'] == CONTENT_REPLAY_RETRIES | |||||
actually_failed.add(record.args['obj_id']) | |||||
assert actually_failed == definitely_failed, ( | |||||
'Unexpected object copy failures; see captured log for details' | |||||
) | |||||
for (sha1, content) in contents.items(): | |||||
if hash_to_hex(sha1) in definitely_failed: | |||||
assert sha1 not in objstorages['dst'] | |||||
continue | |||||
assert sha1 in objstorages['dst'], sha1 | |||||
assert objstorages['dst'].get(sha1) == content | |||||
@_patch_objstorages(['src', 'dst']) | |||||
def test_replay_content_objnotfound( | def test_replay_content_objnotfound( | ||||
objstorages, | objstorages, | ||||
storage, | storage, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
caplog): | caplog): | ||||
(_, kafka_port) = kafka_server | (_, kafka_port) = kafka_server | ||||
▲ Show 20 Lines • Show All 52 Lines • Show Last 20 Lines |
please add a comment explaining the error pattern this loop emulates