diff --git a/bin/swh-loader-tar-producer b/bin/swh-loader-tar-producer index ee04611..2ca6d82 100755 --- a/bin/swh-loader-tar-producer +++ b/bin/swh-loader-tar-producer @@ -1,174 +1,221 @@ #!/usr/bin/env python3 # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import argparse +import itertools +import random import sys from swh.core import config from swh.loader.tar import build, file def compute_message_from(app, conf, root_dir, tarpath, filename, dry_run=False): """Compute and post the message to worker for the archive tarpath. Args: app: instance of the celery app conf: dictionary holding static metadata root_dir: root directory tarball: the archive's representation dry_run: will compute but not send messages Returns: None Raises: ValueError when release number computation error arise. """ origin = build.compute_origin(conf['url_scheme'], conf['type'], root_dir, tarpath) revision = build.compute_revision() occurrences = [build.occurrence_with_mtime(GNU_AUTHORITY, tarpath), build.occurrence_with_ctime(SWH_AUTHORITY, tarpath)] release = build.compute_release(filename, tarpath) if not dry_run: app.tasks['swh.loader.tar.tasks.LoadTarRepository'].delay(tarpath, origin, revision, release, occurrences) -def produce_archive_messages_from(app, conf, path, mirror_file=None, +def grouper(iterable, n, fillvalue=None): + """Collect data into fixed-length chunks or blocks. + + Args: + iterable: an iterable + n: size of block + fillvalue: value to use for the last block + + Returns: + fixed-length chunks of blocks as iterables + + """ + args = [iter(iterable)] * n + return itertools.zip_longest(*args, fillvalue=fillvalue) + + +def random_blocks(iterable, block=100, fillvalue=None): + """Given an iterable: + - slice the iterable in data set of block-sized elements + - randomized the data set + - yield each element + + Args: + iterable: iterable of data + block: number of elements per block + fillvalue: a fillvalue for the last block if not enough values in + last block + + Returns: + An iterable of element. + + """ + for iterable in grouper(iterable, block, fillvalue=fillvalue): + l = list(iterable) + random.shuffle(l) + for e in l: + yield e + + +def produce_archive_messages_from(app, conf, path, + mirror_file=None, dry_run=False): """From path, produce archive tarball messages to celery. Will print error message when some computation arise on archive and continue. Args: app: instance of the celery app conf: dictionary holding static metadata path: top directory to list archives from. mirror_file: a filtering file of tarballs to load dry_run: will compute but not send messages Returns: None Raises: None """ + LIMIT = conf['limit'] + BLOCK = int(conf['block_messages']) count = 0 path_source_tarballs = mirror_file if mirror_file else path - archives = list(file.list_archives_from(path_source_tarballs)) - archives.shuffle() - for tarpath, fname in archives: + archives = file.list_archives_from(path_source_tarballs) + random_archives = random_blocks(archives, + BLOCK, + fillvalue=(None, None)) + + if LIMIT: + random_archives = itertools.islice(random_archives, LIMIT) + + for tarpath, fname in [(t, f) for t, f in random_archives if t and f]: count += 1 try: compute_message_from(app, conf, path, tarpath, fname, dry_run) except ValueError: print('Problem with the following archive: %s' % tarpath) - if LIMIT and count >= LIMIT: - return count - return count def load_config(conf_file): """Load the configuration from file. Args: conf_file: path to a configuration file with the following content: [main] # mirror's root directory holding tarballs to load into swh mirror_root_directory = /home/storage/space/mirrors/gnu.org/gnu/ # origin setup's possible scheme url url_scheme = rsync://ftp.gnu.org/gnu/ # origin type used for those tarballs type = ftp # For tryouts purposes (no limit if not specified) limit = 1 Returns: dictionary of data present in the configuration file. """ conf = config.read(conf_file, default_conf={'limit': ('int', None)}) url_scheme = conf['url_scheme'] mirror_dir = conf['mirror_root_directory'] # remove trailing / in configuration (to ease ulterior computation) if url_scheme[-1] == '/': conf.update({ 'url_scheme': url_scheme[0:-1] }) if mirror_dir[-1] == '/': conf.update({ 'mirror_root_directory': mirror_dir[0:-1] }) return conf def parse_args(): """Parse the configuration from the cli. """ cli = argparse.ArgumentParser( description='Tarball producer of local fs tarballs.') cli.add_argument('--dry-run', '-n', action='store_true', help='Dry run (print repo only)') cli.add_argument('--config', '-c', help='configuration file path') args = cli.parse_args() return args if __name__ == '__main__': args = parse_args() config_file = args.config if not config_file: print('Missing configuration file option.') sys.exit(1) # instantiate celery app with its configuration from swh.core.worker import app from swh.loader.tar import tasks # noqa conf = load_config(config_file) # state... SWH_AUTHORITY = conf['swh_authority'] GNU_AUTHORITY = conf['gnu_authority'] nb_tarballs = produce_archive_messages_from( app, conf, conf['mirror_root_directory'], conf.get('mirror_subset_archives'), args.dry_run) print('%s tarball(s) sent to worker.' % nb_tarballs) diff --git a/resources/producer/tar-gnu.ini b/resources/producer/tar-gnu.ini index 492ed03..efc7a41 100644 --- a/resources/producer/tar-gnu.ini +++ b/resources/producer/tar-gnu.ini @@ -1,29 +1,31 @@ [main] # Mirror's root directory holding tarballs to load into swh mirror_root_directory = /home/storage/space/mirrors/gnu.org/gnu/ # Origin setup's possible scheme url url_scheme = rsync://ftp.gnu.org/gnu/ # Origin type used for tarballs type = ftp # File containing a subset list tarballs from mirror_root_directory to load. # The file's format is one absolute path name to a tarball per line. # NOTE: # - This file must contain data consistent with the mirror_root_directory # - if this option is not provided, the mirror_root_directory is scanned # completely as usual # mirror_subset_archives = /home/storage/missing-archives # Authorities gnu_authority = 4706c92a-8173-45d9-93d7-06523f249398 swh_authority = 5f4d4c51-498a-4e28-88b3-b3e4e8396cba +# Randomize blocks of messages and send for consumption +# block_messages = 100 # DEV options # Tryouts purposes (no limit if not specified) # limit = 10 diff --git a/resources/producer/tar-old-gnu.ini b/resources/producer/tar-old-gnu.ini index 83f5a0d..9b848f6 100644 --- a/resources/producer/tar-old-gnu.ini +++ b/resources/producer/tar-old-gnu.ini @@ -1,28 +1,31 @@ [main] # Mirror's root directory holding tarballs to load into swh mirror_root_directory = /home/storage/space/mirrors/gnu.org/old-gnu/ # Origin setup's possible scheme url url_scheme = rsync://ftp.gnu.org/old-gnu/ # Origin type used for tarballs type = ftp # File containing a subset list tarballs from mirror_root_directory to load. # The file's format is one absolute path name to a tarball per line. # NOTE: # - This file must contain data consistent with the mirror_root_directory # - if this option is not provided, the mirror_root_directory is scanned # completely as usual # mirror_subset_archives = /home/storage/missing-archives # Authorities gnu_authority = 4706c92a-8173-45d9-93d7-06523f249398 swh_authority = 5f4d4c51-498a-4e28-88b3-b3e4e8396cba +# Randomize blocks of messages and send for consumption +block_messages = 100 + # DEV options # Tryouts purposes (no limit if not specified) # limit = 10