Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/replayer/tests/test_cli.py
Show First 20 Lines • Show All 82 Lines • ▼ Show 20 Lines | def test_replay_help(): | ||||
) | ) | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.match(expected, result.output, re.MULTILINE), result.output | assert re.match(expected, result.output, re.MULTILINE), result.output | ||||
NUM_CONTENTS = 10 | NUM_CONTENTS = 10 | ||||
def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages): | def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorage): | ||||
producer = Producer( | producer = Producer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server, | "bootstrap.servers": kafka_server, | ||||
"client.id": "test-producer", | "client.id": "test-producer", | ||||
"acks": "all", | "acks": "all", | ||||
} | } | ||||
) | ) | ||||
contents = {} | contents = {} | ||||
for i in range(NUM_CONTENTS): | for i in range(NUM_CONTENTS): | ||||
content = b"\x00" * 19 + bytes([i]) | content = b"\x00" * 19 + bytes([i]) | ||||
sha1 = objstorages["src"].add(content) | sha1 = objstorage.add(content) | ||||
contents[sha1] = content | contents[sha1] = content | ||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix + ".content", | topic=kafka_prefix + ".content", | ||||
key=key_to_kafka(sha1), | key=key_to_kafka(sha1), | ||||
value=key_to_kafka({"sha1": sha1, "status": "visible",}), | value=key_to_kafka({"sha1": sha1, "status": "visible",}), | ||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
return contents | return contents | ||||
@_patch_objstorages(["src", "dst"]) | @_patch_objstorages(["src", "dst"]) | ||||
def test_replay_content( | def test_replay_content( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
): | ): | ||||
contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | |||||
) | |||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
journal_config={ | journal_config={ | ||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
Show All 14 Lines | |||||
def test_replay_content_structured_log( | def test_replay_content_structured_log( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
caplog, | caplog, | ||||
): | ): | ||||
contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | |||||
) | |||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | ||||
expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) | expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
Show All 23 Lines | |||||
def test_replay_content_static_group_id( | def test_replay_content_static_group_id( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
caplog, | caplog, | ||||
): | ): | ||||
contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | |||||
) | |||||
# Setup log capture to fish the consumer settings out of the log messages | # Setup log capture to fish the consumer settings out of the log messages | ||||
caplog.set_level(logging.DEBUG, "swh.journal.client") | caplog.set_level(logging.DEBUG, "swh.journal.client") | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
Show All 30 Lines | |||||
@_patch_objstorages(["src", "dst"]) | @_patch_objstorages(["src", "dst"]) | ||||
def test_replay_content_exclude( | def test_replay_content_exclude( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
): | ): | ||||
contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | |||||
) | |||||
excluded_contents = list(contents)[0::2] # picking half of them | excluded_contents = list(contents)[0::2] # picking half of them | ||||
with tempfile.NamedTemporaryFile(mode="w+b") as fd: | with tempfile.NamedTemporaryFile(mode="w+b") as fd: | ||||
fd.write(b"".join(sorted(excluded_contents))) | fd.write(b"".join(sorted(excluded_contents))) | ||||
fd.seek(0) | fd.seek(0) | ||||
result = invoke( | result = invoke( | ||||
Show All 37 Lines | def test_replay_content_check_dst( | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
check_dst: bool, | check_dst: bool, | ||||
expected_copied: int, | expected_copied: int, | ||||
expected_in_dst: int, | expected_in_dst: int, | ||||
caplog, | caplog, | ||||
): | ): | ||||
contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | |||||
) | |||||
for i, (sha1, content) in enumerate(contents.items()): | for i, (sha1, content) in enumerate(contents.items()): | ||||
if i >= NUM_CONTENTS_DST: | if i >= NUM_CONTENTS_DST: | ||||
break | break | ||||
objstorages["dst"].add(content, obj_id=sha1) | objstorages["dst"].add(content, obj_id=sha1) | ||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | |||||
def test_replay_content_check_dst_retry( | def test_replay_content_check_dst_retry( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
monkeypatch_retry_sleep, | monkeypatch_retry_sleep, | ||||
): | ): | ||||
contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | |||||
) | |||||
failures = {} | failures = {} | ||||
for i, (sha1, content) in enumerate(contents.items()): | for i, (sha1, content) in enumerate(contents.items()): | ||||
if i >= NUM_CONTENTS_DST: | if i >= NUM_CONTENTS_DST: | ||||
break | break | ||||
objstorages["dst"].add(content, obj_id=sha1) | objstorages["dst"].add(content, obj_id=sha1) | ||||
failures["in", sha1] = 1 | failures["in", sha1] = 1 | ||||
Show All 25 Lines | def test_replay_content_failed_copy_retry( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
caplog, | caplog, | ||||
monkeypatch_retry_sleep, | monkeypatch_retry_sleep, | ||||
): | ): | ||||
contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | |||||
) | |||||
add_failures = {} | add_failures = {} | ||||
get_failures = {} | get_failures = {} | ||||
definitely_failed = set() | definitely_failed = set() | ||||
# We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. | # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. | ||||
# We generate failures for 2 different operations, get and add. | # We generate failures for 2 different operations, get and add. | ||||
num_retry_contents = 2 * CONTENT_REPLAY_RETRIES | num_retry_contents = 2 * CONTENT_REPLAY_RETRIES | ||||
▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | |||||
def test_replay_content_objnotfound( | def test_replay_content_objnotfound( | ||||
objstorages, | objstorages, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
caplog, | caplog, | ||||
): | ): | ||||
contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | |||||
) | |||||
num_contents_deleted = 5 | num_contents_deleted = 5 | ||||
contents_deleted = set() | contents_deleted = set() | ||||
for i, sha1 in enumerate(contents): | for i, sha1 in enumerate(contents): | ||||
if i >= num_contents_deleted: | if i >= num_contents_deleted: | ||||
break | break | ||||
▲ Show 20 Lines • Show All 43 Lines • Show Last 20 Lines |