Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/replayer/tests/test_cli.py
Show All 9 Lines | |||||
import re | import re | ||||
from subprocess import Popen | from subprocess import Popen | ||||
import tempfile | import tempfile | ||||
from typing import Tuple | from typing import Tuple | ||||
from unittest.mock import patch | from unittest.mock import patch | ||||
from click.testing import CliRunner | from click.testing import CliRunner | ||||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||||
import msgpack | |||||
import pytest | import pytest | ||||
import yaml | import yaml | ||||
from swh.journal.serializers import key_to_kafka | from swh.journal.serializers import key_to_kafka | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.objstorage.backends.in_memory import InMemoryObjStorage | from swh.objstorage.backends.in_memory import InMemoryObjStorage | ||||
from swh.objstorage.replayer.cli import objstorage_cli_group | from swh.objstorage.replayer.cli import objstorage_cli_group | ||||
from swh.objstorage.replayer.replay import CONTENT_REPLAY_RETRIES | from swh.objstorage.replayer.replay import CONTENT_REPLAY_RETRIES | ||||
▲ Show 20 Lines • Show All 386 Lines • ▼ Show 20 Lines | |||||
@_patch_objstorages(["src", "dst"]) | @_patch_objstorages(["src", "dst"]) | ||||
def test_replay_content_check_dst_retry( | def test_replay_content_check_dst_retry( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
monkeypatch_retry_sleep, | monkeypatch_retry_sleep, | ||||
caplog, | caplog, | ||||
redis_proc, | |||||
redisdb, | |||||
): | ): | ||||
"""Check the content replayer with a flaky dst objstorage | """Check the content replayer with a flaky dst objstorage | ||||
for 'in' operations. | for 'in' operations. | ||||
""" | """ | ||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | kafka_server, kafka_prefix, objstorages["src"] | ||||
) | ) | ||||
Show All 14 Lines | result = invoke( | ||||
"replay", | "replay", | ||||
"--check-dst", | "--check-dst", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
journal_config={ | journal_config={ | ||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
"error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, | |||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
# check that exactly NUM_CONTENTS_DST 'in' operations have failed once | # check that exactly NUM_CONTENTS_DST 'in' operations have failed once | ||||
failed_in = 0 | failed_in = 0 | ||||
for record in caplog.records: | for record in caplog.records: | ||||
logtext = record.getMessage() | logtext = record.getMessage() | ||||
if "Retry operation obj_in_objstorage" in logtext: | if "Retry operation obj_in_objstorage" in logtext: | ||||
failed_in += 1 | failed_in += 1 | ||||
elif "Retry operation" in logtext: | elif "Retry operation" in logtext: | ||||
assert False, "No other failure expected than 'in' operations" | assert False, "No other failure expected than 'in' operations" | ||||
assert failed_in == NUM_CONTENTS_DST | assert failed_in == NUM_CONTENTS_DST | ||||
# check nothing has been reported in redis | |||||
assert not redisdb.keys() | |||||
# in the end, the replay process should be OK | # in the end, the replay process should be OK | ||||
for (sha1, content) in contents.items(): | for (sha1, content) in contents.items(): | ||||
assert sha1 in objstorages["dst"], sha1 | assert sha1 in objstorages["dst"], sha1 | ||||
assert objstorages["dst"].get(sha1) == content | assert objstorages["dst"].get(sha1) == content | ||||
@_patch_objstorages(["src", "dst"]) | @_patch_objstorages(["src", "dst"]) | ||||
def test_replay_content_failed_copy_retry( | def test_replay_content_failed_copy_retry( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
caplog, | caplog, | ||||
monkeypatch_retry_sleep, | monkeypatch_retry_sleep, | ||||
redis_proc, | |||||
redisdb, | |||||
): | ): | ||||
"""Check the content replayer with a flaky src and dst objstorages | """Check the content replayer with a flaky src and dst objstorages | ||||
for 'get' and 'add' operations. | for 'get' and 'add' operations, and a few non-recoverable failures (some | ||||
objects failed to be replayed). | |||||
""" | """ | ||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | kafka_server, kafka_prefix, objstorages["src"] | ||||
) | ) | ||||
add_failures = {} | add_failures = {} | ||||
get_failures = {} | get_failures = {} | ||||
definitely_failed = set() | definitely_failed = set() | ||||
Show All 40 Lines | ): | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
journal_config={ | journal_config={ | ||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
"error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, | |||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
# check the logs looks as expected | |||||
copied = 0 | copied = 0 | ||||
actually_failed = set() | failed_put = set() | ||||
failed_get = set() | |||||
for record in caplog.records: | for record in caplog.records: | ||||
logtext = record.getMessage() | logtext = record.getMessage() | ||||
if "stored" in logtext: | if "stored" in logtext: | ||||
copied += 1 | copied += 1 | ||||
elif "Failed operation" in logtext: | elif "Failed operation" in logtext: | ||||
assert record.levelno == logging.ERROR | assert record.levelno == logging.ERROR | ||||
assert record.args["retries"] == CONTENT_REPLAY_RETRIES | assert record.args["retries"] == CONTENT_REPLAY_RETRIES | ||||
actually_failed.add(record.args["obj_id"]) | assert record.args["operation"] in ("get_object", "put_object") | ||||
if record.args["operation"] == "get_object": | |||||
failed_get.add(record.args["obj_id"]) | |||||
else: | |||||
failed_put.add(record.args["obj_id"]) | |||||
assert ( | assert ( | ||||
actually_failed == definitely_failed | failed_put | failed_get == definitely_failed | ||||
), "Unexpected object copy failures; see captured log for details" | ), "Unexpected object copy failures; see captured log for details" | ||||
# check failed objects are referenced in redis | |||||
assert set(redisdb.keys()) == { | |||||
f"blob:{objid}".encode() for objid in definitely_failed | |||||
} | |||||
# and have a consistent error report in redis | |||||
for key in redisdb.keys(): | |||||
report = msgpack.loads(redisdb[key]) | |||||
assert report["operation"] in ("get_object", "put_object") | |||||
if report["operation"] == "get_object": | |||||
assert report["obj_id"] in failed_get | |||||
else: | |||||
assert report["obj_id"] in failed_put | |||||
# check valid object are in the dst objstorage, but | |||||
# failed objects are not. | |||||
for (sha1, content) in contents.items(): | for (sha1, content) in contents.items(): | ||||
if hash_to_hex(sha1) in definitely_failed: | if hash_to_hex(sha1) in definitely_failed: | ||||
assert sha1 not in objstorages["dst"] | assert sha1 not in objstorages["dst"] | ||||
continue | continue | ||||
assert sha1 in objstorages["dst"], sha1 | assert sha1 in objstorages["dst"], sha1 | ||||
assert objstorages["dst"].get(sha1) == content | assert objstorages["dst"].get(sha1) == content | ||||
▲ Show 20 Lines • Show All 66 Lines • Show Last 20 Lines |