diff --git a/swh/journal/cli.py b/swh/journal/cli.py index 80ea7d0..98291c2 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,145 +1,209 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import functools import logging import os from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.storage import get_storage +from swh.objstorage import get_objstorage from swh.journal.client import JournalClient from swh.journal.replay import process_replay_objects +from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if config_file: if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) log_level = ctx.obj.get('log_level', logging.INFO) logging.root.setLevel(log_level) logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, hidden=True, # prefer config file help='Kafka broker to connect to.') @click.option('--prefix', type=str, default=None, hidden=True, # prefer config file help='Prefix of Kafka topic names to read from.') @click.option('--group-id', '--consumer-id', type=str, hidden=True, # prefer config file help='Name of the consumer/group id for reading from Kafka.') @click.pass_context def replay(ctx, brokers, prefix, group_id, max_messages): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: storage = get_storage(**conf.pop('storage')) except KeyError: ctx.fail('You must have a storage configured in your config file.') if brokers is None: brokers = conf.get('journal', {}).get('brokers') if not brokers: ctx.fail('You must specify at least one kafka broker.') if not isinstance(brokers, (list, tuple)): brokers = [brokers] if prefix is None: prefix = conf.get('journal', {}).get('prefix') if group_id is None: group_id = conf.get('journal', {}).get('group_id') client = JournalClient(brokers=brokers, group_id=group_id, prefix=prefix) worker_fn = functools.partial(process_replay_objects, storage=storage) try: nb_messages = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') @cli.command() @click.argument('object_type') @click.option('--start-object', default=None) @click.option('--end-object', default=None) @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: ctx.exit(0) +@cli.command() +@click.option('--broker', 'brokers', type=str, multiple=True, + hidden=True, # prefer config file + help='Kafka broker to connect to.') +@click.option('--prefix', type=str, default=None, + hidden=True, # prefer config file + help='Prefix of Kafka topic names to read from.') +@click.option('--group-id', '--consumer-id', type=str, + hidden=True, # prefer config file + help='Name of the consumer/group id for reading from Kafka.') +@click.pass_context +def content_replay(ctx, brokers, prefix, group_id): + """Fill a destination Object Storage (typically a mirror) by reading a Journal + and retrieving objects from an existing source ObjStorage. + + There can be several 'replayers' filling a given ObjStorage as long as they + use the same `group-id`. + + This service retrieves object ids to copy from the 'content' topic. It will + only copy object's content if the object's description in the kafka + nmessage has the status:visible set. + """ + logger = logging.getLogger(__name__) + conf = ctx.obj['config'] + try: + objstorage_src = get_objstorage(**conf.pop('objstorage_src')) + except KeyError: + ctx.fail('You must have a source objstorage configured in ' + 'your config file.') + try: + objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) + except KeyError: + ctx.fail('You must have a destination objstorage configured ' + 'in your config file.') + + if brokers is None: + brokers = conf.get('journal', {}).get('brokers') + if not brokers: + ctx.fail('You must specify at least one kafka broker.') + + if prefix is None: + prefix = conf.get('journal', {}).get('prefix') + + if group_id is None: + group_id = conf.get('journal', {}).get('group_id') + + client = JournalClient(brokers=brokers, group_id=group_id, prefix=prefix) + worker_fn = functools.partial(process_replay_objects_content, + src=objstorage_src, + dst=objstorage_dst) + + try: + nb_messages = 0 + while True: + nb_messages += client.process(worker_fn) + logger.info('Processed %d messages.' % nb_messages) + except KeyboardInterrupt: + ctx.exit(0) + else: + print('Done.') + + def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/replay.py b/swh/journal/replay.py index 7d8c5a2..3321237 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,41 +1,55 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.storage import HashCollision +from swh.objstorage.objstorage import ID_HASH_ALGO logger = logging.getLogger(__name__) def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): _insert_objects(object_type, objects, storage) def _insert_objects(object_type, objects, storage): if object_type == 'content': # TODO: insert 'content' in batches for object_ in objects: try: storage.content_add_metadata([object_]) except HashCollision as e: logger.error('Hash collision: %s', e.args) elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): # TODO: split batches that are too large for the storage # to handle? method = getattr(storage, object_type + '_add') method(objects) elif object_type == 'origin_visit': storage.origin_visit_upsert([ { **obj, 'origin': storage.origin_add_one(obj['origin']) } for obj in objects]) else: assert False + + +def process_replay_objects_content(all_objects, *, src, dst): + for (object_type, objects) in all_objects.items(): + if object_type != 'content': + logger.warning('Received a series of %s, this should not happen', + object_type) + continue + for obj in objects: + if obj['status'] == 'visible': + obj_id = obj[ID_HASH_ALGO] + obj = src.get(obj_id) + dst.add(obj, obj_id=obj_id) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index 6530571..966a57f 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,87 +1,90 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import tempfile from subprocess import Popen from typing import Tuple from unittest.mock import patch from click.testing import CliRunner from kafka import KafkaProducer import pytest from swh.storage.in_memory import Storage from swh.journal.cli import cli from swh.journal.serializers import key_to_kafka, value_to_kafka CLI_CONFIG = ''' storage: cls: memory args: {} ''' @pytest.fixture def storage(): """An instance of swh.storage.in_memory.Storage that gets injected into the CLI functions.""" storage = Storage() with patch('swh.journal.cli.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage def invoke(catch_exceptions, args): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) args = ['-C' + config_fd.name] + args result = runner.invoke(cli, args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_replay( storage: Storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test-producer', ) snapshot = {'id': b'foo', 'branches': { b'HEAD': { 'target_type': 'revision', 'target': b'bar', } }} producer.send( topic=kafka_prefix+'.snapshot', key=snapshot['id'], value=snapshot) result = invoke(False, [ 'replay', '--broker', 'localhost:%d' % port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '1', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert storage.snapshot_get(snapshot['id']) == { **snapshot, 'next_branch': None} + + +# TODO: write a test for the content-replay command diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index 74de5ad..86e5e49 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,103 +1,133 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import namedtuple import functools from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import object_dicts from swh.storage.in_memory import Storage from swh.storage import HashCollision from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.direct_writer import DirectKafkaWriter from swh.journal.replay import process_replay_objects +from swh.journal.replay import process_replay_objects_content from swh.journal.serializers import ( key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') class MockedKafkaWriter(DirectKafkaWriter): def __init__(self, queue): self._prefix = 'prefix' self.queue = queue def send(self, topic, key, value): key = kafka_to_key(key_to_kafka(key)) value = kafka_to_value(value_to_kafka(value)) partition = FakeKafkaPartition(topic) msg = FakeKafkaMessage(key=key, value=value) if self.queue and {partition} == set(self.queue[-1]): # The last message is of the same object type, groupping them self.queue[-1][partition].append(msg) else: self.queue.append({partition: [msg]}) class MockedKafkaConsumer: def __init__(self, queue): self.queue = queue self.committed = False def poll(self): return self.queue.pop(0) def commit(self): if self.queue == []: self.committed = True class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): queue = [] replayer = MockedJournalClient(queue) storage1 = Storage() storage1.journal_writer = MockedKafkaWriter(queue) for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'origin_visit': origin_id = storage1.origin_add_one(obj.pop('origin')) if 'visit' in obj: del obj['visit'] storage1.origin_visit_add(origin_id, **obj) else: method = getattr(storage1, obj_type + '_add') try: method([obj]) except HashCollision: pass queue_size = sum(len(partition) for batch in queue for partition in batch.values()) storage2 = Storage() worker_fn = functools.partial(process_replay_objects, storage=storage2) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) assert replayer.consumer.committed for attr_name in ('_contents', '_directories', '_revisions', '_releases', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name # TODO: add test for hash collision + + +@given(lists(object_dicts(), min_size=1)) +@settings(suppress_health_check=[HealthCheck.too_slow]) +def test_write_replay_content(objects): + queue = [] + replayer = MockedJournalClient(queue) + + storage1 = Storage() + storage1.journal_writer = MockedKafkaWriter(queue) + + for (obj_type, obj) in objects: + obj = obj.copy() + if obj_type == 'content': + storage1.content_add([obj]) + + queue_size = sum(len(partition) + for batch in queue + for partition in batch.values()) + + storage2 = Storage() + worker_fn = functools.partial(process_replay_objects_content, + src=storage1.objstorage, + dst=storage2.objstorage) + nb_messages = 0 + while nb_messages < queue_size: + nb_messages += replayer.process(worker_fn) + + assert storage1.objstorage.state == storage2.objstorage.state