diff --git a/swh/loader/package/debian.py b/swh/loader/package/debian.py index b472505..cd21630 100644 --- a/swh/loader/package/debian.py +++ b/swh/loader/package/debian.py @@ -1,356 +1,395 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import email.utils import iso8601 import logging import re import subprocess from dateutil.parser import parse as parse_date from debian.changelog import Changelog from debian.deb822 import Dsc from os import path from typing import ( Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple ) from swh.loader.package.loader import PackageLoader from swh.loader.package.utils import download, release_name logger = logging.getLogger(__name__) UPLOADERS_SPLIT = re.compile(r'(?<=\>)\s*,\s*') class DebianLoader(PackageLoader): """Load debian origins into swh archive. """ visit_type = 'debian' def __init__(self, url: str, date: str, packages: Mapping[str, Any]): + """Debian Loader implementation. + + Args: + url: Origin url (e.g. deb://Debian/packages/cicero) + date: Ignored + packages: versioned packages and associated artifacts, example:: + + { + 'stretch/contrib/0.7.2-3': { + 'name': 'cicero', + 'version': '0.7.2-3' + 'files': { + 'cicero_0.7.2-3.diff.gz': { + 'md5sum': 'a93661b6a48db48d59ba7d26796fc9ce', + 'name': 'cicero_0.7.2-3.diff.gz', + 'sha256': 'f039c9642fe15c75bed5254315e2a29f...', + 'size': 3964, + 'uri': 'http://d.d.o/cicero_0.7.2-3.diff.gz', + }, + 'cicero_0.7.2-3.dsc': { + 'md5sum': 'd5dac83eb9cfc9bb52a15eb618b4670a', + 'name': 'cicero_0.7.2-3.dsc', + 'sha256': '35b7f1048010c67adfd8d70e4961aefb...', + 'size': 1864, + 'uri': 'http://d.d.o/cicero_0.7.2-3.dsc', + }, + 'cicero_0.7.2.orig.tar.gz': { + 'md5sum': '4353dede07c5728319ba7f5595a7230a', + 'name': 'cicero_0.7.2.orig.tar.gz', + 'sha256': '63f40f2436ea9f67b44e2d4bd669dbab...', + 'size': 96527, + 'uri': 'http://d.d.o/cicero_0.7.2.orig.tar.gz', + } + }, + }, + # ... + } + + """ super().__init__(url=url) self.packages = packages def get_versions(self) -> Sequence[str]: """Returns the keys of the packages input (e.g. stretch/contrib/0.7.2-3, etc...) """ return self.packages.keys() def get_default_version(self) -> str: - """Take the first version as default release + """No default version for debian so no HEAD alias in snapshot. """ - return list(self.packages.keys())[0] + return None def get_package_info(self, version: str) -> Generator[ Tuple[str, Mapping[str, Any]], None, None]: meta = self.packages[version] p_info = meta.copy() p_info['raw'] = meta yield release_name(version), p_info def resolve_revision_from( self, known_package_artifacts: Dict, artifact_metadata: Dict) \ -> Optional[bytes]: artifacts_to_fetch = artifact_metadata['files'] logger.debug('k_p_artifacts: %s', known_package_artifacts) logger.debug('artifacts_to_fetch: %s', artifacts_to_fetch) for rev_id, known_artifacts in known_package_artifacts.items(): logger.debug('Revision: %s', rev_id) logger.debug('Associated known_artifacts: %s', known_artifacts) known_artifacts = known_artifacts['extrinsic']['raw']['files'] rev_found = True for a_name, k_artifact in known_artifacts.items(): artifact_to_fetch = artifacts_to_fetch.get(a_name) logger.debug('artifact_to_fetch: %s', artifact_to_fetch) if artifact_to_fetch is None: # as soon as we do not see an artifact, we consider we need # to check the other revision rev_found = False if k_artifact['sha256'] != artifact_to_fetch['sha256']: # Hash is different, we consider we need to check the other # revisions rev_found = False if rev_found: logger.debug('Existing revision %s found for new artifacts.', rev_id) return rev_id - # if we pass here, we did not find any known artifacts + # if we pass here, we did not find any known artifacts logger.debug('No existing revision found for the new artifacts.') def download_package(self, p_info: Mapping[str, Any], tmpdir: str) -> [Tuple[str, Dict]]: """Contrary to other package loaders (1 package, 1 artifact), `a_metadata` represents the package's datafiles set to fetch: - .orig.tar.gz - .dsc - .diff.gz This is delegated to the `download_package` function. """ all_hashes = download_package(p_info, tmpdir) logger.debug('all_hashes: %s', all_hashes) res = [] for hashes in all_hashes.values(): res.append((tmpdir, hashes)) - logger.debug('res: %s', res) + logger.debug('res: %s', res) return res def uncompress(self, dl_artifacts: [Tuple[str, Dict]], dest: str) -> str: logger.debug('dl_artifacts: %s', dl_artifacts) return extract_package(dl_artifacts, dest=dest) def build_revision(self, a_metadata: Mapping[str, Any], uncompressed_path: str) -> Dict: dsc_url, dsc_name = dsc_information(a_metadata) dsc_path = path.join(path.dirname(uncompressed_path), dsc_name) i_metadata = get_package_metadata( a_metadata, dsc_path, uncompressed_path) logger.debug('i_metadata: %s', i_metadata) logger.debug('a_metadata: %s', a_metadata) msg = 'Synthetic revision for Debian source package %s version %s' % ( a_metadata['name'], a_metadata['version']) date = iso8601.parse_date(i_metadata['changelog']['date']) author = prepare_person(i_metadata['changelog']['person']) # inspired from swh.loader.debian.converters.package_metadata_to_revision # noqa return { 'type': 'dsc', 'message': msg.encode('utf-8'), 'author': author, 'date': date, 'committer': author, 'committer_date': date, 'parents': [], 'metadata': { 'intrinsic': { 'tool': 'dsc', 'raw': i_metadata, }, 'extrinsic': { 'provider': dsc_url, 'when': self.visit_date.isoformat(), 'raw': a_metadata, }, } } def uid_to_person(uid: str) -> Mapping[str, str]: """Convert an uid to a person suitable for insertion. Args: uid: an uid of the form "Name " Returns: a dictionary with the following keys: - name: the name associated to the uid - email: the mail associated to the uid - fullname: the actual uid input """ logger.debug('uid: %s', uid) ret = { 'name': '', 'email': '', 'fullname': uid, } name, mail = email.utils.parseaddr(uid) if name and email: ret['name'] = name ret['email'] = mail else: ret['name'] = uid return ret def prepare_person(person: Mapping[str, str]) -> Mapping[str, bytes]: """Prepare person for swh serialization... Args: A person dict Returns: A person dict ready for storage """ ret = {} for key, value in person.items(): ret[key] = value.encode('utf-8') return ret def download_package( package: Mapping[str, Any], tmpdir: Any) -> Mapping[str, Any]: """Fetch a source package in a temporary directory and check the checksums for all files. Args: package: Dict defining the set of files representing a debian package tmpdir: Where to download and extract the files to ingest Returns: Dict of swh hashes per filename key """ all_hashes = {} for filename, fileinfo in package['files'].items(): uri = fileinfo['uri'] logger.debug('fileinfo: %s', fileinfo) extrinsic_hashes = {'sha256': fileinfo['sha256']} logger.debug('extrinsic_hashes(%s): %s', filename, extrinsic_hashes) filepath, hashes = download(uri, dest=tmpdir, filename=filename, hashes=extrinsic_hashes) all_hashes[filename] = hashes logger.debug('all_hashes: %s', all_hashes) return all_hashes def dsc_information(package: Mapping[str, Any]) -> Tuple[str, str]: """Retrieve dsc information from a package. Args: package: Package metadata information Returns: Tuple of dsc file's uri, dsc's full disk path """ dsc_name = None dsc_url = None for filename, fileinfo in package['files'].items(): if filename.endswith('.dsc'): if dsc_name: raise ValueError( 'Package %s_%s references several dsc files' % (package['name'], package['version']) ) dsc_url = fileinfo['uri'] dsc_name = filename return dsc_url, dsc_name def extract_package(dl_artifacts: List[Tuple[str, Dict]], dest: str) -> str: """Extract a Debian source package to a given directory. Note that after extraction the target directory will be the root of the extracted package, rather than containing it. Args: package: package information dictionary dest: directory where the package files are stored Returns: Package extraction directory """ a_path = dl_artifacts[0][0] logger.debug('dl_artifacts: %s', dl_artifacts) for _, hashes in dl_artifacts: logger.debug('hashes: %s', hashes) filename = hashes['filename'] if filename.endswith('.dsc'): dsc_name = filename break dsc_path = path.join(a_path, dsc_name) destdir = path.join(dest, 'extracted') logfile = path.join(dest, 'extract.log') logger.debug('extract Debian source package %s in %s' % (dsc_path, destdir), extra={ 'swh_type': 'deb_extract', 'swh_dsc': dsc_path, 'swh_destdir': destdir, }) cmd = ['dpkg-source', '--no-copy', '--no-check', '--ignore-bad-version', '-x', dsc_path, destdir] try: with open(logfile, 'w') as stdout: subprocess.check_call(cmd, stdout=stdout, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: logdata = open(logfile, 'r').read() raise ValueError('dpkg-source exited with code %s: %s' % (e.returncode, logdata)) from None return destdir def get_package_metadata(package: Mapping[str, Any], dsc_path: str, extracted_path: str) -> Mapping[str, Any]: """Get the package metadata from the source package at dsc_path, extracted in extracted_path. Args: package: the package dict (with a dsc_path key) dsc_path: path to the package's dsc file extracted_path: the path where the package got extracted Returns: dict: a dictionary with the following keys: - history: list of (package_name, package_version) tuples parsed from the package changelog """ with open(dsc_path, 'rb') as dsc: parsed_dsc = Dsc(dsc) # Parse the changelog to retrieve the rest of the package information changelog_path = path.join(extracted_path, 'debian/changelog') with open(changelog_path, 'rb') as changelog: try: parsed_changelog = Changelog(changelog) except UnicodeDecodeError: logger.warning('Unknown encoding for changelog %s,' ' falling back to iso' % changelog_path.decode('utf-8'), extra={ 'swh_type': 'deb_changelog_encoding', 'swh_name': package['name'], 'swh_version': str(package['version']), 'swh_changelog': changelog_path.decode('utf-8'), }) # need to reset as Changelog scrolls to the end of the file changelog.seek(0) parsed_changelog = Changelog(changelog, encoding='iso-8859-15') package_info = { 'name': package['name'], 'version': str(package['version']), 'changelog': { 'person': uid_to_person(parsed_changelog.author), 'date': parse_date(parsed_changelog.date).isoformat(), 'history': [(block.package, str(block.version)) for block in parsed_changelog][1:], } } maintainers = [ uid_to_person(parsed_dsc['Maintainer']), ] maintainers.extend( uid_to_person(person) for person in UPLOADERS_SPLIT.split(parsed_dsc.get('Uploaders', '')) ) package_info['maintainers'] = maintainers return package_info diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py index 4d7c78c..19c6474 100644 --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -1,389 +1,389 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import tempfile import os from typing import ( Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple ) from swh.core.tarball import uncompress from swh.core.config import SWHConfig from swh.model.from_disk import Directory from swh.model.identifiers import ( revision_identifier, snapshot_identifier, identifier_to_bytes ) from swh.storage import get_storage from swh.storage.algos.snapshot import snapshot_get_all_branches from swh.loader.core.converters import content_for_storage from swh.loader.package.utils import download logger = logging.getLogger(__name__) # Not implemented yet: # - clean up disk routines from previous killed workers (when OOMkilled) # -> separation of concern would like this to be abstracted from the code # -> experience tells us it's complicated to do as such (T903, T964, T982, # etc...) # # - model: swh.model.merkle.from_disk should output swh.model.model.* objects # to avoid this layer's conversion routine call # -> Take this up within swh.model's current implementation class PackageLoader: # Origin visit type (str) set by the loader visit_type = '' def __init__(self, url): """Loader's constructor. This raises exception if the minimal required configuration is missing (cf. fn:`check` method). Args: url (str): Origin url to load data from """ # This expects to use the environment variable SWH_CONFIG_FILENAME self.config = SWHConfig.parse_config_file() self._check_configuration() self.storage = get_storage(**self.config['storage']) self.url = url self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) def _check_configuration(self): """Checks the minimal configuration required is set for the loader. If some required configuration is missing, exception detailing the issue is raised. """ if 'storage' not in self.config: raise ValueError( 'Misconfiguration, at least the storage key should be set') def get_versions(self) -> Sequence[str]: """Return the list of all published package versions. Returns: Sequence of published versions """ return [] def get_package_info(self, version: str) -> Generator[ Tuple[str, Mapping[str, Any]], None, None]: """Given a release version of a package, retrieve the associated package information for such version. Args: version: Package version Returns: (branch name, package metadata) """ yield from {} def build_revision( self, a_metadata: Dict, i_metadata: Dict) -> Dict: """Build the revision dict from the archive metadata (extrinsic artifact metadata) and the intrinsic metadata. Returns: SWH data dict """ return {} def get_default_version(self) -> str: """Retrieve the latest release version Returns: Latest version """ return '' def last_snapshot(self) -> Optional[Dict]: """Retrieve the last snapshot """ visit = self.storage.origin_visit_get_latest( self.url, require_snapshot=True) if visit: return snapshot_get_all_branches( self.storage, visit['snapshot']['id']) def known_artifacts(self, snapshot: Dict) -> [Dict]: """Retrieve the known releases/artifact for the origin. Args snapshot: snapshot for the visit Returns: Dict of keys revision id (bytes), values a metadata Dict. """ if not snapshot or 'branches' not in snapshot: return {} # retrieve only revisions (e.g the alias we do not want here) revs = [rev['target'] for rev in snapshot['branches'].values() if rev and rev['target_type'] == 'revision'] known_revisions = self.storage.revision_get(revs) ret = {} for revision in known_revisions: if not revision: # revision_get can return None continue ret[revision['id']] = revision['metadata'] return ret def resolve_revision_from( self, known_artifacts: Dict, artifact_metadata: Dict) \ -> Optional[bytes]: """Resolve the revision from a snapshot and an artifact metadata dict. If the artifact has already been downloaded, this will return the existing revision targeting that uncompressed artifact directory. Otherwise, this returns None. Args: snapshot: Snapshot artifact_metadata: Information dict Returns: None or revision identifier """ return None def download_package(self, p_info: Mapping[str, Any], tmpdir: str) -> [Tuple[str, Dict]]: """Download artifacts for a specific package. All downloads happen in in the tmpdir folder. Default implementation expects the artifacts package info to be about one artifact per package. Note that most implementation have 1 artifact per package. But some implementation have multiple artifacts per package (debian), some have none, the package is the artifact (gnu). Args: artifacts_package_info: Information on the package artifacts to download (url, filename, etc...) tmpdir: Location to retrieve such artifacts Returns: List of (path, computed hashes) """ a_uri = p_info['url'] filename = p_info.get('filename') return [download(a_uri, dest=tmpdir, filename=filename)] def uncompress(self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str) -> str: """Uncompress the artifact(s) in the destination folder dest. Optionally, this could need to use the p_info dict for some more information (debian). """ uncompressed_path = os.path.join(dest, 'src') for a_path, _ in dl_artifacts: uncompress(a_path, dest=uncompressed_path) return uncompressed_path def load(self) -> Dict: """Load for a specific origin the associated contents. for each package version of the origin 1. Fetch the files for one package version By default, this can be implemented as a simple HTTP request. Loaders with more specific requirements can override this, e.g.: the PyPI loader checks the integrity of the downloaded files; the Debian loader has to download and check several files for one package version. 2. Extract the downloaded files By default, this would be a universal archive/tarball extraction. Loaders for specific formats can override this method (for instance, the Debian loader uses dpkg-source -x). 3. Convert the extracted directory to a set of Software Heritage objects Using swh.model.from_disk. 4. Extract the metadata from the unpacked directories This would only be applicable for "smart" loaders like npm (parsing the package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing debian/changelog and debian/control). On "minimal-metadata" sources such as the GNU archive, the lister should provide the minimal set of metadata needed to populate the revision/release objects (authors, dates) as an argument to the task. 5. Generate the revision/release objects for the given version. From the data generated at steps 3 and 4. end for each 6. Generate and load the snapshot for the visit Using the revisions/releases collected at step 5., and the branch information from step 0., generate a snapshot and load it into the Software Heritage archive """ status_load = 'uneventful' # either: eventful, uneventful, failed status_visit = 'full' # either: partial, full tmp_revisions = {} # type: Dict[str, List] snapshot = None try: # Prepare origin and origin_visit origin = {'url': self.url} self.storage.origin_add([origin]) visit_id = self.storage.origin_visit_add( origin=self.url, date=self.visit_date, type=self.visit_type)['visit'] last_snapshot = self.last_snapshot() logger.debug('last snapshot: %s', last_snapshot) known_artifacts = self.known_artifacts(last_snapshot) logger.debug('known artifacts: %s', known_artifacts) # Retrieve the default release version (the "latest" one) default_version = self.get_default_version() logger.debug('default version: %s', default_version) for version in self.get_versions(): # for each logger.debug('version: %s', version) tmp_revisions[version] = [] # `p_` stands for `package_` for branch_name, p_info in self.get_package_info(version): logger.debug('package_info: %s', p_info) revision_id = self.resolve_revision_from( known_artifacts, p_info['raw']) if revision_id is None: with tempfile.TemporaryDirectory() as tmpdir: try: dl_artifacts = self.download_package( p_info, tmpdir) except Exception: logger.exception('Unable to retrieve %s', p_info) status_visit = 'partial' continue uncompressed_path = self.uncompress( dl_artifacts, dest=tmpdir) logger.debug('uncompressed_path: %s', uncompressed_path) directory = Directory.from_disk( path=uncompressed_path.encode('utf-8'), data=True) # noqa # FIXME: Try not to load the full raw content in # memory objects = directory.collect() contents = objects['content'].values() logger.debug('Number of contents: %s', len(contents)) self.storage.content_add( map(content_for_storage, contents)) status_load = 'eventful' directories = objects['directory'].values() logger.debug('Number of directories: %s', len(directories)) self.storage.directory_add(directories) # FIXME: This should be release. cf. D409 revision = self.build_revision( p_info['raw'], uncompressed_path) revision.update({ 'synthetic': True, 'directory': directory.hash, }) revision['metadata'].update({ 'original_artifact': [ hashes for _, hashes in dl_artifacts ], }) revision['id'] = revision_id = identifier_to_bytes( revision_identifier(revision)) logger.debug('Revision: %s', revision) self.storage.revision_add([revision]) tmp_revisions[version].append((branch_name, revision_id)) logger.debug('tmp_revisions: %s', tmp_revisions) # Build and load the snapshot branches = {} for version, branch_name_revisions in tmp_revisions.items(): if version == default_version and \ len(branch_name_revisions) == 1: # only 1 branch (no ambiguity), we can create an alias # branch 'HEAD' - branch_name, target = branch_name_revisions[0] + branch_name, _ = branch_name_revisions[0] # except for some corner case (deposit) if branch_name != 'HEAD': branches[b'HEAD'] = { 'target_type': 'alias', 'target': branch_name.encode('utf-8'), } for branch_name, target in branch_name_revisions: branch_name = branch_name.encode('utf-8') branches[branch_name] = { 'target_type': 'revision', 'target': target, } snapshot = { 'branches': branches } logger.debug('snapshot: %s', snapshot) snapshot['id'] = identifier_to_bytes( snapshot_identifier(snapshot)) logger.debug('snapshot: %s', snapshot) self.storage.snapshot_add([snapshot]) if hasattr(self.storage, 'flush'): self.storage.flush() except Exception: logger.exception('Fail to load %s' % self.url) status_visit = 'partial' status_load = 'failed' finally: self.storage.origin_visit_update( origin=self.url, visit_id=visit_id, status=status_visit, snapshot=snapshot) result = { 'status': status_load, } if snapshot: result['snapshot_id'] = snapshot['id'] return result diff --git a/swh/loader/package/tests/common.py b/swh/loader/package/tests/common.py index 1c2a9c1..b4430cc 100644 --- a/swh/loader/package/tests/common.py +++ b/swh/loader/package/tests/common.py @@ -1,106 +1,113 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from os import path import logging from typing import Dict, List, Tuple from swh.model.hashutil import hash_to_bytes, hash_to_hex logger = logging.getLogger(__file__) DATADIR = path.join(path.abspath(path.dirname(__file__)), 'resources') def decode_target(target): """Test helper to ease readability in test """ if not target: return target target_type = target['target_type'] if target_type == 'alias': decoded_target = target['target'].decode('utf-8') else: decoded_target = hash_to_hex(target['target']) return { 'target': decoded_target, 'target_type': target_type } def check_snapshot(expected_snapshot, storage): """Check for snapshot match. Provide the hashes as hexadecimal, the conversion is done within the method. Args: expected_snapshot (dict): full snapshot with hex ids storage (Storage): expected storage """ expected_snapshot_id = expected_snapshot['id'] expected_branches = expected_snapshot['branches'] snap = storage.snapshot_get(hash_to_bytes(expected_snapshot_id)) if snap is None: # display known snapshots instead from pprint import pprint - for snap_id, (_snap, branches) in storage._snapshots.items(): - pprint(_snap.to_dict()) + for snap_id, (_snap, _) in storage._snapshots.items(): + snapd = _snap.to_dict() + snapd['id'] = hash_to_hex(snapd['id']) + branches = { + branch.decode('utf-8'): decode_target(target) + for branch, target in snapd['branches'].items() + } + snapd['branches'] = branches + pprint(snapd) raise AssertionError('Snapshot is not found') branches = { branch.decode('utf-8'): decode_target(target) for branch, target in snap['branches'].items() } assert expected_branches == branches def check_metadata(metadata: Dict, key_path: str, raw_type: str): """Given a metadata dict, ensure the associated key_path value is of type raw_type. Args: metadata: Dict to check key_path: Path to check raw_type: Type to check the path with Raises: Assertion error in case of mismatch """ data = metadata keys = key_path.split('.') for k in keys: try: data = data[k] except (TypeError, KeyError) as e: # KeyError: because path too long # TypeError: data is not a dict raise AssertionError(e) assert isinstance(data, raw_type) def check_metadata_paths(metadata: Dict, paths: List[Tuple[str, str]]): """Given a metadata dict, ensure the keys are of expected types Args: metadata: Dict to check key_path: Path to check raw_type: Type to check the path with Raises: Assertion error in case of mismatch """ for key_path, raw_type in paths: check_metadata(metadata, key_path, raw_type) diff --git a/swh/loader/package/tests/test_debian.py b/swh/loader/package/tests/test_debian.py index b453c28..5f0a284 100644 --- a/swh/loader/package/tests/test_debian.py +++ b/swh/loader/package/tests/test_debian.py @@ -1,318 +1,308 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import logging import pytest from os import path from swh.loader.package.debian import ( DebianLoader, download_package, dsc_information, uid_to_person, prepare_person, get_package_metadata, extract_package ) from swh.loader.package.tests.common import check_snapshot logger = logging.getLogger(__name__) PACKAGE_FILES = { + 'name': 'cicero', + 'version': '0.7.2-3', 'files': { 'cicero_0.7.2-3.diff.gz': { 'md5sum': 'a93661b6a48db48d59ba7d26796fc9ce', 'name': 'cicero_0.7.2-3.diff.gz', 'sha256': 'f039c9642fe15c75bed5254315e2a29f9f2700da0e29d9b0729b3ffc46c8971c', # noqa 'size': 3964, 'uri': 'http://deb.debian.org/debian//pool/contrib/c/cicero/cicero_0.7.2-3.diff.gz' # noqa }, 'cicero_0.7.2-3.dsc': { 'md5sum': 'd5dac83eb9cfc9bb52a15eb618b4670a', 'name': 'cicero_0.7.2-3.dsc', 'sha256': '35b7f1048010c67adfd8d70e4961aefd8800eb9a83a4d1cc68088da0009d9a03', # noqa 'size': 1864, 'uri': 'http://deb.debian.org/debian//pool/contrib/c/cicero/cicero_0.7.2-3.dsc'}, # noqa 'cicero_0.7.2.orig.tar.gz': { 'md5sum': '4353dede07c5728319ba7f5595a7230a', 'name': 'cicero_0.7.2.orig.tar.gz', 'sha256': '63f40f2436ea9f67b44e2d4bd669dbabe90e2635a204526c20e0b3c8ee957786', # noqa 'size': 96527, 'uri': 'http://deb.debian.org/debian//pool/contrib/c/cicero/cicero_0.7.2.orig.tar.gz' # noqa } }, - 'id': 23, - 'name': 'cicero', - 'revision_id': None, - 'version': '0.7.2-3' } PACKAGE_PER_VERSION = { 'stretch/contrib/0.7.2-3': PACKAGE_FILES } def test_debian_first_visit( swh_config, requests_mock_datadir): """With no prior visit, load a gnu project ends up with 1 snapshot """ loader = DebianLoader( url='deb://Debian/packages/cicero', date='2019-10-12T05:58:09.165557+00:00', packages=PACKAGE_PER_VERSION) actual_load_status = loader.load() assert actual_load_status['status'] == 'eventful' stats = loader.storage.stat_counters() assert { 'content': 42, 'directory': 2, 'origin': 1, 'origin_visit': 1, 'person': 1, 'release': 0, 'revision': 1, # all artifacts under 1 revision 'skipped_content': 0, 'snapshot': 1 } == stats expected_snapshot = { - 'id': 'a59ec49a01ff329dcbbc63fd36a5654143aef240', + 'id': '3b6b66e6ee4e7d903a379a882684a2a50480c0b4', 'branches': { - 'HEAD': { - 'target_type': 'alias', - 'target': 'releases/stretch/contrib/0.7.2-3' - }, 'releases/stretch/contrib/0.7.2-3': { 'target_type': 'revision', 'target': '2807f5b3f84368b4889a9ae827fe85854ffecf07', } }, } # different than the previous loader as no release is done check_snapshot(expected_snapshot, loader.storage) def test_debian_first_visit_then_another_visit( swh_config, requests_mock_datadir): """With no prior visit, load a gnu project ends up with 1 snapshot """ url = 'deb://Debian/packages/cicero' loader = DebianLoader( url=url, date='2019-10-12T05:58:09.165557+00:00', packages=PACKAGE_PER_VERSION) actual_load_status = loader.load() assert actual_load_status['status'] == 'eventful' origin_visit = next(loader.storage.origin_visit_get(url)) assert origin_visit['status'] == 'full' stats = loader.storage.stat_counters() assert { 'content': 42, 'directory': 2, 'origin': 1, 'origin_visit': 1, 'person': 1, 'release': 0, 'revision': 1, # all artifacts under 1 revision 'skipped_content': 0, 'snapshot': 1 } == stats expected_snapshot = { - 'id': 'a59ec49a01ff329dcbbc63fd36a5654143aef240', + 'id': '3b6b66e6ee4e7d903a379a882684a2a50480c0b4', 'branches': { - 'HEAD': { - 'target_type': 'alias', - 'target': 'releases/stretch/contrib/0.7.2-3' - }, 'releases/stretch/contrib/0.7.2-3': { 'target_type': 'revision', 'target': '2807f5b3f84368b4889a9ae827fe85854ffecf07', } }, } # different than the previous loader as no release is done check_snapshot(expected_snapshot, loader.storage) # No change in between load actual_load_status2 = loader.load() assert actual_load_status2['status'] == 'uneventful' origin_visit2 = list(loader.storage.origin_visit_get(url)) assert origin_visit2[-1]['status'] == 'full' stats2 = loader.storage.stat_counters() assert { 'content': 42 + 0, 'directory': 2 + 0, 'origin': 1, 'origin_visit': 1 + 1, # a new visit occurred 'person': 1, 'release': 0, 'revision': 1, 'skipped_content': 0, 'snapshot': 1, # same snapshot across 2 visits } == stats2 urls = [ m.url for m in requests_mock_datadir.request_history if m.url.startswith('http://deb.debian.org') ] # visited each package artifact twice across 2 visits assert len(urls) == len(set(urls)) def test_uid_to_person(): uid = 'Someone Name ' actual_person = uid_to_person(uid) assert actual_person == { 'name': 'Someone Name', 'email': 'someone@orga.org', 'fullname': uid, } def test_prepare_person(): actual_author = prepare_person({ 'name': 'Someone Name', 'email': 'someone@orga.org', 'fullname': 'Someone Name ', }) assert actual_author == { 'name': b'Someone Name', 'email': b'someone@orga.org', 'fullname': b'Someone Name ', } def test_download_package(datadir, tmpdir, requests_mock_datadir): tmpdir = str(tmpdir) # py3.5 work around (LocalPath issue) all_hashes = download_package(PACKAGE_FILES, tmpdir) assert all_hashes == { 'cicero_0.7.2-3.diff.gz': { 'checksums': { 'blake2s256': '08b1c438e70d2474bab843d826515147fa4a817f8c4baaf3ddfbeb5132183f21', # noqa 'sha1': '0815282053f21601b0ec4adf7a8fe47eace3c0bc', 'sha1_git': '834ac91da3a9da8f23f47004bb456dd5bd16fe49', 'sha256': 'f039c9642fe15c75bed5254315e2a29f9f2700da0e29d9b0729b3ffc46c8971c' # noqa }, 'filename': 'cicero_0.7.2-3.diff.gz', 'length': 3964}, 'cicero_0.7.2-3.dsc': { 'checksums': { 'blake2s256': '8c002bead3e35818eaa9d00826f3d141345707c58fb073beaa8abecf4bde45d2', # noqa 'sha1': 'abbec4e8efbbc80278236e1dd136831eac08accd', 'sha1_git': '1f94b2086fa1142c2df6b94092f5c5fa11093a8e', 'sha256': '35b7f1048010c67adfd8d70e4961aefd8800eb9a83a4d1cc68088da0009d9a03' # noqa }, 'filename': 'cicero_0.7.2-3.dsc', 'length': 1864}, 'cicero_0.7.2.orig.tar.gz': { 'checksums': { 'blake2s256': '9809aa8d2e2dad7f34cef72883db42b0456ab7c8f1418a636eebd30ab71a15a6', # noqa 'sha1': 'a286efd63fe2c9c9f7bb30255c3d6fcdcf390b43', 'sha1_git': 'aa0a38978dce86d531b5b0299b4a616b95c64c74', 'sha256': '63f40f2436ea9f67b44e2d4bd669dbabe90e2635a204526c20e0b3c8ee957786' # noqa }, 'filename': 'cicero_0.7.2.orig.tar.gz', 'length': 96527 } } def test_dsc_information_ok(): fname = 'cicero_0.7.2-3.dsc' dsc_url, dsc_name = dsc_information(PACKAGE_FILES) assert dsc_url == PACKAGE_FILES['files'][fname]['uri'] assert dsc_name == PACKAGE_FILES['files'][fname]['name'] def test_dsc_information_not_found(): fname = 'cicero_0.7.2-3.dsc' package_files = copy.deepcopy(PACKAGE_FILES) package_files['files'].pop(fname) dsc_url, dsc_name = dsc_information(package_files) assert dsc_url is None assert dsc_name is None def test_dsc_information_too_many_dsc_entries(): # craft an extra dsc file fname = 'cicero_0.7.2-3.dsc' package_files = copy.deepcopy(PACKAGE_FILES) data = package_files['files'][fname] fname2 = fname.replace('cicero', 'ciceroo') package_files['files'][fname2] = data with pytest.raises( ValueError, match='Package %s_%s references several dsc' % ( package_files['name'], package_files['version'])): dsc_information(package_files) def test_get_package_metadata(requests_mock_datadir, datadir, tmp_path): tmp_path = str(tmp_path) # py3.5 compat. package = PACKAGE_FILES logger.debug('package: %s', package) # download the packages all_hashes = download_package(package, tmp_path) # Retrieve information from package _, dsc_name = dsc_information(package) dl_artifacts = [(tmp_path, hashes) for hashes in all_hashes.values()] # Extract information from package extracted_path = extract_package(dl_artifacts, tmp_path) # Retrieve information on package dsc_path = path.join(path.dirname(extracted_path), dsc_name) actual_package_info = get_package_metadata( package, dsc_path, extracted_path) logger.debug('actual_package_info: %s', actual_package_info) assert actual_package_info == { 'changelog': { 'date': '2014-10-19T16:52:35+02:00', 'history': [ ('cicero', '0.7.2-2'), ('cicero', '0.7.2-1'), ('cicero', '0.7-1') ], 'person': { 'email': 'sthibault@debian.org', 'fullname': 'Samuel Thibault ', 'name': 'Samuel Thibault' } }, 'maintainers': [ { 'email': 'debian-accessibility@lists.debian.org', 'fullname': 'Debian Accessibility Team ' '', 'name': 'Debian Accessibility Team' }, { 'email': 'sthibault@debian.org', 'fullname': 'Samuel Thibault ', 'name': 'Samuel Thibault' } ], 'name': 'cicero', 'version': '0.7.2-3' }