Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/origin_head.py
# Copyright (C) 2018-2022 The Software Heritage developers | # Copyright (C) 2018-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import re | import re | ||||
from typing import Dict, List, Optional, Tuple, Union | from typing import List, Optional, Tuple, Union | ||||
from swh.model.model import SnapshotBranch, TargetType | from swh.model.model import Snapshot, SnapshotBranch, TargetType | ||||
from swh.model.swhids import CoreSWHID, ObjectType | from swh.model.swhids import CoreSWHID, ObjectType | ||||
from swh.storage.algos.origin import origin_get_latest_visit_status | from swh.storage.algos.origin import origin_get_latest_visit_status | ||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||
from swh.storage.interface import PartialBranches, StorageInterface | |||||
def get_head_swhid(storage, origin_url: str) -> Optional[CoreSWHID]: | def get_head_swhid(storage: StorageInterface, origin_url: str) -> Optional[CoreSWHID]: | ||||
"""Returns the SWHID of the head revision or release of an origin""" | """Returns the SWHID of the head revision or release of an origin""" | ||||
visit_status = origin_get_latest_visit_status( | visit_status = origin_get_latest_visit_status( | ||||
storage, origin_url, allowed_statuses=["full"], require_snapshot=True | storage, origin_url, allowed_statuses=["full"], require_snapshot=True | ||||
) | ) | ||||
if not visit_status: | if not visit_status: | ||||
return None | return None | ||||
assert visit_status.snapshot is not None | assert visit_status.snapshot is not None | ||||
if visit_status.type == "ftp": | |||||
# We need to fetch all branches in order to find the largest one | |||||
snapshot = snapshot_get_all_branches(storage, visit_status.snapshot) | snapshot = snapshot_get_all_branches(storage, visit_status.snapshot) | ||||
if snapshot is None: | if snapshot is None: | ||||
return None | return None | ||||
return _try_get_ftp_head(storage, snapshot) | |||||
if visit_status.type == "ftp": | |||||
return _try_get_ftp_head(dict(snapshot.branches)) | |||||
else: | else: | ||||
return _try_get_head_generic(dict(snapshot.branches)) | # Peak into the snapshot, without fetching too many refs. | ||||
# If the snapshot is small, this gets all of it in a single request. | |||||
# If the snapshot is large, we will query specific branches as we need them. | |||||
partial_branches = storage.snapshot_get_branches( | |||||
visit_status.snapshot, branches_count=100 | |||||
) | |||||
if partial_branches is None: | |||||
# Snapshot does not exist | |||||
return None | |||||
return _try_get_head_generic(storage, partial_branches) | |||||
_archive_filename_re = re.compile( | _archive_filename_re = re.compile( | ||||
rb"^" | rb"^" | ||||
rb"(?P<pkgname>.*)[-_]" | rb"(?P<pkgname>.*)[-_]" | ||||
rb"(?P<version>[0-9]+(\.[0-9])*)" | rb"(?P<version>[0-9]+(\.[0-9])*)" | ||||
rb"(?P<preversion>[-+][a-zA-Z0-9.~]+?)?" | rb"(?P<preversion>[-+][a-zA-Z0-9.~]+?)?" | ||||
rb"(?P<extension>(\.[a-zA-Z0-9]+)+)" | rb"(?P<extension>(\.[a-zA-Z0-9]+)+)" | ||||
Show All 34 Lines | else: | ||||
version.append(1) | version.append(1) | ||||
version.append(preversion[1:]) | version.append(preversion[1:]) | ||||
else: | else: | ||||
assert False, res.group("preversion") | assert False, res.group("preversion") | ||||
return tuple(version) | return tuple(version) | ||||
def _try_get_ftp_head( | def _try_get_ftp_head( | ||||
branches: Dict[bytes, Optional[SnapshotBranch]] | storage: StorageInterface, snapshot: Snapshot | ||||
) -> Optional[CoreSWHID]: | ) -> Optional[CoreSWHID]: | ||||
archive_names = list(branches) | archive_names = list(snapshot.branches) | ||||
max_archive_name = max(archive_names, key=_parse_version) | max_archive_name = max(archive_names, key=_parse_version) | ||||
return _try_resolve_target(branches, max_archive_name) | return _try_resolve_target( | ||||
storage, | |||||
{"id": snapshot.id, "branches": dict(snapshot.branches), "next_branch": None}, | |||||
branch_name=max_archive_name, | |||||
) | |||||
def _try_get_head_generic( | def _try_get_head_generic( | ||||
branches: Dict[bytes, Optional[SnapshotBranch]] | storage: StorageInterface, partial_branches: PartialBranches | ||||
) -> Optional[CoreSWHID]: | ) -> Optional[CoreSWHID]: | ||||
# Works on 'deposit', 'pypi', and VCSs. | # Works on 'deposit', 'pypi', and VCSs. | ||||
return _try_resolve_target(branches, b"HEAD") or _try_resolve_target( | return _try_resolve_target( | ||||
branches, b"master" | storage, partial_branches, branch_name=b"HEAD" | ||||
) or _try_resolve_target(storage, partial_branches, branch_name=b"master") | |||||
def _get_branch( | |||||
storage: StorageInterface, partial_branches: PartialBranches, branch_name: bytes | |||||
) -> Optional[SnapshotBranch]: | |||||
"""Given a ``branch_name``, gets it from ``partial_branches`` if present, | |||||
and fetches it from the storage otherwise.""" | |||||
if branch_name in partial_branches["branches"]: | |||||
return partial_branches["branches"][branch_name] | |||||
elif partial_branches["next_branch"] is not None: | |||||
# Branch is not in `partial_branches`, and `partial_branches` indeed partial | |||||
res = storage.snapshot_get_branches( | |||||
partial_branches["id"], branches_from=branch_name, branches_count=1 | |||||
) | ) | ||||
assert res is not None, "Snapshot does not exist anymore" | |||||
return res["branches"].get(branch_name) | |||||
else: | |||||
# Branch is not in `partial_branches`, but `partial_branches` is the full | |||||
# list of branches, which means it is a dangling reference. | |||||
return None | |||||
def _try_resolve_target( | def _try_resolve_target( | ||||
branches: Dict[bytes, Optional[SnapshotBranch]], branch_name: bytes | storage: StorageInterface, partial_branches: PartialBranches, branch_name: bytes | ||||
) -> Optional[CoreSWHID]: | ) -> Optional[CoreSWHID]: | ||||
try: | try: | ||||
branch = branches[branch_name] | branch = _get_branch(storage, partial_branches, branch_name) | ||||
if branch is None: | if branch is None: | ||||
return None | return None | ||||
while branch.target_type == TargetType.ALIAS: | while branch.target_type == TargetType.ALIAS: | ||||
branch = branches[branch.target] | branch = _get_branch(storage, partial_branches, branch.target) | ||||
if branch is None: | if branch is None: | ||||
return None | return None | ||||
if branch.target_type == TargetType.REVISION: | if branch.target_type == TargetType.REVISION: | ||||
return CoreSWHID(object_type=ObjectType.REVISION, object_id=branch.target) | return CoreSWHID(object_type=ObjectType.REVISION, object_id=branch.target) | ||||
elif branch.target_type == TargetType.CONTENT: | elif branch.target_type == TargetType.CONTENT: | ||||
return None # TODO | return None # TODO | ||||
elif branch.target_type == TargetType.DIRECTORY: | elif branch.target_type == TargetType.DIRECTORY: | ||||
return None # TODO | return None # TODO | ||||
elif branch.target_type == TargetType.RELEASE: | elif branch.target_type == TargetType.RELEASE: | ||||
return CoreSWHID(object_type=ObjectType.RELEASE, object_id=branch.target) | return CoreSWHID(object_type=ObjectType.RELEASE, object_id=branch.target) | ||||
else: | else: | ||||
assert False, branch | assert False, branch | ||||
except KeyError: | except KeyError: | ||||
return None | return None |