Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/nixguix/loader.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import json | ||||
import logging | import logging | ||||
import requests | import requests | ||||
from typing import Dict, Optional, Any, Mapping | from typing import Dict, Optional, Any, Mapping | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import ( | |||||
from swh.model.model import Sha1Git, Revision, RevisionType | Revision, | ||||
RevisionType, | |||||
TargetType, | |||||
Snapshot, | |||||
BaseModel, | |||||
Sha1Git, | |||||
) | |||||
from swh.loader.package.utils import EMPTY_AUTHOR | from swh.loader.package.utils import EMPTY_AUTHOR | ||||
from swh.loader.package.loader import PackageLoader | from swh.loader.package.loader import PackageLoader | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class NixGuixLoader(PackageLoader): | class NixGuixLoader(PackageLoader): | ||||
"""Load sources from a sources.json file. This loader is used to load | """Load sources from a sources.json file. This loader is used to load | ||||
Show All 31 Lines | class NixGuixLoader(PackageLoader): | ||||
def get_package_info(self, url): | def get_package_info(self, url): | ||||
# TODO: try all mirrors and not only the first one. A source | # TODO: try all mirrors and not only the first one. A source | ||||
# can be fetched from several urls, called mirrors. We | # can be fetched from several urls, called mirrors. We | ||||
# currently only use the first one, but if the first one | # currently only use the first one, but if the first one | ||||
# fails, we should try the second one and so on. | # fails, we should try the second one and so on. | ||||
integrity = self._integrityByUrl[url] | integrity = self._integrityByUrl[url] | ||||
yield url, {"url": url, "raw": {"url": url, "integrity": integrity}} | yield url, {"url": url, "raw": {"url": url, "integrity": integrity}} | ||||
def known_artifacts(self, snapshot: Optional[Snapshot]) -> Dict[Sha1Git, BaseModel]: | |||||
"""Almost same implementation as the default one except it filters out the extra | |||||
"evaluation" branch which does not have the right metadata structure. | |||||
""" | |||||
if not snapshot: | |||||
return {} | |||||
# Skip evaluation revision which has no metadata | |||||
revs = [ | |||||
rev.target | |||||
for branch_name, rev in snapshot.branches.items() | |||||
if ( | |||||
rev | |||||
and rev.target_type == TargetType.REVISION | |||||
and branch_name != b"evaluation" | |||||
) | |||||
] | |||||
known_revisions = self.storage.revision_get(revs) | |||||
ret = {} | |||||
for revision in known_revisions: | |||||
if not revision: # revision_get can return None | |||||
continue | |||||
ret[revision["id"]] = revision["metadata"] | |||||
return ret | |||||
def resolve_revision_from( | def resolve_revision_from( | ||||
self, known_artifacts: Dict, artifact_metadata: Dict | self, known_artifacts: Dict, artifact_metadata: Dict | ||||
) -> Optional[bytes]: | ) -> Optional[bytes]: | ||||
for rev_id, known_artifact in known_artifacts.items(): | for rev_id, known_artifact in known_artifacts.items(): | ||||
known_integrity = known_artifact["extrinsic"]["raw"]["integrity"] | known_integrity = known_artifact["extrinsic"]["raw"]["integrity"] | ||||
if artifact_metadata["integrity"] == known_integrity: | if artifact_metadata["integrity"] == known_integrity: | ||||
return rev_id | return rev_id | ||||
▲ Show 20 Lines • Show All 112 Lines • Show Last 20 Lines |