Changeset View
Changeset View
Standalone View
Standalone View
swh/web/browse/snapshot_context.py
Show First 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | |||||
) | ) | ||||
from swh.web.config import get_config | from swh.web.config import get_config | ||||
_empty_snapshot_id = Snapshot(branches={}).id.hex() | _empty_snapshot_id = Snapshot(branches={}).id.hex() | ||||
def _get_branch(branches, branch_name, snapshot_id): | def _get_branch(branches, branch_name, snapshot_id): | ||||
""" | """ | ||||
Utility function to get a specific branch from a branches list. | Utility function to get a specific branch from a snapshot. | ||||
Its purpose is to get the default HEAD branch as some software origin | Returns None if the branch cannot be found. | ||||
(e.g those with svn type) does not have it. In that latter case, check | |||||
if there is a master branch instead and returns it. | |||||
""" | """ | ||||
filtered_branches = [b for b in branches if b["name"] == branch_name] | filtered_branches = [b for b in branches if b["name"] == branch_name] | ||||
if filtered_branches: | if filtered_branches: | ||||
return filtered_branches[0] | return filtered_branches[0] | ||||
elif branch_name == "HEAD": | |||||
filtered_branches = [b for b in branches if b["name"].endswith("master")] | |||||
if filtered_branches: | |||||
return filtered_branches[0] | |||||
elif branches: | |||||
return branches[0] | |||||
else: | else: | ||||
# case where a large branches list has been truncated | # case where a large branches list has been truncated | ||||
snp = archive.lookup_snapshot( | snp = archive.lookup_snapshot( | ||||
snapshot_id, | snapshot_id, | ||||
branches_from=branch_name, | branches_from=branch_name, | ||||
branches_count=1, | branches_count=1, | ||||
target_types=["revision", "alias"], | target_types=["revision", "alias"], | ||||
# pull request branches must be browsable even if they are hidden | # pull request branches must be browsable even if they are hidden | ||||
# by default in branches list | # by default in branches list | ||||
branch_name_exclude_prefix=None, | branch_name_exclude_prefix=None, | ||||
) | ) | ||||
snp_branch, _, _ = process_snapshot_branches(snp) | snp_branch, _, _ = process_snapshot_branches(snp) | ||||
if snp_branch and snp_branch[0]["name"] == branch_name: | if snp_branch and snp_branch[0]["name"] == branch_name: | ||||
branches.append(snp_branch[0]) | branches.append(snp_branch[0]) | ||||
return snp_branch[0] | return snp_branch[0] | ||||
def _get_release(releases, release_name, snapshot_id): | def _get_release(releases, release_name, snapshot_id): | ||||
""" | """ | ||||
Utility function to get a specific release from a releases list. | Utility function to get a specific release from a snapshot. | ||||
Returns None if the release can not be found in the list. | Returns None if the release cannot be found. | ||||
""" | """ | ||||
filtered_releases = [r for r in releases if r["name"] == release_name] | filtered_releases = [r for r in releases if r["name"] == release_name] | ||||
if filtered_releases: | if filtered_releases: | ||||
return filtered_releases[0] | return filtered_releases[0] | ||||
else: | else: | ||||
# case where a large branches list has been truncated | # case where a large branches list has been truncated | ||||
try: | try: | ||||
# git origins have specific branches for releases | # git origins have specific branches for releases | ||||
snp = archive.lookup_snapshot( | snp = archive.lookup_snapshot( | ||||
snapshot_id, | snapshot_id, | ||||
branches_from=f"refs/tags/{release_name}", | branches_from=f"refs/tags/{release_name}", | ||||
branches_count=1, | branches_count=1, | ||||
target_types=["release"], | target_types=["release"], | ||||
) | ) | ||||
except NotFoundExc: | except NotFoundExc: | ||||
snp = archive.lookup_snapshot( | snp = archive.lookup_snapshot( | ||||
snapshot_id, | snapshot_id, | ||||
branches_from=release_name, | branches_from=release_name, | ||||
branches_count=1, | branches_count=1, | ||||
target_types=["release"], | target_types=["release", "alias"], | ||||
) | ) | ||||
_, snp_release, _ = process_snapshot_branches(snp) | _, snp_release, _ = process_snapshot_branches(snp) | ||||
if snp_release and snp_release[0]["name"] == release_name: | if snp_release and snp_release[0]["name"] == release_name: | ||||
releases.append(snp_release[0]) | releases.append(snp_release[0]) | ||||
return snp_release[0] | return snp_release[0] | ||||
def _branch_not_found( | def _branch_not_found( | ||||
▲ Show 20 Lines • Show All 399 Lines • ▼ Show 20 Lines | ) -> SnapshotContext: | ||||
root_directory = None | root_directory = None | ||||
snapshot_total_size = snapshot_sizes["release"] + snapshot_sizes["revision"] | snapshot_total_size = snapshot_sizes["release"] + snapshot_sizes["revision"] | ||||
if path is not None: | if path is not None: | ||||
query_params["path"] = path | query_params["path"] = path | ||||
if snapshot_total_size and revision_id is not None: | if snapshot_total_size and revision_id is not None: | ||||
# browse specific revision for a snapshot requested | |||||
revision = archive.lookup_revision(revision_id) | revision = archive.lookup_revision(revision_id) | ||||
root_directory = revision["directory"] | root_directory = revision["directory"] | ||||
branches.append( | branches.append( | ||||
SnapshotBranchInfo( | SnapshotBranchInfo( | ||||
name=revision_id, | name=revision_id, | ||||
alias=False, | alias=False, | ||||
revision=revision_id, | revision=revision_id, | ||||
directory=root_directory, | directory=root_directory, | ||||
date=revision["date"], | date=revision["date"], | ||||
message=revision["message"], | message=revision["message"], | ||||
url=None, | url=None, | ||||
) | ) | ||||
) | ) | ||||
query_params["revision"] = revision_id | query_params["revision"] = revision_id | ||||
elif snapshot_total_size and release_name: | elif snapshot_total_size and release_name: | ||||
# browse specific release for a snapshot requested | |||||
release = _get_release(releases, release_name, snapshot_id) | release = _get_release(releases, release_name, snapshot_id) | ||||
if release is None: | if release is None: | ||||
_branch_not_found( | _branch_not_found( | ||||
"release", | "release", | ||||
release_name, | release_name, | ||||
snapshot_id, | snapshot_id, | ||||
snapshot_sizes, | snapshot_sizes, | ||||
origin_info, | origin_info, | ||||
timestamp, | timestamp, | ||||
visit_id, | visit_id, | ||||
) | ) | ||||
else: | else: | ||||
root_directory = release["directory"] | if release["target_type"] == "revision": | ||||
revision = archive.lookup_revision(release["target"]) | |||||
root_directory = revision["directory"] | |||||
revision_id = release["target"] | revision_id = release["target"] | ||||
elif release["target_type"] == "directory": | |||||
root_directory = release["target"] | |||||
release_id = release["id"] | release_id = release["id"] | ||||
query_params["release"] = release_name | query_params["release"] = release_name | ||||
elif snapshot_total_size: | elif snapshot_total_size: | ||||
head = aliases.get("HEAD") | |||||
if branch_name: | if branch_name: | ||||
# browse specific branch for a snapshot requested | |||||
query_params["branch"] = branch_name | query_params["branch"] = branch_name | ||||
branch = _get_branch(branches, branch_name or "HEAD", snapshot_id) | branch = _get_branch(branches, branch_name, snapshot_id) | ||||
if branch is None: | if branch is None: | ||||
_branch_not_found( | _branch_not_found( | ||||
"branch", | "branch", | ||||
branch_name, | branch_name, | ||||
snapshot_id, | snapshot_id, | ||||
snapshot_sizes, | snapshot_sizes, | ||||
origin_info, | origin_info, | ||||
timestamp, | timestamp, | ||||
visit_id, | visit_id, | ||||
) | ) | ||||
else: | else: | ||||
branch_name = branch["name"] | branch_name = branch["name"] | ||||
revision_id = branch["revision"] | revision_id = branch["revision"] | ||||
root_directory = branch["directory"] | root_directory = branch["directory"] | ||||
elif head is not None: | |||||
# otherwise, browse branch targeted by the HEAD alias if it exists | |||||
if head["target_type"] == "revision": | |||||
# HEAD alias targets a revision | |||||
head_rev = archive.lookup_revision(head["target"]) | |||||
branch_name = "HEAD" | |||||
revision_id = head_rev["id"] | |||||
root_directory = head_rev["directory"] | |||||
else: | |||||
# HEAD alias targets a release | |||||
vlorentz: are you sure it can't be a directory or content? | |||||
anlambertAuthorUnsubmitted Done Inline ActionsI did not see any HEAD alias pointing to other object types so far. anlambert: I did not see any HEAD alias pointing to other object types so far.
I could raise an exception… | |||||
vlorentzUnsubmitted Not Done Inline ActionsYeah, let's raise an exception, it will save us a few minutes if it ever shows up in Sentry vlorentz: Yeah, let's raise an exception, it will save us a few minutes if it ever shows up in Sentry | |||||
anlambertAuthorUnsubmitted Done Inline ActionsIn fact, we are good here as branch aliases targeting object types different from revision or release are already filtered out by the process_snapshot_branches function. So we can keep that code as it is. anlambert: In fact, we are good here as branch aliases targeting object types different from revision or… | |||||
release_name = archive.lookup_release(head["target"])["name"] | |||||
head_rel = _get_release(releases, release_name, snapshot_id) | |||||
if head_rel["target_type"] == "revision": | |||||
revision = archive.lookup_revision(head_rel["target"]) | |||||
root_directory = revision["directory"] | |||||
revision_id = head_rel["target"] | |||||
elif head_rel["target_type"] == "directory": | |||||
root_directory = head_rel["target"] | |||||
release_id = head_rel["id"] | |||||
elif branches: | |||||
# fallback to browse first branch otherwise | |||||
branch = branches[0] | |||||
branch_name = branch["name"] | |||||
revision_id = branch["revision"] | |||||
root_directory = branch["directory"] | |||||
elif releases: | |||||
Not Done Inline ActionsWouldn't it make sense to use the last one? If releases are named after versions, the last one is more likely the be the most recent, which is the "best" one vlorentz: Wouldn't it make sense to use the last one? If releases are named after versions, the last one… | |||||
Done Inline ActionsIndeed it makes sense, will update. anlambert: Indeed it makes sense, will update. | |||||
# fallback to browse last release otherwise | |||||
release = releases[-1] | |||||
if release["target_type"] == "revision": | |||||
revision = archive.lookup_revision(release["target"]) | |||||
root_directory = revision["directory"] | |||||
revision_id = release["target"] | |||||
elif release["target_type"] == "directory": | |||||
root_directory = release["target"] | |||||
release_id = release["id"] | |||||
release_name = release["name"] | |||||
for b in branches: | for b in branches: | ||||
branch_query_params = dict(query_params) | branch_query_params = dict(query_params) | ||||
branch_query_params.pop("release", None) | branch_query_params.pop("release", None) | ||||
if b["name"] != b["revision"]: | if b["name"] != b["revision"]: | ||||
branch_query_params.pop("revision", None) | branch_query_params.pop("revision", None) | ||||
branch_query_params["branch"] = b["name"] | branch_query_params["branch"] = b["name"] | ||||
b["url"] = reverse( | b["url"] = reverse( | ||||
▲ Show 20 Lines • Show All 907 Lines • Show Last 20 Lines |
are you sure it can't be a directory or content?