Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/origin_head.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | |||||
import re | import re | ||||
from typing import Any, Dict, List, Tuple, Union | from typing import Dict, List, Optional, Tuple, Union | ||||
import click | |||||
from swh.indexer.indexer import OriginIndexer | |||||
from swh.model.model import SnapshotBranch, TargetType | from swh.model.model import SnapshotBranch, TargetType | ||||
from swh.model.swhids import CoreSWHID, ObjectType | |||||
from swh.storage.algos.origin import origin_get_latest_visit_status | from swh.storage.algos.origin import origin_get_latest_visit_status | ||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||
class OriginHeadIndexer(OriginIndexer[Dict]): | def get_head_swhid(storage, origin_url: str) -> Optional[CoreSWHID]: | ||||
"""Origin-level indexer. | """Returns the SWHID of the head revision or release of an origin""" | ||||
This indexer is in charge of looking up the revision that acts as the | |||||
"head" of an origin. | |||||
In git, this is usually the commit pointed to by the 'master' branch.""" | |||||
USE_TOOLS = False | |||||
def persist_index_computations(self, results: Any) -> Dict[str, int]: | |||||
"""Do nothing. The indexer's results are not persistent, they | |||||
should only be piped to another indexer.""" | |||||
return {} | |||||
# Dispatch | |||||
def index(self, id: str, data: None = None, **kwargs) -> List[Dict]: | |||||
origin_url = id | |||||
visit_status = origin_get_latest_visit_status( | visit_status = origin_get_latest_visit_status( | ||||
self.storage, origin_url, allowed_statuses=["full"], require_snapshot=True | storage, origin_url, allowed_statuses=["full"], require_snapshot=True | ||||
) | ) | ||||
if not visit_status: | if not visit_status: | ||||
return [] | return None | ||||
assert visit_status.snapshot is not None | assert visit_status.snapshot is not None | ||||
snapshot = snapshot_get_all_branches(self.storage, visit_status.snapshot) | snapshot = snapshot_get_all_branches(storage, visit_status.snapshot) | ||||
if snapshot is None: | if snapshot is None: | ||||
return [] | return None | ||||
method = getattr( | |||||
self, "_try_get_%s_head" % visit_status.type, self._try_get_head_generic | |||||
) | |||||
rev_id = method(snapshot.branches) # type: ignore | |||||
if rev_id is not None: | |||||
return [ | |||||
{ | |||||
"origin_url": origin_url, | |||||
"revision_id": rev_id, | |||||
} | |||||
] | |||||
# could not find a head revision | if visit_status.type == "ftp": | ||||
return [] | return _try_get_ftp_head(dict(snapshot.branches)) | ||||
else: | |||||
return _try_get_head_generic(dict(snapshot.branches)) | |||||
# Tarballs | |||||
_archive_filename_re = re.compile( | _archive_filename_re = re.compile( | ||||
rb"^" | rb"^" | ||||
rb"(?P<pkgname>.*)[-_]" | rb"(?P<pkgname>.*)[-_]" | ||||
rb"(?P<version>[0-9]+(\.[0-9])*)" | rb"(?P<version>[0-9]+(\.[0-9])*)" | ||||
rb"(?P<preversion>[-+][a-zA-Z0-9.~]+?)?" | rb"(?P<preversion>[-+][a-zA-Z0-9.~]+?)?" | ||||
rb"(?P<extension>(\.[a-zA-Z0-9]+)+)" | rb"(?P<extension>(\.[a-zA-Z0-9]+)+)" | ||||
rb"$" | rb"$" | ||||
) | ) | ||||
@classmethod | |||||
def _parse_version(cls: Any, filename: bytes) -> Tuple[Union[float, int], ...]: | def _parse_version(filename: bytes) -> Tuple[Union[float, int, str], ...]: | ||||
"""Extracts the release version from an archive filename, | """Extracts the release version from an archive filename, | ||||
to get an ordering whose maximum is likely to be the last | to get an ordering whose maximum is likely to be the last | ||||
version of the software | version of the software | ||||
>>> OriginHeadIndexer._parse_version(b'foo') | >>> _parse_version(b'foo') | ||||
(-inf,) | (-inf,) | ||||
>>> OriginHeadIndexer._parse_version(b'foo.tar.gz') | >>> _parse_version(b'foo.tar.gz') | ||||
(-inf,) | (-inf,) | ||||
>>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1.tar.gz') | >>> _parse_version(b'gnu-hello-0.0.1.tar.gz') | ||||
(0, 0, 1, 0) | (0, 0, 1, 0) | ||||
>>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1-beta2.tar.gz') | >>> _parse_version(b'gnu-hello-0.0.1-beta2.tar.gz') | ||||
(0, 0, 1, -1, 'beta2') | (0, 0, 1, -1, 'beta2') | ||||
>>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1+foobar.tar.gz') | >>> _parse_version(b'gnu-hello-0.0.1+foobar.tar.gz') | ||||
(0, 0, 1, 1, 'foobar') | (0, 0, 1, 1, 'foobar') | ||||
""" | """ | ||||
res = cls._archive_filename_re.match(filename) | res = _archive_filename_re.match(filename) | ||||
if res is None: | if res is None: | ||||
return (float("-infinity"),) | return (float("-infinity"),) | ||||
version = [int(n) for n in res.group("version").decode().split(".")] | version: List[Union[float, int, str]] = [ | ||||
int(n) for n in res.group("version").decode().split(".") | |||||
] | |||||
if res.group("preversion") is None: | if res.group("preversion") is None: | ||||
version.append(0) | version.append(0) | ||||
else: | else: | ||||
preversion = res.group("preversion").decode() | preversion = res.group("preversion").decode() | ||||
if preversion.startswith("-"): | if preversion.startswith("-"): | ||||
version.append(-1) | version.append(-1) | ||||
version.append(preversion[1:]) | version.append(preversion[1:]) | ||||
elif preversion.startswith("+"): | elif preversion.startswith("+"): | ||||
version.append(1) | version.append(1) | ||||
version.append(preversion[1:]) | version.append(preversion[1:]) | ||||
else: | else: | ||||
assert False, res.group("preversion") | assert False, res.group("preversion") | ||||
return tuple(version) | return tuple(version) | ||||
def _try_get_ftp_head(self, branches: Dict[bytes, SnapshotBranch]) -> Any: | |||||
def _try_get_ftp_head( | |||||
branches: Dict[bytes, Optional[SnapshotBranch]] | |||||
) -> Optional[CoreSWHID]: | |||||
archive_names = list(branches) | archive_names = list(branches) | ||||
max_archive_name = max(archive_names, key=self._parse_version) | max_archive_name = max(archive_names, key=_parse_version) | ||||
r = self._try_resolve_target(branches, max_archive_name) | return _try_resolve_target(branches, max_archive_name) | ||||
return r | |||||
# Generic | |||||
def _try_get_head_generic(self, branches: Dict[bytes, SnapshotBranch]) -> Any: | def _try_get_head_generic( | ||||
branches: Dict[bytes, Optional[SnapshotBranch]] | |||||
) -> Optional[CoreSWHID]: | |||||
# Works on 'deposit', 'pypi', and VCSs. | # Works on 'deposit', 'pypi', and VCSs. | ||||
return self._try_resolve_target(branches, b"HEAD") or self._try_resolve_target( | return _try_resolve_target(branches, b"HEAD") or _try_resolve_target( | ||||
branches, b"master" | branches, b"master" | ||||
) | ) | ||||
def _try_resolve_target( | def _try_resolve_target( | ||||
self, branches: Dict[bytes, SnapshotBranch], branch_name: bytes | branches: Dict[bytes, Optional[SnapshotBranch]], branch_name: bytes | ||||
) -> Any: | ) -> Optional[CoreSWHID]: | ||||
try: | try: | ||||
branch = branches[branch_name] | branch = branches[branch_name] | ||||
if branch is None: | if branch is None: | ||||
return None | return None | ||||
while branch.target_type == TargetType.ALIAS: | while branch.target_type == TargetType.ALIAS: | ||||
branch = branches[branch.target] | branch = branches[branch.target] | ||||
if branch is None: | if branch is None: | ||||
return None | return None | ||||
if branch.target_type == TargetType.REVISION: | if branch.target_type == TargetType.REVISION: | ||||
return branch.target | return CoreSWHID(object_type=ObjectType.REVISION, object_id=branch.target) | ||||
elif branch.target_type == TargetType.CONTENT: | elif branch.target_type == TargetType.CONTENT: | ||||
return None # TODO | return None # TODO | ||||
elif branch.target_type == TargetType.DIRECTORY: | elif branch.target_type == TargetType.DIRECTORY: | ||||
return None # TODO | return None # TODO | ||||
elif branch.target_type == TargetType.RELEASE: | elif branch.target_type == TargetType.RELEASE: | ||||
return None # TODO | return CoreSWHID(object_type=ObjectType.RELEASE, object_id=branch.target) | ||||
else: | else: | ||||
assert False, branch | assert False, branch | ||||
except KeyError: | except KeyError: | ||||
return None | return None | ||||
@click.command() | |||||
@click.option( | |||||
"--origins", "-i", help='Origins to lookup, in the "type+url" format', multiple=True | |||||
) | |||||
def main(origins: List[str]) -> None: | |||||
rev_metadata_indexer = OriginHeadIndexer() | |||||
rev_metadata_indexer.run(origins) | |||||
if __name__ == "__main__": | |||||
logging.basicConfig(level=logging.INFO) | |||||
main() |