diff --git a/swh/loader/debian/converters.py b/swh/loader/debian/converters.py index 27ee405..29a6b27 100644 --- a/swh/loader/debian/converters.py +++ b/swh/loader/debian/converters.py @@ -1,291 +1,148 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import defaultdict import copy import datetime import email.utils -import os +import logging from swh.model import identifiers -from swh.model.git import GitType -from swh.loader.core.converters import tree_to_directory + +log = logging.getLogger(__name__) ROBOT_AUTHOR = { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', + 'fullname': b'Software Heritage ', } -def blob_to_shallow_content(obj, other_objs): - """Convert a blob as sent by swh.walker.dir to a blob ready to be sent - (without contents) - - Args: - obj: The blob object returned by swh.walker.dir - other_objs: unused. - - Returns: - A "shallow_content", i.e., a content, without the data, which saves - memory space. - - """ - ret = obj.copy() - if 'length' not in ret: - ret['length'] = os.lstat(obj['path']).st_size - - ret['perms'] = obj['perms'].value - ret['type'] = obj['type'].value - - return ret - - -def shallow_content_to_content(obj, content_max_length_one): - """Add the necessary data to the shallow_content created by the previous - function - - Args: - obj: shallow_content dict as returned by blob_to_shallow_content - content_max_length_one: length limit of a persisted content - - Returns: - A content suitable for persistence in swh.storage - - """ - - content = obj.copy() - - if content['length'] > content_max_length_one: - content['status'] = 'absent' - content['reason'] = 'Content too large' - elif 'data' not in content: - content['status'] = 'visible' - content['data'] = open(content['path'], 'rb').read() - - del content['path'] - - return content - - -def package_to_revision(package, log=None): +def package_metadata_to_revision(package, directory, metadata): """Convert a package dictionary to a revision suitable for storage. Args: package: a dictionary with the following keys: - metadata: the metadata for the package, containing:: package_info: changelog pgp_signature - directory Returns: A revision suitable for persistence in swh.storage """ - metadata = package['metadata'] message = 'Synthetic revision for Debian source package %s version %s' % ( package['name'], package['version']) def prepare(obj): if isinstance(obj, list): return [prepare(item) for item in obj] elif isinstance(obj, dict): return {k: prepare(v) for k, v in obj.items()} elif isinstance(obj, datetime.datetime): return obj.isoformat() elif isinstance(obj, bytes): return obj.decode('utf-8') else: return copy.deepcopy(obj) - signature = metadata['package_info']['pgp_signature'] - - committer_date = None - if signature: - committer_date = signature['date'] - else: - if log: - log.info('No PGP signature on package %s_%s' % - (package['name'], package['version']), - extra={ - 'swh_type': 'deb_missing_signature', - 'swh_name': package['name'], - 'swh_version': str(package['version']), - }) - committer_date = metadata['package_info']['changelog']['date'] + author = metadata['package_info']['changelog']['person'] + date = metadata['package_info']['changelog']['date'] ret = { - 'author': metadata['package_info']['changelog']['person'], - 'date': metadata['package_info']['changelog']['date'], - 'committer': ROBOT_AUTHOR, - 'committer_date': committer_date, + 'author': author, + 'date': date, + 'committer': author, + 'committer_date': date, 'type': 'dsc', - 'directory': package['directory'], + 'directory': directory.hash, 'message': message.encode('utf-8'), 'synthetic': True, 'parents': [], 'metadata': prepare(metadata), } rev_id = bytes.fromhex(identifiers.revision_identifier(ret)) ret['id'] = rev_id return ret def package_to_release(package): """Convert a package dictionary to a revision suitable for storage. Args: package: a dictionary with the following keys: - metadata: the metadata for the package, containing:: package_info changelog person date - revision Returns: A revision suitable for persistence in swh.storage """ package_info = package['metadata']['package_info'] message = 'Synthetic release for Debian source package %s version %s' % ( package['name'], package['version']) ret = { 'author': package_info['changelog']['person'], 'date': package_info['changelog']['date'], 'target': package['revision']['id'], 'target_type': 'revision', 'message': message.encode('utf-8'), 'synthetic': True, 'name': str(package['version']), } rev_id = bytes.fromhex(identifiers.release_identifier(ret)) ret['id'] = rev_id return ret -def dedup_objects(objects, remove_duplicates=True): - """Deduplicate the objects from dictionary `objects`. - - Args: - objects: a dictionary of objects indexed by path - remove_duplicates: if True, remove the duplicate objects from the - filesystem - - Returns: - dictionary indexed by object type, of dictionaries indexed by - object id of deduplicated objects. - - """ - converter_map = { - GitType.TREE: tree_to_directory, - GitType.BLOB: blob_to_shallow_content, - } - - type_map = { - GitType.TREE: 'directory', - GitType.BLOB: 'content', - } - - ret = defaultdict(dict) - for members in objects.values(): - for member in members: - conv = converter_map[member['type']](member, objects) - ret_type = type_map[member['type']] - ret_key = conv.get('sha1_git') or conv['id'] - if ret_key not in ret[ret_type]: - ret[ret_type][ret_key] = conv - elif remove_duplicates and 'path' in conv: - # Nuke duplicate files - os.unlink(conv['path']) - - return ret - - -def merge_objects(accumulator, updates, remove_duplicates=True): - """Merge the objects from `updates` in `accumulator`. - - This function mutates accumulator. It is designed so that the - "content" and "directory" members of accumulator can be flushed - periodically, for instance to send the data to the database in - chunks. "content_seen" and "directory_seen" contain all the ids of - the objects that have been seen so far. - - Args: - accumulator (dict): dictionary to accumulate several updates in, with - keys: - - - content (dict) - - directory (dict) - - content_seen (set) - - directory_seen (set) - - updates: the objects to add to accumulator (has two keys, content and - directory) - remove_duplicates: if True, removes the objects from updates that have - already be seen in accumulator. - - Returns: - None (accumulator is mutated) - - """ - - for key in ['content', 'directory']: - seen_key = key + '_seen' - cur_updates = updates[key] - to_update = accumulator[key] - seen = accumulator[seen_key] - for update_key in cur_updates: - if update_key not in seen: - to_update[update_key] = cur_updates[update_key] - seen.add(update_key) - elif remove_duplicates and key == 'content': - # Nuke the files that haven't changed since a previous run... - os.unlink(cur_updates[update_key]['path']) - - def uid_to_person(uid, encode=True): """Convert an uid to a person suitable for insertion. Args: uid: an uid of the form "Name " encode: whether to convert the output to bytes or not Returns: dict: a dictionary with the following keys: - name: the name associated to the uid - email: the mail associated to the uid """ ret = { 'name': '', 'email': '', + 'fullname': uid, } name, mail = email.utils.parseaddr(uid) if name and email: ret['name'] = name ret['email'] = mail else: ret['name'] = uid if encode: - for key in ('name', 'email'): + for key in list(ret): ret[key] = ret[key].encode('utf-8') return ret diff --git a/swh/loader/debian/listers/__init__.py b/swh/loader/debian/listers/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/swh/loader/debian/listers/snapshot.py b/swh/loader/debian/listers/snapshot.py deleted file mode 100644 index 0742862..0000000 --- a/swh/loader/debian/listers/snapshot.py +++ /dev/null @@ -1,277 +0,0 @@ -#!/usr/bin/python3 -# -*- encoding: utf-8 -*- -# -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from collections import defaultdict -import os - -from deb822 import Dsc -from debian.debian_support import Version -import psycopg2 - - -class SnapshotDebianOrg: - """Methods to use the snapshot.debian.org mirror""" - - def __init__( - self, - connstr='service=snapshot', - basedir='/srv/storage/space/mirrors/snapshot.debian.org', - ): - self.db = psycopg2.connect(connstr) - self.basedir = basedir - - def _hash_to_path(self, hash): - """Convert a hash to a file path on disk""" - depth = 2 - fragments = [hash[2*i:2*(i+1)] for i in range(depth)] - dirname = os.path.join(self.basedir, 'files', *fragments) - - return os.path.join(dirname, hash) - - def list_packages(self, count=1, previous_name=''): - """List `count` source package names present in the snapshot.d.o db - starting with previous_name (excluded)""" - - package_names_query = """ - select distinct(name) - from srcpkg - where name > %s - order by name - limit %s - """ - - with self.db.cursor() as cur: - cur.execute(package_names_query, (previous_name, count)) - return [name for (name,) in cur] - - def list_package_files(self, names): - """Retrieve the file metadata for all the versions of the - given source packages. - """ - - files_query = """ - select srcpkg.srcpkg_id as src_id, srcpkg.name as src_name, - srcpkg.version as src_version, file.hash, file.name - from srcpkg - left join file_srcpkg_mapping using (srcpkg_id) - left join file using (hash) - where srcpkg.name in %s - """ - - res = {} - - db_names = tuple(names) - - with self.db.cursor() as cur: - cur.execute(files_query, (db_names,)) - for srcpkg_id, srcpkg_name, srcpkg_version, hash, name in cur: - if srcpkg_id not in res: - res[srcpkg_id] = { - 'id': srcpkg_id, - 'name': srcpkg_name, - 'version': Version(srcpkg_version), - 'files': [], - } - if hash and name: - res[srcpkg_id]['files'].append({ - 'hash': hash, - 'name': name, - }) - - return res - - def list_files_by_name(self, files): - """List the files by name""" - files_query = """ - select distinct name, hash - from file - where name in %s - """ - - ret = defaultdict(list) - if not files: - return ret - - with self.db.cursor() as cur: - cur.execute(files_query, (tuple(files),)) - for name, hash in cur: - ret[name].append(hash) - - return ret - - def link_files_to_dirs(self, files, log=None): - """Symlink the files from the snapshot storage to their destination directory - - Args: - files: iterable of {hash, name, destdir} dictionaries - - Raises: - FileNotFoundError: if a hashed file doesn't exist at the source - - """ - - for file in files: - src = self._hash_to_path(file['hash']) - dst = os.path.join(file['destdir'], file['name']) - if not os.path.exists(src): - raise FileNotFoundError("File %s does not exist" % src) - if not os.path.exists(dst): - os.symlink(src, dst) - - def prepare_origins(self, package_names, storage, log=None): - """Prepare the origins for the given packages. - - Args: - package_names: a list of source package names - storage: an instance of swh.storage.Storage - - Returns: - dict: a name -> origin dictionary where origin is itself a dict - with the following keys: - - - id: id of the origin - - type: deb - - url: the snapshot.debian.org URL for the package - """ - ret = {} - for name in package_names: - origin = { - 'type': 'deb', - 'url': 'http://snapshot.debian.org/package/%s/' % name, - } - origin['id'] = storage.origin_add_one(origin) - ret[name] = origin - - return ret - - def prepare_packages(self, packages, basedir, log=None): - """Prepare all the source packages from `packages` for processing. - - - Step 1: for each version of each package, create a directory - basedir/package_version - - Step 2: link all the files for each package version - to basedir/package_version/ using link_files_to_dirs - - Step 3: parse all the dsc files and retrieve the remaining files - - Args: - packages: a list of source package names - basedir: the base directory for file copies - log: a logging.Logger object - - Returns: - dict: an id -> source_package mapping, where each source_package is - a dict with the following keys: - - - id: the id of the source package in snapshot.debian.org - - name: the source package name - - version: the version of the source package - - files: a list of the files the source package uses (with hash and - name) - - dsc: the full path to the package's dsc file. - - """ - - length = len(packages) - log.info('Preparing %s source package%s from snapshot.debian.org: %s' % - (length, 's' if length else '', ', '.join(packages)), - extra={ - 'swh_type': 'deb_snapshot_prepare_packages', - 'swh_count': length, - 'swh_names': packages, - }) - - src_packages = self.list_package_files(packages) - n_packages = len(src_packages) - - log.info('Found %s package version%s on snapshot.debian.org' % - (n_packages, 's' if n_packages else ''), - extra={ - 'swh_type': 'deb_snapshot_count_package_versions', - 'swh_count': length, - }) - - ret = {} - - pkgs_with_really_missing_files = defaultdict(list) - - files = [] - for id, pkg in src_packages.items(): - srcpkg_name = pkg['name'] - srcpkg_version = str(pkg['version']) - srcpkg_files = pkg['files'] - - dirname = os.path.join(basedir, '%s_%s' % (srcpkg_name, - srcpkg_version)) - os.makedirs(dirname, exist_ok=True) - - if ':' in srcpkg_version: - dsc_version = srcpkg_version.split(':', 1)[1] - else: - dsc_version = srcpkg_version - intended_dsc = '%s_%s.dsc' % (srcpkg_name, dsc_version) - - for file in srcpkg_files: - file = file.copy() - file['destdir'] = dirname - files.append(file) - - ret_pkg = pkg.copy() - ret_pkg['dsc'] = os.path.join(dirname, intended_dsc) - ret[id] = ret_pkg - - self.link_files_to_dirs(files, log) - - for id, pkg in ret.items(): - if not os.path.exists(pkg['dsc']): - intended_dsc = os.path.basename(pkg['dsc']) - pkgs_with_really_missing_files[id].append(intended_dsc) - - missing_files = [] - for id, pkg in ret.items(): - if id in pkgs_with_really_missing_files: - continue - destdir = os.path.dirname(pkg['dsc']) - with open(pkg['dsc'], 'rb') as fh: - dsc = Dsc(fh) - for file in dsc['Files']: - if not os.path.isfile(os.path.join(destdir, file['name'])): - missing_files.append((destdir, file, id)) - - missing_file_names = set(f[1]['name'] for f in missing_files) - retrieved_files = self.list_files_by_name(missing_file_names) - - missing_files_to_copy = [] - - for destdir, file, id in missing_files: - filename = file['name'] - missing_hashes = retrieved_files[filename] - if len(missing_hashes) != 1: - pkgs_with_really_missing_files[id].append(filename) - continue - missing_file = file.copy() - missing_file['hash'] = missing_hashes[0] - missing_file['destdir'] = destdir - missing_files_to_copy.append(missing_file) - - self.link_files_to_dirs(missing_files_to_copy, log) - - for pkg_id, filenames in pkgs_with_really_missing_files.items(): - pkg = ret[pkg_id] - del ret[pkg_id] - if log: - log.warn('Missing files in package %s_%s: %s' % - (pkg['name'], pkg['version'], ', '.join(filenames)), - extra={ - 'swh_type': 'deb_snapshot_missing_files', - 'swh_id': pkg['id'], - 'swh_name': pkg['name'], - 'swh_version': str(pkg['version']), - 'swh_missing_files': filenames, - }) - - return ret diff --git a/swh/loader/debian/loader.py b/swh/loader/debian/loader.py index 43071ad..e7218ec 100644 --- a/swh/loader/debian/loader.py +++ b/swh/loader/debian/loader.py @@ -1,656 +1,471 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime +import copy +import hashlib +import logging import os import re import subprocess -import shutil import tempfile -import traceback -import uuid from dateutil.parser import parse as parse_date from debian.changelog import Changelog from debian.deb822 import Dsc +import requests +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from swh.loader.core.loader import SWHLoader +from swh.storage.schemata.distribution import Package from swh.model import hashutil -from swh.model.git import ( - walk_and_compute_sha1_from_directory, ROOT_TREE_KEY) +from swh.model.from_disk import Directory +from swh.model.identifiers import identifier_to_bytes from . import converters UPLOADERS_SPLIT = re.compile(r'(?<=\>)\s*,\s*') -class PackageExtractionFailed(Exception): +log = logging.getLogger(__name__) + + +class DebianLoaderException(Exception): + pass + + +class PackageDownloadFailed(DebianLoaderException): """Raise this exception when a package extraction failed""" pass -def extract_src_pkg(dsc_path, destdir, log=None): +class PackageExtractionFailed(DebianLoaderException): + """Raise this exception when a package extraction failed""" + pass + + +def _debian_to_hashlib(hashname): + """Convert Debian hash names to hashlib-compatible names""" + return { + 'md5sum': 'md5', + }.get(hashname, hashname) + + +def download_package(package): + """Fetch a source package in a temporary directory and check the checksums + for all files""" + + tempdir = tempfile.TemporaryDirectory( + prefix='swh.loader.debian.%s.' % package['name'] + ) + + for filename, fileinfo in copy.deepcopy(package['files']).items(): + uri = fileinfo.pop('uri') + hashes = { + hashname: hashlib.new(_debian_to_hashlib(hashname)) + for hashname in fileinfo + if hashname not in ('name', 'size') + } + + r = requests.get(uri, stream=True) + if r.status_code != 200: + raise PackageDownloadFailed( + 'Download of %s returned status_code %s: %s' % + (uri, r.status_code, r.text) + ) + + size = 0 + with open(os.path.join(tempdir.name, filename), 'wb') as f: + for chunk in r.iter_content(chunk_size=1024 * 1024): + size += len(chunk) + f.write(chunk) + for hash in hashes.values(): + hash.update(chunk) + + downloadinfo = { + 'name': filename, + 'size': size, + } + for hashname, hash in hashes.items(): + downloadinfo[hashname] = hash.hexdigest() + + if fileinfo != downloadinfo: + raise PackageDownloadFailed( + 'Checksums mismatch: fetched %s, expected %s' % + (downloadinfo, fileinfo) + ) + + return tempdir + + +def extract_package(package, tempdir): """Extract a Debian source package to a given directory Note that after extraction the target directory will be the root of the extracted package, rather than containing it. Args: - dsc_path: path to .dsc file - destdir: directory where to extract the package - log: a logging.Logger object + package (dict): package information dictionary + tempdir (str): directory where the package files are stored Returns: - None + tuple: path to the dsc (str) and extraction directory (str) """ - if log: - log.debug('extract Debian source package %s in %s' % - (dsc_path, destdir.decode('utf-8')), extra={ - 'swh_type': 'deb_extract', - 'swh_dsc': dsc_path, - 'swh_destdir': destdir.decode('utf-8'), - }) + dsc_name = None + for filename in package['files']: + if filename.endswith('.dsc'): + if dsc_name: + raise PackageExtractionFailed( + 'Package %s_%s references several dsc files' % + (package['name'], package['version']) + ) + dsc_name = filename + + dsc_path = os.path.join(tempdir.name, dsc_name) + destdir = os.path.join(tempdir.name, 'extracted') + logfile = os.path.join(tempdir.name, 'extract.log') - destdir_tmp = b''.join([destdir, b'.tmp']) - logfile = b''.join([destdir, b'.log']) + log.debug('extract Debian source package %s in %s' % + (dsc_path, destdir), extra={ + 'swh_type': 'deb_extract', + 'swh_dsc': dsc_path, + 'swh_destdir': destdir, + }) cmd = ['dpkg-source', '--no-copy', '--no-check', '--ignore-bad-version', '-x', dsc_path, - destdir_tmp] + destdir] try: with open(logfile, 'w') as stdout: subprocess.check_call(cmd, stdout=stdout, stderr=subprocess.STDOUT) - except subprocess.CalledProcessError: - if log: - data = open(logfile, 'r').read() - log.warn('extracting Debian package %s failed: %s' % - (dsc_path, data), - extra={ - 'swh_type': 'deb_extract_failed', - 'swh_dsc': dsc_path, - 'swh_log': data, - }) - raise PackageExtractionFailed() + except subprocess.CalledProcessError as e: + logdata = open(logfile, 'r').read() + raise PackageExtractionFailed('dpkg-source exited with code %s: %s' % + (e.returncode, logdata)) from None - os.rename(destdir_tmp, destdir) + return dsc_path, destdir def get_file_info(filepath): """Retrieve the original file information from the file at filepath. Args: filepath: the path to the original file Returns: dict: information about the original file, in a dictionary with the following keys - name: the file name - sha1, sha1_git, sha256: original file hashes - length: original file length """ name = os.path.basename(filepath) if isinstance(name, bytes): name = name.decode('utf-8') ret = { 'name': name, } - hashes = hashutil.hash_file(filepath) + hashes = hashutil.hash_path(filepath) for hash in hashes: + if hash == 'length': + ret[hash] = hashes[hash] + continue ret[hash] = hashutil.hash_to_hex(hashes[hash]) - ret['length'] = os.lstat(filepath).st_size - return ret -def get_gpg_info_signature(gpg_info): - """Extract the signature date from a deb822.GpgInfo object - - Args: - gpg_info: a deb822.GpgInfo object - - Returns: - dict: a dictionary with the following keys: - - - signature_date: a timezone-aware datetime.DateTime object with the - date the signature was made - - signature_keyid: the keyid of the signing key - - signature_uid: the uid of the signing key, if found - """ - - uid = None - - if 'VALIDSIG' in gpg_info: - key_id = gpg_info['VALIDSIG'][0] - timestamp = gpg_info['VALIDSIG'][2] - - for key in gpg_info.uidkeys: - if key in gpg_info: - uid = gpg_info[key][-1] - break - - elif 'ERRSIG' in gpg_info: - key_id = gpg_info['ERRSIG'][0] - timestamp = gpg_info['ERRSIG'][4] - else: - raise ValueError('Cannot find signature in gpg_info ' - 'object. Keys: %s' % gpg_info.keys()) - - dt = datetime.datetime.utcfromtimestamp(int(timestamp)) - dt = dt.replace(tzinfo=datetime.timezone.utc) - - ret = { - 'date': dt, - 'keyid': key_id, - } - - ret['person'] = converters.uid_to_person(uid, encode=False) - - return ret - - -def get_package_metadata(package, extracted_path, keyrings, log=None): +def get_package_metadata(package, dsc_path, extracted_path): """Get the package metadata from the source package at dsc_path, extracted in extracted_path. Args: package: the package dict (with a dsc_path key) + dsc_path: path to the package's dsc file extracted_path: the path where the package got extracted - keyrings: a list of keyrings to use for gpg actions - log: a logging.Logger object Returns: dict: a dictionary with the following keys: - history: list of (package_name, package_version) tuples parsed from the package changelog - source_files: information about all the files in the source package """ ret = {} - # Parse the dsc file to retrieve all the original artifact files - dsc_path = package['dsc'] with open(dsc_path, 'rb') as dsc: parsed_dsc = Dsc(dsc) source_files = [get_file_info(dsc_path)] dsc_dir = os.path.dirname(dsc_path) - for file in parsed_dsc['files']: - file_path = os.path.join(dsc_dir, file['name']) + for filename in package['files']: + file_path = os.path.join(dsc_dir, filename) file_info = get_file_info(file_path) source_files.append(file_info) ret['original_artifact'] = source_files # Parse the changelog to retrieve the rest of the package information - changelog_path = os.path.join(extracted_path, b'debian/changelog') + changelog_path = os.path.join(extracted_path, 'debian/changelog') with open(changelog_path, 'rb') as changelog: try: parsed_changelog = Changelog(changelog) except UnicodeDecodeError: - if log: - log.warn('Unknown encoding for changelog %s,' - ' falling back to iso' % - changelog_path.decode('utf-8'), extra={ - 'swh_type': 'deb_changelog_encoding', - 'swh_name': package['name'], - 'swh_version': str(package['version']), - 'swh_changelog': changelog_path.decode('utf-8'), - }) + log.warn('Unknown encoding for changelog %s,' + ' falling back to iso' % + changelog_path.decode('utf-8'), extra={ + 'swh_type': 'deb_changelog_encoding', + 'swh_name': package['name'], + 'swh_version': str(package['version']), + 'swh_changelog': changelog_path.decode('utf-8'), + }) # need to reset as Changelog scrolls to the end of the file changelog.seek(0) parsed_changelog = Changelog(changelog, encoding='iso-8859-15') package_info = { 'name': package['name'], 'version': str(package['version']), - 'lister_metadata': { - 'lister': 'snapshot.debian.org', - 'id': package['id'], - }, 'changelog': { 'person': converters.uid_to_person(parsed_changelog.author), 'date': parse_date(parsed_changelog.date), 'history': [(block.package, str(block.version)) for block in parsed_changelog][1:], } } - try: - gpg_info = parsed_dsc.get_gpg_info(keyrings=keyrings) - package_info['pgp_signature'] = get_gpg_info_signature(gpg_info) - except ValueError: - if log: - log.info('Could not get PGP signature on package %s_%s' % - (package['name'], package['version']), - extra={ - 'swh_type': 'deb_missing_signature', - 'swh_name': package['name'], - 'swh_version': str(package['version']), - }) - package_info['pgp_signature'] = None - maintainers = [ converters.uid_to_person(parsed_dsc['Maintainer'], encode=False), ] maintainers.extend( converters.uid_to_person(person, encode=False) for person in UPLOADERS_SPLIT.split(parsed_dsc.get('Uploaders', '')) ) package_info['maintainers'] = maintainers ret['package_info'] = package_info return ret -def process_source_package(package, keyrings, basedir=None, log=None): +def process_package(package): """Process a source package into its constituent components. The source package will be decompressed in a temporary directory. Args: package (dict): a dict with the following keys: - name: source package name - version: source package version - dsc: the full path of the package's DSC file. - keyrings: a list of keyrings to use for gpg actions - basedir: the base directory where the package gets extracted - log: a logging.Logger object - Returns: tuple: A tuple with two elements: - package: the original package dict augmented with the following keys: - metadata: the metadata from get_package_metadata - - basedir: the temporary directory in which the package was - decompressed - directory: the sha1_git of the root directory of the package - objects: a dictionary of the parsed directories and files, both indexed by id Raises: FileNotFoundError: if the dsc file does not exist PackageExtractionFailed: if package extraction failed """ - if log: - log.info("Processing package %s_%s" % - (package['name'], str(package['version'])), - extra={ - 'swh_type': 'deb_process_start', - 'swh_name': package['name'], - 'swh_version': str(package['version']), - }) - - if not os.path.exists(package['dsc']): - raise FileNotFoundError('%s does not exist' % package['dsc']) - - basedir = tempfile.mkdtemp(dir=basedir) - debdir = os.path.join(basedir, '%s_%s' % (package['name'], - package['version'])) - - # the swh.loader.dir internals legitimately want bytes for paths - debdir = debdir.encode('utf-8') - - extract_src_pkg(package['dsc'], debdir, log=log) + log.info("Processing package %s_%s" % + (package['name'], str(package['version'])), + extra={ + 'swh_type': 'deb_process_start', + 'swh_name': package['name'], + 'swh_version': str(package['version']), + }) - parsed_objects = walk_and_compute_sha1_from_directory(debdir) - root_tree = parsed_objects[ROOT_TREE_KEY][0] + tempdir = download_package(package) + dsc, debdir = extract_package(package, tempdir) - package = package.copy() - package['basedir'] = basedir - package['directory'] = root_tree['sha1_git'] - package['metadata'] = get_package_metadata(package, debdir, keyrings, - log=log) + directory = Directory.from_disk(path=os.fsencode(debdir), save_path=True) + metadata = get_package_metadata(package, dsc, debdir) - return package, converters.dedup_objects(parsed_objects) + return directory, metadata, tempdir -def process_source_packages(packages, keyrings, basedir=None, log=None): - """Execute process_source_package, but for lists of packages. +class DebianLoader(SWHLoader): + """A loader for Debian packages""" - Args: - packages: an iterable of packages as expected by process_source_package - keyrings: a list of keyrings to use for gpg actions - basedir: the base directory where packages are extracted - log: a logging.Logger object - - Returns: - generator: a generator of partial results. - - Partial results have the following keys: - - - objects: the accumulator for merge_objects. This member can be - mutated to clear the pending contents and directories. - - tempdirs: the temporary directories processed so far. This list can - be flushed if temporary directories are removed on the fly. - - packages: the list of packages processed so far. - - """ - - objects = { - 'content': {}, - 'directory': {}, - 'content_seen': set(), - 'directory_seen': set(), + CONFIG_BASE_FILENAME = 'loader/debian' + ADDITIONAL_CONFIG = { + 'lister_db_url': ('str', 'postgresql:///lister-debian'), } - ret_packages = [] - tempdirs = [] - - n_pkg = len(packages) - for i, package in enumerate(packages): - try: - if log: - log.info( - "Processing package %s/%s" % (i+1, n_pkg), - extra={ - 'swh_type': 'deb_process_progress', - 'swh_counter': i+1, - 'swh_total': n_pkg, - }) - ret_package, package_objs = process_source_package( - package, keyrings, basedir=basedir, log=log) - except PackageExtractionFailed: - continue - except Exception as e: - if log: - e_type = e.__class__.__name__ - e_exc = traceback.format_exception( - e.__class__, - e, - e.__traceback__, - ) - log.warn("Could not process package %s_%s: %s" % - (package['name'], str(package['version']), e_exc), - extra={ - 'swh_type': 'deb_process_failed', - 'swh_name': package['name'], - 'swh_version': str(package['version']), - 'swh_exception_type': e_type, - 'swh_exception': e_exc, - }) - continue - ret_packages.append(ret_package) - converters.merge_objects(objects, package_objs) - tempdirs.append(ret_package['basedir']) - - yield { - 'objects': objects, - 'packages': ret_packages, - 'tempdirs': tempdirs, + def __init__(self, config=None): + super().__init__(logging_class=None, config=config) + self.db_engine = create_engine(self.config['lister_db_url']) + self.mk_session = sessionmaker(bind=self.db_engine) + self.db_session = self.mk_session() + + def load(self, *, origin, date, packages): + return super().load(origin=origin, date=date, packages=packages) + + def prepare(self, *, origin, date, packages): + self.origin = origin + self.visit_date = date + self.packages = packages + + # Deduplicate branches according to equivalent files + branches_files = {} + branches_revs = {} + equiv_branch = {} + for branch, package in packages.items(): + for eq_branch, files in branches_files.items(): + if package['files'] == files: + equiv_branch[branch] = eq_branch + if (not branches_revs[eq_branch] + and package['revision_id']): + branches_revs[eq_branch] = identifier_to_bytes( + package['revision_id'] + ) + break + else: + # No match: new entry + equiv_branch[branch] = branch + branches_files[branch] = package['files'] + if package['revision_id']: + branches_revs[branch] = identifier_to_bytes( + package['revision_id'] + ) + else: + branches_revs[branch] = None + + self.equivs = { + 'branches': equiv_branch, + 'revisions': branches_revs, } + self.versions_to_load = [ + (branch, self.packages[branch]) + for branch in sorted(branches_revs) + if not branches_revs[branch] + ] -def flush_content(storage, partial_result, content_max_length_one, log=None): - """Flush the contents from a partial_result to storage - - Args: - storage: an instance of swh.storage.Storage - partial_result: a partial result as yielded by process_source_packages - content_max_length_one: the maximum length of a persisted content - log: a logging.Logger object - - Note: - This function mutates partial_result to empty the content dict - """ - contents = partial_result['objects']['content'] - - missing_ids = list( - storage.content_missing(list(contents.values()), key_hash='sha1_git') - ) - - log_id = str(uuid.uuid4()) - num_contents = len(contents) - num_new_contents = len(missing_ids) - if log: - log.debug("Sending %d contents (%d new)" % (num_contents, - num_new_contents), - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_num_new': num_new_contents, - 'swh_id': log_id, - }) - - if missing_ids: - full_contents = ( - converters.shallow_content_to_content(contents[i], - content_max_length_one) - for i in missing_ids) - - storage.content_add(full_contents) - - if log: - log.debug("Done sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - - partial_result['objects']['content'] = {} - - -def flush_directory(storage, partial_result, log=None): - """Flush the directories from a partial_result to storage - - Args: - storage: an instance of swh.storage.Storage - partial_result: a partial result as yielded by process_source_packages - log: a logging.Logger object + self.version_idx = 0 + self.done = self.version_idx >= len(self.versions_to_load) - Note: - This function mutates partial_result to empty the directory dict - """ - directories = partial_result['objects']['directory'].values() + self.current_data = {} + self.tempdirs = [] + self.partial = False - log_id = str(uuid.uuid4()) - num_dir = len(directories) - if log: - log.debug("Sending %d directories" % num_dir, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'directory', - 'swh_num': num_dir, - 'swh_id': log_id, - }) + def get_origin(self): + return self.origin - storage.directory_add(list(directories)) + def fetch_data(self): + if self.done: + return False - if log: - log.debug("Done sending %d directories" % num_dir, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'directory', - 'swh_num': num_dir, - 'swh_id': log_id, - }) - - partial_result['objects']['directory'] = {} - - -def flush_revision(storage, partial_result, log=None): - """Flush the revisions from a partial_result to storage - - Args: - storage: an instance of swh.storage.Storage - partial_result: a partial result as yielded by process_source_packages - log: a logging.Logger object - - Returns: - The package objects augmented with a revision argument - """ - packages = [package.copy() for package in partial_result['packages']] - - log_id = str(uuid.uuid4()) - num_rev = len(packages) - if log: - log.debug("Sending %d revisions" % num_rev, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'revision', - 'swh_num': num_rev, - 'swh_id': log_id, - }) - - revisions = [] - for package in packages: - revision = converters.package_to_revision(package, log=log) - revisions.append(revision) - package['revision'] = revision - - storage.revision_add(revisions) - - if log: - log.debug("Done sending %d revisions" % num_rev, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'revision', - 'swh_num': num_rev, - 'swh_id': log_id, - }) - - return packages - - -def flush_release(storage, packages, log=None): - """Flush the revisions from a partial_result to storage - - Args: - storage: an instance of swh.storage.Storage - packages: a list of packages as returned by flush_revision - log: a logging.Logger object - - Returns: - The package objects augmented with a release argument - """ - - log_id = str(uuid.uuid4()) - num_rel = len(packages) - if log: - log.debug("Sending %d releases" % num_rel, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'release', - 'swh_num': num_rel, - 'swh_id': log_id, - }) - - releases = [] - for package in packages: - release = converters.package_to_release(package) - releases.append(release) - package['release'] = release - - storage.release_add(releases) - - if log: - log.debug("Done sending %d releases" % num_rel, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'release', - 'swh_num': num_rel, - 'swh_id': log_id, - }) - - return packages - - -def flush_occurrences(storage, packages, default_occurrences, log=None): - """Flush the occurrences from a partial_result to storage - - Args: - storage: an instance of swh.storage.Storage - packages: a list of packages as returned by flush_release - default_occurrences: a list of occurrences with default values - log: a logging.Logger object - - Returns: - The written occurrence objects - - """ - occurrences = [] - - for package in packages: - for default_occurrence in default_occurrences: - occurrence = default_occurrence.copy() - occurrence['revision'] = package['revision']['id'] - occurrence['branch'] = str(package['version']) - occurrence['origin'] = package['origin_id'] - occurrences.append(occurrence) - - storage.occurrence_add(occurrences) - - return occurrences - - -def remove_tempdirs(partial_result, log=None): - """Remove the temporary files for the packages listed""" - for tempdir in partial_result['tempdirs']: - if os.path.isdir(tempdir): - shutil.rmtree(tempdir) - - # Use the slice trick to empty the list in-place - partial_result['tempdirs'][:] = [] + branch, package = self.versions_to_load[self.version_idx] + self.version_idx += 1 + try: + directory, metadata, tempdir = process_package(package) + self.tempdirs.append(tempdir) + self.current_data = directory.collect() + revision = converters.package_metadata_to_revision( + package, directory, metadata + ) + + self.current_data['revision'] = { + revision['id']: revision, + } + + self.equivs['revisions'][branch] = revision['id'] + + except DebianLoaderException as e: + log.exception('Package %s_%s failed to load' % + (package['name'], package['version'])) + self.partial = True + + self.done = self.version_idx >= len(self.versions_to_load) + return not self.done + + def store_data(self): + self.maybe_load_contents( + self.current_data.get('content', {}).values()) + self.maybe_load_directories( + self.current_data.get('directory', {}).values()) + self.maybe_load_revisions( + self.current_data.get('revision', {}).values()) + self.current_data = {} + + if self.done: + self.flush() + self.update_packages() + self.generate_and_load_snapshot() + + def update_packages(self): + for branch in self.packages: + package = self.packages[branch] + if package['revision_id']: + continue + rev = self.equivs['revisions'][self.equivs['branches'][branch]] + if not rev: + continue + + db_package = self.db_session.query(Package)\ + .filter(Package.id == package['id'])\ + .one() + db_package.revision_id = rev + + self.db_session.commit() + + def generate_and_load_snapshot(self): + """Create a SWH archive "snapshot" of the package being loaded, and send it to + the archive. + + + """ + occurrences = [] + for branch in self.packages: + rev = self.equivs['revisions'][self.equivs['branches'][branch]] + if not rev: + self.partial = True + rev = b'\x00' * 20 + + occurrences.append({ + 'target': rev, + 'target_type': 'revision', + 'origin': self.origin_id, + 'visit': self.visit, + 'branch': branch.encode('utf-8'), + }) + self.maybe_load_occurrences(occurrences) + + def load_status(self): + status = 'eventful' if self.versions_to_load else 'uneventful' + + return { + 'status': status if not self.partial else 'failed', + } -def try_flush_partial(storage, partial_result, - content_packet_size=10000, - content_packet_length=1024 * 1024 * 1024 * 1024, - content_max_length_one=100 * 1024 * 1024, - directory_packet_size=25000, force=False, log=None): - """Conditionally flush the partial result to storage. + def visit_status(self): + return 'partial' if self.partial else 'full' - Args: - storage: an instance of swh.storage.Storage - partial_result: a partial result as yielded by process_source_packages - content_packet_size: the number of contents that triggers a flush - content_packet_length: the cumulated size of contents that triggers a - flush - content_max_length_one: the maximum length of a persisted content - directory_packet_size: the number of directories that triggers a flush - force: force a flush regardless of packet sizes - log: a logging.Logger object - """ - n_content = len(partial_result['objects']['content']) - n_directory = len(partial_result['objects']['directory']) - - if force: - # Don't compute the length if we don't care - len_contents = 0 - else: - len_contents = sum( - content['length'] - for content in partial_result['objects']['content'].values() - if content['length'] <= content_max_length_one - ) - - # Flush both contents and directories at once to be able to clear - # tempfiles while we work - if force or n_content >= content_packet_size or \ - len_contents >= content_packet_length or \ - n_directory >= directory_packet_size: - flush_content(storage, partial_result, content_max_length_one, log=log) - flush_directory(storage, partial_result, log=log) - remove_tempdirs(partial_result, log=log) + def cleanup(self): + for d in self.tempdirs: + d.cleanup() diff --git a/swh/loader/debian/tasks.py b/swh/loader/debian/tasks.py index b16544d..b635c70 100644 --- a/swh/loader/debian/tasks.py +++ b/swh/loader/debian/tasks.py @@ -1,151 +1,17 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import glob -import os -import shutil -import tempfile -import traceback -import dateutil - -from swh.core.config import load_named_config from swh.scheduler.task import Task -from swh.storage import get_storage - -from .listers.snapshot import SnapshotDebianOrg -from .loader import ( - process_source_packages, try_flush_partial, flush_occurrences, - flush_release, flush_revision) - -DEFAULT_CONFIG = { - 'snapshot_connstr': ('str', 'service=snapshot'), - 'snapshot_basedir': ('str', '/home/ndandrim/tmp/snapshot.d.o'), - 'storage_class': ('str', 'local_storage'), - 'storage_args': ('list[str]', [ - 'dbname=softwareheritage-dev', - '/tmp/swh-loader-debian/objects', - ]), - 'content_packet_size': ('int', 10000), - 'content_packet_length': ('int', 1024 ** 3), - 'content_max_length_one': ('int', 100 * 1024**2), - 'directory_packet_size': ('int', 25000), - 'keyrings': ('list[str]', glob.glob('/usr/share/keyrings/*')), -} +from .loader import DebianLoader -class LoadSnapshotPackages(Task): +class LoadDebianPackage(Task): task_queue = 'swh_loader_debian' - @property - def config(self): - if not hasattr(self, '__config'): - self.__config = load_named_config( - 'loader/debian.ini', - DEFAULT_CONFIG, - ) - return self.__config - - def run_task(self, *package_names): - """Load the history of the given package from snapshot.debian.org""" - - config = self.config - - snapshot = SnapshotDebianOrg( - connstr=config['snapshot_connstr'], - basedir=config['snapshot_basedir'], - ) - - storage = get_storage( - config['storage_class'], - config['storage_args'], - ) - - swh_authority_dt = open( - os.path.join(config['snapshot_basedir'], 'TIMESTAMP') - ).read() - - swh_authority = { - 'authority': '5f4d4c51-498a-4e28-88b3-b3e4e8396cba', - 'validity': dateutil.parser.parse(swh_authority_dt), - } - - tmpdir = tempfile.mkdtemp() - os.makedirs(os.path.join(tmpdir, 'source')) - - pkgs = snapshot.prepare_packages( - package_names, - os.path.join(tmpdir, 'source'), - log=self.log, - ) - origins = snapshot.prepare_origins(package_names, storage) - - closed = False - fetch_histories = {} - for origin in origins.values(): - id = origin['id'] - fetch_histories[id] = storage.fetch_history_start(id) - - try: - sorted_pkgs = [] - for p in pkgs.values(): - p['origin_id'] = origins[p['name']]['id'] - sorted_pkgs.append(p) - - sorted_pkgs.sort(key=lambda p: (p['name'], p['version'])) - - partial = {} - for partial in process_source_packages( - sorted_pkgs, - config['keyrings'], - tmpdir, - log=self.log, - ): - - try_flush_partial( - storage, partial, - content_packet_size=config['content_packet_size'], - content_packet_length=config['content_packet_length'], - content_max_length_one=config['content_max_length_one'], - directory_packet_size=config['directory_packet_size'], - log=self.log, - ) - - if partial: - try_flush_partial( - storage, partial, - content_packet_size=config['content_packet_size'], - content_packet_length=config['content_packet_length'], - content_max_length_one=config['content_max_length_one'], - directory_packet_size=config['directory_packet_size'], - force=True, - log=self.log, - ) - - packages = flush_revision(storage, partial, log=self.log) - - packages_w_revs = flush_release( - storage, - packages, - log=self.log - ) - - flush_occurrences(storage, packages_w_revs, [swh_authority], - log=self.log) - - for fh in fetch_histories.values(): - storage.fetch_history_end(fh, {'status': True}) - closed = True - finally: - shutil.rmtree(tmpdir) - if not closed: - data = { - 'status': False, - 'stderr': traceback.format_exc(), - } - - for fh in fetch_histories.values(): - storage.fetch_history_end(fh, data) + def run_task(self, *, origin, date, packages): + loader = DebianLoader() + return loader.load(origin=origin, date=date, packages=packages)