Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/debian/loader.py
# Copyright (C) 2017-2019 The Software Heritage developers | # Copyright (C) 2017-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import email.utils | import email.utils | ||||
import logging | import logging | ||||
from os import path | from os import path | ||||
import re | import re | ||||
import subprocess | import subprocess | ||||
from typing import Any, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple | from typing import ( | ||||
Any, | |||||
Dict, | |||||
FrozenSet, | |||||
Iterator, | |||||
List, | |||||
Mapping, | |||||
Optional, | |||||
Sequence, | |||||
Tuple, | |||||
) | |||||
import attr | import attr | ||||
from dateutil.parser import parse as parse_date | from dateutil.parser import parse as parse_date | ||||
from debian.changelog import Changelog | from debian.changelog import Changelog | ||||
from debian.deb822 import Dsc | from debian.deb822 import Dsc | ||||
from swh.loader.package.loader import BasePackageInfo, PackageLoader | from swh.loader.package.loader import BasePackageInfo, PackageLoader | ||||
from swh.loader.package.utils import download, release_name | from swh.loader.package.utils import download, release_name | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Sha1Git, | Sha1Git, | ||||
Person, | Person, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
UPLOADERS_SPLIT = re.compile(r"(?<=\>)\s*,\s*") | UPLOADERS_SPLIT = re.compile(r"(?<=\>)\s*,\s*") | ||||
@attr.s | |||||
class DebianFileMetadata: | |||||
md5sum = attr.ib(type=str) | |||||
name = attr.ib(type=str) | |||||
"""Filename""" | |||||
sha256 = attr.ib(type=str) | |||||
size = attr.ib(type=int) | |||||
uri = attr.ib(type=str) | |||||
"""URL of this specific file""" | |||||
@attr.s | |||||
class DebianPackageChangelog: | |||||
person = attr.ib(type=Dict[str, str]) | |||||
"""A dict with fields like, model.Person, except they are str instead | |||||
of bytes, and 'email' is optional.""" | |||||
date = attr.ib(type=str) | |||||
"""Date of the changelog entry.""" | |||||
history = attr.ib(type=List[Tuple[str, str]]) | |||||
"""List of tuples (package_name, version)""" | |||||
@attr.s | |||||
class DebianPackageInfo(BasePackageInfo): | class DebianPackageInfo(BasePackageInfo): | ||||
raw = attr.ib(type=Dict[str, Any]) | raw = attr.ib(type=Dict[str, Any]) | ||||
files = attr.ib(type=Dict[str, DebianFileMetadata]) | |||||
"""Metadata of the files (.deb, .dsc, ...) of the package.""" | |||||
name = attr.ib(type=str) | |||||
version = attr.ib(type=str) | |||||
@classmethod | |||||
def from_metadata(cls, a_metadata: Dict[str, Any], url: str) -> "DebianPackageInfo": | |||||
return cls( | |||||
url=url, | |||||
filename=None, | |||||
raw=a_metadata, | |||||
files={ | |||||
file_name: DebianFileMetadata(**file_metadata) | |||||
for (file_name, file_metadata) in a_metadata.get("files", {}).items() | |||||
}, | |||||
name=a_metadata["name"], | |||||
version=a_metadata["version"], | |||||
) | |||||
@attr.s | |||||
class IntrinsicPackageMetadata: | |||||
"""Metadata extracted from a package's .dsc file.""" | |||||
name = attr.ib(type=str) | |||||
version = attr.ib(type=str) | |||||
changelog = attr.ib(type=DebianPackageChangelog) | |||||
maintainers = attr.ib(type=List[Dict[str, str]]) | |||||
"""A list of dicts with fields like, model.Person, except they are str instead | |||||
of bytes, and 'email' is optional.""" | |||||
class DebianLoader(PackageLoader[DebianPackageInfo]): | class DebianLoader(PackageLoader[DebianPackageInfo]): | ||||
"""Load debian origins into swh archive. | """Load debian origins into swh archive. | ||||
""" | """ | ||||
visit_type = "deb" | visit_type = "deb" | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | def get_versions(self) -> Sequence[str]: | ||||
"""Returns the keys of the packages input (e.g. | """Returns the keys of the packages input (e.g. | ||||
stretch/contrib/0.7.2-3, etc...) | stretch/contrib/0.7.2-3, etc...) | ||||
""" | """ | ||||
return list(self.packages.keys()) | return list(self.packages.keys()) | ||||
def get_package_info(self, version: str) -> Iterator[Tuple[str, DebianPackageInfo]]: | def get_package_info(self, version: str) -> Iterator[Tuple[str, DebianPackageInfo]]: | ||||
meta = self.packages[version] | meta = self.packages[version] | ||||
p_info = DebianPackageInfo(url=self.url, filename=None, raw=meta,) | p_info = DebianPackageInfo.from_metadata(meta, url=self.url) | ||||
yield release_name(version), p_info | yield release_name(version), p_info | ||||
def resolve_revision_from( | def resolve_revision_from( | ||||
self, known_package_artifacts: Mapping, artifact_metadata: Mapping | self, known_package_artifacts: Mapping, p_info: DebianPackageInfo | ||||
) -> Optional[bytes]: | ) -> Optional[bytes]: | ||||
return resolve_revision_from(known_package_artifacts, artifact_metadata) | return resolve_revision_from(known_package_artifacts, p_info) | ||||
def download_package( | def download_package( | ||||
self, p_info: DebianPackageInfo, tmpdir: str | self, p_info: DebianPackageInfo, tmpdir: str | ||||
) -> List[Tuple[str, Mapping]]: | ) -> List[Tuple[str, Mapping]]: | ||||
"""Contrary to other package loaders (1 package, 1 artifact), | """Contrary to other package loaders (1 package, 1 artifact), | ||||
`p_info.files` represents the package's datafiles set to fetch: | `p_info.files` represents the package's datafiles set to fetch: | ||||
- <package-version>.orig.tar.gz | - <package-version>.orig.tar.gz | ||||
- <package-version>.dsc | - <package-version>.dsc | ||||
- <package-version>.diff.gz | - <package-version>.diff.gz | ||||
This is delegated to the `download_package` function. | This is delegated to the `download_package` function. | ||||
""" | """ | ||||
all_hashes = download_package(p_info.raw, tmpdir) | all_hashes = download_package(p_info, tmpdir) | ||||
logger.debug("all_hashes: %s", all_hashes) | logger.debug("all_hashes: %s", all_hashes) | ||||
res = [] | res = [] | ||||
for hashes in all_hashes.values(): | for hashes in all_hashes.values(): | ||||
res.append((tmpdir, hashes)) | res.append((tmpdir, hashes)) | ||||
logger.debug("res: %s", res) | logger.debug("res: %s", res) | ||||
return res | return res | ||||
def uncompress( | def uncompress( | ||||
self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str | self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str | ||||
) -> str: | ) -> str: | ||||
logger.debug("dl_artifacts: %s", dl_artifacts) | logger.debug("dl_artifacts: %s", dl_artifacts) | ||||
return extract_package(dl_artifacts, dest=dest) | return extract_package(dl_artifacts, dest=dest) | ||||
def build_revision( | def build_revision( | ||||
self, a_metadata: Mapping[str, Any], uncompressed_path: str, directory: Sha1Git | self, p_info: DebianPackageInfo, uncompressed_path: str, directory: Sha1Git | ||||
) -> Optional[Revision]: | ) -> Optional[Revision]: | ||||
dsc_url, dsc_name = dsc_information(a_metadata) | dsc_url, dsc_name = dsc_information(p_info) | ||||
if not dsc_name: | if not dsc_name: | ||||
raise ValueError("dsc name for url %s should not be None" % dsc_url) | raise ValueError("dsc name for url %s should not be None" % dsc_url) | ||||
dsc_path = path.join(path.dirname(uncompressed_path), dsc_name) | dsc_path = path.join(path.dirname(uncompressed_path), dsc_name) | ||||
i_metadata = get_package_metadata(a_metadata, dsc_path, uncompressed_path) | intrinsic_metadata = get_intrinsic_package_metadata( | ||||
p_info, dsc_path, uncompressed_path | |||||
) | |||||
logger.debug("i_metadata: %s", i_metadata) | logger.debug("intrinsic_metadata: %s", intrinsic_metadata) | ||||
logger.debug("a_metadata: %s", a_metadata) | logger.debug("p_info: %s", p_info) | ||||
msg = "Synthetic revision for Debian source package %s version %s" % ( | msg = "Synthetic revision for Debian source package %s version %s" % ( | ||||
a_metadata["name"], | p_info.name, | ||||
a_metadata["version"], | p_info.version, | ||||
) | ) | ||||
date = TimestampWithTimezone.from_iso8601(i_metadata["changelog"]["date"]) | author = prepare_person(intrinsic_metadata.changelog.person) | ||||
author = prepare_person(i_metadata["changelog"]["person"]) | date = TimestampWithTimezone.from_iso8601(intrinsic_metadata.changelog.date) | ||||
# inspired from swh.loader.debian.converters.package_metadata_to_revision # noqa | # inspired from swh.loader.debian.converters.package_metadata_to_revision # noqa | ||||
return Revision( | return Revision( | ||||
type=RevisionType.DSC, | type=RevisionType.DSC, | ||||
message=msg.encode("utf-8"), | message=msg.encode("utf-8"), | ||||
author=author, | author=author, | ||||
date=date, | date=date, | ||||
committer=author, | committer=author, | ||||
committer_date=date, | committer_date=date, | ||||
parents=(), | parents=(), | ||||
directory=directory, | directory=directory, | ||||
synthetic=True, | synthetic=True, | ||||
metadata={ | metadata={ | ||||
"intrinsic": {"tool": "dsc", "raw": i_metadata,}, | "intrinsic": {"tool": "dsc", "raw": attr.asdict(intrinsic_metadata),}, | ||||
"extrinsic": { | "extrinsic": { | ||||
"provider": dsc_url, | "provider": dsc_url, | ||||
"when": self.visit_date.isoformat(), | "when": self.visit_date.isoformat(), | ||||
"raw": a_metadata, | "raw": p_info.raw, | ||||
}, | }, | ||||
}, | }, | ||||
) | ) | ||||
def resolve_revision_from( | def resolve_revision_from( | ||||
known_package_artifacts: Mapping, artifact_metadata: Mapping | known_package_artifacts: Mapping, p_info: DebianPackageInfo | ||||
) -> Optional[bytes]: | ) -> Optional[bytes]: | ||||
"""Given known package artifacts (resolved from the snapshot of previous | """Given known package artifacts (resolved from the snapshot of previous | ||||
visit) and the new artifact to fetch, try to solve the corresponding | visit) and the new artifact to fetch, try to solve the corresponding | ||||
revision. | revision. | ||||
""" | """ | ||||
artifacts_to_fetch = artifact_metadata.get("files") | artifacts_to_fetch = p_info.files | ||||
if not artifacts_to_fetch: | if not artifacts_to_fetch: | ||||
return None | return None | ||||
def to_set(data): | def to_set(data: DebianPackageInfo) -> FrozenSet[Tuple[str, str, int]]: | ||||
return frozenset( | return frozenset( | ||||
[ | (name, meta.sha256, meta.size) for name, meta in data.files.items() | ||||
ardumont: I recall you can drop the extra enclosing brackets. | |||||
(name, meta["sha256"], meta["size"]) | |||||
for name, meta in data["files"].items() | |||||
] | |||||
) | ) | ||||
# what we want to avoid downloading back if we have them already | # what we want to avoid downloading back if we have them already | ||||
set_new_artifacts = to_set(artifact_metadata) | set_new_artifacts = to_set(p_info) | ||||
known_artifacts_revision_id = {} | known_artifacts_revision_id = {} | ||||
for rev_id, known_artifacts in known_package_artifacts.items(): | for rev_id, known_artifacts in known_package_artifacts.items(): | ||||
extrinsic = known_artifacts.get("extrinsic") | extrinsic = known_artifacts.get("extrinsic") | ||||
if not extrinsic: | if not extrinsic: | ||||
continue | continue | ||||
s = to_set(extrinsic["raw"]) | s = to_set(DebianPackageInfo.from_metadata(extrinsic["raw"], url=p_info.url)) | ||||
known_artifacts_revision_id[s] = rev_id | known_artifacts_revision_id[s] = rev_id | ||||
return known_artifacts_revision_id.get(set_new_artifacts) | return known_artifacts_revision_id.get(set_new_artifacts) | ||||
def uid_to_person(uid: str) -> Mapping[str, str]: | def uid_to_person(uid: str) -> Dict[str, str]: | ||||
"""Convert an uid to a person suitable for insertion. | """Convert an uid to a person suitable for insertion. | ||||
Args: | Args: | ||||
uid: an uid of the form "Name <email@ddress>" | uid: an uid of the form "Name <email@ddress>" | ||||
Returns: | Returns: | ||||
a dictionary with the following keys: | a dictionary with the following keys: | ||||
Show All 28 Lines | Returns: | ||||
A person ready for storage | A person ready for storage | ||||
""" | """ | ||||
return Person.from_dict( | return Person.from_dict( | ||||
{key: value.encode("utf-8") for (key, value) in person.items()} | {key: value.encode("utf-8") for (key, value) in person.items()} | ||||
) | ) | ||||
def download_package(package: Mapping[str, Any], tmpdir: Any) -> Mapping[str, Any]: | def download_package(p_info: DebianPackageInfo, tmpdir: Any) -> Mapping[str, Any]: | ||||
"""Fetch a source package in a temporary directory and check the checksums | """Fetch a source package in a temporary directory and check the checksums | ||||
for all files. | for all files. | ||||
Args: | Args: | ||||
package: Dict defining the set of files representing a debian package | p_info: Information on a package | ||||
tmpdir: Where to download and extract the files to ingest | tmpdir: Where to download and extract the files to ingest | ||||
Returns: | Returns: | ||||
Dict of swh hashes per filename key | Dict of swh hashes per filename key | ||||
""" | """ | ||||
all_hashes = {} | all_hashes = {} | ||||
for filename, fileinfo in package["files"].items(): | for filename, fileinfo in p_info.files.items(): | ||||
uri = fileinfo["uri"] | uri = fileinfo.uri | ||||
logger.debug("fileinfo: %s", fileinfo) | logger.debug("fileinfo: %s", fileinfo) | ||||
extrinsic_hashes = {"sha256": fileinfo["sha256"]} | extrinsic_hashes = {"sha256": fileinfo.sha256} | ||||
logger.debug("extrinsic_hashes(%s): %s", filename, extrinsic_hashes) | logger.debug("extrinsic_hashes(%s): %s", filename, extrinsic_hashes) | ||||
filepath, hashes = download( | filepath, hashes = download( | ||||
uri, dest=tmpdir, filename=filename, hashes=extrinsic_hashes | uri, dest=tmpdir, filename=filename, hashes=extrinsic_hashes | ||||
) | ) | ||||
all_hashes[filename] = hashes | all_hashes[filename] = hashes | ||||
logger.debug("all_hashes: %s", all_hashes) | logger.debug("all_hashes: %s", all_hashes) | ||||
return all_hashes | return all_hashes | ||||
def dsc_information(package: Mapping[str, Any]) -> Tuple[Optional[str], Optional[str]]: | def dsc_information(p_info: DebianPackageInfo) -> Tuple[Optional[str], Optional[str]]: | ||||
"""Retrieve dsc information from a package. | """Retrieve dsc information from a package. | ||||
Args: | Args: | ||||
package: Package metadata information | p_info: Package metadata information | ||||
Returns: | Returns: | ||||
Tuple of dsc file's uri, dsc's full disk path | Tuple of dsc file's uri, dsc's full disk path | ||||
""" | """ | ||||
dsc_name = None | dsc_name = None | ||||
dsc_url = None | dsc_url = None | ||||
for filename, fileinfo in package["files"].items(): | for filename, fileinfo in p_info.files.items(): | ||||
if filename.endswith(".dsc"): | if filename.endswith(".dsc"): | ||||
if dsc_name: | if dsc_name: | ||||
raise ValueError( | raise ValueError( | ||||
"Package %s_%s references several dsc files." | "Package %s_%s references several dsc files." | ||||
% (package["name"], package["version"]) | % (p_info.name, p_info.version) | ||||
) | ) | ||||
dsc_url = fileinfo["uri"] | dsc_url = fileinfo.uri | ||||
dsc_name = filename | dsc_name = filename | ||||
return dsc_url, dsc_name | return dsc_url, dsc_name | ||||
def extract_package(dl_artifacts: List[Tuple[str, Mapping]], dest: str) -> str: | def extract_package(dl_artifacts: List[Tuple[str, Mapping]], dest: str) -> str: | ||||
"""Extract a Debian source package to a given directory. | """Extract a Debian source package to a given directory. | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | except subprocess.CalledProcessError as e: | ||||
logdata = open(logfile, "r").read() | logdata = open(logfile, "r").read() | ||||
raise ValueError( | raise ValueError( | ||||
"dpkg-source exited with code %s: %s" % (e.returncode, logdata) | "dpkg-source exited with code %s: %s" % (e.returncode, logdata) | ||||
) from None | ) from None | ||||
return destdir | return destdir | ||||
def get_package_metadata( | def get_intrinsic_package_metadata( | ||||
package: Mapping[str, Any], dsc_path: str, extracted_path: str | p_info: DebianPackageInfo, dsc_path: str, extracted_path: str | ||||
) -> Mapping[str, Any]: | ) -> IntrinsicPackageMetadata: | ||||
"""Get the package metadata from the source package at dsc_path, | """Get the package metadata from the source package at dsc_path, | ||||
extracted in extracted_path. | extracted in extracted_path. | ||||
Args: | Args: | ||||
package: the package dict (with a dsc_path key) | p_info: the package information | ||||
dsc_path: path to the package's dsc file | dsc_path: path to the package's dsc file | ||||
extracted_path: the path where the package got extracted | extracted_path: the path where the package got extracted | ||||
Returns: | Returns: | ||||
dict: a dictionary with the following keys: | dict: a dictionary with the following keys: | ||||
- history: list of (package_name, package_version) tuples parsed from | - history: list of (package_name, package_version) tuples parsed from | ||||
the package changelog | the package changelog | ||||
""" | """ | ||||
with open(dsc_path, "rb") as dsc: | with open(dsc_path, "rb") as dsc: | ||||
parsed_dsc = Dsc(dsc) | parsed_dsc = Dsc(dsc) | ||||
# Parse the changelog to retrieve the rest of the package information | # Parse the changelog to retrieve the rest of the package information | ||||
changelog_path = path.join(extracted_path, "debian/changelog") | changelog_path = path.join(extracted_path, "debian/changelog") | ||||
with open(changelog_path, "rb") as changelog: | with open(changelog_path, "rb") as changelog_file: | ||||
try: | try: | ||||
parsed_changelog = Changelog(changelog) | parsed_changelog = Changelog(changelog_file) | ||||
except UnicodeDecodeError: | except UnicodeDecodeError: | ||||
logger.warning( | logger.warning( | ||||
"Unknown encoding for changelog %s," | "Unknown encoding for changelog %s," | ||||
" falling back to iso" % changelog_path, | " falling back to iso" % changelog_path, | ||||
extra={ | extra={ | ||||
"swh_type": "deb_changelog_encoding", | "swh_type": "deb_changelog_encoding", | ||||
"swh_name": package["name"], | "swh_name": p_info.name, | ||||
"swh_version": str(package["version"]), | "swh_version": str(p_info.version), | ||||
"swh_changelog": changelog_path, | "swh_changelog": changelog_path, | ||||
}, | }, | ||||
) | ) | ||||
# need to reset as Changelog scrolls to the end of the file | # need to reset as Changelog scrolls to the end of the file | ||||
changelog.seek(0) | changelog_file.seek(0) | ||||
parsed_changelog = Changelog(changelog, encoding="iso-8859-15") | parsed_changelog = Changelog(changelog_file, encoding="iso-8859-15") | ||||
package_info = { | changelog = DebianPackageChangelog( | ||||
"name": package["name"], | person=uid_to_person(parsed_changelog.author), | ||||
"version": str(package["version"]), | date=parse_date(parsed_changelog.date).isoformat(), | ||||
"changelog": { | history=[(block.package, str(block.version)) for block in parsed_changelog][1:], | ||||
"person": uid_to_person(parsed_changelog.author), | ) | ||||
"date": parse_date(parsed_changelog.date).isoformat(), | |||||
"history": [ | |||||
(block.package, str(block.version)) for block in parsed_changelog | |||||
][1:], | |||||
}, | |||||
} | |||||
maintainers = [ | maintainers = [ | ||||
uid_to_person(parsed_dsc["Maintainer"]), | uid_to_person(parsed_dsc["Maintainer"]), | ||||
] | ] | ||||
maintainers.extend( | maintainers.extend( | ||||
uid_to_person(person) | uid_to_person(person) | ||||
for person in UPLOADERS_SPLIT.split(parsed_dsc.get("Uploaders", "")) | for person in UPLOADERS_SPLIT.split(parsed_dsc.get("Uploaders", "")) | ||||
) | ) | ||||
package_info["maintainers"] = maintainers | |||||
return package_info | return IntrinsicPackageMetadata( | ||||
name=p_info.name, | |||||
version=str(p_info.version), | |||||
changelog=changelog, | |||||
maintainers=maintainers, | |||||
) |
I recall you can drop the extra enclosing brackets.