Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_write_replay.py
Show All 21 Lines | |||||
class MockedDirectKafkaWriter(DirectKafkaWriter): | class MockedDirectKafkaWriter(DirectKafkaWriter): | ||||
def __init__(self): | def __init__(self): | ||||
self._prefix = 'prefix' | self._prefix = 'prefix' | ||||
class MockedStorageReplayer(StorageReplayer): | class MockedStorageReplayer(StorageReplayer): | ||||
def __init__(self, object_types=OBJECT_TYPES): | def __init__(self, storage, max_messages, object_types=OBJECT_TYPES): | ||||
self.storage = storage | |||||
self.max_messages = max_messages | |||||
self._object_types = object_types | self._object_types = object_types | ||||
@given(lists(object_dicts(), min_size=1)) | @given(lists(object_dicts(), min_size=1)) | ||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | @settings(suppress_health_check=[HealthCheck.too_slow]) | ||||
def test_write_replay_same_order(objects): | def test_write_replay_same_order(objects): | ||||
committed = False | committed = False | ||||
queue = [] | queue = [] | ||||
Show All 28 Lines | for (obj_type, obj) in objects: | ||||
else: | else: | ||||
method = getattr(storage1, obj_type + '_add') | method = getattr(storage1, obj_type + '_add') | ||||
try: | try: | ||||
method([obj]) | method([obj]) | ||||
except HashCollision: | except HashCollision: | ||||
pass | pass | ||||
storage2 = Storage() | storage2 = Storage() | ||||
replayer = MockedStorageReplayer() | replayer = MockedStorageReplayer(storage2, max_messages=len(queue)) | ||||
replayer.poll = poll | replayer.poll = poll | ||||
replayer.commit = commit | replayer.commit = commit | ||||
replayer.fill(storage2, max_messages=len(queue)) | replayer.process() | ||||
assert committed | assert committed | ||||
for attr_name in ('_contents', '_directories', '_revisions', '_releases', | for attr_name in ('_contents', '_directories', '_revisions', '_releases', | ||||
'_snapshots', '_origin_visits', '_origins'): | '_snapshots', '_origin_visits', '_origins'): | ||||
assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ | assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ | ||||
attr_name | attr_name | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | for (obj_type, obj) in objects: | ||||
except HashCollision: | except HashCollision: | ||||
pass | pass | ||||
queue_size = sum(len(partition) | queue_size = sum(len(partition) | ||||
for batch in queue | for batch in queue | ||||
for partition in batch.values()) | for partition in batch.values()) | ||||
storage2 = Storage() | storage2 = Storage() | ||||
replayer = MockedStorageReplayer() | replayer = MockedStorageReplayer(storage2, max_messages=queue_size) | ||||
replayer.poll = poll | replayer.poll = poll | ||||
replayer.commit = commit | replayer.commit = commit | ||||
replayer.fill(storage2, max_messages=queue_size) | replayer.process() | ||||
assert committed | assert committed | ||||
for attr_name in ('_contents', '_directories', '_revisions', '_releases', | for attr_name in ('_contents', '_directories', '_revisions', '_releases', | ||||
'_snapshots', '_origin_visits', '_origins'): | '_snapshots', '_origin_visits', '_origins'): | ||||
assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ | assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ | ||||
attr_name | attr_name | ||||
# TODO: add test for hash collision | # TODO: add test for hash collision |