diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -41,22 +41,38 @@ ) def poll(self): - yield from self.consumer + return self.consumer.poll() - def fill(self, storage, max_messages=None): - num = 0 - for message in self.poll(): - object_type = message.topic.split('.')[-1] - - # Got a message from a topic we did not subscribe to. - assert object_type in self._object_types, object_type + def commit(self): + self.consumer.commit() - self.insert_object(storage, object_type, message.value) - - num += 1 - if max_messages and num >= max_messages: - break - return num + def fill(self, storage, max_messages=None): + nb_messages = 0 + + def done(): + nonlocal nb_messages + return max_messages and nb_messages >= max_messages + + while not done(): + polled = self.poll() + for (partition, messages) in polled.items(): + assert messages + for message in messages: + object_type = partition.topic.split('.')[-1] + + # Got a message from a topic we did not subscribe to. + assert object_type in self._object_types, object_type + + self.insert_object(storage, object_type, message.value) + + nb_messages += 1 + if done(): + break + if done(): + break + self.commit() + logger.info('Processed %d messages.' % nb_messages) + return nb_messages def insert_object(self, storage, object_type, object_): if object_type in ('content', 'directory', 'revision', 'release', diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -17,7 +17,8 @@ from swh.journal.direct_writer import DirectKafkaWriter from swh.journal.replay import StorageReplayer, OBJECT_TYPES -FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'topic key value') +FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') +FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') class MockedDirectKafkaWriter(DirectKafkaWriter): @@ -30,18 +31,27 @@ self._object_types = object_types -@given(lists(object_dicts())) +@given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order(objects): + committed = False queue = [] def send(topic, key, value): key = kafka_to_key(key_to_kafka(key)) value = kafka_to_value(value_to_kafka(value)) - queue.append(FakeKafkaMessage(topic=topic, key=key, value=value)) + queue.append({ + FakeKafkaPartition(topic): + [FakeKafkaMessage(key=key, value=value)] + }) def poll(): - yield from queue + return queue.pop(0) + + def commit(): + nonlocal committed + if queue == []: + committed = True storage1 = Storage() storage1.journal_writer = MockedDirectKafkaWriter() @@ -64,7 +74,10 @@ storage2 = Storage() replayer = MockedStorageReplayer() replayer.poll = poll - replayer.fill(storage2) + replayer.commit = commit + replayer.fill(storage2, max_messages=len(queue)) + + assert committed for attr_name in ('_contents', '_directories', '_revisions', '_releases', '_snapshots', '_origin_visits', '_origins'):