diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -15,6 +15,7 @@ from swh.journal import DEFAULT_PREFIX from swh.journal.client import JournalClient from swh.journal.replay import process_replay_objects +from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @@ -135,6 +136,68 @@ ctx.exit(0) +@cli.command() +@click.option('--broker', 'brokers', type=str, multiple=True, + hidden=True, # prefer config file + help='Kafka broker to connect to.') +@click.option('--prefix', type=str, default=DEFAULT_PREFIX, + hidden=True, # prefer config file + help='Prefix of Kafka topic names to read from.') +@click.option('--group-id', '--consumer-id', type=str, + hidden=True, # prefer config file + help='Name of the consumer/group id for reading from Kafka.') +@click.pass_context +def content_replay(ctx, brokers, prefix, group_id): + """Fill a destination Object Storage (typically a mirror) by reading a Journal + and retrieving objects from an existing source ObjStorage. + + There can be several 'replayers' filling a given ObjStorage as long as they + use the same `group-id`. + + This service retrieves object ids to copy from the 'content' topic. It will + only copy object's content if the object's description in the kafka + nmessage has the status:visible set. + """ + logger = logging.getLogger(__name__) + conf = ctx.obj['config'] + try: + objstorage_src = get_storage(**conf.pop('objstorage_src')) + except KeyError: + ctx.fail('You must have a source objstorage configured in ' + 'your config file.') + try: + objstorage_dst = get_storage(**conf.pop('objstorage_dst')) + except KeyError: + ctx.fail('You must have a destination objstorage configured ' + 'in your config file.') + + if brokers is None: + brokers = conf.get('journal', {}).get('brokers') + if not brokers: + ctx.fail('You must specify at least one kafka broker.') + + if prefix is None: + prefix = conf.get('journal', {}).get('prefix') + + if group_id is None: + group_id = conf.get('journal', {}).get('group_id') + + client = JournalClient(brokers, prefix, group_id) + worker_fn = functools.partial(process_replay_objects_content, + src=objstorage_src, + dst=objstorage_dst) + + try: + nb_messages = 0 + while True: + nb_messages += client.process(worker_fn) + logger.info('Processed %d messages.' % nb_messages) + except KeyboardInterrupt: + ctx.exit(0) + else: + print('Done.') + + def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -78,12 +78,6 @@ self.max_messages = max_messages self._object_types = object_types - def poll(self): - return self.consumer.poll() - - def commit(self): - self.consumer.commit() - def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. @@ -94,7 +88,7 @@ argument. """ nb_messages = 0 - polled = self.poll() + polled = self.consumer.poll() for (partition, messages) in polled.items(): object_type = partition.topic.split('.')[-1] # Got a message from a topic we did not subscribe to. @@ -104,5 +98,5 @@ nb_messages += len(messages) - self.commit() + self.consumer.commit() return nb_messages diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -6,6 +6,7 @@ import logging from swh.storage import HashCollision +from swh.objstorage.objstorage import ID_HASH_ALGO logger = logging.getLogger(__name__) @@ -38,3 +39,16 @@ for obj in objects]) else: assert False + + +def process_replay_objects_content(all_objects, *, src, dst): + for (object_type, objects) in all_objects.items(): + if object_type != 'content': + logger.info('Received a series of %s, this should not happen', + object_type) + continue + for obj in objects: + if obj['status'] == 'visible': + obj_id = obj[ID_HASH_ALGO] + obj = src.get(obj_id) + dst.add(obj, obj_id=obj_id) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -16,6 +16,7 @@ from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.direct_writer import DirectKafkaWriter from swh.journal.replay import process_replay_objects +from swh.journal.replay import process_replay_objects_content from swh.journal.serializers import ( key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) @@ -26,38 +27,41 @@ class MockedDirectKafkaWriter(DirectKafkaWriter): def __init__(self): self._prefix = 'prefix' + self.queue = [] + self.committed = False + + def send(self, topic, key, value): + key = kafka_to_key(key_to_kafka(key)) + value = kafka_to_value(value_to_kafka(value)) + partition = FakeKafkaPartition(topic) + msg = FakeKafkaMessage(key=key, value=value) + if self.queue and {partition} == set(self.queue[-1]): + # The last message is of the same object type, groupping them + self.queue[-1][partition].append(msg) + else: + self.queue.append({partition: [msg]}) + + def poll(self): + return self.queue.pop(0) + + def commit(self): + if self.queue == []: + self.committed = True class MockedJournalClient(JournalClient): def __init__(self, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types + self.consumer = MockedDirectKafkaWriter() @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) -def test_write_replay_same_order(objects): - committed = False - queue = [] - - def send(topic, key, value): - key = kafka_to_key(key_to_kafka(key)) - value = kafka_to_value(value_to_kafka(value)) - queue.append({ - FakeKafkaPartition(topic): - [FakeKafkaMessage(key=key, value=value)] - }) - - def poll(): - return queue.pop(0) - - def commit(): - nonlocal committed - if queue == []: - committed = True +def test_write_replay_same_order_batches(objects): + replayer = MockedJournalClient() storage1 = Storage() - storage1.journal_writer = MockedDirectKafkaWriter() - storage1.journal_writer.send = send + storage1.journal_writer = replayer.consumer for (obj_type, obj) in objects: obj = obj.copy() @@ -73,18 +77,17 @@ except HashCollision: pass + queue_size = sum(len(partition) + for batch in replayer.consumer.queue + for partition in batch.values()) + storage2 = Storage() worker_fn = functools.partial(process_replay_objects, storage=storage2) - replayer = MockedJournalClient() - replayer.poll = poll - replayer.commit = commit - queue_size = len(queue) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) - assert nb_messages == queue_size - assert committed + assert replayer.consumer.committed for attr_name in ('_contents', '_directories', '_revisions', '_releases', '_snapshots', '_origin_visits', '_origins'): @@ -92,70 +95,32 @@ attr_name -@given(lists(object_dicts(), min_size=1)) -@settings(suppress_health_check=[HealthCheck.too_slow]) -def test_write_replay_same_order_batches(objects): - committed = False - queue = [] - - def send(topic, key, value): - key = kafka_to_key(key_to_kafka(key)) - value = kafka_to_value(value_to_kafka(value)) - partition = FakeKafkaPartition(topic) - msg = FakeKafkaMessage(key=key, value=value) - if queue and {partition} == set(queue[-1]): - # The last message is of the same object type, groupping them - queue[-1][partition].append(msg) - else: - queue.append({ - FakeKafkaPartition(topic): [msg] - }) +# TODO: add test for hash collision - def poll(): - return queue.pop(0) - def commit(): - nonlocal committed - if queue == []: - committed = True +@given(lists(object_dicts(), min_size=1)) +@settings(suppress_health_check=[HealthCheck.too_slow]) +def test_write_replay_content(objects): + replayer = MockedJournalClient() storage1 = Storage() - storage1.journal_writer = MockedDirectKafkaWriter() - storage1.journal_writer.send = send + storage1.journal_writer = replayer.consumer for (obj_type, obj) in objects: obj = obj.copy() - if obj_type == 'origin_visit': - origin_id = storage1.origin_add_one(obj.pop('origin')) - if 'visit' in obj: - del obj['visit'] - storage1.origin_visit_add(origin_id, **obj) - else: - method = getattr(storage1, obj_type + '_add') - try: - method([obj]) - except HashCollision: - pass + if obj_type == 'content': + storage1.content_add([obj]) queue_size = sum(len(partition) - for batch in queue + for batch in replayer.consumer.queue for partition in batch.values()) storage2 = Storage() - worker_fn = functools.partial(process_replay_objects, storage=storage2) - replayer = MockedJournalClient() - replayer.poll = poll - replayer.commit = commit + worker_fn = functools.partial(process_replay_objects_content, + src=storage1.objstorage, + dst=storage2.objstorage) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) - assert committed - - for attr_name in ('_contents', '_directories', '_revisions', '_releases', - '_snapshots', '_origin_visits', '_origins'): - assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ - attr_name - - -# TODO: add test for hash collision + assert storage1.objstorage.state == storage2.objstorage.state