Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/replayer/tests/test_cli.py
Show All 29 Lines | |||||
CLI_CONFIG = { | CLI_CONFIG = { | ||||
"objstorage": {"cls": "mocked", "name": "src",}, | "objstorage": {"cls": "mocked", "name": "src",}, | ||||
"objstorage_dst": {"cls": "mocked", "name": "dst",}, | "objstorage_dst": {"cls": "mocked", "name": "dst",}, | ||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def monkeypatch_retry_sleep(monkeypatch): | def monkeypatch_retry_sleep(monkeypatch): | ||||
from swh.objstorage.replayer.replay import copy_object, obj_in_objstorage | from swh.objstorage.replayer.replay import get_object, obj_in_objstorage, put_object | ||||
monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) | monkeypatch.setattr(get_object.retry, "sleep", lambda x: None) | ||||
monkeypatch.setattr(put_object.retry, "sleep", lambda x: None) | |||||
monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) | monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) | ||||
def _patch_objstorages(names): | def _patch_objstorages(names): | ||||
objstorages = {name: InMemoryObjStorage() for name in names} | objstorages = {name: InMemoryObjStorage() for name in names} | ||||
def get_mock_objstorage(cls, **args): | def get_mock_objstorage(cls, **args): | ||||
assert cls == "mocked", cls | assert cls == "mocked", cls | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | |||||
@_patch_objstorages(["src", "dst"]) | @_patch_objstorages(["src", "dst"]) | ||||
def test_replay_content( | def test_replay_content( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
): | ): | ||||
"""Check the content replayer in normal conditions""" | |||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | kafka_server, kafka_prefix, objstorages["src"] | ||||
) | ) | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
Show All 17 Lines | |||||
@_patch_objstorages(["src", "dst"]) | @_patch_objstorages(["src", "dst"]) | ||||
def test_replay_content_structured_log( | def test_replay_content_structured_log( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
caplog, | caplog, | ||||
): | ): | ||||
"""Check the logs produced by the content replayer in normal conditions""" | |||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | kafka_server, kafka_prefix, objstorages["src"] | ||||
) | ) | ||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | ||||
expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) | expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) | ||||
Show All 10 Lines | ): | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
copied = set() | copied = set() | ||||
for record in caplog.records: | for record in caplog.records: | ||||
logtext = record.getMessage() | logtext = record.getMessage() | ||||
if "copied" in logtext: | if "stored" in logtext: | ||||
copied.add(record.args["obj_id"]) | copied.add(record.args["obj_id"]) | ||||
assert ( | assert ( | ||||
copied == expected_obj_ids | copied == expected_obj_ids | ||||
), "Mismatched logging; see captured log output for details." | ), "Mismatched logging; see captured log output for details." | ||||
@_patch_objstorages(["src", "dst"]) | @_patch_objstorages(["src", "dst"]) | ||||
def test_replay_content_static_group_id( | def test_replay_content_static_group_id( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
caplog, | caplog, | ||||
): | ): | ||||
"""Check the content replayer in normal conditions | |||||
with KAFKA_GROUP_INSTANCE_ID set | |||||
""" | |||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | kafka_server, kafka_prefix, objstorages["src"] | ||||
) | ) | ||||
# Setup log capture to fish the consumer settings out of the log messages | # Setup log capture to fish the consumer settings out of the log messages | ||||
caplog.set_level(logging.DEBUG, "swh.journal.client") | caplog.set_level(logging.DEBUG, "swh.journal.client") | ||||
Show All 33 Lines | |||||
@_patch_objstorages(["src", "dst"]) | @_patch_objstorages(["src", "dst"]) | ||||
def test_replay_content_exclude( | def test_replay_content_exclude( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
): | ): | ||||
"""Check the content replayer in normal conditions | |||||
with a exclusion file (--exclude-sha1-file) | |||||
""" | |||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | kafka_server, kafka_prefix, objstorages["src"] | ||||
) | ) | ||||
excluded_contents = list(contents)[0::2] # picking half of them | excluded_contents = list(contents)[0::2] # picking half of them | ||||
with tempfile.NamedTemporaryFile(mode="w+b") as fd: | with tempfile.NamedTemporaryFile(mode="w+b") as fd: | ||||
fd.write(b"".join(sorted(excluded_contents))) | fd.write(b"".join(sorted(excluded_contents))) | ||||
Show All 40 Lines | def test_replay_content_check_dst( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
check_dst: bool, | check_dst: bool, | ||||
expected_copied: int, | expected_copied: int, | ||||
expected_in_dst: int, | expected_in_dst: int, | ||||
caplog, | caplog, | ||||
): | ): | ||||
"""Check the content replayer in normal conditions | |||||
with some objects already in the dst objstorage. | |||||
When check_dst is True, expect those not to be neither retrieved from the | |||||
src objstorage nor pushed in the dst objstorage. | |||||
""" | |||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | kafka_server, kafka_prefix, objstorages["src"] | ||||
) | ) | ||||
# add some objects in the dst objstorage | |||||
for i, (sha1, content) in enumerate(contents.items()): | for i, (sha1, content) in enumerate(contents.items()): | ||||
if i >= NUM_CONTENTS_DST: | if i >= NUM_CONTENTS_DST: | ||||
break | break | ||||
objstorages["dst"].add(content, obj_id=sha1) | objstorages["dst"].add(content, obj_id=sha1) | ||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
"--check-dst" if check_dst else "--no-check-dst", | "--check-dst" if check_dst else "--no-check-dst", | ||||
journal_config={ | journal_config={ | ||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
copied = 0 | retrieved = 0 | ||||
stored = 0 | |||||
in_dst = 0 | in_dst = 0 | ||||
for record in caplog.records: | for record in caplog.records: | ||||
logtext = record.getMessage() | logtext = record.getMessage() | ||||
if "copied" in logtext: | if "retrieved" in logtext: | ||||
copied += 1 | retrieved += 1 | ||||
elif "stored" in logtext: | |||||
stored += 1 | |||||
elif "in dst" in logtext: | elif "in dst" in logtext: | ||||
in_dst += 1 | in_dst += 1 | ||||
assert ( | assert ( | ||||
copied == expected_copied and in_dst == expected_in_dst | retrieved == expected_copied | ||||
and stored == expected_copied | |||||
and in_dst == expected_in_dst | |||||
), "Unexpected amount of objects copied, see the captured log for details" | ), "Unexpected amount of objects copied, see the captured log for details" | ||||
for (sha1, content) in contents.items(): | for (sha1, content) in contents.items(): | ||||
assert sha1 in objstorages["dst"], sha1 | assert sha1 in objstorages["dst"], sha1 | ||||
assert objstorages["dst"].get(sha1) == content | assert objstorages["dst"].get(sha1) == content | ||||
class FlakyObjStorage(InMemoryObjStorage): | class FlakyObjStorage(InMemoryObjStorage): | ||||
"""Flaky objstorage | |||||
Any 'get', 'add' or 'in' (i.e. '__contains__()') operation will fail | |||||
according to configured 'failures'. | |||||
'failures' is expected to be a dict which keys are couples (operation, | |||||
obj_id) and values are the number of time the operation 'operation' is | |||||
expected to fail for object 'obj_id' before being performed successfully. | |||||
An optional state ('state') can be also given as argument (see | |||||
InMemoryObjStorage). | |||||
""" | |||||
def __init__(self, *args, **kwargs): | def __init__(self, *args, **kwargs): | ||||
state = kwargs.pop("state") | state = kwargs.pop("state") | ||||
self.failures_left = Counter(kwargs.pop("failures")) | self.failures_left = Counter(kwargs.pop("failures")) | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
if state: | if state: | ||||
self.state = state | self.state = state | ||||
def flaky_operation(self, op, obj_id): | def flaky_operation(self, op, obj_id): | ||||
Show All 16 Lines | |||||
@_patch_objstorages(["src", "dst"]) | @_patch_objstorages(["src", "dst"]) | ||||
def test_replay_content_check_dst_retry( | def test_replay_content_check_dst_retry( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
monkeypatch_retry_sleep, | monkeypatch_retry_sleep, | ||||
caplog, | |||||
): | ): | ||||
"""Check the content replayer with a flaky dst objstorage | |||||
for 'in' operations. | |||||
""" | |||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | kafka_server, kafka_prefix, objstorages["src"] | ||||
) | ) | ||||
# build a flaky dst objstorage in which the 'in' operation for the first | |||||
# NUM_CONTENT_DST objects will fail once | |||||
failures = {} | failures = {} | ||||
for i, (sha1, content) in enumerate(contents.items()): | for i, (sha1, content) in enumerate(contents.items()): | ||||
if i >= NUM_CONTENTS_DST: | if i >= NUM_CONTENTS_DST: | ||||
break | break | ||||
objstorages["dst"].add(content, obj_id=sha1) | objstorages["dst"].add(content, obj_id=sha1) | ||||
failures["in", sha1] = 1 | failures["in", sha1] = 1 | ||||
orig_dst = objstorages["dst"] | orig_dst = objstorages["dst"] | ||||
objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) | objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) | ||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | |||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--check-dst", | |||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
"--check-dst", | |||||
journal_config={ | journal_config={ | ||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
# check that exactly NUM_CONTENTS_DST 'in' operations have failed once | |||||
failed_in = 0 | |||||
for record in caplog.records: | |||||
logtext = record.getMessage() | |||||
if "Retry operation obj_in_objstorage" in logtext: | |||||
failed_in += 1 | |||||
elif "Retry operation" in logtext: | |||||
assert False, "No other failure expected than 'in' operations" | |||||
assert failed_in == NUM_CONTENTS_DST | |||||
# in the end, the replay process should be OK | |||||
for (sha1, content) in contents.items(): | for (sha1, content) in contents.items(): | ||||
assert sha1 in objstorages["dst"], sha1 | assert sha1 in objstorages["dst"], sha1 | ||||
assert objstorages["dst"].get(sha1) == content | assert objstorages["dst"].get(sha1) == content | ||||
@_patch_objstorages(["src", "dst"]) | @_patch_objstorages(["src", "dst"]) | ||||
def test_replay_content_failed_copy_retry( | def test_replay_content_failed_copy_retry( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
caplog, | caplog, | ||||
monkeypatch_retry_sleep, | monkeypatch_retry_sleep, | ||||
): | ): | ||||
"""Check the content replayer with a flaky src and dst objstorages | |||||
for 'get' and 'add' operations. | |||||
""" | |||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | kafka_server, kafka_prefix, objstorages["src"] | ||||
) | ) | ||||
add_failures = {} | add_failures = {} | ||||
get_failures = {} | get_failures = {} | ||||
definitely_failed = set() | definitely_failed = set() | ||||
Show All 19 Lines | for i, sha1 in enumerate(contents): | ||||
else: | else: | ||||
get_failures["get", sha1] = num_failures | get_failures["get", sha1] = num_failures | ||||
# Only contents that have CONTENT_REPLAY_RETRIES or more are | # Only contents that have CONTENT_REPLAY_RETRIES or more are | ||||
# definitely failing | # definitely failing | ||||
if num_failures >= CONTENT_REPLAY_RETRIES: | if num_failures >= CONTENT_REPLAY_RETRIES: | ||||
definitely_failed.add(hash_to_hex(sha1)) | definitely_failed.add(hash_to_hex(sha1)) | ||||
assert add_failures | |||||
assert get_failures | |||||
assert definitely_failed | |||||
objstorages["dst"] = FlakyObjStorage( | objstorages["dst"] = FlakyObjStorage( | ||||
state=objstorages["dst"].state, failures=add_failures, | state=objstorages["dst"].state, failures=add_failures, | ||||
) | ) | ||||
objstorages["src"] = FlakyObjStorage( | objstorages["src"] = FlakyObjStorage( | ||||
state=objstorages["src"].state, failures=get_failures, | state=objstorages["src"].state, failures=get_failures, | ||||
) | ) | ||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | ||||
Show All 11 Lines | ): | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
copied = 0 | copied = 0 | ||||
actually_failed = set() | actually_failed = set() | ||||
for record in caplog.records: | for record in caplog.records: | ||||
logtext = record.getMessage() | logtext = record.getMessage() | ||||
if "copied" in logtext: | if "stored" in logtext: | ||||
copied += 1 | copied += 1 | ||||
elif "Failed operation" in logtext: | elif "Failed operation" in logtext: | ||||
assert record.levelno == logging.ERROR | assert record.levelno == logging.ERROR | ||||
assert record.args["retries"] == CONTENT_REPLAY_RETRIES | assert record.args["retries"] == CONTENT_REPLAY_RETRIES | ||||
actually_failed.add(record.args["obj_id"]) | actually_failed.add(record.args["obj_id"]) | ||||
assert ( | assert ( | ||||
actually_failed == definitely_failed | actually_failed == definitely_failed | ||||
Show All 11 Lines | |||||
@_patch_objstorages(["src", "dst"]) | @_patch_objstorages(["src", "dst"]) | ||||
def test_replay_content_objnotfound( | def test_replay_content_objnotfound( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
caplog, | caplog, | ||||
): | ): | ||||
"""Check the ContentNotFound is not considered a failure to retry""" | |||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | kafka_server, kafka_prefix, objstorages["src"] | ||||
) | ) | ||||
# delete a few objects from the src objstorage | |||||
num_contents_deleted = 5 | num_contents_deleted = 5 | ||||
contents_deleted = set() | contents_deleted = set() | ||||
for i, sha1 in enumerate(contents): | for i, sha1 in enumerate(contents): | ||||
if i >= num_contents_deleted: | if i >= num_contents_deleted: | ||||
break | break | ||||
del objstorages["src"].state[sha1] | del objstorages["src"].state[sha1] | ||||
contents_deleted.add(hash_to_hex(sha1)) | contents_deleted.add(hash_to_hex(sha1)) | ||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
journal_config={ | journal_config={ | ||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
copied = 0 | copied = 0 | ||||
not_in_src = set() | not_in_src = set() | ||||
for record in caplog.records: | for record in caplog.records: | ||||
logtext = record.getMessage() | logtext = record.getMessage() | ||||
if "copied" in logtext: | if "stored" in logtext: | ||||
copied += 1 | copied += 1 | ||||
elif "object not found" in logtext: | elif "object not found" in logtext: | ||||
# Check that the object id can be recovered from logs | # Check that the object id can be recovered from logs | ||||
assert record.levelno == logging.ERROR | assert record.levelno == logging.ERROR | ||||
not_in_src.add(record.args["obj_id"]) | not_in_src.add(record.args["obj_id"]) | ||||
elif "Retry operation" in logtext: | |||||
assert False, "Not found objects should not be retried" | |||||
assert ( | assert ( | ||||
copied == NUM_CONTENTS - num_contents_deleted | copied == NUM_CONTENTS - num_contents_deleted | ||||
), "Unexpected number of contents copied" | ), "Unexpected number of contents copied" | ||||
assert ( | assert ( | ||||
not_in_src == contents_deleted | not_in_src == contents_deleted | ||||
), "Mismatch between deleted contents and not_in_src logs" | ), "Mismatch between deleted contents and not_in_src logs" | ||||
for (sha1, content) in contents.items(): | for (sha1, content) in contents.items(): | ||||
if sha1 not in objstorages["src"]: | if sha1 not in objstorages["src"]: | ||||
continue | continue | ||||
assert sha1 in objstorages["dst"], sha1 | assert sha1 in objstorages["dst"], sha1 | ||||
assert objstorages["dst"].get(sha1) == content | assert objstorages["dst"].get(sha1) == content |