diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -10,7 +10,9 @@ import logging from swh.indexer.indexer import OriginIndexer +from swh.model.model import SnapshotBranch, TargetType from swh.storage.algos.origin import origin_get_latest_visit_status +from swh.storage.algos.snapshot import snapshot_get_all_branches class OriginHeadIndexer(OriginIndexer): @@ -39,14 +41,14 @@ if not visit_and_status: return None visit, visit_status = visit_and_status - latest_snapshot = self.storage.snapshot_get(visit_status.snapshot) - if latest_snapshot is None: + snapshot = snapshot_get_all_branches(self.storage, visit_status.snapshot) + if snapshot is None: return None method = getattr( self, "_try_get_%s_head" % visit.type, self._try_get_head_generic ) - rev_id = method(latest_snapshot) + rev_id = method(snapshot.branches) if rev_id is not None: return { "origin_url": origin_url, @@ -68,7 +70,7 @@ ) @classmethod - def _parse_version(cls: Any, filename: str) -> Tuple[Union[float, int], ...]: + def _parse_version(cls: Any, filename: bytes) -> Tuple[Union[float, int], ...]: """Extracts the release version from an archive filename, to get an ordering whose maximum is likely to be the last version of the software @@ -102,45 +104,42 @@ assert False, res.group("preversion") return tuple(version) - def _try_get_ftp_head(self, snapshot: Dict[str, Any]) -> Any: - archive_names = list(snapshot["branches"]) + def _try_get_ftp_head(self, branches: Dict[bytes, SnapshotBranch]) -> Any: + archive_names = list(branches) max_archive_name = max(archive_names, key=self._parse_version) - r = self._try_resolve_target(snapshot["branches"], max_archive_name) + r = self._try_resolve_target(branches, max_archive_name) return r # Generic - def _try_get_head_generic(self, snapshot: Dict[str, Any]) -> Any: + def _try_get_head_generic(self, branches: Dict[bytes, SnapshotBranch]) -> Any: # Works on 'deposit', 'pypi', and VCSs. - try: - branches = snapshot["branches"] - except KeyError: - return None - else: - return self._try_resolve_target( - branches, b"HEAD" - ) or self._try_resolve_target(branches, b"master") + return self._try_resolve_target(branches, b"HEAD") or self._try_resolve_target( + branches, b"master" + ) - def _try_resolve_target(self, branches: Dict, target_name: bytes) -> Any: + def _try_resolve_target( + self, branches: Dict[bytes, SnapshotBranch], branch_name: bytes + ) -> Any: try: - target = branches[target_name] - if target is None: + branch = branches[branch_name] + if branch is None: return None - while target["target_type"] == "alias": - target = branches[target["target"]] - if target is None: + while branch.target_type == TargetType.ALIAS: + branch = branches[branch.target] + if branch is None: return None - if target["target_type"] == "revision": - return target["target"] - elif target["target_type"] == "content": + if branch.target_type == TargetType.REVISION: + return branch.target + elif branch.target_type == TargetType.CONTENT: return None # TODO - elif target["target_type"] == "directory": + elif branch.target_type == TargetType.DIRECTORY: return None # TODO - elif target["target_type"] == "release": + elif branch.target_type == TargetType.RELEASE: return None # TODO else: - assert False + assert False, branch except KeyError: return None