diff --git a/bin/load-history-from-snapshot b/bin/load-history-from-snapshot index 6e76b64..1fddcc6 100755 --- a/bin/load-history-from-snapshot +++ b/bin/load-history-from-snapshot @@ -1,55 +1,56 @@ #!/usr/bin/python3 import glob import logging import os import sys from swh.loader.debian.listers.snapshot import SnapshotDebianOrg from swh.loader.debian.loader import ( - process_source_packages, try_flush_partial, flush_revision) + process_source_packages, try_flush_partial, flush_release, flush_revision) from swh.storage.storage import Storage logging.basicConfig() log = logging.getLogger('swh.loader.debian.load_history_from_snapshot') keyrings = glob.glob('/usr/share/keyrings/*') s = SnapshotDebianOrg(connstr='service=snapshot', basedir=os.path.expanduser('~/tmp/snapshot.d.o')) source_package_dir = sys.argv[1] package_names = sys.argv[2:] pkgs = s.copy_package_files(package_names, source_package_dir, log=log) sorted_pkgs = sorted((p for p in pkgs.values() if os.path.exists(p['dsc'])), key=lambda p: (p['name'], p['version'])) storage = Storage('dbname=softwareheritage-dev', '/tmp/swh-loader-debian/objects') partial = {} for partial in process_source_packages(sorted_pkgs, keyrings, log=log): print( partial['packages'][-1]['version'], len(partial['objects']['directory']), '(%s)' % len(partial['objects']['directory_seen']), len(partial['objects']['content']), '(%s)' % len(partial['objects']['content_seen']), ) try_flush_partial(storage, partial, content_packet_size=10000, content_packet_length=1024 * 1024 * 1024, content_max_length_one=100 * 1024 * 1024, directory_packet_size=25000, log=log) if partial: try_flush_partial(storage, partial, content_packet_size=10000, content_packet_length=1024 * 1024 * 1024, content_max_length_one=100 * 1024 * 1024, directory_packet_size=25000, force=True, log=log) packages = flush_revision(storage, partial) + revisions = flush_release(storage, packages) diff --git a/swh/loader/debian/converters.py b/swh/loader/debian/converters.py index fe198c6..4fa3d1c 100644 --- a/swh/loader/debian/converters.py +++ b/swh/loader/debian/converters.py @@ -1,218 +1,259 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import copy import datetime import email.utils import os from swh.model import identifiers from swh.loader.dir.converters import tree_to_directory from swh.loader.dir.git.git import GitType ROBOT_AUTHOR = { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', } def blob_to_shallow_content(obj, other_objs): """Convert a blob as sent by swh.walker.dir to a blob ready to be sent (without contents) Args: - obj: The blob object returned by swh.walker.dir - other_objs: unused. Returns: A "shallow_content": a content, without the data, which saves memory space. """ ret = obj.copy() if 'length' not in ret: ret['length'] = os.lstat(obj['path']).st_size ret['perms'] = obj['perms'].value ret['type'] = obj['type'].value return ret def shallow_content_to_content(obj, content_max_length_one): """Add the necessary data to the shallow_content created by the previous function Args: - obj: shallow_content dict as returned by blob_to_shallow_content - content_max_length_one: length limit of a persisted content Returns: A content suitable for persistence in swh.storage """ content = obj.copy() if content['length'] > content_max_length_one: content['status'] = 'absent' content['reason'] = 'Content too large' elif 'data' not in content: content['status'] = 'visible' content['data'] = open(content['path'], 'rb').read() del content['path'] return content def package_to_revision(package): """Convert a package dictionary to a revision suitable for storage. Args: - - package: a dictionary with the following keys: + package: a dictionary with the following keys: + metadata: the metadata for the package, containing + package_info: + changelog + pgp_signature + directory Returns: A revision suitable for persistence in swh.storage """ metadata = package['metadata'] message = 'Synthetic revision for Debian source package %s version %s' % ( package['name'], package['version']) def prepare(obj): if isinstance(obj, list): return [prepare(item) for item in obj] elif isinstance(obj, dict): return {k: prepare(v) for k, v in obj.items()} elif isinstance(obj, datetime.datetime): return obj.isoformat() elif isinstance(obj, bytes): return obj.decode('utf-8') else: return copy.deepcopy(obj) ret = { 'author': metadata['package_info']['changelog']['person'], 'date': metadata['package_info']['changelog']['date'], 'committer': ROBOT_AUTHOR, 'committer_date': metadata['package_info']['pgp_signature']['date'], 'type': 'dsc', 'directory': package['directory'], 'message': message.encode('utf-8'), 'synthetic': True, 'parents': [], 'metadata': prepare(metadata), } rev_id = bytes.fromhex(identifiers.revision_identifier(ret)) ret['id'] = rev_id return ret +def package_to_release(package): + """Convert a package dictionary to a revision suitable for storage. + + Args: + package: a dictionary with the following keys: + metadata: the metadata for the package, containing + package_info + changelog + person + date + revision + + Returns: + A revision suitable for persistence in swh.storage + """ + package_info = package['metadata']['package_info'] + + message = 'Synthetic release for Debian source package %s version %s' % ( + package['name'], package['version']) + + ret = { + 'author': package_info['changelog']['person'], + 'date': package_info['changelog']['date'], + 'target': package['revision']['id'], + 'target_type': 'revision', + 'message': message.encode('utf-8'), + 'synthetic': True, + 'name': str(package['version']), + } + + rev_id = bytes.fromhex(identifiers.release_identifier(ret)) + ret['id'] = rev_id + + return ret + + def dedup_objects(objects, remove_duplicates=True): """Deduplicate the objects from dictionary `objects`. Args: - objects: a dictionary of objects indexed by path - remove_duplicates: if True, remove the duplicate objects from the filesystem Returns: A dictionary, indexed by object type, of dictionaries indexed by object id of deduplicated objects. """ converter_map = { GitType.TREE: tree_to_directory, GitType.BLOB: blob_to_shallow_content, } type_map = { GitType.TREE: 'directory', GitType.BLOB: 'content', } ret = defaultdict(dict) for members in objects.values(): for member in members: conv = converter_map[member['type']](member, objects) ret_type = type_map[member['type']] ret_key = conv.get('sha1_git') or conv['id'] if ret_key not in ret[ret_type]: ret[ret_type][ret_key] = conv elif remove_duplicates and 'path' in conv: # Nuke duplicate files os.unlink(conv['path']) return ret def merge_objects(accumulator, updates, remove_duplicates=True): """Merge the objects from `updates` in `accumulator`. This function mutates accumulator. It is designed so that the "content" and "directory" members of accumulator can be flushed periodically, for instance to send the data to the database in chunks. "content_seen" and "directory_seen" contain all the ids of the objects that have been seen so far. - Args: - accumulator: a dict to accumulate several updates in, with keys: - content (dict) - directory (dict) - content_seen (set) - directory_seen (set) - updates: the objects to add to accumulator (has two keys, content and directory) - remove_duplicates: if True, removes the objects from updates that have already be seen in accumulator. - Returns: None (accumulator is mutated). """ for key in ['content', 'directory']: seen_key = key + '_seen' cur_updates = updates[key] to_update = accumulator[key] seen = accumulator[seen_key] for update_key in cur_updates: if update_key not in seen: to_update[update_key] = cur_updates[update_key] seen.add(update_key) elif remove_duplicates and key == 'content': # Nuke the files that haven't changed since a previous run... os.unlink(cur_updates[update_key]['path']) def uid_to_person(uid, encode=True): """Convert an uid to a person suitable for insertion. Args: uid: an uid of the form "Name " encode: whether to convert the output to bytes or not Returns: a dictionary with keys: name: the name associated to the uid email: the mail associated to the uid """ ret = { 'name': '', 'email': '', } name, mail = email.utils.parseaddr(uid) if name and email: ret['name'] = name ret['email'] = mail else: ret['name'] = uid if encode: for key in ('name', 'email'): ret[key] = ret[key].encode('utf-8') return ret diff --git a/swh/loader/debian/listers/snapshot.py b/swh/loader/debian/listers/snapshot.py index 38c06d5..e28398a 100644 --- a/swh/loader/debian/listers/snapshot.py +++ b/swh/loader/debian/listers/snapshot.py @@ -1,248 +1,256 @@ #!/usr/bin/python3 # -*- encoding: utf-8 -*- # # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import os import shutil from deb822 import Dsc from debian.debian_support import Version import psycopg2 class SnapshotDebianOrg: """Methods to use the snapshot.debian.org mirror""" def __init__( self, connstr='service=snapshot', basedir='/srv/storage/space/mirrors/snapshot.debian.org', ): self.db = psycopg2.connect(connstr) self.basedir = basedir def _hash_to_path(self, hash): """Convert a hash to a file path on disk""" depth = 2 fragments = [hash[2*i:2*(i+1)] for i in range(depth)] dirname = os.path.join(self.basedir, 'files', *fragments) return os.path.join(dirname, hash) def list_packages(self, count=1, previous_name=''): """List `count` source package names present in the snapshot.d.o db starting with previous_name (excluded)""" package_names_query = """ select distinct(name) from srcpkg where name > %s order by name limit %s """ with self.db.cursor() as cur: cur.execute(package_names_query, (previous_name, count)) return [name for (name,) in cur] def list_package_files(self, names): """Retrieve the file metadata for all the versions of the given source packages. """ files_query = """ select srcpkg.srcpkg_id as src_id, srcpkg.name as src_name, srcpkg.version as src_version, file.hash, file.name from srcpkg left join file_srcpkg_mapping using (srcpkg_id) left join file using (hash) where srcpkg.name in %s """ res = {} db_names = tuple(names) with self.db.cursor() as cur: cur.execute(files_query, (db_names,)) for srcpkg_id, srcpkg_name, srcpkg_version, hash, name in cur: if srcpkg_id not in res: res[srcpkg_id] = { 'id': srcpkg_id, 'name': srcpkg_name, 'version': Version(srcpkg_version), 'files': [], } if hash and name: res[srcpkg_id]['files'].append({ 'hash': hash, 'name': name, }) return res def list_files_by_name(self, files): """List the files by name""" files_query = """ select distinct name, hash from file where name in %s """ ret = defaultdict(list) if not files: return ret with self.db.cursor() as cur: cur.execute(files_query, (tuple(files),)) for name, hash in cur: ret[name].append(hash) return ret def copy_files_to_dirs(self, files, pool): """Copy the files from the snapshot storage to the directory `dirname`, via `pool`. - Step 1: copy hashed files to pool - Step 2: link hashed files from pool to destdir with the given name Args: - files: iterable of {hash, name, destdir} dictionaries - pool: the pool directory where hashed files are stored Raises: - FileNotFoundError if a hashed file doesn't exist at the source """ hashes = set(file['hash'] for file in files) print("%d files to copy" % len(hashes)) cnt = 0 for hash in hashes: dst1 = os.path.join(pool, hash) if not os.path.exists(dst1): src = self._hash_to_path(hash) shutil.copy(src, dst1) cnt += 1 if cnt % 100 == 0: print("%d files copied" % cnt) if cnt % 100 != 0: print("%d files copied" % cnt) for file in files: src1 = os.path.join(pool, file['hash']) dst = os.path.join(file['destdir'], file['name']) if not os.path.exists(dst): os.link(src1, dst) def copy_package_files(self, packages, basedir, log=None): """Copy all the files for the packages `packages` in `basedir`. Step 1: create a pool as basedir/.pool Step 2: for each package version, create a directory basedir/package_version Step 3: copy all the files for each package version to basedir/package_version/ using copy_files_to_dirs (and the previously created pool) Step 4: parse all the dsc files and retrieve the remaining files Args: - packages: an id -> source_package mapping where each source_package is a dict containing: - name (str): source package name - version (debian_support.Version): source package version - files (list): list of {hash, filename} dicts - basedir: the base directory for file copies - log: a logging.Logger object Returns: - an id -> source_package mapping, where each source_package has been augmented with the full path to its dsc file in the 'dsc' key. """ src_packages = self.list_package_files(packages) ret = {} pool = os.path.join(basedir, '.pool') os.makedirs(pool, exist_ok=True) + pkgs_with_really_missing_files = defaultdict(list) + files = [] for id, pkg in src_packages.items(): srcpkg_name = pkg['name'] srcpkg_version = str(pkg['version']) srcpkg_files = pkg['files'] dirname = os.path.join(basedir, '%s_%s' % (srcpkg_name, srcpkg_version)) os.makedirs(dirname, exist_ok=True) if ':' in srcpkg_version: dsc_version = srcpkg_version.split(':', 1)[1] else: dsc_version = srcpkg_version intended_dsc = '%s_%s.dsc' % (srcpkg_name, dsc_version) for file in srcpkg_files: file = file.copy() file['destdir'] = dirname files.append(file) ret_pkg = pkg.copy() ret_pkg['dsc'] = os.path.join(dirname, intended_dsc) ret[id] = ret_pkg self.copy_files_to_dirs(files, pool) + for id, pkg in ret.items(): + if not os.path.exists(pkg['dsc']): + intended_dsc = os.path.basename(pkg['dsc']) + pkgs_with_really_missing_files[id].append(intended_dsc) + missing_files = [] for id, pkg in ret.items(): + if id in pkgs_with_really_missing_files: + continue destdir = os.path.dirname(pkg['dsc']) with open(pkg['dsc'], 'rb') as fh: dsc = Dsc(fh) for file in dsc['Files']: if not os.path.isfile(os.path.join(destdir, file['name'])): missing_files.append((destdir, file, id)) missing_file_names = set(f[1]['name'] for f in missing_files) retrieved_files = self.list_files_by_name(missing_file_names) - pkgs_with_really_missing_files = defaultdict(list) missing_files_to_copy = [] for destdir, file, id in missing_files: filename = file['name'] missing_hashes = retrieved_files[filename] if len(missing_hashes) != 1: pkgs_with_really_missing_files[id].append(filename) continue missing_file = file.copy() missing_file['hash'] = missing_hashes[0] missing_file['destdir'] = destdir missing_files_to_copy.append(missing_file) self.copy_files_to_dirs(missing_files_to_copy, pool) for pkg_id, filenames in pkgs_with_really_missing_files.items(): - pkg = ret[id] - del ret[id] + pkg = ret[pkg_id] + del ret[pkg_id] if log: - log.warn('Missing files in package %s_%s' % + log.warn('Missing files in package %s_%s: %s' % (pkg['name'], pkg['version'], ', '.join(filenames)), extra={ 'swh_type': 'deb_snapshot_missing_files', 'swh_pkgname': pkg['name'], 'swh_pkgver': pkg['version'], 'swh_missing_files': filenames, }) return ret diff --git a/swh/loader/debian/loader.py b/swh/loader/debian/loader.py index 338a698..dd00bb4 100644 --- a/swh/loader/debian/loader.py +++ b/swh/loader/debian/loader.py @@ -1,424 +1,445 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import os import re import subprocess import shutil import tempfile from dateutil.parser import parse as parse_date from debian.changelog import Changelog from debian.deb822 import Dsc from swh.core import hashutil from swh.loader.dir.git.git import ( walk_and_compute_sha1_from_directory, ROOT_TREE_KEY) from . import converters UPLOADERS_SPLIT = re.compile(r'(?<=\>)\s*,\s*') def extract_src_pkg(dsc_path, destdir, log=None): """Extract a Debian source package to a given directory Note that after extraction the target directory will be the root of the extracted package, rather than containing it. Args: dsc_path: path to .dsc file destdir: directory where to extract the package log: a logging.Logger object Returns: None """ if log: log.debug('extract Debian source package %s in %s' % (dsc_path, destdir.decode('utf-8')), extra={ 'swh_type': 'deb_extract', 'swh_dsc': dsc_path, 'swh_destdir': destdir.decode('utf-8'), }) destdir_tmp = b''.join([destdir, b'.tmp']) logfile = b''.join([destdir, b'.log']) cmd = ['dpkg-source', '--no-copy', '--no-check', '--ignore-bad-version', '-x', dsc_path, destdir_tmp] with open(logfile, 'w') as stdout: subprocess.check_call(cmd, stdout=stdout, stderr=subprocess.STDOUT) os.rename(destdir_tmp, destdir) def get_file_info(filepath): """Retrieve the original file information from the file at filepath. Args: filepath: the path to the original file Returns: A dict with the information about the original file: name: the file name sha1, sha1_git, sha256: the hashes for the original file length: the length of the original file """ name = os.path.basename(filepath) if isinstance(name, bytes): name = name.decode('utf-8') ret = { 'name': name, } hashes = hashutil.hashfile(filepath) for hash in hashes: ret[hash] = hashutil.hash_to_hex(hashes[hash]) ret['length'] = os.lstat(filepath).st_size return ret def get_gpg_info_signature(gpg_info): """Extract the signature date from a deb822.GpgInfo object Args: gpg_info: a deb822.GpgInfo object Returns: a dictionary with the following keys: signature_date: a timezone-aware datetime.DateTime object with the date the signature was made signature_keyid: the keyid of the signing key signature_uid: the uid of the signing key, if found """ uid = None if 'VALIDSIG' in gpg_info: key_id = gpg_info['VALIDSIG'][0] timestamp = gpg_info['VALIDSIG'][2] for key in gpg_info.uidkeys: if key in gpg_info: uid = gpg_info[key][-1] break elif 'ERRSIG' in gpg_info: key_id = gpg_info['ERRSIG'][0] timestamp = gpg_info['ERRSIG'][4] else: raise ValueError('Cannot find signature in gpg_info ' 'object. Keys: %s' % gpg_info.keys()) dt = datetime.datetime.utcfromtimestamp(int(timestamp)) dt = dt.replace(tzinfo=datetime.timezone.utc) ret = { 'date': dt, 'keyid': key_id, } ret['person'] = converters.uid_to_person(uid, encode=False) return ret def get_package_metadata(package, extracted_path, keyrings, log=None): """Get the package metadata from the source package at dsc_path, extracted in extracted_path. Args: package: the package dict (with a dsc_path key) extracted_path: the path where the package got extracted keyrings: a list of keyrings to use for gpg actions log: a logging.Logger object Returns: a dict with the following keys history: list of (package_name, package_version) tuples parsed from the package changelog source_files: information about all the files in the source package """ ret = {} # Parse the dsc file to retrieve all the original artifact files dsc_path = package['dsc'] with open(dsc_path, 'rb') as dsc: parsed_dsc = Dsc(dsc) source_files = [get_file_info(dsc_path)] dsc_dir = os.path.dirname(dsc_path) for file in parsed_dsc['files']: file_path = os.path.join(dsc_dir, file['name']) file_info = get_file_info(file_path) source_files.append(file_info) ret['original_artifact'] = source_files # Parse the changelog to retrieve the rest of the package information changelog_path = os.path.join(extracted_path, b'debian/changelog') with open(changelog_path, 'rb') as changelog: try: parsed_changelog = Changelog(changelog) except UnicodeDecodeError: if log: log.warn('Unknown encoding for changelog %s,' ' falling back to iso' % changelog_path.decode('utf-8'), extra={ 'swh_type': 'deb_changelog_encoding', 'swh_changelog': changelog_path.decode('utf-8'), }) # need to reset as Changelog scrolls to the end of the file changelog.seek(0) parsed_changelog = Changelog(changelog, encoding='iso-8859-15') package_info = { 'name': package['name'], 'version': str(package['version']), 'lister_metadata': { 'lister': 'snapshot.debian.org', 'id': package['id'], }, 'changelog': { 'person': converters.uid_to_person(parsed_changelog.author), 'date': parse_date(parsed_changelog.date), 'history': [(block.package, str(block.version)) for block in parsed_changelog][1:], } } gpg_info = parsed_dsc.get_gpg_info(keyrings=keyrings) package_info['pgp_signature'] = get_gpg_info_signature(gpg_info) maintainers = [ converters.uid_to_person(parsed_dsc['Maintainer'], encode=False), ] maintainers.extend( converters.uid_to_person(person, encode=False) for person in UPLOADERS_SPLIT.split(parsed_dsc.get('Uploaders', '')) ) package_info['maintainers'] = maintainers ret['package_info'] = package_info return ret def process_source_package(package, keyrings, log=None): """Process a source package into its constituent components. The source package will be decompressed in a temporary directory. Args: package: a dict with the following keys: name: source package name version: source package version dsc: the full path of the package's DSC file. keyrings: a list of keyrings to use for gpg actions log: a logging.Logger object Returns: A tuple with two elements: package: the original package dict augmented with the following keys: metadata: the metadata from get_package_metadata basedir: the temporary directory in which the package was decompressed directory: the sha1_git of the root directory of the package objects: a dictionary of the parsed directories and files, both indexed by id Raises: - FileNotFoundError if the dsc file does not exist. """ if not os.path.exists(package['dsc']): raise FileNotFoundError('%s does not exist' % package['dsc']) basedir = tempfile.mkdtemp() debdir = os.path.join(basedir, '%s_%s' % (package['name'], package['version'])) # the swh.loader.dir internals legitimately want bytes for paths debdir = debdir.encode('utf-8') extract_src_pkg(package['dsc'], debdir, log=log) parsed_objects = walk_and_compute_sha1_from_directory(debdir) root_tree = parsed_objects[ROOT_TREE_KEY][0] package = package.copy() package['basedir'] = basedir package['directory'] = root_tree['sha1_git'] package['metadata'] = get_package_metadata(package, debdir, keyrings, log=log) return package, converters.dedup_objects(parsed_objects) def process_source_packages(packages, keyrings, log=None): """Execute process_source_package, but for lists of packages. Args: packages: an iterable of packages as expected by process_source_package keyrings: a list of keyrings to use for gpg actions log: a logging.Logger object Returns: a generator of partial results. Partial results have the following keys: objects: the accumulator for merge_objects. This member can be mutated to clear the pending contents and directories. tempdirs: the temporary directories processed so far. This list can be flushed if temporary directories are removed on the fly. packages: the list of packages processed so far. """ objects = { 'content': {}, 'directory': {}, 'content_seen': set(), 'directory_seen': set(), } ret_packages = [] tempdirs = [] for package in packages: ret_package, package_objs = process_source_package(package, keyrings) ret_packages.append(ret_package) converters.merge_objects(objects, package_objs) tempdirs.append(ret_package['basedir']) yield { 'objects': objects, 'packages': ret_packages, 'tempdirs': tempdirs, } def flush_content(storage, partial_result, content_max_length_one, log=None): """Flush the contents from a partial_result to storage Args: storage: an instance of swh.storage.Storage partial_result: a partial result as yielded by process_source_packages content_max_length_one: the maximum length of a persisted content log: a logging.Logger object This function mutates partial_result to empty the content dict """ contents = partial_result['objects']['content'] missing_ids = storage.content_missing(contents.values(), key_hash='sha1_git') if missing_ids: full_contents = ( converters.shallow_content_to_content(contents[i], content_max_length_one) for i in missing_ids) storage.content_add(full_contents) partial_result['objects']['content'] = {} def flush_directory(storage, partial_result, log=None): """Flush the directories from a partial_result to storage Args: storage: an instance of swh.storage.Storage partial_result: a partial result as yielded by process_source_packages log: a logging.Logger object This function mutates partial_result to empty the directory dict """ storage.directory_add(partial_result['objects']['directory'].values()) partial_result['objects']['directory'] = {} def flush_revision(storage, partial_result, log=None): """Flush the revisions from a partial_result to storage Args: storage: an instance of swh.storage.Storage partial_result: a partial result as yielded by process_source_packages log: a logging.Logger object Returns: The package objects augmented with a revision argument """ packages = [package.copy() for package in partial_result['packages']] revisions = [] for package in packages: revision = converters.package_to_revision(package) revisions.append(revision) package['revision'] = revision storage.revision_add(revisions) return packages +def flush_release(storage, packages, log=None): + """Flush the revisions from a partial_result to storage + + Args: + storage: an instance of swh.storage.Storage + packages: a list of packages as returned by flush_revision + log: a logging.Logger object + Returns: + The package objects augmented with a revision argument + """ + releases = [] + for package in packages: + release = converters.package_to_release(package) + releases.append(release) + package['release'] = release + + storage.release_add(releases) + + return packages + + def remove_tempdirs(partial_result, log=None): """Remove the temporary files for the packages listed""" for tempdir in partial_result['tempdirs']: if os.path.isdir(tempdir): shutil.rmtree(tempdir) # Use the slice trick to empty the list in-place partial_result['tempdirs'][:] = [] def try_flush_partial(storage, partial_result, content_packet_size=10000, content_packet_length=1024 * 1024 * 1024 * 1024, content_max_length_one=100 * 1024 * 1024, directory_packet_size=25000, force=False, log=None): """Conditionally flush the partial result to storage. Args: storage: an instance of swh.storage.Storage partial_result: a partial result as yielded by process_source_packages content_packet_size: the number of contents that triggers a flush content_packet_length: the cumulated size of contents that triggers a flush content_max_length_one: the maximum length of a persisted content directory_packet_size: the number of directories that triggers a flush force: force a flush regardless of packet sizes log: a logging.Logger object """ n_content = len(partial_result['objects']['content']) n_directory = len(partial_result['objects']['directory']) if force: # Don't compute the length if we don't care len_contents = 0 else: len_contents = sum( content['length'] for content in partial_result['objects']['content'].values() if content['length'] <= content_max_length_one ) # Flush both contents and directories at once to be able to clear # tempfiles while we work if force or n_content >= content_packet_size or \ len_contents >= content_packet_length or \ n_directory >= directory_packet_size: flush_content(storage, partial_result, content_max_length_one, log=log) flush_directory(storage, partial_result, log=log) remove_tempdirs(partial_result, log=log)