diff --git a/requirements.txt b/requirements.txt index b814c8e..1e62953 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html vcversioner retrying +click diff --git a/resources/producer/tar-gnu.ini b/resources/producer/tar-gnu.ini index 98e01c6..a1660bc 100644 --- a/resources/producer/tar-gnu.ini +++ b/resources/producer/tar-gnu.ini @@ -1,30 +1,24 @@ [main] # Mirror's root directory holding tarballs to load into swh mirror_root_directory = /home/storage/space/mirrors/gnu.org/gnu/ # Origin setup's possible scheme url url_scheme = rsync://ftp.gnu.org/gnu/ # Origin type used for tarballs type = ftp # File containing a subset list tarballs from mirror_root_directory to load. # The file's format is one absolute path name to a tarball per line. # NOTE: # - This file must contain data consistent with the mirror_root_directory # - if this option is not provided, the mirror_root_directory is scanned # completely as usual # mirror_subset_archives = /home/storage/missing-archives # Retrieval date information (rsync, etc...) date = Fri, 28 Aug 2015 13:13:26 +0200 # Randomize blocks of messages and send for consumption block_messages = 250 - -# DEV options - -# Tryouts purposes (no limit if not specified) -# limit = 10 - diff --git a/resources/producer/tar-old-gnu.ini b/resources/producer/tar-old-gnu.ini index db5cec1..54b78ff 100644 --- a/resources/producer/tar-old-gnu.ini +++ b/resources/producer/tar-old-gnu.ini @@ -1,30 +1,24 @@ [main] # Mirror's root directory holding tarballs to load into swh mirror_root_directory = /home/storage/space/mirrors/gnu.org/old-gnu/ # Origin setup's possible scheme url url_scheme = rsync://ftp.gnu.org/old-gnu/ # Origin type used for tarballs type = ftp # File containing a subset list tarballs from mirror_root_directory to load. # The file's format is one absolute path name to a tarball per line. # NOTE: # - This file must contain data consistent with the mirror_root_directory # - if this option is not provided, the mirror_root_directory is scanned # completely as usual # mirror_subset_archives = /home/tony/work/inria/repo/swh-environment/swh-loader-tar/old-gnu-missing # Retrieval date information (rsync, etc...) date = Fri, 28 Aug 2015 13:13:26 +0200 # Randomize blocks of messages and send for consumption block_messages = 100 - -# DEV options - -# Tryouts purposes (no limit if not specified) -#limit = 10 - diff --git a/swh/loader/tar/producer.py b/swh/loader/tar/producer.py index 7dc0835..67d9610 100755 --- a/swh/loader/tar/producer.py +++ b/swh/loader/tar/producer.py @@ -1,174 +1,115 @@ -#!/usr/bin/env python3 - # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import argparse -import sys +import click + +from swh.scheduler.utils import get_task from swh.core import config from swh.loader.tar import build, file -task_queue = 'swh.loader.tar.tasks.LoadTarRepository' +TASK_QUEUE = 'swh.loader.tar.tasks.LoadTarRepository' -def compute_message_from(app, conf, root_dir, tarpath, filename, - retrieval_date, dry_run=False): +def compute_message_from( + conf, root_dir, tarpath, retrieval_date, dry_run=False): """Compute and post the message to worker for the archive tarpath. Args: - app: instance of the celery app conf: dictionary holding static metadata root_dir: root directory tarball: the archive's representation retrieval_date: retrieval date of information dry_run: will compute but not send messages Returns: None """ - origin = build.compute_origin(conf['url_scheme'], - conf['type'], - root_dir, - tarpath) + origin = build.compute_origin( + conf['url_scheme'], conf['type'], root_dir, tarpath) revision = build.compute_revision(tarpath) occurrence = build.occurrence_with_date(retrieval_date, tarpath) + task = get_task(TASK_QUEUE) if not dry_run: - app.tasks[task_queue].delay(tarpath, - origin, - revision, - [occurrence]) + task.delay(tarpath, origin, revision, [occurrence]) -def produce_archive_messages_from(app, conf, path, - retrieval_date, - mirror_file=None, - dry_run=False): +def produce_archive_messages_from( + conf, path, retrieval_date, mirror_file=None, dry_run=False): """From path, produce archive tarball messages to celery. Will print error message when some computation arise on archive and continue. Args: - app: instance of the celery app conf: dictionary holding static metadata path: top directory to list archives from. retrieval_date: retrieval date of information mirror_file: a filtering file of tarballs to load dry_run: will compute but not send messages Returns: - None - - Raises: - None + Number of messages generated """ limit = conf['limit'] block = int(conf['block_messages']) count = 0 path_source_tarballs = mirror_file if mirror_file else path - for tarpath, fname in file.random_archives_from(path_source_tarballs, - block, - limit): + for tarpath, _ in file.random_archives_from( + path_source_tarballs, block, limit): try: - compute_message_from(app, conf, path, tarpath, fname, - retrieval_date, dry_run) + compute_message_from( + conf, path, tarpath, retrieval_date, dry_run) count += 1 except ValueError: print('Problem with the following archive: %s' % tarpath) return count -def load_config(conf_file): - """Load the configuration from file. - - Args: - conf_file: path to a configuration file with the following content: - [main] - - # mirror's root directory holding tarballs to load into swh - mirror_root_directory = /home/storage/space/mirrors/gnu.org/gnu/ - - # origin setup's possible scheme url - url_scheme = rsync://ftp.gnu.org/gnu/ - - # origin type used for those tarballs - type = ftp - - # For tryouts purposes (no limit if not specified) - limit = 1 - - Returns: - dictionary of data present in the configuration file. - +@click.command() +@click.option('--config-file', required=1, + help='Configuration file path') +@click.option('--dry-run/--no-dry-run', default=False, + help='Dry run (print repo only)') +@click.option('--limit', default=None, + help='Number of origins limit to send') +def main(config_file, dry_run, limit): + """Tarball producer of local fs tarballs. """ - conf = config.read(conf_file, - default_conf={'limit': ('int', None)}) + conf = config.read(config_file) url_scheme = conf['url_scheme'] mirror_dir = conf['mirror_root_directory'] # remove trailing / in configuration (to ease ulterior computation) if url_scheme[-1] == '/': - conf.update({ - 'url_scheme': url_scheme[0:-1] - }) + conf['url_scheme'] = url_scheme[0:-1] if mirror_dir[-1] == '/': - conf.update({ - 'mirror_root_directory': mirror_dir[0:-1] - }) + conf['mirror_root_directory'] = mirror_dir[0:-1] - return conf + if limit: + conf['limit'] = int(limit) + nb_tarballs = produce_archive_messages_from( + conf=conf, + path=conf['mirror_root_directory'], + retrieval_date=conf['date'], + mirror_file=conf.get('mirror_subset_archives'), + dry_run=dry_run) -def parse_args(): - """Parse the configuration from the cli. - - """ - cli = argparse.ArgumentParser( - description='Tarball producer of local fs tarballs.') - cli.add_argument('--dry-run', '-n', - action='store_true', - help='Dry run (print repo only)') - cli.add_argument('--config', '-c', help='configuration file path') - - args = cli.parse_args() - - return args + print('%s tarball(s) sent to worker.' % nb_tarballs) if __name__ == '__main__': - args = parse_args() - config_file = args.config - if not config_file: - print('Missing configuration file option.') - sys.exit(1) - - # instantiate celery app with its configuration - from swh.scheduler.celery_backend.config import app - from swh.loader.tar import tasks # noqa - - conf = load_config(config_file) - - retrieval_date = conf['date'] - - nb_tarballs = produce_archive_messages_from( - app, - conf, - conf['mirror_root_directory'], - retrieval_date, - conf.get('mirror_subset_archives'), - args.dry_run) - - print('%s tarball(s) sent to worker.' % nb_tarballs) + main()