Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_cli.py
Show First 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | |||||
@pytest.fixture | @pytest.fixture | ||||
def monkeypatch_retry_sleep(monkeypatch): | def monkeypatch_retry_sleep(monkeypatch): | ||||
from swh.journal.replay import copy_object, obj_in_objstorage | from swh.journal.replay import copy_object, obj_in_objstorage | ||||
monkeypatch.setattr(copy_object.retry, 'sleep', lambda x: None) | monkeypatch.setattr(copy_object.retry, 'sleep', lambda x: None) | ||||
monkeypatch.setattr(obj_in_objstorage.retry, 'sleep', lambda x: None) | monkeypatch.setattr(obj_in_objstorage.retry, 'sleep', lambda x: None) | ||||
def invoke(*args, env=None): | def invoke(*args, env=None, journal_config=None): | ||||
config = copy.deepcopy(CLI_CONFIG) | config = copy.deepcopy(CLI_CONFIG) | ||||
if journal_config: | |||||
config['journal'] = journal_config | |||||
runner = CliRunner() | runner = CliRunner() | ||||
with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: | with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: | ||||
yaml.dump(config, config_fd) | yaml.dump(config, config_fd) | ||||
config_fd.seek(0) | config_fd.seek(0) | ||||
args = ['-C' + config_fd.name] + list(args) | args = ['-C' + config_fd.name] + list(args) | ||||
return runner.invoke( | return runner.invoke( | ||||
cli, args, obj={'log_level': logging.DEBUG}, env=env, | cli, args, obj={'log_level': logging.DEBUG}, env=env, | ||||
) | ) | ||||
def test_replay( | def test_replay( | ||||
storage, | storage, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int]): | kafka_server: Tuple[Popen, int]): | ||||
(_, port) = kafka_server | (_, kafka_port) = kafka_server | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
producer = Producer({ | producer = Producer({ | ||||
'bootstrap.servers': 'localhost:{}'.format(port), | 'bootstrap.servers': 'localhost:{}'.format(kafka_port), | ||||
'client.id': 'test-producer', | 'client.id': 'test-producer', | ||||
'enable.idempotence': 'true', | 'enable.idempotence': 'true', | ||||
}) | }) | ||||
snapshot = {'id': b'foo', 'branches': { | snapshot = {'id': b'foo', 'branches': { | ||||
b'HEAD': { | b'HEAD': { | ||||
'target_type': 'revision', | 'target_type': 'revision', | ||||
'target': b'\x01'*20, | 'target': b'\x01'*20, | ||||
} | } | ||||
}} # type: Dict[str, Any] | }} # type: Dict[str, Any] | ||||
producer.produce( | producer.produce( | ||||
topic=kafka_prefix+'.snapshot', | topic=kafka_prefix+'.snapshot', | ||||
key=key_to_kafka(snapshot['id']), | key=key_to_kafka(snapshot['id']), | ||||
value=value_to_kafka(snapshot), | value=value_to_kafka(snapshot), | ||||
) | ) | ||||
producer.flush() | producer.flush() | ||||
logger.debug('Flushed producer') | logger.debug('Flushed producer') | ||||
result = invoke( | result = invoke( | ||||
'replay', | 'replay', | ||||
'--broker', '127.0.0.1:%d' % port, | |||||
'--group-id', kafka_consumer_group, | |||||
'--prefix', kafka_prefix, | |||||
'--stop-after-objects', '1', | '--stop-after-objects', '1', | ||||
journal_config={ | |||||
'brokers': ['127.0.0.1:%d' % kafka_port], | |||||
'group_id': kafka_consumer_group, | |||||
'prefix': kafka_prefix, | |||||
}, | |||||
) | ) | ||||
expected = r'Done.\n' | expected = r'Done.\n' | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
assert storage.snapshot_get(snapshot['id']) == { | assert storage.snapshot_get(snapshot['id']) == { | ||||
**snapshot, 'next_branch': None} | **snapshot, 'next_branch': None} | ||||
▲ Show 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | def test_replay_content( | ||||
(_, kafka_port) = kafka_server | (_, kafka_port) = kafka_server | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_port, kafka_prefix, objstorages) | kafka_port, kafka_prefix, objstorages) | ||||
result = invoke( | result = invoke( | ||||
'content-replay', | 'content-replay', | ||||
'--broker', '127.0.0.1:%d' % kafka_port, | |||||
'--group-id', kafka_consumer_group, | |||||
'--prefix', kafka_prefix, | |||||
'--stop-after-objects', str(NUM_CONTENTS), | '--stop-after-objects', str(NUM_CONTENTS), | ||||
journal_config={ | |||||
'brokers': ['127.0.0.1:%d' % kafka_port], | |||||
'group_id': kafka_consumer_group, | |||||
'prefix': kafka_prefix, | |||||
}, | |||||
) | ) | ||||
expected = r'Done.\n' | expected = r'Done.\n' | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
for (sha1, content) in contents.items(): | for (sha1, content) in contents.items(): | ||||
assert sha1 in objstorages['dst'], sha1 | assert sha1 in objstorages['dst'], sha1 | ||||
assert objstorages['dst'].get(sha1) == content | assert objstorages['dst'].get(sha1) == content | ||||
Show All 13 Lines | contents = _fill_objstorage_and_kafka( | ||||
kafka_port, kafka_prefix, objstorages) | kafka_port, kafka_prefix, objstorages) | ||||
caplog.set_level(logging.DEBUG, 'swh.journal.replay') | caplog.set_level(logging.DEBUG, 'swh.journal.replay') | ||||
expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) | expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) | ||||
result = invoke( | result = invoke( | ||||
'content-replay', | 'content-replay', | ||||
'--broker', '127.0.0.1:%d' % kafka_port, | |||||
'--group-id', kafka_consumer_group, | |||||
'--prefix', kafka_prefix, | |||||
'--stop-after-objects', str(NUM_CONTENTS), | '--stop-after-objects', str(NUM_CONTENTS), | ||||
journal_config={ | |||||
'brokers': ['127.0.0.1:%d' % kafka_port], | |||||
'group_id': kafka_consumer_group, | |||||
'prefix': kafka_prefix, | |||||
}, | |||||
) | ) | ||||
expected = r'Done.\n' | expected = r'Done.\n' | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
copied = set() | copied = set() | ||||
for record in caplog.records: | for record in caplog.records: | ||||
logtext = record.getMessage() | logtext = record.getMessage() | ||||
Show All 19 Lines | def test_replay_content_static_group_id( | ||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_port, kafka_prefix, objstorages) | kafka_port, kafka_prefix, objstorages) | ||||
# Setup log capture to fish the consumer settings out of the log messages | # Setup log capture to fish the consumer settings out of the log messages | ||||
caplog.set_level(logging.DEBUG, 'swh.journal.client') | caplog.set_level(logging.DEBUG, 'swh.journal.client') | ||||
result = invoke( | result = invoke( | ||||
'content-replay', | 'content-replay', | ||||
'--broker', '127.0.0.1:%d' % kafka_port, | |||||
'--group-id', kafka_consumer_group, | |||||
'--prefix', kafka_prefix, | |||||
'--stop-after-objects', str(NUM_CONTENTS), | '--stop-after-objects', str(NUM_CONTENTS), | ||||
env={'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}) | env={'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}, | ||||
journal_config={ | |||||
'brokers': ['127.0.0.1:%d' % kafka_port], | |||||
'group_id': kafka_consumer_group, | |||||
'prefix': kafka_prefix, | |||||
}, | |||||
) | |||||
expected = r'Done.\n' | expected = r'Done.\n' | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
consumer_settings = None | consumer_settings = None | ||||
for record in caplog.records: | for record in caplog.records: | ||||
if 'Consumer settings' in record.message: | if 'Consumer settings' in record.message: | ||||
consumer_settings = record.args | consumer_settings = record.args | ||||
Show All 28 Lines | def test_replay_content_exclude( | ||||
excluded_contents = list(contents)[0::2] # picking half of them | excluded_contents = list(contents)[0::2] # picking half of them | ||||
with tempfile.NamedTemporaryFile(mode='w+b') as fd: | with tempfile.NamedTemporaryFile(mode='w+b') as fd: | ||||
fd.write(b''.join(sorted(excluded_contents))) | fd.write(b''.join(sorted(excluded_contents))) | ||||
fd.seek(0) | fd.seek(0) | ||||
result = invoke( | result = invoke( | ||||
'content-replay', | 'content-replay', | ||||
'--broker', '127.0.0.1:%d' % kafka_port, | |||||
'--group-id', kafka_consumer_group, | |||||
'--prefix', kafka_prefix, | |||||
'--stop-after-objects', str(NUM_CONTENTS), | '--stop-after-objects', str(NUM_CONTENTS), | ||||
'--exclude-sha1-file', fd.name, | '--exclude-sha1-file', fd.name, | ||||
journal_config={ | |||||
'brokers': ['127.0.0.1:%d' % kafka_port], | |||||
'group_id': kafka_consumer_group, | |||||
'prefix': kafka_prefix, | |||||
}, | |||||
) | ) | ||||
expected = r'Done.\n' | expected = r'Done.\n' | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
for (sha1, content) in contents.items(): | for (sha1, content) in contents.items(): | ||||
if sha1 in excluded_contents: | if sha1 in excluded_contents: | ||||
assert sha1 not in objstorages['dst'], sha1 | assert sha1 not in objstorages['dst'], sha1 | ||||
Show All 31 Lines | for i, (sha1, content) in enumerate(contents.items()): | ||||
break | break | ||||
objstorages['dst'].add(content, obj_id=sha1) | objstorages['dst'].add(content, obj_id=sha1) | ||||
caplog.set_level(logging.DEBUG, 'swh.journal.replay') | caplog.set_level(logging.DEBUG, 'swh.journal.replay') | ||||
result = invoke( | result = invoke( | ||||
'content-replay', | 'content-replay', | ||||
'--broker', '127.0.0.1:%d' % kafka_port, | |||||
'--group-id', kafka_consumer_group, | |||||
'--prefix', kafka_prefix, | |||||
'--stop-after-objects', str(NUM_CONTENTS), | '--stop-after-objects', str(NUM_CONTENTS), | ||||
'--check-dst' if check_dst else '--no-check-dst', | '--check-dst' if check_dst else '--no-check-dst', | ||||
journal_config={ | |||||
'brokers': ['127.0.0.1:%d' % kafka_port], | |||||
'group_id': kafka_consumer_group, | |||||
'prefix': kafka_prefix, | |||||
}, | |||||
) | ) | ||||
expected = r'Done.\n' | expected = r'Done.\n' | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
copied = 0 | copied = 0 | ||||
in_dst = 0 | in_dst = 0 | ||||
for record in caplog.records: | for record in caplog.records: | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | for i, (sha1, content) in enumerate(contents.items()): | ||||
failures['in', sha1] = 1 | failures['in', sha1] = 1 | ||||
orig_dst = objstorages['dst'] | orig_dst = objstorages['dst'] | ||||
objstorages['dst'] = FlakyObjStorage(state=orig_dst.state, | objstorages['dst'] = FlakyObjStorage(state=orig_dst.state, | ||||
failures=failures) | failures=failures) | ||||
result = invoke( | result = invoke( | ||||
'content-replay', | 'content-replay', | ||||
'--broker', '127.0.0.1:%d' % kafka_port, | |||||
'--group-id', kafka_consumer_group, | |||||
'--prefix', kafka_prefix, | |||||
'--stop-after-objects', str(NUM_CONTENTS), | '--stop-after-objects', str(NUM_CONTENTS), | ||||
'--check-dst', | '--check-dst', | ||||
journal_config={ | |||||
'brokers': ['127.0.0.1:%d' % kafka_port], | |||||
'group_id': kafka_consumer_group, | |||||
'prefix': kafka_prefix, | |||||
}, | |||||
) | ) | ||||
expected = r'Done.\n' | expected = r'Done.\n' | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
for (sha1, content) in contents.items(): | for (sha1, content) in contents.items(): | ||||
assert sha1 in objstorages['dst'], sha1 | assert sha1 in objstorages['dst'], sha1 | ||||
assert objstorages['dst'].get(sha1) == content | assert objstorages['dst'].get(sha1) == content | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | objstorages['src'] = FlakyObjStorage( | ||||
state=objstorages['src'].state, | state=objstorages['src'].state, | ||||
failures=get_failures, | failures=get_failures, | ||||
) | ) | ||||
caplog.set_level(logging.DEBUG, 'swh.journal.replay') | caplog.set_level(logging.DEBUG, 'swh.journal.replay') | ||||
result = invoke( | result = invoke( | ||||
'content-replay', | 'content-replay', | ||||
'--broker', '127.0.0.1:%d' % kafka_port, | |||||
'--group-id', kafka_consumer_group, | |||||
'--prefix', kafka_prefix, | |||||
'--stop-after-objects', str(NUM_CONTENTS), | '--stop-after-objects', str(NUM_CONTENTS), | ||||
journal_config={ | |||||
'brokers': ['127.0.0.1:%d' % kafka_port], | |||||
'group_id': kafka_consumer_group, | |||||
'prefix': kafka_prefix, | |||||
}, | |||||
) | ) | ||||
expected = r'Done.\n' | expected = r'Done.\n' | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
copied = 0 | copied = 0 | ||||
actually_failed = set() | actually_failed = set() | ||||
for record in caplog.records: | for record in caplog.records: | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | for i, sha1 in enumerate(contents): | ||||
del objstorages['src'].state[sha1] | del objstorages['src'].state[sha1] | ||||
contents_deleted.add(hash_to_hex(sha1)) | contents_deleted.add(hash_to_hex(sha1)) | ||||
caplog.set_level(logging.DEBUG, 'swh.journal.replay') | caplog.set_level(logging.DEBUG, 'swh.journal.replay') | ||||
result = invoke( | result = invoke( | ||||
'content-replay', | 'content-replay', | ||||
'--broker', '127.0.0.1:%d' % kafka_port, | |||||
'--group-id', kafka_consumer_group, | |||||
'--prefix', kafka_prefix, | |||||
'--stop-after-objects', str(NUM_CONTENTS), | '--stop-after-objects', str(NUM_CONTENTS), | ||||
journal_config={ | |||||
'brokers': ['127.0.0.1:%d' % kafka_port], | |||||
'group_id': kafka_consumer_group, | |||||
'prefix': kafka_prefix, | |||||
}, | |||||
) | ) | ||||
expected = r'Done.\n' | expected = r'Done.\n' | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
copied = 0 | copied = 0 | ||||
not_in_src = set() | not_in_src = set() | ||||
for record in caplog.records: | for record in caplog.records: | ||||
Show All 21 Lines |