diff --git a/debian/control b/debian/control index e5fa747..f36067c 100644 --- a/debian/control +++ b/debian/control @@ -1,25 +1,26 @@ Source: swh-loader-antelink Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-nose, python3-setuptools, python3-swh.core (>= 0.0.15~), + python3-swh.model (>= 0.0.14~), python3-swh.storage (>= 0.0.34~), python3-swh.scheduler (>= 0.0.6~), python3-click, python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DLDANT/ Package: python3-swh.loader.antelink Architecture: all Depends: python3-swh.core (>= 0.0.15~), python3-swh.storage (>= 0.0.34~), python3-swh.scheduler (>= 0.0.6~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Antelink Loader diff --git a/requirements-swh.txt b/requirements-swh.txt index e4c53d2..a9f79d1 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,4 @@ swh.core >= 0.0.15 +swh.model >= 0.0.14 swh.storage >= 0.0.34 swh.scheduler >= 0.0.6 diff --git a/swh/loader/antelink/compute_checksums.py b/swh/loader/antelink/compute_checksums.py index d09139a..cc75ccf 100755 --- a/swh/loader/antelink/compute_checksums.py +++ b/swh/loader/antelink/compute_checksums.py @@ -1,117 +1,117 @@ #!/usr/bin/python3 """Module in charge of computing metadata from sesi's antelink backup. This outputs the results as csv content with: sha1,sha1_git,sha256,length,path,corrupted Note: - The path is the one from sesi. - sesi machine has no access to swh's network that's why we output as files first """ import logging import os import re import sys -from swh.core import hashutil from swh.loader.antelink import utils +from swh.model import hashutil dry_run = False LOG_LEVEL = logging.WARN # logging.INFO SHA1_RE = re.compile(r'^[0-9a-fA-F]{40}$') MAX_SIZE_TARBALL = None # 2*1024*1024*1024 # 2G tarball threshold def is_sha1(s): return bool(re.match(SHA1_RE, s)) def main(): """Expects filepaths to be piped in. The filepath is then: - check for being named .gz (if not skipped) - check for existence (if not skipped) - check for filesize threshold (if too much, logged and computations are skipped for now) - hash computation on the uncompressed file and length The output is as follows: sha1,sha1_git,sha256,length,path,corrupted for huge file or hash computation pb file, the output is as follows: ,,,,path, (The computation will have to be replayed later) """ for line in sys.stdin: path = line.rstrip() logging.debug('Treating file %s' % path) filename = os.path.basename(path) sha1_filename = filename.rstrip('.gz') if not is_sha1(sha1_filename): logging.debug('Skipping non-SHA1 file %s' % sha1_filename) continue if not os.path.isfile(path): logging.warn('file %s is not a regular file, skipping it' % path) continue if dry_run: logging.warn('dry run, do nothing...') continue filesize = os.lstat(path).st_size if MAX_SIZE_TARBALL and filesize >= MAX_SIZE_TARBALL: logging.warn('Huge compressed file (%s, %s) detected... ' 'Skipping computation for now.' % (path, filesize)) # print out the path without computations print(','.join(['', '', '', '', path, ''])) continue else: logging.info('File gz (%s, %s) detected' % (path, filesize)) try: data = utils.compute_hash(path) except Exception as e: logging.error('Problem during hash computation for (%s, %s)... %s' % (path, filesize, e)) print(','.join(['', '', '', '', path, ''])) continue content_sha1 = hashutil.hash_to_hex(data['sha1']) corrupted = False if sha1_filename != content_sha1: logging.error('File gz (%s, %s) corrupted (content sha1: %s)' % (path, filesize, content_sha1)) corrupted = True data.update({ 'path': path, 'length': str(data['length']), 'corrupted': str(corrupted), }) print(','.join([hashutil.hash_to_hex(data['sha1']), hashutil.hash_to_hex(data['sha1_git']), hashutil.hash_to_hex(data['sha256']), data['length'], data['path'], data['corrupted']])) if __name__ == '__main__': process_log_file = sys.argv[1] logging.basicConfig(level=LOG_LEVEL, handlers=[logging.FileHandler(process_log_file), logging.StreamHandler()]) main() diff --git a/swh/loader/antelink/inject_checksums.py b/swh/loader/antelink/inject_checksums.py index 7cf788d..ab6ba7a 100644 --- a/swh/loader/antelink/inject_checksums.py +++ b/swh/loader/antelink/inject_checksums.py @@ -1,82 +1,82 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of loading the data retrieved from sesi's backup. This expects to receive filepaths from sys.stdin. Those filepath must point to existing file which are of the form: sha1,sha1_git,sha256,length,path,corrupted This is the output format of module swh.loader.antelink.compute_checksums. The data are stored in table content_sesi_all. """ import os import sys -from swh.core import hashutil +from swh.model import hashutil from swh.core.utils import grouper from swh.loader.antelink import utils from swh.loader.antelink.db import Db BLOCK_SIZE = 250000 def load_file(path): """Load file and yield sha1, pathname couple.""" with open(path, 'r') as f: for line in f: data = line.rstrip().split(',') path = data[4] # some line can be empty on sha1, sha1_git, sha256 (when # huge file or pb during hash computation) origin_sha1 = utils.sha1_from_path(path) if data[0]: - yield {'origin_sha1': hashutil.hex_to_hash(origin_sha1), - 'sha1': hashutil.hex_to_hash(data[0]), - 'sha1_git': hashutil.hex_to_hash(data[1]), - 'sha256': hashutil.hex_to_hash(data[2]), + yield {'origin_sha1': hashutil.hash_to_bytes(origin_sha1), + 'sha1': hashutil.hash_to_bytes(data[0]), + 'sha1_git': hashutil.hash_to_bytes(data[1]), + 'sha256': hashutil.hash_to_bytes(data[2]), 'length': data[3], 'path': path, 'corrupted': data[5]} else: print('Path %s skipped.' % path) def store_file_content(db_url, path): """The first round finished, there were errors. Adapting the code and running this command will finish appropriately the first round. """ db = Db.connect(db_url) with db.transaction() as cur: for data in grouper(load_file(path), BLOCK_SIZE): db.copy_to(data, 'content_sesi', ['origin_sha1', 'sha1', 'sha1_git', 'sha256', 'length', 'path', 'corrupted'], cur) if __name__ == '__main__': db_url = "%s" % sys.argv[1] for filepath in sys.stdin: filepath = filepath.rstrip() if not os.path.exists(filepath): print('Path %s does not exist.' % filepath) continue if not os.path.isfile(filepath): print('Path %s does not reference a file.' % filepath) continue try: store_file_content(db_url, filepath) print('%s ok' % filepath) except Exception as e: print('%s ko %s' % (filepath, e)) diff --git a/swh/loader/antelink/s3injecter.py b/swh/loader/antelink/s3injecter.py index cfd765c..a0bb82e 100644 --- a/swh/loader/antelink/s3injecter.py +++ b/swh/loader/antelink/s3injecter.py @@ -1,68 +1,69 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os -from swh.core import config, hashutil +from swh.core import config +from swh.model import hashutil from swh.storage import get_storage from swh.loader.antelink import utils class AntelinkS3Injecter(config.SWHConfig): """A bulk loader for downloading some file from s3. """ DEFAULT_CONFIG = { 'storage_class': ('str', 'remote_storage'), 'storage_args': ('list[str]', ['http://localhost:5002/']), 's3_folder': ('string', '/srv/storage/space/antelink/s3'), } def __init__(self, config): self.config = config s3_folder = self.config['s3_folder'] if not s3_folder.endswith('/'): self.config['s3_folder'] = s3_folder + '/' self.storage = get_storage(config['storage_class'], config['storage_args']) self.log = logging.getLogger( 'swh.antelink.loader.AntelinkS3Injecter') def process_paths(self, paths): for localpath in paths: if not os.path.exists(localpath): self.log.error('%s does not exist!' % localpath) continue try: data = utils.to_content(localpath, log=self.log) # Check for corruption on sha1 origin_sha1 = utils.sha1_from_path(localpath) sha1 = hashutil.hash_to_hex(data['sha1']) if origin_sha1 != sha1: self.log.warn('%s corrupted - %s != %s. Skipping!' % (localpath, origin_sha1, sha1)) continue self.log.debug('%s -> swh' % sha1) yield data except Exception as e: self.log.error('Problem during checksums computation %s - %s' % (localpath, e)) continue def process(self, paths): # Then process them and store in swh data = self.process_paths( (self.config['s3_folder'] + p for p in paths)) self.storage.content_add(data) self.log.info('s3 - %s contents -> swh' % len(paths)) diff --git a/swh/loader/antelink/s3injecter_producer.py b/swh/loader/antelink/s3injecter_producer.py index 8350768..902eb60 100755 --- a/swh/loader/antelink/s3injecter_producer.py +++ b/swh/loader/antelink/s3injecter_producer.py @@ -1,90 +1,90 @@ -# Copyright (C) 2015-2016 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click -from swh.core import hashutil from swh.core.utils import grouper +from swh.model import hashutil from swh.storage import get_storage from swh.loader.antelink import utils, storage task_name = 'swh.loader.antelink.tasks.AntelinkS3InjecterTsk' def process_paths(paths): """Compute map from sha1 to localpath. """ m = {} for localpath, size in paths: - sha1 = hashutil.hex_to_hash(utils.sha1_from_path(localpath)) + sha1 = hashutil.hash_to_bytes(utils.sha1_from_path(localpath)) m[sha1] = [localpath, size] return m def retrieve_unknown_sha1s(swhstorage, gen_data): # Compute blocks of 1000 sha1s for paths in grouper(gen_data, n=1000): data = process_paths(paths) sha1s_tocheck = list(data.keys()) if len(sha1s_tocheck) > 0: # let those inexistent sha1s flow for sha1 in swhstorage.content_missing_per_sha1(sha1s_tocheck): yield data[sha1][0], data[sha1][1] @click.command() @click.option('--db-url', help='Db access.') @click.option('--block-size', default=104857600, help='Default block size in bytes (100Mib).') @click.option('--block-max-files', default=1000, help='Default max number of files (default: 1000).') @click.option('--limit', default=None, help='Limit data to fetch.') @click.option('--dry-run', is_flag=True, help='Dry run.') @click.option('--final', is_flag=True, help='To deal with remaining s3 files.') @click.option('--huge', is_flag=True, help='To deal with big files.') @click.option('--storage-class', default='remote_storage') @click.option('--storage-args', default='http://uffizi.internal.softwareheritage.org:5002/') def compute_s3_jobs(db_url, block_size, block_max_files, limit, dry_run, final, huge, storage_class, storage_args): from swh.scheduler.celery_backend.config import app from swh.loader.antelink import tasks # noqa # right inputs if isinstance(block_size, str): block_size = int(block_size) if isinstance(block_max_files, str): block_max_files = int(block_max_files) if limit and isinstance(limit, str): limit = int(limit) if dry_run: print('** DRY RUN **') swhstorage = get_storage(storage_class, storage_args.split(',')) if db_url: store = storage.Storage(db_url) gen_data = retrieve_unknown_sha1s( swhstorage, store.read_content_s3_not_in_sesi_nor_in_swh(huge, final, limit)) else: gen_data = retrieve_unknown_sha1s(swhstorage, utils.gen_path_length_from_stdin()) nb_total_blocks = 0 for paths, size in utils.split_data_per_size(gen_data, block_size, block_max_files): nb_total_blocks += 1 print('%s paths (%s bytes) sent.' % (len(paths), size)) if dry_run: continue app.tasks[task_name].delay(paths) print('Number of jobs: %s' % nb_total_blocks) if __name__ == '__main__': compute_s3_jobs() diff --git a/swh/loader/antelink/sesiinjecter.py b/swh/loader/antelink/sesiinjecter.py index 50cc5d5..884b9ca 100644 --- a/swh/loader/antelink/sesiinjecter.py +++ b/swh/loader/antelink/sesiinjecter.py @@ -1,62 +1,63 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os -from swh.core import config, hashutil +from swh.core import config +from swh.model import hashutil from swh.storage import get_storage from swh.loader.antelink import utils class AntelinkSesiInjecter(config.SWHConfig): """A bulk loader for downloading some file from s3. """ DEFAULT_CONFIG = { 'storage_class': ('str', 'remote_storage'), 'storage_args': ('list[str]', ['http://localhost:5002/']), } def __init__(self, config): self.config = config self.storage = get_storage(config['storage_class'], config['storage_args']) self.log = logging.getLogger( 'swh.antelink.loader.AntelinkSesiInjecter') def process_paths(self, paths): for localpath in paths: if not os.path.exists(localpath): self.log.warn('%s does not exist!' % localpath) continue try: data = utils.to_content(localpath, log=self.log) # Check for corruption on sha1 origin_sha1 = utils.sha1_from_path(localpath) sha1 = hashutil.hash_to_hex(data['sha1']) if origin_sha1 != sha1: self.log.warn('%s corrupted - %s != %s. Skipping!' % (localpath, origin_sha1, sha1)) continue self.log.debug('%s -> swh' % sha1) yield data except Exception as e: self.log.error('Problem during checksums computation %s - %s' % (localpath, e)) continue def process(self, paths): # Then process them and store in swh data = self.process_paths(paths) self.storage.content_add(data) self.log.info('sesi - %s contents -> swh' % len(paths)) diff --git a/swh/loader/antelink/sesiproducer.py b/swh/loader/antelink/sesiproducer.py index fda92c3..8e94fca 100755 --- a/swh/loader/antelink/sesiproducer.py +++ b/swh/loader/antelink/sesiproducer.py @@ -1,96 +1,96 @@ -# Copyright (C) 2015-2016 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click -from swh.core import hashutil from swh.core.utils import grouper +from swh.model import hashutil from swh.storage import get_storage from swh.loader.antelink import storage, utils def process_paths(paths): """Compute map from sha1 to localpath. """ m = {} for localpath, size in paths: - sha1 = hashutil.hex_to_hash(utils.sha1_from_path(localpath)) + sha1 = hashutil.hash_to_bytes(utils.sha1_from_path(localpath)) m[sha1] = [localpath, size] return m def retrieve_unknown_sha1s(swhstorage, gendata): # Compute blocks of 1000 sha1s for paths in grouper(gendata, n=1000): data = process_paths(paths) sha1s_tocheck = list(data.keys()) if len(sha1s_tocheck) > 0: # let those inexistent sha1s flow for sha1 in swhstorage.content_missing_per_sha1(sha1s_tocheck): yield data[sha1][0], data[sha1][1] @click.command() @click.option('--db-url', help="""Optional db access. If not specified, wait for stdin entries.""") @click.option('--block-size', default=104857600, help='Default block size in bytes (100Mib).') @click.option('--block-max-files', default=1000, help='Default max number of files (default: 1000).') @click.option('--limit', default=None, help='Limit data to fetch.') @click.option('--dry-run', is_flag=True, help='Dry run.') @click.option('--huge', is_flag=True, help='Deal with huge files.') @click.option('--storage-class', default='remote_storage') @click.option('--storage-args', default='http://uffizi.internal.softwareheritage.org:5002/') def send_jobs(db_url, block_size, block_max_files, limit, dry_run, huge, storage_class, storage_args): """Send paths for worker to retrieve from sesi machine. """ from swh.scheduler.celery_backend.config import app from swh.loader.antelink import tasks # noqa # right inputs if isinstance(block_size, str): block_size = int(block_size) if isinstance(block_max_files, str): block_max_files = int(block_max_files) if limit and isinstance(limit, str): limit = int(limit) if dry_run: print('** DRY RUN **') if db_url: store = storage.Storage(db_url) gen_data = store.read_content_sesi_not_in_swh(huge, limit) else: gen_data = utils.gen_path_length_from_stdin() if huge: task_name = 'swh.loader.antelink.tasks.AntelinkSesiInjecterHugeTsk' else: task_name = 'swh.loader.antelink.tasks.AntelinkSesiInjecterTsk' swhstorage = get_storage(storage_class, storage_args.split(',')) gen_data = retrieve_unknown_sha1s(swhstorage, gen_data) nb_total_blocks = 0 for paths, size in utils.split_data_per_size(gen_data, block_size, block_max_files): nb_total_blocks += 1 print('%s paths (%s bytes) sent.' % (len(paths), size)) if dry_run: continue app.tasks[task_name].delay(paths) print('Number of jobs: %s' % nb_total_blocks) if __name__ == '__main__': send_jobs() diff --git a/swh/loader/antelink/utils.py b/swh/loader/antelink/utils.py index 6c90737..e7ea8b5 100644 --- a/swh/loader/antelink/utils.py +++ b/swh/loader/antelink/utils.py @@ -1,126 +1,126 @@ -# Copyright (C) 2015-2016 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip import os import sys -from swh.core import hashutil +from swh.model import hashutil def compute_len(f): """Compute file-like f object's size. Returns: Length of the file-like f object. """ total = 0 while True: chunk = f.read(hashutil.HASH_BLOCK_SIZE) if not chunk: break total += len(chunk) return total def hashfile(f, with_data=False): """hash the content of a file-like object. """ length = compute_len(f) f.seek(0) if with_data: localdata = [] def add_chunk(chunk, localdata=localdata): localdata.append(chunk) - data = hashutil._hash_file_obj(f, length, chunk_cb=add_chunk) + data = hashutil.hash_file(f, length, chunk_cb=add_chunk) data['data'] = b''.join(localdata) else: - data = hashutil._hash_file_obj(f, length) + data = hashutil.hash_file(f, length) data['length'] = length return data def compute_hash(path, with_data=False): """Compute the gzip file's hashes and length. Args: path: path to the gzip file to hash Returns: dictionary of sha1, sha1_git, sha256 and length. """ with gzip.open(path, 'rb') as f: return hashfile(f, with_data=with_data) def sha1_from_path(path): """Path expected to ends with .gz. Ex: /some/path/to/.gz Returns: sha1 extracted from the pathname. """ return os.path.basename(path).split('.')[0] def to_content(path, log=None): """Load path into a content for swh. """ data = compute_hash(path, with_data=True) data['update'] = 'visible' return data def split_data_per_size(gen_data, block_size, block_max_files): """Compute the paths to retrieve from sesi and inject in swh. It will compute ~block_size (bytes) of files (paths) to retrieve and send it to the queue for workers to download and inject in swh. """ accu_size = 0 paths = [] nb_files = 0 for path, length in gen_data: accu_size += length paths.append(path) nb_files += 1 if accu_size >= block_size or nb_files >= block_max_files: yield paths, accu_size paths = [] accu_size = 0 nb_files = 0 # if remaining paths if accu_size > 0 or paths: yield paths, accu_size def gen_path_length_from_stdin(): """Compute the paths to retrieve from sesi and inject in swh. It will compute ~block_size (bytes) of files (paths) to retrieve and send it to the queue for workers to download and inject in swh. """ for line in sys.stdin: line = line.rstrip() data = line.split(' ') if len(data) > 1: yield data[0], int(data[1]) else: yield data[0]