Changeset View
Changeset View
Standalone View
Standalone View
swh/web/browse/snapshot_context.py
Show All 26 Lines | from swh.web.browse.utils import ( | ||||
gen_revision_url, | gen_revision_url, | ||||
gen_snapshot_link, | gen_snapshot_link, | ||||
get_directory_entries, | get_directory_entries, | ||||
get_readme_to_display, | get_readme_to_display, | ||||
prepare_content_for_display, | prepare_content_for_display, | ||||
request_content, | request_content, | ||||
) | ) | ||||
from swh.web.common import archive, highlightjs | from swh.web.common import archive, highlightjs | ||||
from swh.web.common.exc import BadInputExc, NotFoundExc, handle_view_exception | from swh.web.common.exc import BadInputExc, NotFoundExc | ||||
from swh.web.common.identifiers import get_swhids_info | from swh.web.common.identifiers import get_swhids_info | ||||
from swh.web.common.origin_visits import get_origin_visit | from swh.web.common.origin_visits import get_origin_visit | ||||
from swh.web.common.typing import ( | from swh.web.common.typing import ( | ||||
ContentMetadata, | ContentMetadata, | ||||
DirectoryMetadata, | DirectoryMetadata, | ||||
OriginInfo, | OriginInfo, | ||||
SnapshotBranchInfo, | SnapshotBranchInfo, | ||||
SnapshotContext, | SnapshotContext, | ||||
▲ Show 20 Lines • Show All 621 Lines • ▼ Show 20 Lines | |||||
def browse_snapshot_directory( | def browse_snapshot_directory( | ||||
request, snapshot_id=None, origin_url=None, timestamp=None, path=None | request, snapshot_id=None, origin_url=None, timestamp=None, path=None | ||||
): | ): | ||||
""" | """ | ||||
Django view implementation for browsing a directory in a snapshot context. | Django view implementation for browsing a directory in a snapshot context. | ||||
""" | """ | ||||
try: | |||||
_check_origin_url(snapshot_id, origin_url) | _check_origin_url(snapshot_id, origin_url) | ||||
snapshot_context = get_snapshot_context( | snapshot_context = get_snapshot_context( | ||||
snapshot_id=snapshot_id, | snapshot_id=snapshot_id, | ||||
origin_url=origin_url, | origin_url=origin_url, | ||||
timestamp=timestamp, | timestamp=timestamp, | ||||
visit_id=request.GET.get("visit_id"), | visit_id=request.GET.get("visit_id"), | ||||
path=path, | path=path, | ||||
browse_context="directory", | browse_context="directory", | ||||
branch_name=request.GET.get("branch"), | branch_name=request.GET.get("branch"), | ||||
release_name=request.GET.get("release"), | release_name=request.GET.get("release"), | ||||
revision_id=request.GET.get("revision"), | revision_id=request.GET.get("revision"), | ||||
) | ) | ||||
root_directory = snapshot_context["root_directory"] | root_directory = snapshot_context["root_directory"] | ||||
sha1_git = root_directory | sha1_git = root_directory | ||||
if root_directory and path: | if root_directory and path: | ||||
dir_info = archive.lookup_directory_with_path(root_directory, path) | dir_info = archive.lookup_directory_with_path(root_directory, path) | ||||
sha1_git = dir_info["target"] | sha1_git = dir_info["target"] | ||||
dirs = [] | dirs = [] | ||||
files = [] | files = [] | ||||
if sha1_git: | if sha1_git: | ||||
dirs, files = get_directory_entries(sha1_git) | dirs, files = get_directory_entries(sha1_git) | ||||
except Exception as exc: | |||||
return handle_view_exception(request, exc) | |||||
origin_info = snapshot_context["origin_info"] | origin_info = snapshot_context["origin_info"] | ||||
visit_info = snapshot_context["visit_info"] | visit_info = snapshot_context["visit_info"] | ||||
url_args = snapshot_context["url_args"] | url_args = snapshot_context["url_args"] | ||||
query_params = dict(snapshot_context["query_params"]) | query_params = dict(snapshot_context["query_params"]) | ||||
revision_id = snapshot_context["revision_id"] | revision_id = snapshot_context["revision_id"] | ||||
snapshot_id = snapshot_context["snapshot_id"] | snapshot_id = snapshot_context["snapshot_id"] | ||||
if origin_info: | if origin_info: | ||||
▲ Show 20 Lines • Show All 164 Lines • ▼ Show 20 Lines | def browse_snapshot_content( | ||||
origin_url=None, | origin_url=None, | ||||
timestamp=None, | timestamp=None, | ||||
path=None, | path=None, | ||||
selected_language=None, | selected_language=None, | ||||
): | ): | ||||
""" | """ | ||||
Django view implementation for browsing a content in a snapshot context. | Django view implementation for browsing a content in a snapshot context. | ||||
""" | """ | ||||
try: | |||||
_check_origin_url(snapshot_id, origin_url) | _check_origin_url(snapshot_id, origin_url) | ||||
if path is None: | if path is None: | ||||
raise BadInputExc("The path of a content must be given as query parameter.") | raise BadInputExc("The path of a content must be given as query parameter.") | ||||
snapshot_context = get_snapshot_context( | snapshot_context = get_snapshot_context( | ||||
snapshot_id=snapshot_id, | snapshot_id=snapshot_id, | ||||
origin_url=origin_url, | origin_url=origin_url, | ||||
timestamp=timestamp, | timestamp=timestamp, | ||||
visit_id=request.GET.get("visit_id"), | visit_id=request.GET.get("visit_id"), | ||||
path=path, | path=path, | ||||
browse_context="content", | browse_context="content", | ||||
branch_name=request.GET.get("branch"), | branch_name=request.GET.get("branch"), | ||||
release_name=request.GET.get("release"), | release_name=request.GET.get("release"), | ||||
revision_id=request.GET.get("revision"), | revision_id=request.GET.get("revision"), | ||||
) | ) | ||||
root_directory = snapshot_context["root_directory"] | root_directory = snapshot_context["root_directory"] | ||||
sha1_git = None | sha1_git = None | ||||
query_string = None | query_string = None | ||||
content_data = {} | content_data = {} | ||||
directory_id = None | directory_id = None | ||||
split_path = path.split("/") | split_path = path.split("/") | ||||
filename = split_path[-1] | filename = split_path[-1] | ||||
filepath = path[: -len(filename)] | filepath = path[: -len(filename)] | ||||
if root_directory: | if root_directory: | ||||
content_info = archive.lookup_directory_with_path(root_directory, path) | content_info = archive.lookup_directory_with_path(root_directory, path) | ||||
sha1_git = content_info["target"] | sha1_git = content_info["target"] | ||||
query_string = "sha1_git:" + sha1_git | query_string = "sha1_git:" + sha1_git | ||||
content_data = request_content(query_string, raise_if_unavailable=False) | content_data = request_content(query_string, raise_if_unavailable=False) | ||||
if filepath: | if filepath: | ||||
dir_info = archive.lookup_directory_with_path(root_directory, filepath) | dir_info = archive.lookup_directory_with_path(root_directory, filepath) | ||||
directory_id = dir_info["target"] | directory_id = dir_info["target"] | ||||
else: | else: | ||||
directory_id = root_directory | directory_id = root_directory | ||||
except Exception as exc: | |||||
return handle_view_exception(request, exc) | |||||
revision_id = snapshot_context["revision_id"] | revision_id = snapshot_context["revision_id"] | ||||
origin_info = snapshot_context["origin_info"] | origin_info = snapshot_context["origin_info"] | ||||
visit_info = snapshot_context["visit_info"] | visit_info = snapshot_context["visit_info"] | ||||
snapshot_id = snapshot_context["snapshot_id"] | snapshot_id = snapshot_context["snapshot_id"] | ||||
if content_data.get("raw_data") is not None: | if content_data.get("raw_data") is not None: | ||||
content_display_data = prepare_content_for_display( | content_display_data = prepare_content_for_display( | ||||
content_data["raw_data"], content_data["mimetype"], path | content_data["raw_data"], content_data["mimetype"], path | ||||
▲ Show 20 Lines • Show All 128 Lines • ▼ Show 20 Lines | |||||
PER_PAGE = 100 | PER_PAGE = 100 | ||||
def browse_snapshot_log(request, snapshot_id=None, origin_url=None, timestamp=None): | def browse_snapshot_log(request, snapshot_id=None, origin_url=None, timestamp=None): | ||||
""" | """ | ||||
Django view implementation for browsing a revision history in a | Django view implementation for browsing a revision history in a | ||||
snapshot context. | snapshot context. | ||||
""" | """ | ||||
try: | |||||
_check_origin_url(snapshot_id, origin_url) | _check_origin_url(snapshot_id, origin_url) | ||||
snapshot_context = get_snapshot_context( | snapshot_context = get_snapshot_context( | ||||
snapshot_id=snapshot_id, | snapshot_id=snapshot_id, | ||||
origin_url=origin_url, | origin_url=origin_url, | ||||
timestamp=timestamp, | timestamp=timestamp, | ||||
visit_id=request.GET.get("visit_id"), | visit_id=request.GET.get("visit_id"), | ||||
browse_context="log", | browse_context="log", | ||||
branch_name=request.GET.get("branch"), | branch_name=request.GET.get("branch"), | ||||
release_name=request.GET.get("release"), | release_name=request.GET.get("release"), | ||||
revision_id=request.GET.get("revision"), | revision_id=request.GET.get("revision"), | ||||
) | ) | ||||
revision_id = snapshot_context["revision_id"] | revision_id = snapshot_context["revision_id"] | ||||
per_page = int(request.GET.get("per_page", PER_PAGE)) | per_page = int(request.GET.get("per_page", PER_PAGE)) | ||||
offset = int(request.GET.get("offset", 0)) | offset = int(request.GET.get("offset", 0)) | ||||
revs_ordering = request.GET.get("revs_ordering", "committer_date") | revs_ordering = request.GET.get("revs_ordering", "committer_date") | ||||
session_key = "rev_%s_log_ordering_%s" % (revision_id, revs_ordering) | session_key = "rev_%s_log_ordering_%s" % (revision_id, revs_ordering) | ||||
rev_log_session = request.session.get(session_key, None) | rev_log_session = request.session.get(session_key, None) | ||||
rev_log = [] | rev_log = [] | ||||
revs_walker_state = None | revs_walker_state = None | ||||
if rev_log_session: | if rev_log_session: | ||||
rev_log = rev_log_session["rev_log"] | rev_log = rev_log_session["rev_log"] | ||||
revs_walker_state = rev_log_session["revs_walker_state"] | revs_walker_state = rev_log_session["revs_walker_state"] | ||||
if len(rev_log) < offset + per_page: | if len(rev_log) < offset + per_page: | ||||
revs_walker = archive.get_revisions_walker( | revs_walker = archive.get_revisions_walker( | ||||
revs_ordering, | revs_ordering, | ||||
revision_id, | revision_id, | ||||
max_revs=offset + per_page + 1, | max_revs=offset + per_page + 1, | ||||
state=revs_walker_state, | state=revs_walker_state, | ||||
) | ) | ||||
rev_log += [rev["id"] for rev in revs_walker] | rev_log += [rev["id"] for rev in revs_walker] | ||||
revs_walker_state = revs_walker.export_state() | revs_walker_state = revs_walker.export_state() | ||||
revs = rev_log[offset : offset + per_page] | revs = rev_log[offset : offset + per_page] | ||||
revision_log = archive.lookup_revision_multiple(revs) | revision_log = archive.lookup_revision_multiple(revs) | ||||
request.session[session_key] = { | request.session[session_key] = { | ||||
"rev_log": rev_log, | "rev_log": rev_log, | ||||
"revs_walker_state": revs_walker_state, | "revs_walker_state": revs_walker_state, | ||||
} | } | ||||
except Exception as exc: | |||||
return handle_view_exception(request, exc) | |||||
origin_info = snapshot_context["origin_info"] | origin_info = snapshot_context["origin_info"] | ||||
visit_info = snapshot_context["visit_info"] | visit_info = snapshot_context["visit_info"] | ||||
url_args = snapshot_context["url_args"] | url_args = snapshot_context["url_args"] | ||||
query_params = snapshot_context["query_params"] | query_params = snapshot_context["query_params"] | ||||
snapshot_id = snapshot_context["snapshot_id"] | snapshot_id = snapshot_context["snapshot_id"] | ||||
query_params["per_page"] = per_page | query_params["per_page"] = per_page | ||||
revs_ordering = request.GET.get("revs_ordering", "") | revs_ordering = request.GET.get("revs_ordering", "") | ||||
▲ Show 20 Lines • Show All 80 Lines • ▼ Show 20 Lines | |||||
def browse_snapshot_branches( | def browse_snapshot_branches( | ||||
request, snapshot_id=None, origin_url=None, timestamp=None | request, snapshot_id=None, origin_url=None, timestamp=None | ||||
): | ): | ||||
""" | """ | ||||
Django view implementation for browsing a list of branches in a snapshot | Django view implementation for browsing a list of branches in a snapshot | ||||
context. | context. | ||||
""" | """ | ||||
try: | |||||
_check_origin_url(snapshot_id, origin_url) | _check_origin_url(snapshot_id, origin_url) | ||||
snapshot_context = get_snapshot_context( | snapshot_context = get_snapshot_context( | ||||
snapshot_id=snapshot_id, | snapshot_id=snapshot_id, | ||||
origin_url=origin_url, | origin_url=origin_url, | ||||
timestamp=timestamp, | timestamp=timestamp, | ||||
visit_id=request.GET.get("visit_id"), | visit_id=request.GET.get("visit_id"), | ||||
) | ) | ||||
branches_bc = request.GET.get("branches_breadcrumbs", "") | branches_bc = request.GET.get("branches_breadcrumbs", "") | ||||
branches_bc = branches_bc.split(",") if branches_bc else [] | branches_bc = branches_bc.split(",") if branches_bc else [] | ||||
branches_from = branches_bc[-1] if branches_bc else "" | branches_from = branches_bc[-1] if branches_bc else "" | ||||
origin_info = snapshot_context["origin_info"] | origin_info = snapshot_context["origin_info"] | ||||
url_args = snapshot_context["url_args"] | url_args = snapshot_context["url_args"] | ||||
query_params = snapshot_context["query_params"] | query_params = snapshot_context["query_params"] | ||||
if origin_info: | if origin_info: | ||||
browse_view_name = "browse-origin-directory" | browse_view_name = "browse-origin-directory" | ||||
else: | else: | ||||
browse_view_name = "browse-snapshot-directory" | browse_view_name = "browse-snapshot-directory" | ||||
snapshot = archive.lookup_snapshot( | snapshot = archive.lookup_snapshot( | ||||
snapshot_context["snapshot_id"], | snapshot_context["snapshot_id"], | ||||
branches_from, | branches_from, | ||||
PER_PAGE + 1, | PER_PAGE + 1, | ||||
target_types=["revision", "alias"], | target_types=["revision", "alias"], | ||||
) | ) | ||||
displayed_branches, _ = process_snapshot_branches(snapshot) | displayed_branches, _ = process_snapshot_branches(snapshot) | ||||
except Exception as exc: | |||||
return handle_view_exception(request, exc) | |||||
for branch in displayed_branches: | for branch in displayed_branches: | ||||
rev_query_params = {} | rev_query_params = {} | ||||
if origin_info: | if origin_info: | ||||
rev_query_params["origin_url"] = origin_info["url"] | rev_query_params["origin_url"] = origin_info["url"] | ||||
revision_url = reverse( | revision_url = reverse( | ||||
"browse-revision", | "browse-revision", | ||||
url_args={"sha1_git": branch["revision"]}, | url_args={"sha1_git": branch["revision"]}, | ||||
▲ Show 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | |||||
def browse_snapshot_releases( | def browse_snapshot_releases( | ||||
request, snapshot_id=None, origin_url=None, timestamp=None | request, snapshot_id=None, origin_url=None, timestamp=None | ||||
): | ): | ||||
""" | """ | ||||
Django view implementation for browsing a list of releases in a snapshot | Django view implementation for browsing a list of releases in a snapshot | ||||
context. | context. | ||||
""" | """ | ||||
try: | |||||
_check_origin_url(snapshot_id, origin_url) | _check_origin_url(snapshot_id, origin_url) | ||||
snapshot_context = get_snapshot_context( | snapshot_context = get_snapshot_context( | ||||
snapshot_id=snapshot_id, | snapshot_id=snapshot_id, | ||||
origin_url=origin_url, | origin_url=origin_url, | ||||
timestamp=timestamp, | timestamp=timestamp, | ||||
visit_id=request.GET.get("visit_id"), | visit_id=request.GET.get("visit_id"), | ||||
) | ) | ||||
rel_bc = request.GET.get("releases_breadcrumbs", "") | rel_bc = request.GET.get("releases_breadcrumbs", "") | ||||
rel_bc = rel_bc.split(",") if rel_bc else [] | rel_bc = rel_bc.split(",") if rel_bc else [] | ||||
rel_from = rel_bc[-1] if rel_bc else "" | rel_from = rel_bc[-1] if rel_bc else "" | ||||
origin_info = snapshot_context["origin_info"] | origin_info = snapshot_context["origin_info"] | ||||
url_args = snapshot_context["url_args"] | url_args = snapshot_context["url_args"] | ||||
query_params = snapshot_context["query_params"] | query_params = snapshot_context["query_params"] | ||||
snapshot = archive.lookup_snapshot( | snapshot = archive.lookup_snapshot( | ||||
snapshot_context["snapshot_id"], | snapshot_context["snapshot_id"], | ||||
rel_from, | rel_from, | ||||
PER_PAGE + 1, | PER_PAGE + 1, | ||||
target_types=["release", "alias"], | target_types=["release", "alias"], | ||||
) | ) | ||||
_, displayed_releases = process_snapshot_branches(snapshot) | _, displayed_releases = process_snapshot_branches(snapshot) | ||||
except Exception as exc: | |||||
return handle_view_exception(request, exc) | |||||
for release in displayed_releases: | for release in displayed_releases: | ||||
query_params_tgt = {"snapshot": snapshot_id} | query_params_tgt = {"snapshot": snapshot_id} | ||||
if origin_info: | if origin_info: | ||||
query_params_tgt["origin_url"] = origin_info["url"] | query_params_tgt["origin_url"] = origin_info["url"] | ||||
release_url = reverse( | release_url = reverse( | ||||
"browse-release", | "browse-release", | ||||
url_args={"sha1_git": release["id"]}, | url_args={"sha1_git": release["id"]}, | ||||
▲ Show 20 Lines • Show All 86 Lines • Show Last 20 Lines |