diff --git a/bin/swh-loader-tar-producer b/bin/swh-loader-tar-producer index c309a7f..7a27839 100755 --- a/bin/swh-loader-tar-producer +++ b/bin/swh-loader-tar-producer @@ -1,221 +1,174 @@ #!/usr/bin/env python3 # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import argparse -import itertools -import random import sys from swh.core import config from swh.loader.tar import build, file def compute_message_from(app, conf, root_dir, tarpath, filename, dry_run=False): """Compute and post the message to worker for the archive tarpath. Args: app: instance of the celery app conf: dictionary holding static metadata root_dir: root directory tarball: the archive's representation dry_run: will compute but not send messages Returns: None Raises: ValueError when release number computation error arise. """ origin = build.compute_origin(conf['url_scheme'], conf['type'], root_dir, tarpath) revision = build.compute_revision() occurrences = [build.occurrence_with_mtime(GNU_AUTHORITY, tarpath), build.occurrence_with_ctime(SWH_AUTHORITY, tarpath)] release = build.compute_release(filename, tarpath) if not dry_run: app.tasks['swh.loader.tar.tasks.LoadTarRepository'].delay(tarpath, origin, revision, release, occurrences) -def grouper(iterable, n, fillvalue=None): - """Collect data into fixed-length chunks or blocks. - - Args: - iterable: an iterable - n: size of block - fillvalue: value to use for the last block - - Returns: - fixed-length chunks of blocks as iterables - - """ - args = [iter(iterable)] * n - return itertools.zip_longest(*args, fillvalue=fillvalue) - - -def random_blocks(iterable, block=100, fillvalue=None): - """Given an iterable: - - slice the iterable in data set of block-sized elements - - randomized the data set - - yield each element - - Args: - iterable: iterable of data - block: number of elements per block - fillvalue: a fillvalue for the last block if not enough values in - last block - - Returns: - An iterable of element. - - """ - for iterable in grouper(iterable, block, fillvalue=fillvalue): - l = list(iterable) - random.shuffle(l) - for e in l: - yield e - - def produce_archive_messages_from(app, conf, path, mirror_file=None, dry_run=False): """From path, produce archive tarball messages to celery. Will print error message when some computation arise on archive and continue. Args: app: instance of the celery app conf: dictionary holding static metadata path: top directory to list archives from. mirror_file: a filtering file of tarballs to load dry_run: will compute but not send messages Returns: None Raises: None """ LIMIT = conf['limit'] BLOCK = int(conf['block_messages']) count = 0 path_source_tarballs = mirror_file if mirror_file else path - archives = file.archives_from(path_source_tarballs) - random_archives = random_blocks(archives, - BLOCK, - fillvalue=(None, None)) - - if LIMIT: - random_archives = itertools.islice(random_archives, LIMIT) - - for tarpath, fname in [(t, f) for t, f in random_archives if t and f]: + for tarpath, fname in file.random_archives_from(path_source_tarballs, + BLOCK, + LIMIT): count += 1 try: compute_message_from(app, conf, path, tarpath, fname, dry_run) except ValueError: print('Problem with the following archive: %s' % tarpath) return count def load_config(conf_file): """Load the configuration from file. Args: conf_file: path to a configuration file with the following content: [main] # mirror's root directory holding tarballs to load into swh mirror_root_directory = /home/storage/space/mirrors/gnu.org/gnu/ # origin setup's possible scheme url url_scheme = rsync://ftp.gnu.org/gnu/ # origin type used for those tarballs type = ftp # For tryouts purposes (no limit if not specified) limit = 1 Returns: dictionary of data present in the configuration file. """ conf = config.read(conf_file, default_conf={'limit': ('int', None)}) url_scheme = conf['url_scheme'] mirror_dir = conf['mirror_root_directory'] # remove trailing / in configuration (to ease ulterior computation) if url_scheme[-1] == '/': conf.update({ 'url_scheme': url_scheme[0:-1] }) if mirror_dir[-1] == '/': conf.update({ 'mirror_root_directory': mirror_dir[0:-1] }) return conf def parse_args(): """Parse the configuration from the cli. """ cli = argparse.ArgumentParser( description='Tarball producer of local fs tarballs.') cli.add_argument('--dry-run', '-n', action='store_true', help='Dry run (print repo only)') cli.add_argument('--config', '-c', help='configuration file path') args = cli.parse_args() return args if __name__ == '__main__': args = parse_args() config_file = args.config if not config_file: print('Missing configuration file option.') sys.exit(1) # instantiate celery app with its configuration from swh.core.worker import app from swh.loader.tar import tasks # noqa conf = load_config(config_file) # state... SWH_AUTHORITY = conf['swh_authority'] GNU_AUTHORITY = conf['gnu_authority'] nb_tarballs = produce_archive_messages_from( app, conf, conf['mirror_root_directory'], conf.get('mirror_subset_archives'), args.dry_run) print('%s tarball(s) sent to worker.' % nb_tarballs) diff --git a/swh/loader/tar/file.py b/swh/loader/tar/file.py index 332a2b2..af7fa91 100644 --- a/swh/loader/tar/file.py +++ b/swh/loader/tar/file.py @@ -1,67 +1,89 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import itertools import os -from swh.loader.tar import tarball +from swh.loader.tar import tarball, utils def archives_from_dir(path): """Given a path to a directory, walk such directory and yield tuple of tarpath, fname. Args: path: top level directory Returns: Generator of tuple tarpath, filename with tarpath a tarball. """ for dirpath, dirnames, filenames in os.walk(path): for fname in filenames: tarpath = os.path.join(dirpath, fname) if not os.path.exists(tarpath): continue if tarball.is_tarball(tarpath): yield tarpath, fname def archives_from_file(mirror_file): """Given a path to a file containing one tarball per line, yield a tuple of tarpath, fname. Args: mirror_file: path to the file containing list of tarpath. Returns: Generator of tuple tarpath, filename with tarpath a tarball. """ with open(mirror_file, 'r') as f: for tarpath in f.readlines(): tarpath = tarpath.strip() if not os.path.exists(tarpath): print('WARN: %s does not exist. Skipped.' % tarpath) continue if tarball.is_tarball(tarpath): yield tarpath, os.path.basename(tarpath) def archives_from(path): """From path, list tuple of tarpath, fname. Args: path: top directory to list archives from or custom file format. + Returns: + Generator of tuple tarpath, filename with tarpath a tarball. + """ if os.path.isfile(path): yield from archives_from_file(path) elif os.path.isdir(path): yield from archives_from_dir(path) else: raise ValueError( 'Input incorrect, %s must be a file or a directory.' % path) + + +def random_archives_from(path, block, limit=None): + """Randomize by size block the archives. + + Returns: + Generator of randomized tuple tarpath, filename with tarpath a tarball. + + """ + random_archives = utils.random_blocks(archives_from(path), + block, + fillvalue=(None, None)) + + if limit: + random_archives = itertools.islice(random_archives, limit) + + for tarpath, fname in ((t, f) for t, f in random_archives if t and f): + yield tarpath, fname diff --git a/swh/loader/tar/utils.py b/swh/loader/tar/utils.py index b4ea451..7e90536 100644 --- a/swh/loader/tar/utils.py +++ b/swh/loader/tar/utils.py @@ -1,127 +1,170 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import itertools +import random import re from swh.core import hashutil # FIXME; extract this in property # to recognize existing naming pattern extensions = [ 'ps', 'zip', 'tar', 'gz', 'tgz', 'bz2', 'bzip2', 'lzma', 'lz', 'xz', 'Z', 'diff', 'iso', 'exe', 'jar', 'egg', 'gem', 'xpi', 'apk', 'dmg', 'DevPak', ] pattern = re.compile(r''' ^ (?: # We have a software name and a release number, separated with a # -, _ or dot. (?P.+?[-_.]) (?P[0-9][0-9a-zA-Z_.+:~-]*?) | # We couldn't match a release number, put everything in the # software name. (?P.+?) ) (?P(?:\.(?:%s))+) $ ''' % '|'.join(extensions), flags=re.VERBOSE) def parse_filename(filename): """Parse a filename into its components. Parsing policy: We use Debian's release number heuristic: A release number starts with a digit, and is followed by alphanumeric characters or any of ., +, :, ~ and - We hardcode a list of possible extensions, as this release number scheme would match them too... We match on any combination of those. Greedy matching is done right to left (we only match the extension greedily with +, software_name and release_number are matched lazily with +? and *?). Args: filename: filename without path. Returns: Dictionary with the following keys: - software_name - release_number: can be None if it could not be found. - extension Raises: ValueError if the filename could not be parsed. """ m = pattern.match(filename) if not m: raise ValueError('Filename %s could not be parsed.' % filename) d = m.groupdict() return { 'software_name': d['software_name1'] or d['software_name2'], 'release_number': d['release_number'], 'extension': d['extension'], } def release_number(filename): """Compute the release number from the filename. cf. parse_filename's docstring """ return parse_filename(filename)['release_number'] def commonname(path0, path1, as_str=False): """Compute the commonname between the path0 and path1. """ return path1.split(path0)[1] def convert_to_hex(d): """Convert a flat dictionary with bytes in values to the same dictionary with hex as values. Args: dict: flat dictionary with sha bytes in their values. Returns: Mirror dictionary with values as string hex. """ if not d: return d checksums = {} for key, h in d.items(): checksums[key] = hashutil.hash_to_hex(h) return checksums + + +def grouper(iterable, n, fillvalue=None): + """Collect data into fixed-length chunks or blocks. + + Args: + iterable: an iterable + n: size of block + fillvalue: value to use for the last block + + Returns: + fixed-length chunks of blocks as iterables + + """ + args = [iter(iterable)] * n + return itertools.zip_longest(*args, fillvalue=fillvalue) + + +def random_blocks(iterable, block=100, fillvalue=None): + """Given an iterable: + - slice the iterable in data set of block-sized elements + - randomized the data set + - yield each element + + Args: + iterable: iterable of data + block: number of elements per block + fillvalue: a fillvalue for the last block if not enough values in + last block + + Returns: + An iterable of randomized per block-size elements. + + """ + count = 0 + for iterable in grouper(iterable, block, fillvalue=fillvalue): + count += 1 + l = list(iterable) + random.shuffle(l) + for e in l: + yield e