diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -45,6 +45,10 @@ url='https://forge.softwareheritage.org/diffusion/DJNL/', packages=find_packages(), scripts=[], + entry_points=''' + [console_scripts] + swh-journal=swh.journal.cli:main + ''', install_requires=parse_requirements() + parse_requirements('swh'), setup_requires=['vcversioner'], extras_require={'testing': parse_requirements('test')}, diff --git a/swh/journal/cli.py b/swh/journal/cli.py new file mode 100644 --- /dev/null +++ b/swh/journal/cli.py @@ -0,0 +1,89 @@ +# Copyright (C) 2016-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +import logging +import os + +from swh.core import config +from swh.journal.publisher import JournalPublisher + +CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) + + +@click.group(context_settings=CONTEXT_SETTINGS) +@click.option('--config-file', '-C', default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.") +@click.option('--log-level', '-l', default='INFO', + type=click.Choice(logging._nameToLevel.keys()), + help="Log level (default to INFO)") +@click.pass_context +def cli(ctx, config_file, log_level): + """Software Heritage Scheduler CLI interface + + Default to use the the local scheduler instance (plugged to the + main scheduler db). + + """ + if not config_file: + config_file = os.environ.get('SWH_CONFIG_FILENAME') + if not config_file: + raise ValueError('You must either pass a config-file parameter ' + 'or set SWH_CONFIG_FILENAME to target ' + 'the config-file') + + if not os.path.exists(config_file): + raise ValueError('%s does not exist' % config_file) + + conf = config.read(config_file) + ctx.ensure_object(dict) + + logger = logging.getLogger(__name__) + logger.setLevel(log_level) + + _log = logging.getLogger('kafka') + _log.setLevel(logging.INFO) + + ctx.obj['config'] = conf + ctx.obj['loglevel'] = log_level + + +@cli.command() +@click.pass_context +def publisher(ctx): + """Manipulate publisher + + """ + mandatory_keys = [ + 'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id', + 'publisher_id', 'object_types', 'storage' + ] + + conf = ctx.obj['config'] + missing_keys = [] + for key in mandatory_keys: + if not conf.get(key): + missing_keys.append(key) + + if missing_keys: + raise click.ClickException( + 'Configuration error: The following keys must be' + ' provided: %s' % (','.join(missing_keys), )) + + publisher = JournalPublisher(conf) + try: + while True: + publisher.poll() + except KeyboardInterrupt: + ctx.exit(0) + + +def main(): + return cli(auto_envvar_prefix='SWH_JOURNAL') + + +if __name__ == '__main__': + main() diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2018 The Software Heritage developers +# Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -8,14 +8,13 @@ from kafka import KafkaProducer, KafkaConsumer -from swh.core.config import SWHConfig from swh.storage import get_storage from swh.storage.algos import snapshot from .serializers import kafka_to_key, key_to_kafka -class JournalPublisher(SWHConfig): +class JournalPublisher: """The journal publisher is a layer in charge of: - consuming messages from topics (1 topic per object_type) @@ -26,37 +25,10 @@ The main entry point for this class is the 'poll' method. """ - DEFAULT_CONFIG = { - 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), - - 'temporary_prefix': ('str', 'swh.tmp_journal.new'), - 'final_prefix': ('str', 'swh.journal.objects'), - - 'consumer_id': ('str', 'swh.journal.publisher'), - 'publisher_id': ('str', 'swh.journal.publisher'), - - 'object_types': ('list[str]', ['content', 'revision', 'release']), - - 'storage': ('dict', { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5002/', - } - }), - - 'max_messages': ('int', 10000), - } - - CONFIG_BASE_FILENAME = 'journal/publisher' - - def __init__(self, extra_configuration=None): - self.config = config = self.parse_config_file() - if extra_configuration: - config.update(extra_configuration) - + def __init__(self, config): + self.config = config self._prepare_storage(config) self._prepare_journal(config) - self.max_messages = self.config['max_messages'] def _prepare_journal(self, config): @@ -219,21 +191,4 @@ if __name__ == '__main__': - import click - - @click.command() - @click.option('--verbose', is_flag=True, default=False, - help='Be verbose if asked.') - def main(verbose): - logging.basicConfig( - level=logging.DEBUG if verbose else logging.INFO, - format='%(asctime)s %(process)d %(levelname)s %(message)s' - ) - _log = logging.getLogger('kafka') - _log.setLevel(logging.INFO) - - publisher = JournalPublisher() - while True: - publisher.poll() - - main() + print('Please use the "swh-journal publisher run" command') diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher.py --- a/swh/journal/tests/test_publisher.py +++ b/swh/journal/tests/test_publisher.py @@ -101,19 +101,18 @@ } ] +TEST_CONFIG = { + 'brokers': ['localhost'], + 'temporary_prefix': 'swh.tmp_journal.new', + 'final_prefix': 'swh.journal.objects', + 'consumer_id': 'swh.journal.test.publisher', + 'publisher_id': 'swh.journal.test.publisher', + 'object_types': ['content'], + 'max_messages': 3, +} -class JournalPublisherTest(JournalPublisher): - def parse_config_file(self): - return { - 'brokers': ['localhost'], - 'temporary_prefix': 'swh.tmp_journal.new', - 'final_prefix': 'swh.journal.objects', - 'consumer_id': 'swh.journal.test.publisher', - 'publisher_id': 'swh.journal.test.publisher', - 'object_types': ['content'], - 'max_messages': 3, - } +class JournalPublisherTest(JournalPublisher): def _prepare_storage(self, config): self.storage = Storage() self.storage.content_add({'data': b'42', **c} for c in CONTENTS) @@ -130,6 +129,13 @@ print("publisher.origin-visits", self.origin_visits) + +class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): + """A journal publisher with: + - no kafka dependency + - in-memory storage + """ + def _prepare_journal(self, config): """No journal for now @@ -139,7 +145,7 @@ class TestPublisher(unittest.TestCase): def setUp(self): - self.publisher = JournalPublisherTest() + self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] self.revisions = [{b'id': c['id']} for c in REVISIONS] self.releases = [{b'id': c['id']} for c in RELEASES]