diff --git a/bin/load-history-from-snapshot b/bin/load-history-from-snapshot new file mode 100755 index 0000000..50856e3 --- /dev/null +++ b/bin/load-history-from-snapshot @@ -0,0 +1,44 @@ +#!/usr/bin/python3 + +import os +import sys + +from swh.loader.debian.listers.snapshot import SnapshotDebianOrg +from swh.loader.debian.loader import process_source_packages, try_flush_partial +from swh.storage.storage import Storage + +s = SnapshotDebianOrg(connstr='service=snapshot', + basedir=os.path.expanduser('~/tmp/snapshot.d.o')) + +source_package_dir = sys.argv[1] +package_names = sys.argv[2:] + +pkgs = s.copy_package_files(package_names, source_package_dir) +sorted_pkgs = sorted((p + for p in pkgs.values() + if os.path.exists(p['dsc'])), + key=lambda p: (p['name'], p['version'])) + +storage = Storage('dbname=softwareheritage-dev', + '/tmp/swh-loader-debian/objects') + +partial = {} +for partial in process_source_packages(sorted_pkgs): + print( + partial['packages'][-1]['version'], + len(partial['objects']['directory']), + '(%s)' % len(partial['objects']['directory_seen']), + len(partial['objects']['content']), + '(%s)' % len(partial['objects']['content_seen']), + ) + + try_flush_partial(storage, partial, content_packet_size=10000, + content_packet_length=1024 * 1024 * 1024, + content_max_length_one=100 * 1024 * 1024, + directory_packet_size=25000) + +if partial: + try_flush_partial(storage, partial, content_packet_size=10000, + content_packet_length=1024 * 1024 * 1024, + content_max_length_one=100 * 1024 * 1024, + directory_packet_size=25000, force=True) diff --git a/swh/loader/debian.py b/swh/loader/debian.py deleted file mode 100644 index 02752e7..0000000 --- a/swh/loader/debian.py +++ /dev/null @@ -1,48 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import logging -import os -import subprocess - -from swh.core.hashutil import hashfile - - -def extract_src_pkg(dsc_path, destdir): - """extract a Debian source package to a given directory - - Note that after extraction the target directory will be the root of the - extract package, rather than containing it. - - Args: - dsc_path: path to .dsc file - destdir: directory where to extract the package - - Returns: - None - - """ - logging.debug('extract Debian source package %s' % dsc_path) - - destdir_tmp = destdir + '.tmp' - logfile = destdir + '.log' - - cmd = ['dpkg-source', '--no-copy', '--no-check', '-x', - dsc_path, destdir_tmp] - with open(logfile, 'w') as log: - subprocess.check_call(cmd, stdout=log, stderr=subprocess.STDOUT) - - os.rename(destdir_tmp, destdir) - - -def load_content_from_dir(storage, srcpkg_dir): - hashes = {} - for root, _dirs, files in os.walk(srcpkg_dir): - for name in files: - path = os.path.join(root, name) - hashes[path] = hashfile(path) - hashes[path]['length'] = os.path.getsize(path) - - storage.load_content(hashes) diff --git a/swh/loader/debian/converters.py b/swh/loader/debian/converters.py new file mode 100644 index 0000000..5ed0d5f --- /dev/null +++ b/swh/loader/debian/converters.py @@ -0,0 +1,169 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import defaultdict +import email.utils +import os + +from swh.loader.dir.converters import tree_to_directory +from swh.loader.dir.git.git import GitType + + +def blob_to_shallow_content(obj, other_objs): + """Convert a blob as sent by swh.walker.dir to a blob ready to be sent + (without contents) + + Args: + - obj: The blob object returned by swh.walker.dir + - other_objs: unused. + + Returns: A "shallow_content": a content, without the data, + which saves memory space. + + """ + ret = obj.copy() + if 'length' not in ret: + ret['length'] = os.lstat(obj['path']).st_size + + ret['perms'] = obj['perms'].value + ret['type'] = obj['type'].value + + return ret + + +def shallow_content_to_content(obj, content_max_length_one): + """Add the necessary data to the shallow_content created by the + previous function + + Args: + - obj: shallow_content dict as returned by blob_to_shallow_content + - content_max_length_one: length limit of a persisted content + + Returns: + A content suitable for persistence in swh.storage + """ + + content = obj.copy() + + if content['length'] > content_max_length_one: + content['status'] = 'absent' + content['reason'] = 'Content too large' + elif 'data' not in content: + content['status'] = 'visible' + content['data'] = open(content['path'], 'rb').read() + + del content['path'] + + return content + + +def dedup_objects(objects, remove_duplicates=True): + """Deduplicate the objects from dictionary `objects`. + + Args: + - objects: a dictionary of objects indexed by path + - remove_duplicates: if True, remove the duplicate objects + from the filesystem + + Returns: A dictionary, indexed by object type, of dictionaries + indexed by object id of deduplicated objects. + + """ + converter_map = { + GitType.TREE: tree_to_directory, + GitType.BLOB: blob_to_shallow_content, + } + + type_map = { + GitType.TREE: 'directory', + GitType.BLOB: 'content', + } + + ret = defaultdict(dict) + for members in objects.values(): + for member in members: + conv = converter_map[member['type']](member, objects) + ret_type = type_map[member['type']] + ret_key = conv.get('sha1_git') or conv['id'] + if ret_key not in ret[ret_type]: + ret[ret_type][ret_key] = conv + elif remove_duplicates and 'path' in conv: + # Nuke duplicate files + os.unlink(conv['path']) + + return ret + + +def merge_objects(accumulator, updates, remove_duplicates=True): + """Merge the objects from `updates` in `accumulator`. + + This function mutates accumulator. It is designed so that the + "content" and "directory" members of accumulator can be flushed + periodically, for instance to send the data to the database in + chunks. "content_seen" and "directory_seen" contain all the ids of + the objects that have been seen so far. + + - Args: + - accumulator: a dict to accumulate several updates in, with keys: + - content (dict) + - directory (dict) + - content_seen (set) + - directory_seen (set) + - updates: the objects to add to accumulator (has two keys, + content and directory) + - remove_duplicates: if True, removes the objects from updates that + have already be seen in accumulator. + + - Returns: None (accumulator is mutated). + + """ + + for key in ['content', 'directory']: + seen_key = key + '_seen' + cur_updates = updates[key] + to_update = accumulator[key] + seen = accumulator[seen_key] + for update_key in cur_updates: + if update_key not in seen: + to_update[update_key] = cur_updates[update_key] + seen.add(update_key) + elif remove_duplicates and key == 'content': + # Nuke the files that haven't changed since a previous run... + os.unlink(cur_updates[update_key]['path']) + + +def uid_to_person(uid, key=None): + """Convert an uid to a person suitable for insertion. + + Args: + uid: an uid of the form "Name " + key: the key in which the values are stored + + Returns: a dictionary with keys: + key_name (or name if key is None): the name associated to the uid + key_email (or email if key is None): the mail associated to the uid + """ + + if key is not None: + name_key = '%s_name' % key + mail_key = '%s_email' % key + else: + name_key = 'name' + mail_key = 'mail' + + ret = { + name_key: '', + mail_key: '', + } + + name, mail = email.utils.parseaddr(uid) + + if name and email: + ret[name_key] = name + ret[mail_key] = mail + else: + ret[name_key] = uid + + return ret diff --git a/swh/loader/debian/listers/snapshot.py b/swh/loader/debian/listers/snapshot.py new file mode 100644 index 0000000..4e78db7 --- /dev/null +++ b/swh/loader/debian/listers/snapshot.py @@ -0,0 +1,184 @@ +#!/usr/bin/python3 +# -*- encoding: utf-8 -*- +# +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import shutil + +from debian.debian_support import Version +import psycopg2 + + +class SnapshotDebianOrg: + """Methods to use the snapshot.debian.org mirror""" + + def __init__( + self, + connstr='service=snapshot', + basedir='/srv/storage/space/mirrors/snapshot.debian.org', + ): + self.db = psycopg2.connect(connstr) + self.basedir = basedir + + def _hash_to_path(self, hash): + """Convert a hash to a file path on disk""" + depth = 2 + fragments = [hash[2*i:2*(i+1)] for i in range(depth)] + dirname = os.path.join(self.basedir, 'files', *fragments) + + return os.path.join(dirname, hash) + + def list_packages(self, count=1, previous_name=''): + """List `count` source package names present in the snapshot.d.o db + starting with previous_name (excluded)""" + + package_names_query = """ + select distinct(name) + from srcpkg + where name > %s + order by name + limit %s + """ + + with self.db.cursor() as cur: + cur.execute(package_names_query, (previous_name, count)) + return [name for (name,) in cur] + + def list_package_files(self, names): + """Retrieve the file metadata for all the versions of the + given source packages. + """ + + files_query = """ + select srcpkg.srcpkg_id as src_id, srcpkg.name as src_name, + srcpkg.version as src_version, file.hash, file.name + from srcpkg + left join file_srcpkg_mapping using (srcpkg_id) + left join file using (hash) + where srcpkg.name in %s + """ + + res = {} + + db_names = tuple(names) + + with self.db.cursor() as cur: + cur.execute(files_query, (db_names,)) + for srcpkg_id, srcpkg_name, srcpkg_version, hash, name in cur: + if srcpkg_id not in res: + res[srcpkg_id] = { + 'id': srcpkg_id, + 'name': srcpkg_name, + 'version': Version(srcpkg_version), + 'files': [], + } + if hash and name: + res[srcpkg_id]['files'].append({ + 'hash': hash, + 'name': name, + }) + + return res + + def copy_files_to_dirs(self, files, pool): + """Copy the files from the snapshot storage to the directory + `dirname`, via `pool`. + + - Step 1: copy hashed files to pool + - Step 2: link hashed files from pool to destdir with the given name + + Args: + - files: iterable of {hash, name, destdir} dictionaries + - pool: the pool directory where hashed files are stored + + Raises: + - FileNotFoundError if a hashed file doesn't exist at the source + + """ + + hashes = set(file['hash'] for file in files) + + print("%d files to copy" % len(hashes)) + + cnt = 0 + for hash in hashes: + dst1 = os.path.join(pool, hash) + if not os.path.exists(dst1): + src = self._hash_to_path(hash) + shutil.copy(src, dst1) + cnt += 1 + if cnt % 100 == 0: + print("%d files copied" % cnt) + + if cnt % 100 != 0: + print("%d files copied" % cnt) + + for file in files: + src1 = os.path.join(pool, file['hash']) + dst = os.path.join(file['destdir'], file['name']) + if not os.path.exists(dst): + os.link(src1, dst) + + def copy_package_files(self, packages, basedir): + """Copy all the files for the packages `packages` in `basedir`. + + Step 1: create a pool as basedir/.pool + Step 2: for each package version, create a directory + basedir/package_version + Step 3: copy all the files for each package version + to basedir/package_version/ using copy_files_to_dirs (and + the previously created pool) + + Args: + - packages: an id -> source_package mapping + where each source_package is a dict containing: + - name (str): source package name + - version (debian_support.Version): source package + version + - files (list): list of {hash, filename} dicts + - basedir: the base directory for file copies + Returns: + - an id -> source_package mapping, where each + source_package has been augmented with the full path to its + dsc file in the 'dsc' key. + """ + + src_packages = self.list_package_files(packages) + + files = [] + ret = {} + + pool = os.path.join(basedir, '.pool') + os.makedirs(pool, exist_ok=True) + + for id, pkg in src_packages.items(): + srcpkg_name = pkg['name'] + srcpkg_version = str(pkg['version']) + srcpkg_files = pkg['files'] + + dirname = os.path.join(basedir, '%s_%s' % (srcpkg_name, + srcpkg_version)) + os.makedirs(dirname, exist_ok=True) + + if ':' in srcpkg_version: + dsc_version = srcpkg_version.split(':', 1)[1] + else: + dsc_version = srcpkg_version + intended_dsc = '%s_%s.dsc' % (srcpkg_name, dsc_version) + + for file in srcpkg_files: + file = file.copy() + file['destdir'] = dirname + files.append(file) + + ret_pkg = pkg.copy() + ret_pkg['dsc'] = os.path.join(dirname, intended_dsc) + ret[id] = ret_pkg + + self.copy_files_to_dirs(files, pool) + + return ret diff --git a/swh/loader/debian/loader.py b/swh/loader/debian/loader.py new file mode 100644 index 0000000..cbce61e --- /dev/null +++ b/swh/loader/debian/loader.py @@ -0,0 +1,369 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import glob +import os +import subprocess +import shutil +import tempfile + +from dateutil.parser import parse as parse_date +from debian.changelog import Changelog +from debian.deb822 import Dsc + +from swh.core import hashutil +from swh.loader.dir.git.git import ( + walk_and_compute_sha1_from_directory, ROOT_TREE_KEY) + +from . import converters + +DEBIAN_KEYRINGS = glob.glob('/usr/share/keyrings/*') + + +def extract_src_pkg(dsc_path, destdir, log=None): + """Extract a Debian source package to a given directory + + Note that after extraction the target directory will be the root of the + extracted package, rather than containing it. + + Args: + dsc_path: path to .dsc file + destdir: directory where to extract the package + log: a logging.Logger object + + Returns: + None + + """ + if log: + log.debug('extract Debian source package %s' % dsc_path) + + destdir_tmp = b''.join([destdir, b'.tmp']) + logfile = b''.join([destdir, b'.log']) + + cmd = ['dpkg-source', + '--no-copy', '--no-check', + '--ignore-bad-version', + '-x', dsc_path, + destdir_tmp] + + with open(logfile, 'w') as stdout: + subprocess.check_call(cmd, stdout=stdout, stderr=subprocess.STDOUT) + + os.rename(destdir_tmp, destdir) + + +def get_file_info(filepath): + """Retrieve the original file information from the file at filepath. + + Args: + filepath: the path to the original file + Returns: + A dict with the information about the original file: + name: the file name + sha1, sha1_git, sha256: the hashes for the original file + length: the length of the original file + """ + + name = os.path.basename(filepath) + if isinstance(name, bytes): + name = name.decode('utf-8') + + ret = { + 'name': name, + } + + hashes = hashutil.hashfile(filepath) + for hash in hashes: + ret[hash] = hashutil.hash_to_hex(hashes[hash]) + + ret['length'] = os.lstat(filepath).st_size + + return ret + + +def get_gpg_info_signature(gpg_info): + """Extract the signature date from a deb822.GpgInfo object + + Args: + gpg_info: a deb822.GpgInfo object + Returns: a dictionary with the following keys: + signature_date: a timezone-aware datetime.DateTime object with the date + the signature was made + signature_keyid: the keyid of the signing key + signature_uid: the uid of the signing key, if found + """ + + uid = None + + if 'VALIDSIG' in gpg_info: + key_id = gpg_info['VALIDSIG'][0] + timestamp = gpg_info['VALIDSIG'][2] + + for key in gpg_info.uidkeys: + if key in gpg_info: + uid = gpg_info[key][-1] + break + + elif 'ERRSIG' in gpg_info: + key_id = gpg_info['ERRSIG'][0] + timestamp = gpg_info['ERRSIG'][4] + else: + raise ValueError('Cannot find signature in gpg_info ' + 'object. Keys: %s' % gpg_info.keys()) + + dt = datetime.datetime.utcfromtimestamp(int(timestamp)) + dt = dt.replace(tzinfo=datetime.timezone.utc) + + ret = { + 'committer_date': dt, + 'committer_offset': 0, + 'committer_keyid': key_id, + } + + ret.update(converters.uid_to_person(uid, 'committer')) + + return ret + + +def get_package_metadata(dsc_path, extracted_path, log=None): + """Get the package metadata from the source package at dsc_path, + extracted in extracted_path. + + Args: + dsc_path: the path to the package's dsc file + extracted_path: the path where the package got extracted + log: a logging.Logger object + + Returns: a dict with the following keys + history: list of (package_name, package_version) tuples parsed from + the package changelog + source_files: information about all the files in the source package + + """ + ret = {} + + changelog_path = os.path.join(extracted_path, b'debian/changelog') + with open(changelog_path, 'rb') as changelog: + try: + parsed_changelog = Changelog(changelog) + except UnicodeDecodeError: + if log: + log.warn('Unknown encoding for changelog %s,' + ' falling back to iso' % + changelog_path.decode('utf-8')) + + # need to reset as Changelog scrolls to the end of the file + changelog.seek(0) + parsed_changelog = Changelog(changelog, encoding='iso-8859-15') + + ret['history'] = [(block.package, block.version) + for block in parsed_changelog] + + ret.update(converters.uid_to_person(parsed_changelog.author, 'author')) + date = parse_date(parsed_changelog.date) + ret['author_date'] = date + ret['author_offset'] = date.tzinfo.utcoffset(date).seconds // 60 + + with open(dsc_path, 'rb') as dsc: + parsed_dsc = Dsc(dsc) + + source_files = [get_file_info(dsc_path)] + + dsc_dir = os.path.dirname(dsc_path) + for file in parsed_dsc['files']: + file_path = os.path.join(dsc_dir, file['name']) + file_info = get_file_info(file_path) + source_files.append(file_info) + + ret['source_files'] = source_files + + gpg_info = parsed_dsc.get_gpg_info(keyrings=DEBIAN_KEYRINGS) + + ret.update(get_gpg_info_signature(gpg_info)) + + return ret + + +def process_source_package(package, log=None): + """Process a source package into its constituent components. + + The source package will be decompressed in a temporary directory. + + Args: + package: a dict with the following keys: + name: source package name + version: source package version + dsc: the full path of the package's DSC file. + + Returns: + A tuple with two elements: + package: the original package dict augmented with the following keys: + metadata: the metadata from get_package_metadata + basedir: the temporary directory in which the package was + decompressed + directory: the sha1_git of the root directory of the package + objects: a dictionary of the parsed directories and files, both indexed + by id + + Raises: + - FileNotFoundError if the dsc file does not exist. + """ + + if not os.path.exists(package['dsc']): + raise FileNotFoundError('%s does not exist' % package['dsc']) + + basedir = tempfile.mkdtemp() + debdir = os.path.join(basedir, '%s_%s' % (package['name'], + package['version'])) + + # the swh.loader.dir internals legitimately want bytes for paths + debdir = debdir.encode('utf-8') + + extract_src_pkg(package['dsc'], debdir, log=log) + + parsed_objects = walk_and_compute_sha1_from_directory(debdir) + root_tree = parsed_objects[ROOT_TREE_KEY][0] + + package = package.copy() + package['basedir'] = basedir + package['directory'] = root_tree['sha1_git'] + package['metadata'] = get_package_metadata(package['dsc'], debdir, log=log) + + return package, converters.dedup_objects(parsed_objects) + + +def process_source_packages(packages, log=None): + """Execute process_source_package, but for lists of packages. + + Args: + packages: an iterable of packages as expected by process_source_package + log: a logging.Logger object + + Returns: a generator of partial results. + + Partial results have the following keys: + objects: the accumulator for merge_objects. This member can be mutated + to clear the pending contents and directories. + tempdirs: the temporary directories processed so far. This list can be + flushed if temporary directories are removed on the fly. + packages: the list of packages processed so far. + + """ + + objects = { + 'content': {}, + 'directory': {}, + 'content_seen': set(), + 'directory_seen': set(), + } + + ret_packages = [] + tempdirs = [] + + for package in packages: + ret_package, package_objs = process_source_package(package) + ret_packages.append(ret_package) + converters.merge_objects(objects, package_objs) + tempdirs.append(ret_package['basedir']) + + yield { + 'objects': objects, + 'packages': ret_packages, + 'tempdirs': tempdirs, + } + + +def flush_content(storage, partial_result, content_max_length_one, log=None): + """Flush the contents from a partial_result to storage + + Args: + storage: an instance of swh.storage.Storage + partial_result: a partial result as yielded by process_source_packages + content_max_length_one: the maximum length of a persisted content + log: a logging.Logger object + + This function mutates partial_result to empty the content dict + """ + contents = partial_result['objects']['content'] + + missing_ids = storage.content_missing(contents.values(), + key_hash='sha1_git') + + if missing_ids: + full_contents = ( + converters.shallow_content_to_content(contents[i], + content_max_length_one) + for i in missing_ids) + + storage.content_add(full_contents) + + partial_result['objects']['content'] = {} + + +def flush_directory(storage, partial_result, log=None): + """Flush the directories from a partial_result to storage + + Args: + storage: an instance of swh.storage.Storage + partial_result: a partial result as yielded by process_source_packages + log: a logging.Logger object + + This function mutates partial_result to empty the directory dict + """ + storage.directory_add(partial_result['objects']['directory'].values()) + partial_result['objects']['directory'] = {} + + +def remove_tempdirs(partial_result, log=None): + """Remove the temporary files for the packages listed""" + for tempdir in partial_result['tempdirs']: + if os.path.isdir(tempdir): + shutil.rmtree(tempdir) + + # Use the slice trick to empty the list in-place + partial_result['tempdirs'][:] = [] + + +def try_flush_partial(storage, partial_result, + content_packet_size=10000, + content_packet_length=1024 * 1024 * 1024 * 1024, + content_max_length_one=100 * 1024 * 1024, + directory_packet_size=25000, force=False, log=None): + """Conditionally flush the partial result to storage. + + Args: + storage: an instance of swh.storage.Storage + partial_result: a partial result as yielded by process_source_packages + content_packet_size: the number of contents that triggers a flush + content_packet_length: the cumulated size of contents that triggers a + flush + content_max_length_one: the maximum length of a persisted content + directory_packet_size: the number of directories that triggers a flush + force: force a flush regardless of packet sizes + log: a logging.Logger object + """ + n_content = len(partial_result['objects']['content']) + n_directory = len(partial_result['objects']['directory']) + + if force: + # Don't compute the length if we don't care + len_contents = 0 + else: + len_contents = sum( + content['length'] + for content in partial_result['objects']['content'].values() + if content['length'] <= content_max_length_one + ) + + # Flush both contents and directories at once to be able to clear + # tempfiles while we work + if force or n_content >= content_packet_size or \ + len_contents >= content_packet_length or \ + n_directory >= directory_packet_size: + flush_content(storage, partial_result, content_max_length_one, log=log) + flush_directory(storage, partial_result, log=log) + remove_tempdirs(partial_result, log=log)