diff --git a/swh/loader/antelink/inject_checksums.py b/swh/loader/antelink/inject_checksums.py index c79b7d9..7cf788d 100644 --- a/swh/loader/antelink/inject_checksums.py +++ b/swh/loader/antelink/inject_checksums.py @@ -1,81 +1,82 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of loading the data retrieved from sesi's backup. This expects to receive filepaths from sys.stdin. Those filepath must point to existing file which are of the form: sha1,sha1_git,sha256,length,path,corrupted This is the output format of module swh.loader.antelink.compute_checksums. The data are stored in table content_sesi_all. """ import os import sys from swh.core import hashutil +from swh.core.utils import grouper from swh.loader.antelink import utils from swh.loader.antelink.db import Db BLOCK_SIZE = 250000 def load_file(path): """Load file and yield sha1, pathname couple.""" with open(path, 'r') as f: for line in f: data = line.rstrip().split(',') path = data[4] # some line can be empty on sha1, sha1_git, sha256 (when # huge file or pb during hash computation) origin_sha1 = utils.sha1_from_path(path) if data[0]: yield {'origin_sha1': hashutil.hex_to_hash(origin_sha1), 'sha1': hashutil.hex_to_hash(data[0]), 'sha1_git': hashutil.hex_to_hash(data[1]), 'sha256': hashutil.hex_to_hash(data[2]), 'length': data[3], 'path': path, 'corrupted': data[5]} else: print('Path %s skipped.' % path) def store_file_content(db_url, path): """The first round finished, there were errors. Adapting the code and running this command will finish appropriately the first round. """ db = Db.connect(db_url) with db.transaction() as cur: - for data in utils.split_data(load_file(path), BLOCK_SIZE): + for data in grouper(load_file(path), BLOCK_SIZE): db.copy_to(data, 'content_sesi', ['origin_sha1', 'sha1', 'sha1_git', 'sha256', 'length', 'path', 'corrupted'], cur) if __name__ == '__main__': db_url = "%s" % sys.argv[1] for filepath in sys.stdin: filepath = filepath.rstrip() if not os.path.exists(filepath): print('Path %s does not exist.' % filepath) continue if not os.path.isfile(filepath): print('Path %s does not reference a file.' % filepath) continue try: store_file_content(db_url, filepath) print('%s ok' % filepath) except Exception as e: print('%s ko %s' % (filepath, e)) diff --git a/swh/loader/antelink/loads3ls.py b/swh/loader/antelink/loads3ls.py index d8eab8b..3c1639f 100644 --- a/swh/loader/antelink/loads3ls.py +++ b/swh/loader/antelink/loads3ls.py @@ -1,74 +1,75 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of loading the 'aws s3 ls' data output in antelink.content_s3 table.""" import os import sys +from swh.core.utils import grouper from swh.loader.antelink import utils from swh.loader.antelink.db import Db BLOCK_SIZE = None # 75000 def load_data(path): """Load file and yield sha1, pathname couple.""" with open(path, 'r') as f: for line in f: data = line.rstrip().split(' ') l = len(data) pathname = data[l - 1] if pathname.endswith('.gz'): sha1 = bytes.fromhex(utils.sha1_from_path(path)) length = data[l - 2] yield {'sha1': sha1, 'path': pathname, 'length': length} def store_file_to_antelink_db_per_block(db, path): with db.transaction() as cur: - for data in utils.split_data(load_data(path), BLOCK_SIZE): + for data in grouper(load_data(path), BLOCK_SIZE): db.copy_to(data, 'content_s3', ['sha1', 'path', 'length'], cur) def store_file_to_antelink_db(db, path): with db.transaction() as cur: db.copy_to(load_data(path), 'content_s3', ['sha1', 'path', 'length'], cur) def store_file_and_print_result(db, path): """Try and store the file in the db connection. This prints ok or ko depending on the result. """ try: if BLOCK_SIZE: store_file_to_antelink_db_per_block(db, path) else: store_file_to_antelink_db(db, path) print('%s ok' % path) except Exception as e: print('%s ko %s' % (path, e)) if __name__ == '__main__': db_url = "%s" % sys.argv[1] for line in sys.stdin: db = Db.connect(db_url) filepath = line.rstrip() if not os.path.exists(filepath): print('Path %s does not exist.' % filepath) if os.path.isfile(filepath): store_file_and_print_result(db, filepath) diff --git a/swh/loader/antelink/s3download_producer.py b/swh/loader/antelink/s3download_producer.py index 3da2822..7099043 100755 --- a/swh/loader/antelink/s3download_producer.py +++ b/swh/loader/antelink/s3download_producer.py @@ -1,37 +1,38 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click -from swh.loader.antelink import utils, storage +from swh.core.utils import grouper +from swh.loader.antelink import storage def s3_files_to_download(db_url, huge, final, limit): store = storage.Storage(db_url) files_gen = store.read_content_s3_not_in_sesi_nor_in_swh( huge=huge, final=final, limit=limit) for path, _ in files_gen: yield path @click.command() @click.option('--db-url', default='service=swh-antelink', help='Db access.') @click.option('--block-size', default=1000, help='Default block size to use.') @click.option('--limit', default=None, help='Limit data to fetch.') @click.option('--final', is_flag=True, help='Add final s3 files.') @click.option('--huge', is_flag=True, help='Add huge s3 files.') def compute_s3_files(db_url, block_size, limit, final, huge): from swh.scheduler.celery_backend.config import app from swh.loader.antelink import tasks # noqa - for paths in utils.split_data( + for paths in grouper( s3_files_to_download(db_url, huge, final, limit), block_size): app.tasks['swh.loader.antelink.tasks.AntelinkS3DownloaderTsk'].delay( list(paths)) if __name__ == '__main__': compute_s3_files() diff --git a/swh/loader/antelink/s3injecter_producer.py b/swh/loader/antelink/s3injecter_producer.py index b414f16..d141d35 100755 --- a/swh/loader/antelink/s3injecter_producer.py +++ b/swh/loader/antelink/s3injecter_producer.py @@ -1,85 +1,86 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click from swh.core import hashutil +from swh.core.utils import grouper from swh.storage import get_storage from swh.loader.antelink import utils, storage task_name = 'swh.loader.antelink.tasks.AntelinkS3InjecterTsk' def process_paths(paths): """Compute map from sha1 to localpath. """ m = {} for localpath, size in paths: sha1 = hashutil.hex_to_hash(utils.sha1_from_path(localpath)) m[sha1] = [localpath, size] return m def retrieve_unknown_sha1s(swhstorage, gen_data): # Compute blocks of 1000 sha1s - for paths in utils.split_data(gen_data, block_size=1000): + for paths in grouper(gen_data, n=1000): data = process_paths(paths) sha1s_tocheck = list(data.keys()) if len(sha1s_tocheck) > 0: # let those inexistent sha1s flow for sha1 in swhstorage.content_missing_per_sha1(sha1s_tocheck): yield data[sha1][0], data[sha1][1] @click.command() @click.option('--db-url', default='service=swh-antelink', help='Db access.') @click.option('--block-size', default=104857600, help='Default block size in bytes (100Mib).') @click.option('--block-max-files', default=1000, help='Default max number of files (default: 1000).') @click.option('--limit', default=None, help='Limit data to fetch.') @click.option('--dry-run', is_flag=True, help='Dry run.') @click.option('--final', is_flag=True, help='To deal with remaining s3 files.') @click.option('--huge', is_flag=True, help='To deal with big files.') @click.option('--storage-class', default='remote_storage') @click.option('--storage-args', default='http://uffizi.internal.softwareheritage.org:5002/') def compute_s3_jobs(db_url, block_size, block_max_files, limit, dry_run, final, huge, storage_class, storage_args): from swh.scheduler.celery_backend.config import app from swh.loader.antelink import tasks # noqa # right inputs if isinstance(block_size, str): block_size = int(block_size) if isinstance(block_max_files, str): block_max_files = int(block_max_files) if limit and isinstance(limit, str): limit = int(limit) if dry_run: print('** DRY RUN **') store = storage.Storage(db_url) swhstorage = get_storage(storage_class, storage_args.split(',')) gen_data = retrieve_unknown_sha1s( swhstorage, store.read_content_s3_not_in_sesi_nor_in_swh(huge, final, limit)) nb_total_blocks = 0 for paths, size in utils.split_data_per_size(gen_data, block_size, block_max_files): nb_total_blocks += 1 print('%s paths (%s bytes) sent.' % (len(paths), size)) if dry_run: continue app.tasks[task_name].delay(paths) print('Number of jobs: %s' % nb_total_blocks) if __name__ == '__main__': compute_s3_jobs() diff --git a/swh/loader/antelink/sesiproducer.py b/swh/loader/antelink/sesiproducer.py index 86ff938..bf63a63 100755 --- a/swh/loader/antelink/sesiproducer.py +++ b/swh/loader/antelink/sesiproducer.py @@ -1,110 +1,110 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import sys from swh.core import hashutil +from swh.core.utils import grouper from swh.storage import get_storage from swh.loader.antelink import storage, utils def gen_path_length_from_stdin(): """Compute the paths to retrieve from sesi and inject in swh. It will compute ~block_size (bytes) of files (paths) to retrieve and send it to the queue for workers to download and inject in swh. """ for line in sys.stdin: line = line.rstrip() data = line.split(' ') yield data[0], int(data[1]) def process_paths(paths): """Compute map from sha1 to localpath. """ m = {} for localpath, size in paths: sha1 = hashutil.hex_to_hash(utils.sha1_from_path(localpath)) m[sha1] = [localpath, size] return m def retrieve_unknown_sha1s(swhstorage, gendata): # Compute blocks of 1000 sha1s - for paths in utils.split_data(gendata, block_size=1000): + for paths in grouper(gendata, n=1000): data = process_paths(paths) sha1s_tocheck = list(data.keys()) if len(sha1s_tocheck) > 0: # let those inexistent sha1s flow for sha1 in swhstorage.content_missing_per_sha1(sha1s_tocheck): yield data[sha1][0], data[sha1][1] @click.command() @click.option('--db-url', help="""Optional db access. - If not specified, wait for stdin entries. - """) + If not specified, wait for stdin entries.""") @click.option('--block-size', default=104857600, help='Default block size in bytes (100Mib).') @click.option('--block-max-files', default=1000, help='Default max number of files (default: 1000).') @click.option('--limit', default=None, help='Limit data to fetch.') @click.option('--dry-run', is_flag=True, help='Dry run.') @click.option('--huge', is_flag=True, help='Deal with huge files.') @click.option('--storage-class', default='remote_storage') @click.option('--storage-args', default='http://uffizi.internal.softwareheritage.org:5002/') def send_jobs(db_url, block_size, block_max_files, limit, dry_run, huge, storage_class, storage_args): """Send paths for worker to retrieve from sesi machine. """ from swh.scheduler.celery_backend.config import app from swh.loader.antelink import tasks # noqa # right inputs if isinstance(block_size, str): block_size = int(block_size) if isinstance(block_max_files, str): block_max_files = int(block_max_files) if limit and isinstance(limit, str): limit = int(limit) if dry_run: print('** DRY RUN **') if db_url: store = storage.Storage(db_url) gen_data = store.read_content_sesi_not_in_swh(huge, limit) else: gen_data = gen_path_length_from_stdin() if huge: task_name = 'swh.loader.antelink.tasks.AntelinkSesiInjecterHugeTsk' else: task_name = 'swh.loader.antelink.tasks.AntelinkSesiInjecterTsk' swhstorage = get_storage(storage_class, storage_args.split(',')) gen_data = retrieve_unknown_sha1s(swhstorage, gen_data) nb_total_blocks = 0 for paths, size in utils.split_data_per_size(gen_data, block_size, block_max_files): nb_total_blocks += 1 print('%s paths (%s bytes) sent.' % (len(paths), size)) if dry_run: continue app.tasks[task_name].delay(paths) print('Number of jobs: %s' % nb_total_blocks) if __name__ == '__main__': send_jobs() diff --git a/swh/loader/antelink/utils.py b/swh/loader/antelink/utils.py index bb212cb..e90cdf5 100644 --- a/swh/loader/antelink/utils.py +++ b/swh/loader/antelink/utils.py @@ -1,139 +1,109 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip -import itertools import os from swh.core import hashutil -def grouper(iterable, n, fillvalue=None): - """Collect data into fixed-length chunks or blocks. - - Args: - iterable: an iterable - n: size of block - fillvalue: value to use for the last block - - Returns: - fixed-length chunks of blocks as iterables - - """ - args = [iter(iterable)] * n - return itertools.zip_longest(*args, fillvalue=fillvalue) - - def compute_len(f): """Compute file-like f object's size. Returns: Length of the file-like f object. """ total = 0 while True: chunk = f.read(hashutil.HASH_BLOCK_SIZE) if not chunk: break total += len(chunk) return total def hashfile(f, with_data=False): """hash the content of a file-like object. """ length = compute_len(f) f.seek(0) if with_data: localdata = [] def add_chunk(chunk, localdata=localdata): localdata.append(chunk) data = hashutil._hash_file_obj(f, length, chunk_cb=add_chunk) data['data'] = b''.join(localdata) else: data = hashutil._hash_file_obj(f, length) data['length'] = length return data def compute_hash(path, with_data=False): """Compute the gzip file's hashes and length. Args: path: path to the gzip file to hash Returns: dictionary of sha1, sha1_git, sha256 and length. """ with gzip.open(path, 'rb') as f: return hashfile(f, with_data=with_data) -def split_data(data, block_size): - """Split the data of the generator of block with a given size. - The last block may be inferior of block_size. - - Args: - data: generator of data to slice in blocks of size block-size - block_size: size block to use - """ - splitdata = grouper(data, block_size, fillvalue=None) - for _data in splitdata: - yield (d for d in _data if d) - - def sha1_from_path(path): """Path expected to ends with .gz. Ex: /some/path/to/.gz Returns: sha1 extracted from the pathname. """ return os.path.basename(path).split('.')[0] def to_content(path, log=None): """Load path into a content for swh. """ data = compute_hash(path, with_data=True) data['update'] = 'visible' return data def split_data_per_size(gen_data, block_size, block_max_files): """Compute the paths to retrieve from sesi and inject in swh. It will compute ~block_size (bytes) of files (paths) to retrieve and send it to the queue for workers to download and inject in swh. """ accu_size = 0 paths = [] nb_files = 0 for path, length in gen_data: accu_size += length paths.append(path) nb_files += 1 if accu_size >= block_size or nb_files >= block_max_files: yield paths, accu_size paths = [] accu_size = 0 nb_files = 0 # if remaining paths if accu_size > 0 or paths: yield paths, accu_size