Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/cran/loader.py
# Copyright (C) 2019-2021 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
from datetime import timezone | from datetime import timezone | ||||
import logging | import logging | ||||
import os | import os | ||||
from os import path | from os import path | ||||
import re | import re | ||||
import string | |||||
from typing import Any, Dict, Iterator, List, Mapping, Optional, Tuple | from typing import Any, Dict, Iterator, List, Mapping, Optional, Tuple | ||||
import attr | import attr | ||||
import dateutil.parser | import dateutil.parser | ||||
from debian.deb822 import Deb822 | from debian.deb822 import Deb822 | ||||
from swh.loader.package.loader import BasePackageInfo, PackageLoader | from swh.loader.package.loader import BasePackageInfo, PackageLoader | ||||
from swh.loader.package.utils import release_name | from swh.loader.package.utils import release_name | ||||
Show All 12 Lines | |||||
DATE_PATTERN = re.compile(r"^(?P<year>\d{4})-(?P<month>\d{2})$") | DATE_PATTERN = re.compile(r"^(?P<year>\d{4})-(?P<month>\d{2})$") | ||||
@attr.s | @attr.s | ||||
class CRANPackageInfo(BasePackageInfo): | class CRANPackageInfo(BasePackageInfo): | ||||
raw_info = attr.ib(type=Dict[str, Any]) | raw_info = attr.ib(type=Dict[str, Any]) | ||||
version = attr.ib(type=str) | version = attr.ib(type=str) | ||||
ID_KEYS = ["url", "version"] | MANIFEST_FORMAT = string.Template("$version $url") | ||||
@classmethod | @classmethod | ||||
def from_metadata(cls, a_metadata: Dict[str, Any]) -> "CRANPackageInfo": | def from_metadata(cls, a_metadata: Dict[str, Any]) -> "CRANPackageInfo": | ||||
url = a_metadata["url"] | url = a_metadata["url"] | ||||
return CRANPackageInfo( | return CRANPackageInfo( | ||||
url=url, | url=url, | ||||
filename=path.basename(url), | filename=path.basename(url), | ||||
raw_info=a_metadata, | raw_info=a_metadata, | ||||
Show All 32 Lines | def get_default_version(self) -> str: | ||||
return self.artifacts[-1]["version"] | return self.artifacts[-1]["version"] | ||||
def get_package_info(self, version: str) -> Iterator[Tuple[str, CRANPackageInfo]]: | def get_package_info(self, version: str) -> Iterator[Tuple[str, CRANPackageInfo]]: | ||||
for a_metadata in self.artifacts: | for a_metadata in self.artifacts: | ||||
p_info = CRANPackageInfo.from_metadata(a_metadata) | p_info = CRANPackageInfo.from_metadata(a_metadata) | ||||
if version == p_info.version: | if version == p_info.version: | ||||
yield release_name(version), p_info | yield release_name(version), p_info | ||||
def extid_from_known_artifact(self, known_artifact: Dict) -> bytes: | |||||
return CRANPackageInfo.from_metadata(known_artifact).extid() | |||||
def resolve_revision_from( | def resolve_revision_from( | ||||
self, known_artifacts: Mapping[bytes, Mapping], p_info: CRANPackageInfo, | self, known_artifacts: Mapping[bytes, Mapping], p_info: CRANPackageInfo, | ||||
) -> Optional[bytes]: | ) -> Optional[bytes]: | ||||
"""Given known_artifacts per revision, try to determine the revision for | """Given known_artifacts per revision, try to determine the revision for | ||||
artifact_metadata | artifact_metadata | ||||
""" | """ | ||||
new_identity = p_info.artifact_identity() | new_extid = p_info.extid() | ||||
for rev_id, known_artifact_meta in known_artifacts.items(): | for rev_id, known_artifact_meta in known_artifacts.items(): | ||||
logging.debug("known_artifact_meta: %s", known_artifact_meta) | logging.debug("known_artifact_meta: %s", known_artifact_meta) | ||||
known_artifact = known_artifact_meta["extrinsic"]["raw"] | known_artifact = known_artifact_meta["extrinsic"]["raw"] | ||||
known_identity = CRANPackageInfo.from_metadata( | known_extid = self.extid_from_known_artifact(known_artifact) | ||||
known_artifact | if new_extid == known_extid: | ||||
).artifact_identity() | |||||
if new_identity == known_identity: | |||||
return rev_id | return rev_id | ||||
return None | return None | ||||
def build_revision( | def build_revision( | ||||
self, p_info: CRANPackageInfo, uncompressed_path: str, directory: Sha1Git | self, p_info: CRANPackageInfo, uncompressed_path: str, directory: Sha1Git | ||||
) -> Optional[Revision]: | ) -> Optional[Revision]: | ||||
# a_metadata is empty | # a_metadata is empty | ||||
metadata = extract_intrinsic_metadata(uncompressed_path) | metadata = extract_intrinsic_metadata(uncompressed_path) | ||||
▲ Show 20 Lines • Show All 98 Lines • Show Last 20 Lines |