diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py index 95d949b..256401a 100644 --- a/swh/journal/backfill.py +++ b/swh/journal/backfill.py @@ -1,137 +1,151 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -"""Module defining journal checker classes. +"""Module defining journal backfiller classes. -Those checker goal is to send back all, or missing objects from the -journal queues. +Those backfiller goal is to produce back part or all of the objects +from the storage to the journal topics At the moment, a first naive implementation is the JournalBackfiller. It simply reads the objects from the storage and sends every object identifier back to the journal. """ import psycopg2 from kafka import KafkaProducer -from swh.core.config import SWHConfig from .serializers import key_to_kafka TYPE_TO_PRIMARY_KEY = { 'origin': ['id'], - 'content': ['sha1', 'sha1_git', 'sha256'], + 'content': ['sha1', 'sha1_git', 'sha256', 'blake2s256'], 'directory': ['id'], 'revision': ['id'], 'release': ['id'], 'origin_visit': ['origin', 'visit'], 'skipped_content': ['sha1', 'sha1_git', 'sha256'], } def entry_to_bytes(entry): """Convert an entry coming from the database to bytes""" if isinstance(entry, memoryview): return entry.tobytes() if isinstance(entry, tuple): return [entry_to_bytes(value) for value in entry] return entry def fetch(db_conn, obj_type): """Fetch all obj_type's identifiers from db. This opens one connection, stream objects and when done, close the connection. Raises: ValueError if obj_type is not supported Yields: Identifiers for the specific object_type """ primary_key = TYPE_TO_PRIMARY_KEY.get(obj_type) if not primary_key: raise ValueError('The object type %s is not supported. ' 'Only possible values are %s' % ( obj_type, TYPE_TO_PRIMARY_KEY.keys())) primary_key_str = ','.join(primary_key) query = 'select %s from %s order by %s' % ( primary_key_str, obj_type, primary_key_str) server_side_cursor_name = 'swh.journal.%s' % obj_type with psycopg2.connect(db_conn) as db: cursor = db.cursor(name=server_side_cursor_name) cursor.execute(query) for o in cursor: yield dict(zip(primary_key, entry_to_bytes(o))) -class JournalBackfiller(SWHConfig): +DEFAULT_CONFIG = { + 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), + 'temporary_prefix': ('str', 'swh.tmp_journal.new'), + 'publisher_id': ('str', 'swh.journal.publisher.test'), + 'object_types': ('list[str]', ['content', 'revision', 'release']), + 'storage_dbconn': ('str', 'service=swh-dev'), +} + + +MANDATORY_KEYS = [ + 'brokers', 'temporary_prefix', 'publisher_id', 'object_types', + 'storage_dbconn', +] + + +class JournalBackfiller: """Class in charge of reading the storage's objects and sends those back to the publisher queue. This is designed to be run periodically. """ - DEFAULT_CONFIG = { - 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), - 'temporary_prefix': ('str', 'swh.tmp_journal.new'), - 'publisher_id': ('str', 'swh.journal.publisher.test'), - 'object_types': ('list[str]', ['content', 'revision', 'release']), - 'storage_dbconn': ('str', 'service=swh-dev'), - } - - CONFIG_BASE_FILENAME = 'journal/checker' - - def __init__(self, extra_configuration=None): - self.config = config = self.parse_config_file() - if extra_configuration: - config.update(extra_configuration) - + def __init__(self, config=None): + self.config = config + self.check_config(config) self.object_types = self.config['object_types'] - for obj_type in self.object_types: - if obj_type not in TYPE_TO_PRIMARY_KEY: - raise ValueError('The object type %s is not supported. ' - 'Possible values are %s' % ( - obj_type, - ', '.join(TYPE_TO_PRIMARY_KEY))) - self.storage_dbconn = self.config['storage_dbconn'] self.producer = KafkaProducer( bootstrap_servers=config['brokers'], value_serializer=key_to_kafka, client_id=config['publisher_id'], ) + def check_config(self, config): + missing_keys = [] + for key in MANDATORY_KEYS: + if not config.get(key): + missing_keys.append(key) + + if missing_keys: + raise ValueError( + 'Configuration error: The following keys must be' + ' provided: %s' % (','.join(missing_keys), )) + + object_types = config['object_types'] + for obj_type in object_types: + if obj_type not in TYPE_TO_PRIMARY_KEY: + raise ValueError('The object type %s is not supported. ' + 'Possible values are %s' % ( + obj_type, + ', '.join(TYPE_TO_PRIMARY_KEY))) + def _read_storage(self): """Read storage's objects and generates tuple as object_type, dict of object. Yields: tuple of object_type, object as dict """ for obj_type in self.object_types: for obj in fetch(self.storage_dbconn, obj_type): yield obj_type, obj def run(self): """Reads storage's subscribed object types and send them to the publisher's reading queue. """ for obj_type, obj in self._read_storage(): topic = '%s.%s' % (self.config['temporary_prefix'], obj_type) self.producer.send(topic, value=obj) if __name__ == '__main__': - JournalBackfiller().run() + print('Please use the "swh-journal backfiller run" command') diff --git a/swh/journal/cli.py b/swh/journal/cli.py index 5c1a02a..c2189f9 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,88 +1,103 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import os from swh.core import config from swh.storage import get_storage from swh.journal.replay import StorageReplayer +from swh.journal.backfill import JournalBackfiller CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.pass_context def cli(ctx, config_file, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if not config_file: raise ValueError('You must either pass a config-file parameter ' 'or set SWH_CONFIG_FILENAME to target ' 'the config-file') if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) ctx.ensure_object(dict) logging.basicConfig( level=log_level, format='%(asctime)s %(levelname)s %(name)s %(message)s', ) logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.') @click.option('--prefix', type=str, default='swh.journal.objects', help='Prefix of Kafka topic names to read from.') @click.option('--consumer-id', type=str, help='Name of the consumer/group id for reading from Kafka.') @click.pass_context def replay(ctx, brokers, prefix, consumer_id, max_messages): """Fill a new storage by reading a journal. """ conf = ctx.obj['config'] storage = get_storage(**conf.pop('storage')) replayer = StorageReplayer(brokers, prefix, consumer_id) try: replayer.fill(storage, max_messages=max_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') +@cli.command() +@click.pass_context +def backfiller(ctx): + """Manipulate backfiller + + """ + conf = ctx.obj['config'] + backfiller = JournalBackfiller(conf) + try: + backfiller.run() + except KeyboardInterrupt: + ctx.exit(0) + + def main(): return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py new file mode 100644 index 0000000..1b37f7b --- /dev/null +++ b/swh/journal/tests/test_backfill.py @@ -0,0 +1,42 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.journal.backfill import JournalBackfiller, TYPE_TO_PRIMARY_KEY + + +TEST_CONFIG = { + 'brokers': ['localhost'], + 'temporary_prefix': 'swh.tmp_journal.new', + 'publisher_id': 'swh.journal.publisher.test', + 'object_types': ['content', 'revision', 'release'], + 'storage_dbconn': 'service=swh-dev', +} + + +def test_config_ko_missing_mandatory_key(): + for key in TEST_CONFIG.keys(): + config = TEST_CONFIG.copy() + config.pop(key) + + with pytest.raises(ValueError) as e: + JournalBackfiller(config) + + error = ('Configuration error: The following keys must be' + ' provided: %s' % (','.join([key]), )) + assert e.value.args[0] == error + + +def test_config_ko_unknown_object_type(): + wrong_config = TEST_CONFIG.copy() + wrong_config['object_types'] = ['something-wrong'] + with pytest.raises(ValueError) as e: + JournalBackfiller(wrong_config) + + error = ('The object type something-wrong is not supported. ' + 'Possible values are %s' % ( + ', '.join(TYPE_TO_PRIMARY_KEY))) + assert e.value.args[0] == error