Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_write_replay.py
Show First 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | def test_write_replay_same_order_batches(objects): | ||||
storage1.journal_writer = MockedKafkaWriter(queue) | storage1.journal_writer = MockedKafkaWriter(queue) | ||||
for (obj_type, obj) in objects: | for (obj_type, obj) in objects: | ||||
obj = obj.copy() | obj = obj.copy() | ||||
if obj_type == 'origin_visit': | if obj_type == 'origin_visit': | ||||
storage1.origin_add_one({'url': obj['origin']}) | storage1.origin_add_one({'url': obj['origin']}) | ||||
storage1.origin_visit_upsert([obj]) | storage1.origin_visit_upsert([obj]) | ||||
else: | else: | ||||
if obj_type == 'content' and obj.get('status') == 'absent': | |||||
obj_type = 'skipped_content' | |||||
method = getattr(storage1, obj_type + '_add') | method = getattr(storage1, obj_type + '_add') | ||||
try: | try: | ||||
method([obj]) | method([obj]) | ||||
except HashCollision: | except HashCollision: | ||||
pass | pass | ||||
queue_size = len(queue) | queue_size = len(queue) | ||||
assert replayer.max_messages == 0 | assert replayer.max_messages == 0 | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def test_write_replay_content(objects): | ||||
storage1.journal_writer = MockedKafkaWriter(queue) | storage1.journal_writer = MockedKafkaWriter(queue) | ||||
contents = [] | contents = [] | ||||
for (obj_type, obj) in objects: | for (obj_type, obj) in objects: | ||||
obj = obj.copy() | obj = obj.copy() | ||||
if obj_type == 'content': | if obj_type == 'content': | ||||
# avoid hash collision | # avoid hash collision | ||||
if not storage1.content_find(obj): | if not storage1.content_find(obj): | ||||
if obj.get('status') != 'absent': | |||||
storage1.content_add([obj]) | storage1.content_add([obj]) | ||||
contents.append(obj) | contents.append(obj) | ||||
queue_size = len(queue) | queue_size = len(queue) | ||||
assert replayer.max_messages == 0 | assert replayer.max_messages == 0 | ||||
replayer.max_messages = queue_size | replayer.max_messages = queue_size | ||||
storage2 = InMemoryStorage() | storage2 = InMemoryStorage() | ||||
worker_fn = functools.partial(process_replay_objects_content, | worker_fn = functools.partial(process_replay_objects_content, | ||||
Show All 12 Lines |