diff --git a/swh/loader/package/debian.py b/swh/loader/package/debian.py index 09cff1b..6a4b8d6 100644 --- a/swh/loader/package/debian.py +++ b/swh/loader/package/debian.py @@ -1,319 +1,320 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import email.utils +import iso8601 import logging import re import subprocess from dateutil.parser import parse as parse_date from debian.changelog import Changelog from debian.deb822 import Dsc from os import path from typing import Any, Dict, Generator, Mapping, Optional, Sequence, Tuple from swh.loader.package.loader import PackageLoader from swh.loader.package.utils import download logger = logging.getLogger(__name__) UPLOADERS_SPLIT = re.compile(r'(?<=\>)\s*,\s*') class DebianLoader(PackageLoader): """Load debian origins into swh archive. """ visit_type = 'debian' def __init__(self, url: str, date: str, packages: Mapping[str, Any]): super().__init__(url=url) self.packages = packages def get_versions(self) -> Sequence[str]: """Returns the keys of the packages input (e.g. stretch/contrib/0.7.2-3, etc...) """ return self.packages.keys() def get_default_release(self) -> str: """Take the first version as default release """ return list(self.packages.keys())[0] def get_artifacts(self, version: str) -> Generator[ Tuple[Mapping[str, Any], Dict], None, None]: a_metadata = self.packages[version] artifacts_package_info = a_metadata.copy() artifacts_package_info['filename'] = version yield artifacts_package_info, a_metadata def resolve_revision_from( self, known_artifacts: Dict, artifact_metadata: Dict) \ -> Optional[bytes]: logger.debug('known_artifacts: %s' % known_artifacts) logger.debug('artifact_metadata: %s' % artifact_metadata) pass # for now def download_package(self, a_p_info: str, tmpdir: str) -> Tuple[str, Dict]: """Contrary to other package loaders (1 package, 1 artifact), `a_metadata` represents the package's datafiles set to fetch: - .orig.tar.gz - .dsc - .diff.gz This is delegated to the `download_package` function. """ logger.debug('debian: artifactS_package_info: %s', a_p_info) return tmpdir, download_package(a_p_info, tmpdir) def uncompress(self, a_path: str, tmpdir: str, a_metadata: Dict) -> str: return extract_package(a_metadata, tmpdir) def read_intrinsic_metadata(self, a_metadata: Dict, a_uncompressed_path: str) -> Dict: _, dsc_name = dsc_information(a_metadata) dsc_path = path.join(path.dirname(a_uncompressed_path), dsc_name) return get_package_metadata( a_metadata, dsc_path, a_uncompressed_path) def build_revision( self, a_metadata: Dict, i_metadata: Dict) -> Dict: dsc_url, _ = dsc_information(a_metadata) logger.debug('i_metadata: %s', i_metadata) logger.debug('a_metadata: %s', a_metadata) msg = 'Synthetic revision for Debian source package %s version %s' % ( a_metadata['name'], a_metadata['version']) - date = i_metadata['changelog']['date'] + date = iso8601.parse_date(i_metadata['changelog']['date']) author = prepare_person(i_metadata['changelog']['person']) # inspired from swh.loader.debian.converters.package_metadata_to_revision # noqa return { 'type': 'dsc', 'message': msg.encode('utf-8'), 'author': author, 'date': date, 'committer': author, 'committer_date': date, 'parents': [], 'metadata': { 'intrinsic': { 'tool': 'dsc', 'raw': i_metadata, }, 'extrinsic': { 'provider': dsc_url, 'when': self.visit_date.isoformat(), 'raw': a_metadata, }, } } def uid_to_person(uid: str) -> Mapping[str, str]: """Convert an uid to a person suitable for insertion. Args: uid: an uid of the form "Name " Returns: a dictionary with the following keys: - name: the name associated to the uid - email: the mail associated to the uid - fullname: the actual uid input """ logger.debug('uid: %s', uid) ret = { 'name': '', 'email': '', 'fullname': uid, } name, mail = email.utils.parseaddr(uid) if name and email: ret['name'] = name ret['email'] = mail else: ret['name'] = uid return ret def prepare_person(person: Mapping[str, str]) -> Mapping[str, bytes]: """Prepare person for swh serialization... Args: A person dict Returns: A person dict ready for storage """ ret = {} for key, value in person.items(): ret[key] = value.encode('utf-8') return ret def download_package( package: Mapping[str, Any], tmpdir: Any) -> Mapping[str, Any]: """Fetch a source package in a temporary directory and check the checksums for all files. Args: package: Dict defining the set of files representing a debian package tmpdir: Where to download and extract the files to ingest Returns: Dict of swh hashes per filename key """ all_hashes = {} for filename, fileinfo in package['files'].items(): uri = fileinfo['uri'] logger.debug('fileinfo: %s', fileinfo) extrinsic_hashes = {'sha256': fileinfo['sha256']} logger.debug('extrinsic_hashes(%s): %s', filename, extrinsic_hashes) filepath, hashes = download(uri, dest=tmpdir, filename=filename, hashes=extrinsic_hashes) all_hashes[filename] = hashes logger.debug('all_hashes: %s', all_hashes) return all_hashes def dsc_information(package: Mapping[str, Any]) -> Tuple[str, str]: """Retrieve dsc information from a package. Args: package: Package metadata information Returns: Tuple of dsc file's uri, dsc's full disk path """ dsc_name = None dsc_url = None for filename, fileinfo in package['files'].items(): if filename.endswith('.dsc'): if dsc_name: raise ValueError( 'Package %s_%s references several dsc files' % (package['name'], package['version']) ) dsc_url = fileinfo['uri'] dsc_name = filename return dsc_url, dsc_name def extract_package(package: Mapping[str, Any], tmpdir: str) -> str: """Extract a Debian source package to a given directory. Note that after extraction the target directory will be the root of the extracted package, rather than containing it. Args: package (dict): package information dictionary tmpdir (str): directory where the package files are stored Returns: Package extraction directory """ _, dsc_name = dsc_information(package) dsc_path = path.join(tmpdir, dsc_name) destdir = path.join(tmpdir, 'extracted') logfile = path.join(tmpdir, 'extract.log') logger.debug('extract Debian source package %s in %s' % (dsc_path, destdir), extra={ 'swh_type': 'deb_extract', 'swh_dsc': dsc_path, 'swh_destdir': destdir, }) cmd = ['dpkg-source', '--no-copy', '--no-check', '--ignore-bad-version', '-x', dsc_path, destdir] try: with open(logfile, 'w') as stdout: subprocess.check_call(cmd, stdout=stdout, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: logdata = open(logfile, 'r').read() raise ValueError('dpkg-source exited with code %s: %s' % (e.returncode, logdata)) from None return destdir def get_package_metadata(package: Mapping[str, Any], dsc_path: str, extracted_path: str) -> Mapping[str, Any]: """Get the package metadata from the source package at dsc_path, extracted in extracted_path. Args: package: the package dict (with a dsc_path key) dsc_path: path to the package's dsc file extracted_path: the path where the package got extracted Returns: dict: a dictionary with the following keys: - history: list of (package_name, package_version) tuples parsed from the package changelog """ with open(dsc_path, 'rb') as dsc: parsed_dsc = Dsc(dsc) # Parse the changelog to retrieve the rest of the package information changelog_path = path.join(extracted_path, 'debian/changelog') with open(changelog_path, 'rb') as changelog: try: parsed_changelog = Changelog(changelog) except UnicodeDecodeError: logger.warning('Unknown encoding for changelog %s,' ' falling back to iso' % changelog_path.decode('utf-8'), extra={ 'swh_type': 'deb_changelog_encoding', 'swh_name': package['name'], 'swh_version': str(package['version']), 'swh_changelog': changelog_path.decode('utf-8'), }) # need to reset as Changelog scrolls to the end of the file changelog.seek(0) parsed_changelog = Changelog(changelog, encoding='iso-8859-15') package_info = { 'name': package['name'], 'version': str(package['version']), 'changelog': { 'person': uid_to_person(parsed_changelog.author), - 'date': parse_date(parsed_changelog.date), + 'date': parse_date(parsed_changelog.date).isoformat(), 'history': [(block.package, str(block.version)) for block in parsed_changelog][1:], } } maintainers = [ uid_to_person(parsed_dsc['Maintainer']), ] maintainers.extend( uid_to_person(person) for person in UPLOADERS_SPLIT.split(parsed_dsc.get('Uploaders', '')) ) package_info['maintainers'] = maintainers return package_info diff --git a/swh/loader/package/tests/test_debian.py b/swh/loader/package/tests/test_debian.py index ed3f0e5..c298c1b 100644 --- a/swh/loader/package/tests/test_debian.py +++ b/swh/loader/package/tests/test_debian.py @@ -1,320 +1,316 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import logging import pytest from os import path from swh.loader.package.debian import ( DebianLoader, download_package, dsc_information, uid_to_person, prepare_person, get_package_metadata, extract_package ) from swh.loader.package.tests.common import check_snapshot logger = logging.getLogger(__name__) PACKAGE_FILES = { 'files': { 'cicero_0.7.2-3.diff.gz': { 'md5sum': 'a93661b6a48db48d59ba7d26796fc9ce', 'name': 'cicero_0.7.2-3.diff.gz', 'sha256': 'f039c9642fe15c75bed5254315e2a29f9f2700da0e29d9b0729b3ffc46c8971c', # noqa 'size': 3964, 'uri': 'http://deb.debian.org/debian//pool/contrib/c/cicero/cicero_0.7.2-3.diff.gz' # noqa }, 'cicero_0.7.2-3.dsc': { 'md5sum': 'd5dac83eb9cfc9bb52a15eb618b4670a', 'name': 'cicero_0.7.2-3.dsc', 'sha256': '35b7f1048010c67adfd8d70e4961aefd8800eb9a83a4d1cc68088da0009d9a03', # noqa 'size': 1864, 'uri': 'http://deb.debian.org/debian//pool/contrib/c/cicero/cicero_0.7.2-3.dsc'}, # noqa 'cicero_0.7.2.orig.tar.gz': { 'md5sum': '4353dede07c5728319ba7f5595a7230a', 'name': 'cicero_0.7.2.orig.tar.gz', 'sha256': '63f40f2436ea9f67b44e2d4bd669dbabe90e2635a204526c20e0b3c8ee957786', # noqa 'size': 96527, 'uri': 'http://deb.debian.org/debian//pool/contrib/c/cicero/cicero_0.7.2.orig.tar.gz' # noqa } }, 'id': 23, 'name': 'cicero', 'revision_id': None, 'version': '0.7.2-3' } PACKAGE_PER_VERSION = { 'stretch/contrib/0.7.2-3': PACKAGE_FILES } def test_debian_first_visit( swh_config, requests_mock_datadir): """With no prior visit, load a gnu project ends up with 1 snapshot """ loader = DebianLoader( url='deb://Debian/packages/cicero', date='2019-10-12T05:58:09.165557+00:00', packages=PACKAGE_PER_VERSION) actual_load_status = loader.load() assert actual_load_status['status'] == 'eventful' stats = loader.storage.stat_counters() assert { 'content': 42, 'directory': 2, 'origin': 1, 'origin_visit': 1, 'person': 1, 'release': 0, 'revision': 1, # all artifacts under 1 revision 'skipped_content': 0, 'snapshot': 1 } == stats expected_snapshot = { 'id': 'a59ec49a01ff329dcbbc63fd36a5654143aef240', 'branches': { 'HEAD': { 'target_type': 'alias', 'target': 'releases/stretch/contrib/0.7.2-3' }, 'releases/stretch/contrib/0.7.2-3': { 'target_type': 'revision', 'target': '2807f5b3f84368b4889a9ae827fe85854ffecf07', } }, } # different than the previous loader as no release is done check_snapshot(expected_snapshot, loader.storage) def test_debian_first_visit_then_another_visit( swh_config, requests_mock_datadir): """With no prior visit, load a gnu project ends up with 1 snapshot """ url = 'deb://Debian/packages/cicero' loader = DebianLoader( url=url, date='2019-10-12T05:58:09.165557+00:00', packages=PACKAGE_PER_VERSION) actual_load_status = loader.load() assert actual_load_status['status'] == 'eventful' origin_visit = next(loader.storage.origin_visit_get(url)) assert origin_visit['status'] == 'full' stats = loader.storage.stat_counters() assert { 'content': 42, 'directory': 2, 'origin': 1, 'origin_visit': 1, 'person': 1, 'release': 0, 'revision': 1, # all artifacts under 1 revision 'skipped_content': 0, 'snapshot': 1 } == stats expected_snapshot = { 'id': 'a59ec49a01ff329dcbbc63fd36a5654143aef240', 'branches': { 'HEAD': { 'target_type': 'alias', 'target': 'releases/stretch/contrib/0.7.2-3' }, 'releases/stretch/contrib/0.7.2-3': { 'target_type': 'revision', 'target': '2807f5b3f84368b4889a9ae827fe85854ffecf07', } }, } # different than the previous loader as no release is done check_snapshot(expected_snapshot, loader.storage) # No change in between load actual_load_status2 = loader.load() assert actual_load_status2['status'] == 'eventful' origin_visit2 = list(loader.storage.origin_visit_get(url)) assert origin_visit2[-1]['status'] == 'full' stats2 = loader.storage.stat_counters() assert { 'content': 42 + 0, 'directory': 2 + 0, 'origin': 1, 'origin_visit': 1 + 1, # a new visit occurred 'person': 1, 'release': 0, 'revision': 1, 'skipped_content': 0, 'snapshot': 1, # same snapshot across 2 visits } == stats2 urls = [ m.url for m in requests_mock_datadir.request_history if m.url.startswith('http://deb.debian.org') ] # visited each package artifact twice across 2 visits assert len(urls) == 2 * len(set(urls)) def test_uid_to_person(): uid = 'Someone Name ' actual_person = uid_to_person(uid) assert actual_person == { 'name': 'Someone Name', 'email': 'someone@orga.org', 'fullname': uid, } def test_prepare_person(): actual_author = prepare_person({ 'name': 'Someone Name', 'email': 'someone@orga.org', 'fullname': 'Someone Name ', }) assert actual_author == { 'name': b'Someone Name', 'email': b'someone@orga.org', 'fullname': b'Someone Name ', } def test_download_package(datadir, tmpdir, requests_mock_datadir): tmpdir = str(tmpdir) # py3.5 work around (LocalPath issue) all_hashes = download_package(PACKAGE_FILES, tmpdir) assert all_hashes == { 'cicero_0.7.2-3.diff.gz': { 'checksums': { 'blake2s256': '08b1c438e70d2474bab843d826515147fa4a817f8c4baaf3ddfbeb5132183f21', # noqa 'sha1': '0815282053f21601b0ec4adf7a8fe47eace3c0bc', 'sha1_git': '834ac91da3a9da8f23f47004bb456dd5bd16fe49', 'sha256': 'f039c9642fe15c75bed5254315e2a29f9f2700da0e29d9b0729b3ffc46c8971c' # noqa }, 'filename': 'cicero_0.7.2-3.diff.gz', 'length': 3964}, 'cicero_0.7.2-3.dsc': { 'checksums': { 'blake2s256': '8c002bead3e35818eaa9d00826f3d141345707c58fb073beaa8abecf4bde45d2', # noqa 'sha1': 'abbec4e8efbbc80278236e1dd136831eac08accd', 'sha1_git': '1f94b2086fa1142c2df6b94092f5c5fa11093a8e', 'sha256': '35b7f1048010c67adfd8d70e4961aefd8800eb9a83a4d1cc68088da0009d9a03' # noqa }, 'filename': 'cicero_0.7.2-3.dsc', 'length': 1864}, 'cicero_0.7.2.orig.tar.gz': { 'checksums': { 'blake2s256': '9809aa8d2e2dad7f34cef72883db42b0456ab7c8f1418a636eebd30ab71a15a6', # noqa 'sha1': 'a286efd63fe2c9c9f7bb30255c3d6fcdcf390b43', 'sha1_git': 'aa0a38978dce86d531b5b0299b4a616b95c64c74', 'sha256': '63f40f2436ea9f67b44e2d4bd669dbabe90e2635a204526c20e0b3c8ee957786' # noqa }, 'filename': 'cicero_0.7.2.orig.tar.gz', 'length': 96527 } } def test_dsc_information_ok(): fname = 'cicero_0.7.2-3.dsc' dsc_url, dsc_name = dsc_information(PACKAGE_FILES) assert dsc_url == PACKAGE_FILES['files'][fname]['uri'] assert dsc_name == PACKAGE_FILES['files'][fname]['name'] def test_dsc_information_not_found(): fname = 'cicero_0.7.2-3.dsc' package_files = copy.deepcopy(PACKAGE_FILES) package_files['files'].pop(fname) dsc_url, dsc_name = dsc_information(package_files) assert dsc_url is None assert dsc_name is None def test_dsc_information_too_many_dsc_entries(): # craft an extra dsc file fname = 'cicero_0.7.2-3.dsc' package_files = copy.deepcopy(PACKAGE_FILES) data = package_files['files'][fname] fname2 = fname.replace('cicero', 'ciceroo') package_files['files'][fname2] = data with pytest.raises( ValueError, match='Package %s_%s references several dsc' % ( package_files['name'], package_files['version'])): dsc_information(package_files) def test_get_package_metadata(requests_mock_datadir, datadir, tmp_path): tmp_path = str(tmp_path) # py3.5 compat. package = PACKAGE_FILES logger.debug('package: %s', package) # download the packages download_package(package, tmp_path) # Retrieve information from package _, dsc_name = dsc_information(package) # Extract information from package extracted_path = extract_package(package, tmp_path) # Retrieve information on package dsc_path = path.join(path.dirname(extracted_path), dsc_name) actual_package_info = get_package_metadata( package, dsc_path, extracted_path) logger.debug('actual_package_info: %s', actual_package_info) - import datetime - from dateutil.tz import tzoffset - assert actual_package_info == { 'changelog': { - 'date': datetime.datetime( - 2014, 10, 19, 16, 52, 35, tzinfo=tzoffset(None, 7200)), + 'date': '2014-10-19T16:52:35+02:00', 'history': [ ('cicero', '0.7.2-2'), ('cicero', '0.7.2-1'), ('cicero', '0.7-1') ], 'person': { 'email': 'sthibault@debian.org', 'fullname': 'Samuel Thibault ', 'name': 'Samuel Thibault' } }, 'maintainers': [ { 'email': 'debian-accessibility@lists.debian.org', 'fullname': 'Debian Accessibility Team ' '', 'name': 'Debian Accessibility Team' }, { 'email': 'sthibault@debian.org', 'fullname': 'Samuel Thibault ', 'name': 'Samuel Thibault' } ], 'name': 'cicero', 'version': '0.7.2-3' }