diff --git a/bin/load-history-from-snapshot b/bin/load-history-from-snapshot index 3a6e464..6e76b64 100755 --- a/bin/load-history-from-snapshot +++ b/bin/load-history-from-snapshot @@ -1,49 +1,55 @@ #!/usr/bin/python3 +import glob import logging import os import sys from swh.loader.debian.listers.snapshot import SnapshotDebianOrg -from swh.loader.debian.loader import process_source_packages, try_flush_partial +from swh.loader.debian.loader import ( + process_source_packages, try_flush_partial, flush_revision) from swh.storage.storage import Storage logging.basicConfig() log = logging.getLogger('swh.loader.debian.load_history_from_snapshot') +keyrings = glob.glob('/usr/share/keyrings/*') + s = SnapshotDebianOrg(connstr='service=snapshot', basedir=os.path.expanduser('~/tmp/snapshot.d.o')) source_package_dir = sys.argv[1] package_names = sys.argv[2:] pkgs = s.copy_package_files(package_names, source_package_dir, log=log) sorted_pkgs = sorted((p for p in pkgs.values() if os.path.exists(p['dsc'])), key=lambda p: (p['name'], p['version'])) storage = Storage('dbname=softwareheritage-dev', '/tmp/swh-loader-debian/objects') partial = {} -for partial in process_source_packages(sorted_pkgs, log=log): +for partial in process_source_packages(sorted_pkgs, keyrings, log=log): print( partial['packages'][-1]['version'], len(partial['objects']['directory']), '(%s)' % len(partial['objects']['directory_seen']), len(partial['objects']['content']), '(%s)' % len(partial['objects']['content_seen']), ) try_flush_partial(storage, partial, content_packet_size=10000, content_packet_length=1024 * 1024 * 1024, content_max_length_one=100 * 1024 * 1024, directory_packet_size=25000, log=log) if partial: try_flush_partial(storage, partial, content_packet_size=10000, content_packet_length=1024 * 1024 * 1024, content_max_length_one=100 * 1024 * 1024, directory_packet_size=25000, force=True, log=log) + + packages = flush_revision(storage, partial) diff --git a/swh/loader/debian/converters.py b/swh/loader/debian/converters.py index 5ed0d5f..fe198c6 100644 --- a/swh/loader/debian/converters.py +++ b/swh/loader/debian/converters.py @@ -1,169 +1,218 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict +import copy +import datetime import email.utils import os +from swh.model import identifiers from swh.loader.dir.converters import tree_to_directory from swh.loader.dir.git.git import GitType +ROBOT_AUTHOR = { + 'name': b'Software Heritage', + 'email': b'robot@softwareheritage.org', +} + def blob_to_shallow_content(obj, other_objs): """Convert a blob as sent by swh.walker.dir to a blob ready to be sent (without contents) Args: - obj: The blob object returned by swh.walker.dir - other_objs: unused. Returns: A "shallow_content": a content, without the data, which saves memory space. """ ret = obj.copy() if 'length' not in ret: ret['length'] = os.lstat(obj['path']).st_size ret['perms'] = obj['perms'].value ret['type'] = obj['type'].value return ret def shallow_content_to_content(obj, content_max_length_one): """Add the necessary data to the shallow_content created by the previous function Args: - obj: shallow_content dict as returned by blob_to_shallow_content - content_max_length_one: length limit of a persisted content Returns: A content suitable for persistence in swh.storage """ content = obj.copy() if content['length'] > content_max_length_one: content['status'] = 'absent' content['reason'] = 'Content too large' elif 'data' not in content: content['status'] = 'visible' content['data'] = open(content['path'], 'rb').read() del content['path'] return content +def package_to_revision(package): + """Convert a package dictionary to a revision suitable for storage. + + Args: + - package: a dictionary with the following keys: + + Returns: + A revision suitable for persistence in swh.storage + """ + + metadata = package['metadata'] + message = 'Synthetic revision for Debian source package %s version %s' % ( + package['name'], package['version']) + + def prepare(obj): + if isinstance(obj, list): + return [prepare(item) for item in obj] + elif isinstance(obj, dict): + return {k: prepare(v) for k, v in obj.items()} + elif isinstance(obj, datetime.datetime): + return obj.isoformat() + elif isinstance(obj, bytes): + return obj.decode('utf-8') + else: + return copy.deepcopy(obj) + + ret = { + 'author': metadata['package_info']['changelog']['person'], + 'date': metadata['package_info']['changelog']['date'], + 'committer': ROBOT_AUTHOR, + 'committer_date': metadata['package_info']['pgp_signature']['date'], + 'type': 'dsc', + 'directory': package['directory'], + 'message': message.encode('utf-8'), + 'synthetic': True, + 'parents': [], + 'metadata': prepare(metadata), + } + + rev_id = bytes.fromhex(identifiers.revision_identifier(ret)) + ret['id'] = rev_id + + return ret + + def dedup_objects(objects, remove_duplicates=True): """Deduplicate the objects from dictionary `objects`. Args: - objects: a dictionary of objects indexed by path - remove_duplicates: if True, remove the duplicate objects from the filesystem Returns: A dictionary, indexed by object type, of dictionaries indexed by object id of deduplicated objects. """ converter_map = { GitType.TREE: tree_to_directory, GitType.BLOB: blob_to_shallow_content, } type_map = { GitType.TREE: 'directory', GitType.BLOB: 'content', } ret = defaultdict(dict) for members in objects.values(): for member in members: conv = converter_map[member['type']](member, objects) ret_type = type_map[member['type']] ret_key = conv.get('sha1_git') or conv['id'] if ret_key not in ret[ret_type]: ret[ret_type][ret_key] = conv elif remove_duplicates and 'path' in conv: # Nuke duplicate files os.unlink(conv['path']) return ret def merge_objects(accumulator, updates, remove_duplicates=True): """Merge the objects from `updates` in `accumulator`. This function mutates accumulator. It is designed so that the "content" and "directory" members of accumulator can be flushed periodically, for instance to send the data to the database in chunks. "content_seen" and "directory_seen" contain all the ids of the objects that have been seen so far. - Args: - accumulator: a dict to accumulate several updates in, with keys: - content (dict) - directory (dict) - content_seen (set) - directory_seen (set) - updates: the objects to add to accumulator (has two keys, content and directory) - remove_duplicates: if True, removes the objects from updates that have already be seen in accumulator. - Returns: None (accumulator is mutated). """ for key in ['content', 'directory']: seen_key = key + '_seen' cur_updates = updates[key] to_update = accumulator[key] seen = accumulator[seen_key] for update_key in cur_updates: if update_key not in seen: to_update[update_key] = cur_updates[update_key] seen.add(update_key) elif remove_duplicates and key == 'content': # Nuke the files that haven't changed since a previous run... os.unlink(cur_updates[update_key]['path']) -def uid_to_person(uid, key=None): +def uid_to_person(uid, encode=True): """Convert an uid to a person suitable for insertion. Args: uid: an uid of the form "Name " - key: the key in which the values are stored - + encode: whether to convert the output to bytes or not Returns: a dictionary with keys: - key_name (or name if key is None): the name associated to the uid - key_email (or email if key is None): the mail associated to the uid + name: the name associated to the uid + email: the mail associated to the uid """ - if key is not None: - name_key = '%s_name' % key - mail_key = '%s_email' % key - else: - name_key = 'name' - mail_key = 'mail' - ret = { - name_key: '', - mail_key: '', + 'name': '', + 'email': '', } name, mail = email.utils.parseaddr(uid) if name and email: - ret[name_key] = name - ret[mail_key] = mail + ret['name'] = name + ret['email'] = mail else: - ret[name_key] = uid + ret['name'] = uid + + if encode: + for key in ('name', 'email'): + ret[key] = ret[key].encode('utf-8') return ret diff --git a/swh/loader/debian/loader.py b/swh/loader/debian/loader.py index edf1d50..1ffa43d 100644 --- a/swh/loader/debian/loader.py +++ b/swh/loader/debian/loader.py @@ -1,377 +1,420 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -import glob import os +import re import subprocess import shutil import tempfile from dateutil.parser import parse as parse_date from debian.changelog import Changelog from debian.deb822 import Dsc from swh.core import hashutil from swh.loader.dir.git.git import ( walk_and_compute_sha1_from_directory, ROOT_TREE_KEY) from . import converters -DEBIAN_KEYRINGS = glob.glob('/usr/share/keyrings/*') + +UPLOADERS_SPLIT = re.compile(r'(?<=\>)\s*,\s*') def extract_src_pkg(dsc_path, destdir, log=None): """Extract a Debian source package to a given directory Note that after extraction the target directory will be the root of the extracted package, rather than containing it. Args: dsc_path: path to .dsc file destdir: directory where to extract the package log: a logging.Logger object Returns: None """ if log: log.debug('extract Debian source package %s in %s' % (dsc_path, destdir.decode('utf-8')), extra={ 'swh_type': 'deb_extract', 'swh_dsc': dsc_path, 'swh_destdir': destdir.decode('utf-8'), }) destdir_tmp = b''.join([destdir, b'.tmp']) logfile = b''.join([destdir, b'.log']) cmd = ['dpkg-source', '--no-copy', '--no-check', '--ignore-bad-version', '-x', dsc_path, destdir_tmp] with open(logfile, 'w') as stdout: subprocess.check_call(cmd, stdout=stdout, stderr=subprocess.STDOUT) os.rename(destdir_tmp, destdir) def get_file_info(filepath): """Retrieve the original file information from the file at filepath. Args: filepath: the path to the original file Returns: A dict with the information about the original file: name: the file name sha1, sha1_git, sha256: the hashes for the original file length: the length of the original file """ name = os.path.basename(filepath) if isinstance(name, bytes): name = name.decode('utf-8') ret = { 'name': name, } hashes = hashutil.hashfile(filepath) for hash in hashes: ret[hash] = hashutil.hash_to_hex(hashes[hash]) ret['length'] = os.lstat(filepath).st_size return ret def get_gpg_info_signature(gpg_info): """Extract the signature date from a deb822.GpgInfo object Args: gpg_info: a deb822.GpgInfo object Returns: a dictionary with the following keys: signature_date: a timezone-aware datetime.DateTime object with the date the signature was made signature_keyid: the keyid of the signing key signature_uid: the uid of the signing key, if found """ uid = None if 'VALIDSIG' in gpg_info: key_id = gpg_info['VALIDSIG'][0] timestamp = gpg_info['VALIDSIG'][2] for key in gpg_info.uidkeys: if key in gpg_info: uid = gpg_info[key][-1] break elif 'ERRSIG' in gpg_info: key_id = gpg_info['ERRSIG'][0] timestamp = gpg_info['ERRSIG'][4] else: raise ValueError('Cannot find signature in gpg_info ' 'object. Keys: %s' % gpg_info.keys()) dt = datetime.datetime.utcfromtimestamp(int(timestamp)) dt = dt.replace(tzinfo=datetime.timezone.utc) ret = { - 'committer_date': dt, - 'committer_offset': 0, - 'committer_keyid': key_id, + 'date': dt, + 'keyid': key_id, } - ret.update(converters.uid_to_person(uid, 'committer')) + ret['person'] = converters.uid_to_person(uid, encode=False) return ret -def get_package_metadata(dsc_path, extracted_path, log=None): +def get_package_metadata(package, extracted_path, keyrings, log=None): """Get the package metadata from the source package at dsc_path, extracted in extracted_path. Args: - dsc_path: the path to the package's dsc file + package: the package dict (with a dsc_path key) extracted_path: the path where the package got extracted + keyrings: a list of keyrings to use for gpg actions log: a logging.Logger object Returns: a dict with the following keys history: list of (package_name, package_version) tuples parsed from the package changelog source_files: information about all the files in the source package """ ret = {} + # Parse the dsc file to retrieve all the original artifact files + dsc_path = package['dsc'] + with open(dsc_path, 'rb') as dsc: + parsed_dsc = Dsc(dsc) + + source_files = [get_file_info(dsc_path)] + + dsc_dir = os.path.dirname(dsc_path) + for file in parsed_dsc['files']: + file_path = os.path.join(dsc_dir, file['name']) + file_info = get_file_info(file_path) + source_files.append(file_info) + + ret['original_artifact'] = source_files + + # Parse the changelog to retrieve the rest of the package information changelog_path = os.path.join(extracted_path, b'debian/changelog') with open(changelog_path, 'rb') as changelog: try: parsed_changelog = Changelog(changelog) except UnicodeDecodeError: if log: log.warn('Unknown encoding for changelog %s,' ' falling back to iso' % changelog_path.decode('utf-8'), extra={ 'swh_type': 'deb_changelog_encoding', 'swh_changelog': changelog_path.decode('utf-8'), }) # need to reset as Changelog scrolls to the end of the file changelog.seek(0) parsed_changelog = Changelog(changelog, encoding='iso-8859-15') - ret['history'] = [(block.package, str(block.version)) - for block in parsed_changelog] - - ret.update(converters.uid_to_person(parsed_changelog.author, 'author')) - date = parse_date(parsed_changelog.date) - ret['author_date'] = date - ret['author_offset'] = date.tzinfo.utcoffset(date).seconds // 60 - - with open(dsc_path, 'rb') as dsc: - parsed_dsc = Dsc(dsc) - - source_files = [get_file_info(dsc_path)] - - dsc_dir = os.path.dirname(dsc_path) - for file in parsed_dsc['files']: - file_path = os.path.join(dsc_dir, file['name']) - file_info = get_file_info(file_path) - source_files.append(file_info) - - ret['source_files'] = source_files + package_info = { + 'name': package['name'], + 'version': str(package['version']), + 'id': package['id'], + 'changelog': { + 'person': converters.uid_to_person(parsed_changelog.author), + 'date': parse_date(parsed_changelog.date), + 'history': [(block.package, str(block.version)) + for block in parsed_changelog][1:], + } + } - gpg_info = parsed_dsc.get_gpg_info(keyrings=DEBIAN_KEYRINGS) + gpg_info = parsed_dsc.get_gpg_info(keyrings=keyrings) + package_info['pgp_signature'] = get_gpg_info_signature(gpg_info) - ret.update(get_gpg_info_signature(gpg_info)) + maintainers = [ + converters.uid_to_person(parsed_dsc['Maintainer'], encode=False), + ] + maintainers.extend( + converters.uid_to_person(person, encode=False) + for person in UPLOADERS_SPLIT.split(parsed_dsc.get('Uploaders', '')) + ) + package_info['maintainers'] = maintainers + ret['package_info'] = package_info return ret -def process_source_package(package, log=None): +def process_source_package(package, keyrings, log=None): """Process a source package into its constituent components. The source package will be decompressed in a temporary directory. Args: package: a dict with the following keys: name: source package name version: source package version dsc: the full path of the package's DSC file. + keyrings: a list of keyrings to use for gpg actions + log: a logging.Logger object Returns: A tuple with two elements: package: the original package dict augmented with the following keys: metadata: the metadata from get_package_metadata basedir: the temporary directory in which the package was decompressed directory: the sha1_git of the root directory of the package objects: a dictionary of the parsed directories and files, both indexed by id Raises: - FileNotFoundError if the dsc file does not exist. """ if not os.path.exists(package['dsc']): raise FileNotFoundError('%s does not exist' % package['dsc']) basedir = tempfile.mkdtemp() debdir = os.path.join(basedir, '%s_%s' % (package['name'], package['version'])) # the swh.loader.dir internals legitimately want bytes for paths debdir = debdir.encode('utf-8') extract_src_pkg(package['dsc'], debdir, log=log) parsed_objects = walk_and_compute_sha1_from_directory(debdir) root_tree = parsed_objects[ROOT_TREE_KEY][0] package = package.copy() package['basedir'] = basedir package['directory'] = root_tree['sha1_git'] - package['metadata'] = get_package_metadata(package['dsc'], debdir, log=log) + package['metadata'] = get_package_metadata(package, debdir, keyrings, + log=log) return package, converters.dedup_objects(parsed_objects) -def process_source_packages(packages, log=None): +def process_source_packages(packages, keyrings, log=None): """Execute process_source_package, but for lists of packages. Args: packages: an iterable of packages as expected by process_source_package + keyrings: a list of keyrings to use for gpg actions log: a logging.Logger object Returns: a generator of partial results. Partial results have the following keys: objects: the accumulator for merge_objects. This member can be mutated to clear the pending contents and directories. tempdirs: the temporary directories processed so far. This list can be flushed if temporary directories are removed on the fly. packages: the list of packages processed so far. """ objects = { 'content': {}, 'directory': {}, 'content_seen': set(), 'directory_seen': set(), } ret_packages = [] tempdirs = [] for package in packages: - ret_package, package_objs = process_source_package(package) + ret_package, package_objs = process_source_package(package, keyrings) ret_packages.append(ret_package) converters.merge_objects(objects, package_objs) tempdirs.append(ret_package['basedir']) yield { 'objects': objects, 'packages': ret_packages, 'tempdirs': tempdirs, } def flush_content(storage, partial_result, content_max_length_one, log=None): """Flush the contents from a partial_result to storage Args: storage: an instance of swh.storage.Storage partial_result: a partial result as yielded by process_source_packages content_max_length_one: the maximum length of a persisted content log: a logging.Logger object This function mutates partial_result to empty the content dict """ contents = partial_result['objects']['content'] missing_ids = storage.content_missing(contents.values(), key_hash='sha1_git') if missing_ids: full_contents = ( converters.shallow_content_to_content(contents[i], content_max_length_one) for i in missing_ids) storage.content_add(full_contents) partial_result['objects']['content'] = {} def flush_directory(storage, partial_result, log=None): """Flush the directories from a partial_result to storage Args: storage: an instance of swh.storage.Storage partial_result: a partial result as yielded by process_source_packages log: a logging.Logger object This function mutates partial_result to empty the directory dict """ storage.directory_add(partial_result['objects']['directory'].values()) partial_result['objects']['directory'] = {} +def flush_revision(storage, partial_result, log=None): + """Flush the revisions from a partial_result to storage + + Args: + storage: an instance of swh.storage.Storage + partial_result: a partial result as yielded by process_source_packages + log: a logging.Logger object + Returns: + The package objects augmented with a revision argument + """ + packages = [package.copy() for package in partial_result['packages']] + revisions = [] + for package in packages: + revision = converters.package_to_revision(package) + revisions.append(revision) + package['revision'] = revision + + storage.revision_add(revisions) + + return packages + + def remove_tempdirs(partial_result, log=None): """Remove the temporary files for the packages listed""" for tempdir in partial_result['tempdirs']: if os.path.isdir(tempdir): shutil.rmtree(tempdir) # Use the slice trick to empty the list in-place partial_result['tempdirs'][:] = [] def try_flush_partial(storage, partial_result, content_packet_size=10000, content_packet_length=1024 * 1024 * 1024 * 1024, content_max_length_one=100 * 1024 * 1024, directory_packet_size=25000, force=False, log=None): """Conditionally flush the partial result to storage. Args: storage: an instance of swh.storage.Storage partial_result: a partial result as yielded by process_source_packages content_packet_size: the number of contents that triggers a flush content_packet_length: the cumulated size of contents that triggers a flush content_max_length_one: the maximum length of a persisted content directory_packet_size: the number of directories that triggers a flush force: force a flush regardless of packet sizes log: a logging.Logger object """ n_content = len(partial_result['objects']['content']) n_directory = len(partial_result['objects']['directory']) if force: # Don't compute the length if we don't care len_contents = 0 else: len_contents = sum( content['length'] for content in partial_result['objects']['content'].values() if content['length'] <= content_max_length_one ) # Flush both contents and directories at once to be able to clear # tempfiles while we work if force or n_content >= content_packet_size or \ len_contents >= content_packet_length or \ n_directory >= directory_packet_size: flush_content(storage, partial_result, content_max_length_one, log=log) flush_directory(storage, partial_result, log=log) remove_tempdirs(partial_result, log=log)