Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_write_replay.py
Show All 24 Lines | def test_write_replay_same_order_batches(objects): | ||||
replayer = MockedJournalClient(queue) | replayer = MockedJournalClient(queue) | ||||
storage1 = Storage() | storage1 = Storage() | ||||
storage1.journal_writer = MockedKafkaWriter(queue) | storage1.journal_writer = MockedKafkaWriter(queue) | ||||
for (obj_type, obj) in objects: | for (obj_type, obj) in objects: | ||||
obj = obj.copy() | obj = obj.copy() | ||||
if obj_type == 'origin_visit': | if obj_type == 'origin_visit': | ||||
origin_id = storage1.origin_add_one(obj.pop('origin')) | storage1.origin_add_one(obj['origin']) | ||||
if 'visit' in obj: | storage1.origin_visit_upsert([obj]) | ||||
del obj['visit'] | |||||
storage1.origin_visit_add(origin_id, **obj) | |||||
else: | else: | ||||
method = getattr(storage1, obj_type + '_add') | method = getattr(storage1, obj_type + '_add') | ||||
try: | try: | ||||
method([obj]) | method([obj]) | ||||
except HashCollision: | except HashCollision: | ||||
pass | pass | ||||
queue_size = sum(len(partition) | queue_size = sum(len(partition) | ||||
▲ Show 20 Lines • Show All 47 Lines • Show Last 20 Lines |