diff --git a/swh/loader/antelink/s3download_producer.py b/swh/loader/antelink/s3download_producer.py index 7099043..1ff96db 100755 --- a/swh/loader/antelink/s3download_producer.py +++ b/swh/loader/antelink/s3download_producer.py @@ -1,38 +1,41 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click from swh.core.utils import grouper -from swh.loader.antelink import storage +from swh.loader.antelink import storage, utils -def s3_files_to_download(db_url, huge, final, limit): - store = storage.Storage(db_url) +def s3_files_to_download(store, huge, final, limit): files_gen = store.read_content_s3_not_in_sesi_nor_in_swh( huge=huge, final=final, limit=limit) for path, _ in files_gen: yield path @click.command() -@click.option('--db-url', default='service=swh-antelink', help='Db access.') +@click.option('--db-url', help='Db access.') @click.option('--block-size', default=1000, help='Default block size to use.') @click.option('--limit', default=None, help='Limit data to fetch.') @click.option('--final', is_flag=True, help='Add final s3 files.') @click.option('--huge', is_flag=True, help='Add huge s3 files.') def compute_s3_files(db_url, block_size, limit, final, huge): from swh.scheduler.celery_backend.config import app from swh.loader.antelink import tasks # noqa - for paths in grouper( - s3_files_to_download(db_url, huge, final, limit), - block_size): + if db_url: + store = storage.Storage(db_url) + gen_data = s3_files_to_download(store, huge, final, limit) + else: + gen_data = utils.gen_path_length_from_stdin() + + for paths in grouper(gen_data, block_size): app.tasks['swh.loader.antelink.tasks.AntelinkS3DownloaderTsk'].delay( list(paths)) if __name__ == '__main__': compute_s3_files() diff --git a/swh/loader/antelink/s3injecter_producer.py b/swh/loader/antelink/s3injecter_producer.py index 9d20e9f..8350768 100755 --- a/swh/loader/antelink/s3injecter_producer.py +++ b/swh/loader/antelink/s3injecter_producer.py @@ -1,105 +1,90 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click -import sys from swh.core import hashutil from swh.core.utils import grouper from swh.storage import get_storage from swh.loader.antelink import utils, storage task_name = 'swh.loader.antelink.tasks.AntelinkS3InjecterTsk' -def gen_path_length_from_stdin(): - """Compute the paths to retrieve from sesi and inject in swh. - - It will compute ~block_size (bytes) of files (paths) to retrieve - and send it to the queue for workers to download and inject in swh. - - """ - for line in sys.stdin: - line = line.rstrip() - data = line.split(' ') - yield data[0], int(data[1]) - - def process_paths(paths): """Compute map from sha1 to localpath. """ m = {} for localpath, size in paths: sha1 = hashutil.hex_to_hash(utils.sha1_from_path(localpath)) m[sha1] = [localpath, size] return m def retrieve_unknown_sha1s(swhstorage, gen_data): # Compute blocks of 1000 sha1s for paths in grouper(gen_data, n=1000): data = process_paths(paths) sha1s_tocheck = list(data.keys()) if len(sha1s_tocheck) > 0: # let those inexistent sha1s flow for sha1 in swhstorage.content_missing_per_sha1(sha1s_tocheck): yield data[sha1][0], data[sha1][1] @click.command() @click.option('--db-url', help='Db access.') @click.option('--block-size', default=104857600, help='Default block size in bytes (100Mib).') @click.option('--block-max-files', default=1000, help='Default max number of files (default: 1000).') @click.option('--limit', default=None, help='Limit data to fetch.') @click.option('--dry-run', is_flag=True, help='Dry run.') @click.option('--final', is_flag=True, help='To deal with remaining s3 files.') @click.option('--huge', is_flag=True, help='To deal with big files.') @click.option('--storage-class', default='remote_storage') @click.option('--storage-args', default='http://uffizi.internal.softwareheritage.org:5002/') def compute_s3_jobs(db_url, block_size, block_max_files, limit, dry_run, final, huge, storage_class, storage_args): from swh.scheduler.celery_backend.config import app from swh.loader.antelink import tasks # noqa # right inputs if isinstance(block_size, str): block_size = int(block_size) if isinstance(block_max_files, str): block_max_files = int(block_max_files) if limit and isinstance(limit, str): limit = int(limit) if dry_run: print('** DRY RUN **') swhstorage = get_storage(storage_class, storage_args.split(',')) if db_url: store = storage.Storage(db_url) gen_data = retrieve_unknown_sha1s( swhstorage, store.read_content_s3_not_in_sesi_nor_in_swh(huge, final, limit)) else: - gen_data = retrieve_unknown_sha1s( - swhstorage, - gen_path_length_from_stdin()) + gen_data = retrieve_unknown_sha1s(swhstorage, + utils.gen_path_length_from_stdin()) nb_total_blocks = 0 for paths, size in utils.split_data_per_size(gen_data, block_size, block_max_files): nb_total_blocks += 1 print('%s paths (%s bytes) sent.' % (len(paths), size)) if dry_run: continue app.tasks[task_name].delay(paths) print('Number of jobs: %s' % nb_total_blocks) if __name__ == '__main__': compute_s3_jobs() diff --git a/swh/loader/antelink/sesiproducer.py b/swh/loader/antelink/sesiproducer.py index bf63a63..fda92c3 100755 --- a/swh/loader/antelink/sesiproducer.py +++ b/swh/loader/antelink/sesiproducer.py @@ -1,110 +1,96 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click -import sys from swh.core import hashutil from swh.core.utils import grouper from swh.storage import get_storage from swh.loader.antelink import storage, utils -def gen_path_length_from_stdin(): - """Compute the paths to retrieve from sesi and inject in swh. - - It will compute ~block_size (bytes) of files (paths) to retrieve - and send it to the queue for workers to download and inject in swh. - - """ - for line in sys.stdin: - line = line.rstrip() - data = line.split(' ') - yield data[0], int(data[1]) - - def process_paths(paths): """Compute map from sha1 to localpath. """ m = {} for localpath, size in paths: sha1 = hashutil.hex_to_hash(utils.sha1_from_path(localpath)) m[sha1] = [localpath, size] return m def retrieve_unknown_sha1s(swhstorage, gendata): # Compute blocks of 1000 sha1s for paths in grouper(gendata, n=1000): data = process_paths(paths) sha1s_tocheck = list(data.keys()) if len(sha1s_tocheck) > 0: # let those inexistent sha1s flow for sha1 in swhstorage.content_missing_per_sha1(sha1s_tocheck): yield data[sha1][0], data[sha1][1] @click.command() @click.option('--db-url', help="""Optional db access. If not specified, wait for stdin entries.""") @click.option('--block-size', default=104857600, help='Default block size in bytes (100Mib).') @click.option('--block-max-files', default=1000, help='Default max number of files (default: 1000).') @click.option('--limit', default=None, help='Limit data to fetch.') @click.option('--dry-run', is_flag=True, help='Dry run.') @click.option('--huge', is_flag=True, help='Deal with huge files.') @click.option('--storage-class', default='remote_storage') @click.option('--storage-args', default='http://uffizi.internal.softwareheritage.org:5002/') def send_jobs(db_url, block_size, block_max_files, limit, dry_run, huge, storage_class, storage_args): """Send paths for worker to retrieve from sesi machine. """ from swh.scheduler.celery_backend.config import app from swh.loader.antelink import tasks # noqa # right inputs if isinstance(block_size, str): block_size = int(block_size) if isinstance(block_max_files, str): block_max_files = int(block_max_files) if limit and isinstance(limit, str): limit = int(limit) if dry_run: print('** DRY RUN **') if db_url: store = storage.Storage(db_url) gen_data = store.read_content_sesi_not_in_swh(huge, limit) else: - gen_data = gen_path_length_from_stdin() + gen_data = utils.gen_path_length_from_stdin() if huge: task_name = 'swh.loader.antelink.tasks.AntelinkSesiInjecterHugeTsk' else: task_name = 'swh.loader.antelink.tasks.AntelinkSesiInjecterTsk' swhstorage = get_storage(storage_class, storage_args.split(',')) gen_data = retrieve_unknown_sha1s(swhstorage, gen_data) nb_total_blocks = 0 for paths, size in utils.split_data_per_size(gen_data, block_size, block_max_files): nb_total_blocks += 1 print('%s paths (%s bytes) sent.' % (len(paths), size)) if dry_run: continue app.tasks[task_name].delay(paths) print('Number of jobs: %s' % nb_total_blocks) if __name__ == '__main__': send_jobs() diff --git a/swh/loader/antelink/utils.py b/swh/loader/antelink/utils.py index e90cdf5..6c90737 100644 --- a/swh/loader/antelink/utils.py +++ b/swh/loader/antelink/utils.py @@ -1,109 +1,126 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip import os +import sys from swh.core import hashutil def compute_len(f): """Compute file-like f object's size. Returns: Length of the file-like f object. """ total = 0 while True: chunk = f.read(hashutil.HASH_BLOCK_SIZE) if not chunk: break total += len(chunk) return total def hashfile(f, with_data=False): """hash the content of a file-like object. """ length = compute_len(f) f.seek(0) if with_data: localdata = [] def add_chunk(chunk, localdata=localdata): localdata.append(chunk) data = hashutil._hash_file_obj(f, length, chunk_cb=add_chunk) data['data'] = b''.join(localdata) else: data = hashutil._hash_file_obj(f, length) data['length'] = length return data def compute_hash(path, with_data=False): """Compute the gzip file's hashes and length. Args: path: path to the gzip file to hash Returns: dictionary of sha1, sha1_git, sha256 and length. """ with gzip.open(path, 'rb') as f: return hashfile(f, with_data=with_data) def sha1_from_path(path): """Path expected to ends with .gz. Ex: /some/path/to/.gz Returns: sha1 extracted from the pathname. """ return os.path.basename(path).split('.')[0] def to_content(path, log=None): """Load path into a content for swh. """ data = compute_hash(path, with_data=True) data['update'] = 'visible' return data def split_data_per_size(gen_data, block_size, block_max_files): """Compute the paths to retrieve from sesi and inject in swh. It will compute ~block_size (bytes) of files (paths) to retrieve and send it to the queue for workers to download and inject in swh. """ accu_size = 0 paths = [] nb_files = 0 for path, length in gen_data: accu_size += length paths.append(path) nb_files += 1 if accu_size >= block_size or nb_files >= block_max_files: yield paths, accu_size paths = [] accu_size = 0 nb_files = 0 # if remaining paths if accu_size > 0 or paths: yield paths, accu_size + + +def gen_path_length_from_stdin(): + """Compute the paths to retrieve from sesi and inject in swh. + + It will compute ~block_size (bytes) of files (paths) to retrieve + and send it to the queue for workers to download and inject in swh. + + """ + for line in sys.stdin: + line = line.rstrip() + data = line.split(' ') + if len(data) > 1: + yield data[0], int(data[1]) + else: + yield data[0]