Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/loader/package/debian.py b/swh/loader/package/debian.py
index b472505..cd21630 100644
--- a/swh/loader/package/debian.py
+++ b/swh/loader/package/debian.py
@@ -1,356 +1,395 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import email.utils
import iso8601
import logging
import re
import subprocess
from dateutil.parser import parse as parse_date
from debian.changelog import Changelog
from debian.deb822 import Dsc
from os import path
from typing import (
Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple
)
from swh.loader.package.loader import PackageLoader
from swh.loader.package.utils import download, release_name
logger = logging.getLogger(__name__)
UPLOADERS_SPLIT = re.compile(r'(?<=\>)\s*,\s*')
class DebianLoader(PackageLoader):
"""Load debian origins into swh archive.
"""
visit_type = 'debian'
def __init__(self, url: str, date: str, packages: Mapping[str, Any]):
+ """Debian Loader implementation.
+
+ Args:
+ url: Origin url (e.g. deb://Debian/packages/cicero)
+ date: Ignored
+ packages: versioned packages and associated artifacts, example::
+
+ {
+ 'stretch/contrib/0.7.2-3': {
+ 'name': 'cicero',
+ 'version': '0.7.2-3'
+ 'files': {
+ 'cicero_0.7.2-3.diff.gz': {
+ 'md5sum': 'a93661b6a48db48d59ba7d26796fc9ce',
+ 'name': 'cicero_0.7.2-3.diff.gz',
+ 'sha256': 'f039c9642fe15c75bed5254315e2a29f...',
+ 'size': 3964,
+ 'uri': 'http://d.d.o/cicero_0.7.2-3.diff.gz',
+ },
+ 'cicero_0.7.2-3.dsc': {
+ 'md5sum': 'd5dac83eb9cfc9bb52a15eb618b4670a',
+ 'name': 'cicero_0.7.2-3.dsc',
+ 'sha256': '35b7f1048010c67adfd8d70e4961aefb...',
+ 'size': 1864,
+ 'uri': 'http://d.d.o/cicero_0.7.2-3.dsc',
+ },
+ 'cicero_0.7.2.orig.tar.gz': {
+ 'md5sum': '4353dede07c5728319ba7f5595a7230a',
+ 'name': 'cicero_0.7.2.orig.tar.gz',
+ 'sha256': '63f40f2436ea9f67b44e2d4bd669dbab...',
+ 'size': 96527,
+ 'uri': 'http://d.d.o/cicero_0.7.2.orig.tar.gz',
+ }
+ },
+ },
+ # ...
+ }
+
+ """
super().__init__(url=url)
self.packages = packages
def get_versions(self) -> Sequence[str]:
"""Returns the keys of the packages input (e.g.
stretch/contrib/0.7.2-3, etc...)
"""
return self.packages.keys()
def get_default_version(self) -> str:
- """Take the first version as default release
+ """No default version for debian so no HEAD alias in snapshot.
"""
- return list(self.packages.keys())[0]
+ return None
def get_package_info(self, version: str) -> Generator[
Tuple[str, Mapping[str, Any]], None, None]:
meta = self.packages[version]
p_info = meta.copy()
p_info['raw'] = meta
yield release_name(version), p_info
def resolve_revision_from(
self, known_package_artifacts: Dict, artifact_metadata: Dict) \
-> Optional[bytes]:
artifacts_to_fetch = artifact_metadata['files']
logger.debug('k_p_artifacts: %s', known_package_artifacts)
logger.debug('artifacts_to_fetch: %s', artifacts_to_fetch)
for rev_id, known_artifacts in known_package_artifacts.items():
logger.debug('Revision: %s', rev_id)
logger.debug('Associated known_artifacts: %s', known_artifacts)
known_artifacts = known_artifacts['extrinsic']['raw']['files']
rev_found = True
for a_name, k_artifact in known_artifacts.items():
artifact_to_fetch = artifacts_to_fetch.get(a_name)
logger.debug('artifact_to_fetch: %s', artifact_to_fetch)
if artifact_to_fetch is None:
# as soon as we do not see an artifact, we consider we need
# to check the other revision
rev_found = False
if k_artifact['sha256'] != artifact_to_fetch['sha256']:
# Hash is different, we consider we need to check the other
# revisions
rev_found = False
if rev_found:
logger.debug('Existing revision %s found for new artifacts.',
rev_id)
return rev_id
- # if we pass here, we did not find any known artifacts
+ # if we pass here, we did not find any known artifacts
logger.debug('No existing revision found for the new artifacts.')
def download_package(self, p_info: Mapping[str, Any],
tmpdir: str) -> [Tuple[str, Dict]]:
"""Contrary to other package loaders (1 package, 1 artifact),
`a_metadata` represents the package's datafiles set to fetch:
- <package-version>.orig.tar.gz
- <package-version>.dsc
- <package-version>.diff.gz
This is delegated to the `download_package` function.
"""
all_hashes = download_package(p_info, tmpdir)
logger.debug('all_hashes: %s', all_hashes)
res = []
for hashes in all_hashes.values():
res.append((tmpdir, hashes))
- logger.debug('res: %s', res)
+ logger.debug('res: %s', res)
return res
def uncompress(self, dl_artifacts: [Tuple[str, Dict]], dest: str) -> str:
logger.debug('dl_artifacts: %s', dl_artifacts)
return extract_package(dl_artifacts, dest=dest)
def build_revision(self, a_metadata: Mapping[str, Any],
uncompressed_path: str) -> Dict:
dsc_url, dsc_name = dsc_information(a_metadata)
dsc_path = path.join(path.dirname(uncompressed_path), dsc_name)
i_metadata = get_package_metadata(
a_metadata, dsc_path, uncompressed_path)
logger.debug('i_metadata: %s', i_metadata)
logger.debug('a_metadata: %s', a_metadata)
msg = 'Synthetic revision for Debian source package %s version %s' % (
a_metadata['name'], a_metadata['version'])
date = iso8601.parse_date(i_metadata['changelog']['date'])
author = prepare_person(i_metadata['changelog']['person'])
# inspired from swh.loader.debian.converters.package_metadata_to_revision # noqa
return {
'type': 'dsc',
'message': msg.encode('utf-8'),
'author': author,
'date': date,
'committer': author,
'committer_date': date,
'parents': [],
'metadata': {
'intrinsic': {
'tool': 'dsc',
'raw': i_metadata,
},
'extrinsic': {
'provider': dsc_url,
'when': self.visit_date.isoformat(),
'raw': a_metadata,
},
}
}
def uid_to_person(uid: str) -> Mapping[str, str]:
"""Convert an uid to a person suitable for insertion.
Args:
uid: an uid of the form "Name <email@ddress>"
Returns:
a dictionary with the following keys:
- name: the name associated to the uid
- email: the mail associated to the uid
- fullname: the actual uid input
"""
logger.debug('uid: %s', uid)
ret = {
'name': '',
'email': '',
'fullname': uid,
}
name, mail = email.utils.parseaddr(uid)
if name and email:
ret['name'] = name
ret['email'] = mail
else:
ret['name'] = uid
return ret
def prepare_person(person: Mapping[str, str]) -> Mapping[str, bytes]:
"""Prepare person for swh serialization...
Args:
A person dict
Returns:
A person dict ready for storage
"""
ret = {}
for key, value in person.items():
ret[key] = value.encode('utf-8')
return ret
def download_package(
package: Mapping[str, Any], tmpdir: Any) -> Mapping[str, Any]:
"""Fetch a source package in a temporary directory and check the checksums
for all files.
Args:
package: Dict defining the set of files representing a debian package
tmpdir: Where to download and extract the files to ingest
Returns:
Dict of swh hashes per filename key
"""
all_hashes = {}
for filename, fileinfo in package['files'].items():
uri = fileinfo['uri']
logger.debug('fileinfo: %s', fileinfo)
extrinsic_hashes = {'sha256': fileinfo['sha256']}
logger.debug('extrinsic_hashes(%s): %s', filename, extrinsic_hashes)
filepath, hashes = download(uri, dest=tmpdir, filename=filename,
hashes=extrinsic_hashes)
all_hashes[filename] = hashes
logger.debug('all_hashes: %s', all_hashes)
return all_hashes
def dsc_information(package: Mapping[str, Any]) -> Tuple[str, str]:
"""Retrieve dsc information from a package.
Args:
package: Package metadata information
Returns:
Tuple of dsc file's uri, dsc's full disk path
"""
dsc_name = None
dsc_url = None
for filename, fileinfo in package['files'].items():
if filename.endswith('.dsc'):
if dsc_name:
raise ValueError(
'Package %s_%s references several dsc files' %
(package['name'], package['version'])
)
dsc_url = fileinfo['uri']
dsc_name = filename
return dsc_url, dsc_name
def extract_package(dl_artifacts: List[Tuple[str, Dict]], dest: str) -> str:
"""Extract a Debian source package to a given directory.
Note that after extraction the target directory will be the root of the
extracted package, rather than containing it.
Args:
package: package information dictionary
dest: directory where the package files are stored
Returns:
Package extraction directory
"""
a_path = dl_artifacts[0][0]
logger.debug('dl_artifacts: %s', dl_artifacts)
for _, hashes in dl_artifacts:
logger.debug('hashes: %s', hashes)
filename = hashes['filename']
if filename.endswith('.dsc'):
dsc_name = filename
break
dsc_path = path.join(a_path, dsc_name)
destdir = path.join(dest, 'extracted')
logfile = path.join(dest, 'extract.log')
logger.debug('extract Debian source package %s in %s' %
(dsc_path, destdir), extra={
'swh_type': 'deb_extract',
'swh_dsc': dsc_path,
'swh_destdir': destdir,
})
cmd = ['dpkg-source',
'--no-copy', '--no-check',
'--ignore-bad-version',
'-x', dsc_path,
destdir]
try:
with open(logfile, 'w') as stdout:
subprocess.check_call(cmd, stdout=stdout, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
logdata = open(logfile, 'r').read()
raise ValueError('dpkg-source exited with code %s: %s' %
(e.returncode, logdata)) from None
return destdir
def get_package_metadata(package: Mapping[str, Any], dsc_path: str,
extracted_path: str) -> Mapping[str, Any]:
"""Get the package metadata from the source package at dsc_path,
extracted in extracted_path.
Args:
package: the package dict (with a dsc_path key)
dsc_path: path to the package's dsc file
extracted_path: the path where the package got extracted
Returns:
dict: a dictionary with the following keys:
- history: list of (package_name, package_version) tuples parsed from
the package changelog
"""
with open(dsc_path, 'rb') as dsc:
parsed_dsc = Dsc(dsc)
# Parse the changelog to retrieve the rest of the package information
changelog_path = path.join(extracted_path, 'debian/changelog')
with open(changelog_path, 'rb') as changelog:
try:
parsed_changelog = Changelog(changelog)
except UnicodeDecodeError:
logger.warning('Unknown encoding for changelog %s,'
' falling back to iso' %
changelog_path.decode('utf-8'), extra={
'swh_type': 'deb_changelog_encoding',
'swh_name': package['name'],
'swh_version': str(package['version']),
'swh_changelog': changelog_path.decode('utf-8'),
})
# need to reset as Changelog scrolls to the end of the file
changelog.seek(0)
parsed_changelog = Changelog(changelog, encoding='iso-8859-15')
package_info = {
'name': package['name'],
'version': str(package['version']),
'changelog': {
'person': uid_to_person(parsed_changelog.author),
'date': parse_date(parsed_changelog.date).isoformat(),
'history': [(block.package, str(block.version))
for block in parsed_changelog][1:],
}
}
maintainers = [
uid_to_person(parsed_dsc['Maintainer']),
]
maintainers.extend(
uid_to_person(person)
for person in UPLOADERS_SPLIT.split(parsed_dsc.get('Uploaders', ''))
)
package_info['maintainers'] = maintainers
return package_info
diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py
index 4d7c78c..19c6474 100644
--- a/swh/loader/package/loader.py
+++ b/swh/loader/package/loader.py
@@ -1,389 +1,389 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import logging
import tempfile
import os
from typing import (
Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple
)
from swh.core.tarball import uncompress
from swh.core.config import SWHConfig
from swh.model.from_disk import Directory
from swh.model.identifiers import (
revision_identifier, snapshot_identifier, identifier_to_bytes
)
from swh.storage import get_storage
from swh.storage.algos.snapshot import snapshot_get_all_branches
from swh.loader.core.converters import content_for_storage
from swh.loader.package.utils import download
logger = logging.getLogger(__name__)
# Not implemented yet:
# - clean up disk routines from previous killed workers (when OOMkilled)
# -> separation of concern would like this to be abstracted from the code
# -> experience tells us it's complicated to do as such (T903, T964, T982,
# etc...)
#
# - model: swh.model.merkle.from_disk should output swh.model.model.* objects
# to avoid this layer's conversion routine call
# -> Take this up within swh.model's current implementation
class PackageLoader:
# Origin visit type (str) set by the loader
visit_type = ''
def __init__(self, url):
"""Loader's constructor. This raises exception if the minimal required
configuration is missing (cf. fn:`check` method).
Args:
url (str): Origin url to load data from
"""
# This expects to use the environment variable SWH_CONFIG_FILENAME
self.config = SWHConfig.parse_config_file()
self._check_configuration()
self.storage = get_storage(**self.config['storage'])
self.url = url
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
def _check_configuration(self):
"""Checks the minimal configuration required is set for the loader.
If some required configuration is missing, exception detailing the
issue is raised.
"""
if 'storage' not in self.config:
raise ValueError(
'Misconfiguration, at least the storage key should be set')
def get_versions(self) -> Sequence[str]:
"""Return the list of all published package versions.
Returns:
Sequence of published versions
"""
return []
def get_package_info(self, version: str) -> Generator[
Tuple[str, Mapping[str, Any]], None, None]:
"""Given a release version of a package, retrieve the associated
package information for such version.
Args:
version: Package version
Returns:
(branch name, package metadata)
"""
yield from {}
def build_revision(
self, a_metadata: Dict, i_metadata: Dict) -> Dict:
"""Build the revision dict from the archive metadata (extrinsic
artifact metadata) and the intrinsic metadata.
Returns:
SWH data dict
"""
return {}
def get_default_version(self) -> str:
"""Retrieve the latest release version
Returns:
Latest version
"""
return ''
def last_snapshot(self) -> Optional[Dict]:
"""Retrieve the last snapshot
"""
visit = self.storage.origin_visit_get_latest(
self.url, require_snapshot=True)
if visit:
return snapshot_get_all_branches(
self.storage, visit['snapshot']['id'])
def known_artifacts(self, snapshot: Dict) -> [Dict]:
"""Retrieve the known releases/artifact for the origin.
Args
snapshot: snapshot for the visit
Returns:
Dict of keys revision id (bytes), values a metadata Dict.
"""
if not snapshot or 'branches' not in snapshot:
return {}
# retrieve only revisions (e.g the alias we do not want here)
revs = [rev['target']
for rev in snapshot['branches'].values()
if rev and rev['target_type'] == 'revision']
known_revisions = self.storage.revision_get(revs)
ret = {}
for revision in known_revisions:
if not revision: # revision_get can return None
continue
ret[revision['id']] = revision['metadata']
return ret
def resolve_revision_from(
self, known_artifacts: Dict, artifact_metadata: Dict) \
-> Optional[bytes]:
"""Resolve the revision from a snapshot and an artifact metadata dict.
If the artifact has already been downloaded, this will return the
existing revision targeting that uncompressed artifact directory.
Otherwise, this returns None.
Args:
snapshot: Snapshot
artifact_metadata: Information dict
Returns:
None or revision identifier
"""
return None
def download_package(self, p_info: Mapping[str, Any],
tmpdir: str) -> [Tuple[str, Dict]]:
"""Download artifacts for a specific package. All downloads happen in
in the tmpdir folder.
Default implementation expects the artifacts package info to be
about one artifact per package.
Note that most implementation have 1 artifact per package. But some
implementation have multiple artifacts per package (debian), some have
none, the package is the artifact (gnu).
Args:
artifacts_package_info: Information on the package artifacts to
download (url, filename, etc...)
tmpdir: Location to retrieve such artifacts
Returns:
List of (path, computed hashes)
"""
a_uri = p_info['url']
filename = p_info.get('filename')
return [download(a_uri, dest=tmpdir, filename=filename)]
def uncompress(self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]],
dest: str) -> str:
"""Uncompress the artifact(s) in the destination folder dest.
Optionally, this could need to use the p_info dict for some more
information (debian).
"""
uncompressed_path = os.path.join(dest, 'src')
for a_path, _ in dl_artifacts:
uncompress(a_path, dest=uncompressed_path)
return uncompressed_path
def load(self) -> Dict:
"""Load for a specific origin the associated contents.
for each package version of the origin
1. Fetch the files for one package version By default, this can be
implemented as a simple HTTP request. Loaders with more specific
requirements can override this, e.g.: the PyPI loader checks the
integrity of the downloaded files; the Debian loader has to download
and check several files for one package version.
2. Extract the downloaded files By default, this would be a universal
archive/tarball extraction.
Loaders for specific formats can override this method (for instance,
the Debian loader uses dpkg-source -x).
3. Convert the extracted directory to a set of Software Heritage
objects Using swh.model.from_disk.
4. Extract the metadata from the unpacked directories This would only
be applicable for "smart" loaders like npm (parsing the
package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing
debian/changelog and debian/control).
On "minimal-metadata" sources such as the GNU archive, the lister
should provide the minimal set of metadata needed to populate the
revision/release objects (authors, dates) as an argument to the
task.
5. Generate the revision/release objects for the given version. From
the data generated at steps 3 and 4.
end for each
6. Generate and load the snapshot for the visit
Using the revisions/releases collected at step 5., and the branch
information from step 0., generate a snapshot and load it into the
Software Heritage archive
"""
status_load = 'uneventful' # either: eventful, uneventful, failed
status_visit = 'full' # either: partial, full
tmp_revisions = {} # type: Dict[str, List]
snapshot = None
try:
# Prepare origin and origin_visit
origin = {'url': self.url}
self.storage.origin_add([origin])
visit_id = self.storage.origin_visit_add(
origin=self.url,
date=self.visit_date,
type=self.visit_type)['visit']
last_snapshot = self.last_snapshot()
logger.debug('last snapshot: %s', last_snapshot)
known_artifacts = self.known_artifacts(last_snapshot)
logger.debug('known artifacts: %s', known_artifacts)
# Retrieve the default release version (the "latest" one)
default_version = self.get_default_version()
logger.debug('default version: %s', default_version)
for version in self.get_versions(): # for each
logger.debug('version: %s', version)
tmp_revisions[version] = []
# `p_` stands for `package_`
for branch_name, p_info in self.get_package_info(version):
logger.debug('package_info: %s', p_info)
revision_id = self.resolve_revision_from(
known_artifacts, p_info['raw'])
if revision_id is None:
with tempfile.TemporaryDirectory() as tmpdir:
try:
dl_artifacts = self.download_package(
p_info, tmpdir)
except Exception:
logger.exception('Unable to retrieve %s',
p_info)
status_visit = 'partial'
continue
uncompressed_path = self.uncompress(
dl_artifacts, dest=tmpdir)
logger.debug('uncompressed_path: %s',
uncompressed_path)
directory = Directory.from_disk(
path=uncompressed_path.encode('utf-8'),
data=True) # noqa
# FIXME: Try not to load the full raw content in
# memory
objects = directory.collect()
contents = objects['content'].values()
logger.debug('Number of contents: %s',
len(contents))
self.storage.content_add(
map(content_for_storage, contents))
status_load = 'eventful'
directories = objects['directory'].values()
logger.debug('Number of directories: %s',
len(directories))
self.storage.directory_add(directories)
# FIXME: This should be release. cf. D409
revision = self.build_revision(
p_info['raw'], uncompressed_path)
revision.update({
'synthetic': True,
'directory': directory.hash,
})
revision['metadata'].update({
'original_artifact': [
hashes for _, hashes in dl_artifacts
],
})
revision['id'] = revision_id = identifier_to_bytes(
revision_identifier(revision))
logger.debug('Revision: %s', revision)
self.storage.revision_add([revision])
tmp_revisions[version].append((branch_name, revision_id))
logger.debug('tmp_revisions: %s', tmp_revisions)
# Build and load the snapshot
branches = {}
for version, branch_name_revisions in tmp_revisions.items():
if version == default_version and \
len(branch_name_revisions) == 1:
# only 1 branch (no ambiguity), we can create an alias
# branch 'HEAD'
- branch_name, target = branch_name_revisions[0]
+ branch_name, _ = branch_name_revisions[0]
# except for some corner case (deposit)
if branch_name != 'HEAD':
branches[b'HEAD'] = {
'target_type': 'alias',
'target': branch_name.encode('utf-8'),
}
for branch_name, target in branch_name_revisions:
branch_name = branch_name.encode('utf-8')
branches[branch_name] = {
'target_type': 'revision',
'target': target,
}
snapshot = {
'branches': branches
}
logger.debug('snapshot: %s', snapshot)
snapshot['id'] = identifier_to_bytes(
snapshot_identifier(snapshot))
logger.debug('snapshot: %s', snapshot)
self.storage.snapshot_add([snapshot])
if hasattr(self.storage, 'flush'):
self.storage.flush()
except Exception:
logger.exception('Fail to load %s' % self.url)
status_visit = 'partial'
status_load = 'failed'
finally:
self.storage.origin_visit_update(
origin=self.url, visit_id=visit_id, status=status_visit,
snapshot=snapshot)
result = {
'status': status_load,
}
if snapshot:
result['snapshot_id'] = snapshot['id']
return result
diff --git a/swh/loader/package/tests/common.py b/swh/loader/package/tests/common.py
index 1c2a9c1..b4430cc 100644
--- a/swh/loader/package/tests/common.py
+++ b/swh/loader/package/tests/common.py
@@ -1,106 +1,113 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from os import path
import logging
from typing import Dict, List, Tuple
from swh.model.hashutil import hash_to_bytes, hash_to_hex
logger = logging.getLogger(__file__)
DATADIR = path.join(path.abspath(path.dirname(__file__)), 'resources')
def decode_target(target):
"""Test helper to ease readability in test
"""
if not target:
return target
target_type = target['target_type']
if target_type == 'alias':
decoded_target = target['target'].decode('utf-8')
else:
decoded_target = hash_to_hex(target['target'])
return {
'target': decoded_target,
'target_type': target_type
}
def check_snapshot(expected_snapshot, storage):
"""Check for snapshot match.
Provide the hashes as hexadecimal, the conversion is done
within the method.
Args:
expected_snapshot (dict): full snapshot with hex ids
storage (Storage): expected storage
"""
expected_snapshot_id = expected_snapshot['id']
expected_branches = expected_snapshot['branches']
snap = storage.snapshot_get(hash_to_bytes(expected_snapshot_id))
if snap is None:
# display known snapshots instead
from pprint import pprint
- for snap_id, (_snap, branches) in storage._snapshots.items():
- pprint(_snap.to_dict())
+ for snap_id, (_snap, _) in storage._snapshots.items():
+ snapd = _snap.to_dict()
+ snapd['id'] = hash_to_hex(snapd['id'])
+ branches = {
+ branch.decode('utf-8'): decode_target(target)
+ for branch, target in snapd['branches'].items()
+ }
+ snapd['branches'] = branches
+ pprint(snapd)
raise AssertionError('Snapshot is not found')
branches = {
branch.decode('utf-8'): decode_target(target)
for branch, target in snap['branches'].items()
}
assert expected_branches == branches
def check_metadata(metadata: Dict, key_path: str, raw_type: str):
"""Given a metadata dict, ensure the associated key_path value is of type
raw_type.
Args:
metadata: Dict to check
key_path: Path to check
raw_type: Type to check the path with
Raises:
Assertion error in case of mismatch
"""
data = metadata
keys = key_path.split('.')
for k in keys:
try:
data = data[k]
except (TypeError, KeyError) as e:
# KeyError: because path too long
# TypeError: data is not a dict
raise AssertionError(e)
assert isinstance(data, raw_type)
def check_metadata_paths(metadata: Dict, paths: List[Tuple[str, str]]):
"""Given a metadata dict, ensure the keys are of expected types
Args:
metadata: Dict to check
key_path: Path to check
raw_type: Type to check the path with
Raises:
Assertion error in case of mismatch
"""
for key_path, raw_type in paths:
check_metadata(metadata, key_path, raw_type)
diff --git a/swh/loader/package/tests/test_debian.py b/swh/loader/package/tests/test_debian.py
index b453c28..5f0a284 100644
--- a/swh/loader/package/tests/test_debian.py
+++ b/swh/loader/package/tests/test_debian.py
@@ -1,318 +1,308 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import logging
import pytest
from os import path
from swh.loader.package.debian import (
DebianLoader, download_package, dsc_information, uid_to_person,
prepare_person, get_package_metadata, extract_package
)
from swh.loader.package.tests.common import check_snapshot
logger = logging.getLogger(__name__)
PACKAGE_FILES = {
+ 'name': 'cicero',
+ 'version': '0.7.2-3',
'files': {
'cicero_0.7.2-3.diff.gz': {
'md5sum': 'a93661b6a48db48d59ba7d26796fc9ce',
'name': 'cicero_0.7.2-3.diff.gz',
'sha256': 'f039c9642fe15c75bed5254315e2a29f9f2700da0e29d9b0729b3ffc46c8971c', # noqa
'size': 3964,
'uri': 'http://deb.debian.org/debian//pool/contrib/c/cicero/cicero_0.7.2-3.diff.gz' # noqa
},
'cicero_0.7.2-3.dsc': {
'md5sum': 'd5dac83eb9cfc9bb52a15eb618b4670a',
'name': 'cicero_0.7.2-3.dsc',
'sha256': '35b7f1048010c67adfd8d70e4961aefd8800eb9a83a4d1cc68088da0009d9a03', # noqa
'size': 1864,
'uri': 'http://deb.debian.org/debian//pool/contrib/c/cicero/cicero_0.7.2-3.dsc'}, # noqa
'cicero_0.7.2.orig.tar.gz': {
'md5sum': '4353dede07c5728319ba7f5595a7230a',
'name': 'cicero_0.7.2.orig.tar.gz',
'sha256': '63f40f2436ea9f67b44e2d4bd669dbabe90e2635a204526c20e0b3c8ee957786', # noqa
'size': 96527,
'uri': 'http://deb.debian.org/debian//pool/contrib/c/cicero/cicero_0.7.2.orig.tar.gz' # noqa
}
},
- 'id': 23,
- 'name': 'cicero',
- 'revision_id': None,
- 'version': '0.7.2-3'
}
PACKAGE_PER_VERSION = {
'stretch/contrib/0.7.2-3': PACKAGE_FILES
}
def test_debian_first_visit(
swh_config, requests_mock_datadir):
"""With no prior visit, load a gnu project ends up with 1 snapshot
"""
loader = DebianLoader(
url='deb://Debian/packages/cicero',
date='2019-10-12T05:58:09.165557+00:00',
packages=PACKAGE_PER_VERSION)
actual_load_status = loader.load()
assert actual_load_status['status'] == 'eventful'
stats = loader.storage.stat_counters()
assert {
'content': 42,
'directory': 2,
'origin': 1,
'origin_visit': 1,
'person': 1,
'release': 0,
'revision': 1, # all artifacts under 1 revision
'skipped_content': 0,
'snapshot': 1
} == stats
expected_snapshot = {
- 'id': 'a59ec49a01ff329dcbbc63fd36a5654143aef240',
+ 'id': '3b6b66e6ee4e7d903a379a882684a2a50480c0b4',
'branches': {
- 'HEAD': {
- 'target_type': 'alias',
- 'target': 'releases/stretch/contrib/0.7.2-3'
- },
'releases/stretch/contrib/0.7.2-3': {
'target_type': 'revision',
'target': '2807f5b3f84368b4889a9ae827fe85854ffecf07',
}
},
} # different than the previous loader as no release is done
check_snapshot(expected_snapshot, loader.storage)
def test_debian_first_visit_then_another_visit(
swh_config, requests_mock_datadir):
"""With no prior visit, load a gnu project ends up with 1 snapshot
"""
url = 'deb://Debian/packages/cicero'
loader = DebianLoader(
url=url,
date='2019-10-12T05:58:09.165557+00:00',
packages=PACKAGE_PER_VERSION)
actual_load_status = loader.load()
assert actual_load_status['status'] == 'eventful'
origin_visit = next(loader.storage.origin_visit_get(url))
assert origin_visit['status'] == 'full'
stats = loader.storage.stat_counters()
assert {
'content': 42,
'directory': 2,
'origin': 1,
'origin_visit': 1,
'person': 1,
'release': 0,
'revision': 1, # all artifacts under 1 revision
'skipped_content': 0,
'snapshot': 1
} == stats
expected_snapshot = {
- 'id': 'a59ec49a01ff329dcbbc63fd36a5654143aef240',
+ 'id': '3b6b66e6ee4e7d903a379a882684a2a50480c0b4',
'branches': {
- 'HEAD': {
- 'target_type': 'alias',
- 'target': 'releases/stretch/contrib/0.7.2-3'
- },
'releases/stretch/contrib/0.7.2-3': {
'target_type': 'revision',
'target': '2807f5b3f84368b4889a9ae827fe85854ffecf07',
}
},
} # different than the previous loader as no release is done
check_snapshot(expected_snapshot, loader.storage)
# No change in between load
actual_load_status2 = loader.load()
assert actual_load_status2['status'] == 'uneventful'
origin_visit2 = list(loader.storage.origin_visit_get(url))
assert origin_visit2[-1]['status'] == 'full'
stats2 = loader.storage.stat_counters()
assert {
'content': 42 + 0,
'directory': 2 + 0,
'origin': 1,
'origin_visit': 1 + 1, # a new visit occurred
'person': 1,
'release': 0,
'revision': 1,
'skipped_content': 0,
'snapshot': 1, # same snapshot across 2 visits
} == stats2
urls = [
m.url for m in requests_mock_datadir.request_history
if m.url.startswith('http://deb.debian.org')
]
# visited each package artifact twice across 2 visits
assert len(urls) == len(set(urls))
def test_uid_to_person():
uid = 'Someone Name <someone@orga.org>'
actual_person = uid_to_person(uid)
assert actual_person == {
'name': 'Someone Name',
'email': 'someone@orga.org',
'fullname': uid,
}
def test_prepare_person():
actual_author = prepare_person({
'name': 'Someone Name',
'email': 'someone@orga.org',
'fullname': 'Someone Name <someone@orga.org>',
})
assert actual_author == {
'name': b'Someone Name',
'email': b'someone@orga.org',
'fullname': b'Someone Name <someone@orga.org>',
}
def test_download_package(datadir, tmpdir, requests_mock_datadir):
tmpdir = str(tmpdir) # py3.5 work around (LocalPath issue)
all_hashes = download_package(PACKAGE_FILES, tmpdir)
assert all_hashes == {
'cicero_0.7.2-3.diff.gz': {
'checksums': {
'blake2s256': '08b1c438e70d2474bab843d826515147fa4a817f8c4baaf3ddfbeb5132183f21', # noqa
'sha1': '0815282053f21601b0ec4adf7a8fe47eace3c0bc',
'sha1_git': '834ac91da3a9da8f23f47004bb456dd5bd16fe49',
'sha256': 'f039c9642fe15c75bed5254315e2a29f9f2700da0e29d9b0729b3ffc46c8971c' # noqa
},
'filename': 'cicero_0.7.2-3.diff.gz',
'length': 3964},
'cicero_0.7.2-3.dsc': {
'checksums': {
'blake2s256': '8c002bead3e35818eaa9d00826f3d141345707c58fb073beaa8abecf4bde45d2', # noqa
'sha1': 'abbec4e8efbbc80278236e1dd136831eac08accd',
'sha1_git': '1f94b2086fa1142c2df6b94092f5c5fa11093a8e',
'sha256': '35b7f1048010c67adfd8d70e4961aefd8800eb9a83a4d1cc68088da0009d9a03' # noqa
},
'filename': 'cicero_0.7.2-3.dsc',
'length': 1864},
'cicero_0.7.2.orig.tar.gz': {
'checksums': {
'blake2s256': '9809aa8d2e2dad7f34cef72883db42b0456ab7c8f1418a636eebd30ab71a15a6', # noqa
'sha1': 'a286efd63fe2c9c9f7bb30255c3d6fcdcf390b43',
'sha1_git': 'aa0a38978dce86d531b5b0299b4a616b95c64c74',
'sha256': '63f40f2436ea9f67b44e2d4bd669dbabe90e2635a204526c20e0b3c8ee957786' # noqa
},
'filename': 'cicero_0.7.2.orig.tar.gz',
'length': 96527
}
}
def test_dsc_information_ok():
fname = 'cicero_0.7.2-3.dsc'
dsc_url, dsc_name = dsc_information(PACKAGE_FILES)
assert dsc_url == PACKAGE_FILES['files'][fname]['uri']
assert dsc_name == PACKAGE_FILES['files'][fname]['name']
def test_dsc_information_not_found():
fname = 'cicero_0.7.2-3.dsc'
package_files = copy.deepcopy(PACKAGE_FILES)
package_files['files'].pop(fname)
dsc_url, dsc_name = dsc_information(package_files)
assert dsc_url is None
assert dsc_name is None
def test_dsc_information_too_many_dsc_entries():
# craft an extra dsc file
fname = 'cicero_0.7.2-3.dsc'
package_files = copy.deepcopy(PACKAGE_FILES)
data = package_files['files'][fname]
fname2 = fname.replace('cicero', 'ciceroo')
package_files['files'][fname2] = data
with pytest.raises(
ValueError, match='Package %s_%s references several dsc' % (
package_files['name'], package_files['version'])):
dsc_information(package_files)
def test_get_package_metadata(requests_mock_datadir, datadir, tmp_path):
tmp_path = str(tmp_path) # py3.5 compat.
package = PACKAGE_FILES
logger.debug('package: %s', package)
# download the packages
all_hashes = download_package(package, tmp_path)
# Retrieve information from package
_, dsc_name = dsc_information(package)
dl_artifacts = [(tmp_path, hashes) for hashes in all_hashes.values()]
# Extract information from package
extracted_path = extract_package(dl_artifacts, tmp_path)
# Retrieve information on package
dsc_path = path.join(path.dirname(extracted_path), dsc_name)
actual_package_info = get_package_metadata(
package, dsc_path, extracted_path)
logger.debug('actual_package_info: %s', actual_package_info)
assert actual_package_info == {
'changelog': {
'date': '2014-10-19T16:52:35+02:00',
'history': [
('cicero', '0.7.2-2'),
('cicero', '0.7.2-1'),
('cicero', '0.7-1')
],
'person': {
'email': 'sthibault@debian.org',
'fullname': 'Samuel Thibault <sthibault@debian.org>',
'name': 'Samuel Thibault'
}
},
'maintainers': [
{
'email': 'debian-accessibility@lists.debian.org',
'fullname': 'Debian Accessibility Team '
'<debian-accessibility@lists.debian.org>',
'name': 'Debian Accessibility Team'
},
{
'email': 'sthibault@debian.org',
'fullname': 'Samuel Thibault <sthibault@debian.org>',
'name': 'Samuel Thibault'
}
],
'name': 'cicero',
'version': '0.7.2-3'
}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Aug 25, 5:58 PM (4 d, 4 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3247027

Event Timeline