Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_write_replay.py
Show First 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | |||||
# TODO: add test for hash collision | # TODO: add test for hash collision | ||||
@given(lists(object_dicts(), min_size=1)) | @given(lists(object_dicts(), min_size=1)) | ||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | @settings(suppress_health_check=[HealthCheck.too_slow]) | ||||
def test_write_replay_content(objects): | def test_write_replay_content(objects): | ||||
queue = [] | queue = [] | ||||
replayer = MockedJournalClient(queue) | replayer = MockedJournalClient(queue) | ||||
storage1 = Storage() | storage1 = Storage() | ||||
storage1.journal_writer = MockedKafkaWriter(queue) | storage1.journal_writer = MockedKafkaWriter(queue) | ||||
contents = [] | |||||
for (obj_type, obj) in objects: | for (obj_type, obj) in objects: | ||||
obj = obj.copy() | obj = obj.copy() | ||||
if obj_type == 'content': | if obj_type == 'content': | ||||
# avoid hash collision | |||||
if not storage1.content_find(obj): | |||||
storage1.content_add([obj]) | storage1.content_add([obj]) | ||||
contents.append(obj) | |||||
queue_size = len(queue) | queue_size = len(queue) | ||||
storage2 = Storage() | storage2 = Storage() | ||||
worker_fn = functools.partial(process_replay_objects_content, | worker_fn = functools.partial(process_replay_objects_content, | ||||
src=storage1.objstorage, | src=storage1.objstorage, | ||||
dst=storage2.objstorage) | dst=storage2.objstorage) | ||||
nb_messages = 0 | nb_messages = 0 | ||||
while nb_messages < queue_size: | while nb_messages < queue_size: | ||||
nb_messages += replayer.process(worker_fn) | nb_messages += replayer.process(worker_fn) | ||||
assert storage1.objstorage.state == storage2.objstorage.state | # only content with status visible will be copied in storage2 | ||||
expected_objstorage_state = { | |||||
c['sha1']: c['data'] for c in contents if c['status'] == 'visible' | |||||
} | |||||
assert expected_objstorage_state == storage2.objstorage.state |