diff --git a/swh/web/browse/views/utils/snapshot_context.py b/swh/web/browse/snapshot_context.py similarity index 74% rename from swh/web/browse/views/utils/snapshot_context.py rename to swh/web/browse/snapshot_context.py index f620f7e4..a8cb018a 100644 --- a/swh/web/browse/views/utils/snapshot_context.py +++ b/swh/web/browse/snapshot_context.py @@ -1,1065 +1,1390 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -# Utility module implementing Django views for browsing the archive -# in a snapshot context. -# Its purpose is to factorize code for the views reachable from the -# /origin/.* and /snapshot/.* endpoints. +# Utility module for browsing the archive in a snapshot context. +from collections import defaultdict + + +from django.core.cache import cache from django.shortcuts import render from django.template.defaultfilters import filesizeformat from django.utils.html import escape import sentry_sdk -from swh.model.identifiers import snapshot_identifier +from swh.model.identifiers import persistent_identifier, snapshot_identifier from swh.web.browse.utils import ( - get_snapshot_context, get_directory_entries, gen_directory_link, gen_revision_link, request_content, gen_content_link, prepare_content_for_display, content_display_max_size, format_log_entries, gen_revision_log_link, gen_release_link, get_readme_to_display, get_swh_persistent_ids, gen_snapshot_link, - process_snapshot_branches, ) from swh.web.common import service, highlightjs from swh.web.common.exc import handle_view_exception, NotFoundExc +from swh.web.common.origin_visits import get_origin_visit from swh.web.common.utils import ( reverse, gen_path_info, format_utc_iso_date, swh_object_icons, ) +from swh.web.config import get_config + _empty_snapshot_id = snapshot_identifier({"branches": {}}) def _get_branch(branches, branch_name, snapshot_id): """ Utility function to get a specific branch from a branches list. Its purpose is to get the default HEAD branch as some software origin (e.g those with svn type) does not have it. In that latter case, check if there is a master branch instead and returns it. """ filtered_branches = [b for b in branches if b["name"] == branch_name] if filtered_branches: return filtered_branches[0] elif branch_name == "HEAD": filtered_branches = [b for b in branches if b["name"].endswith("master")] if filtered_branches: return filtered_branches[0] elif branches: return branches[0] else: # case where a large branches list has been truncated snp = service.lookup_snapshot( snapshot_id, branches_from=branch_name, branches_count=1, target_types=["revision", "alias"], ) snp_branch, _ = process_snapshot_branches(snp) if snp_branch and snp_branch[0]["name"] == branch_name: branches.append(snp_branch[0]) return snp_branch[0] def _get_release(releases, release_name, snapshot_id): """ Utility function to get a specific release from a releases list. Returns None if the release can not be found in the list. """ filtered_releases = [r for r in releases if r["name"] == release_name] if filtered_releases: return filtered_releases[0] else: # case where a large branches list has been truncated for branch_name in (release_name, f"refs/tags/{release_name}"): snp = service.lookup_snapshot( snapshot_id, branches_from=branch_name, branches_count=1, target_types=["release"], ) _, snp_release = process_snapshot_branches(snp) if snp_release and snp_release[0]["name"] == release_name: releases.append(snp_release[0]) return snp_release[0] def _branch_not_found( branch_type, branch, snapshot_id, snapshot_sizes, origin_info, timestamp, visit_id ): """ Utility function to raise an exception when a specified branch/release can not be found. """ if branch_type == "branch": branch_type = "Branch" branch_type_plural = "branches" target_type = "revision" else: branch_type = "Release" branch_type_plural = "releases" target_type = "release" if snapshot_id and snapshot_sizes[target_type] == 0: msg = "Snapshot with id %s has an empty list" " of %s!" % ( snapshot_id, branch_type_plural, ) elif snapshot_id: msg = "%s %s for snapshot with id %s" " not found!" % ( branch_type, branch, snapshot_id, ) elif visit_id and snapshot_sizes[target_type] == 0: msg = ( "Origin with url %s" " for visit with id %s has an empty list" " of %s!" % (origin_info["url"], visit_id, branch_type_plural) ) elif visit_id: msg = ( "%s %s associated to visit with" " id %s for origin with url %s" " not found!" % (branch_type, branch, visit_id, origin_info["url"]) ) elif snapshot_sizes[target_type] == 0: msg = ( "Origin with url %s" " for visit with timestamp %s has an empty list" " of %s!" % (origin_info["url"], timestamp, branch_type_plural) ) else: msg = ( "%s %s associated to visit with" " timestamp %s for origin with " "url %s not found!" % (branch_type, branch, timestamp, origin_info["url"]) ) raise NotFoundExc(escape(msg)) +def process_snapshot_branches(snapshot): + """ + Process a dictionary describing snapshot branches: extract those + targeting revisions and releases, put them in two different lists, + then sort those lists in lexicographical order of the branches' names. + + Args: + snapshot_branches (dict): A dict describing the branches of a snapshot + as returned for instance by + :func:`swh.web.common.service.lookup_snapshot` + + Returns: + tuple: A tuple whose first member is the sorted list of branches + targeting revisions and second member the sorted list of branches + targeting releases + """ + snapshot_branches = snapshot["branches"] + branches = {} + branch_aliases = {} + releases = {} + revision_to_branch = defaultdict(set) + revision_to_release = defaultdict(set) + release_to_branch = defaultdict(set) + for branch_name, target in snapshot_branches.items(): + if not target: + # FIXME: display branches with an unknown target anyway + continue + target_id = target["target"] + target_type = target["target_type"] + if target_type == "revision": + branches[branch_name] = { + "name": branch_name, + "revision": target_id, + } + revision_to_branch[target_id].add(branch_name) + elif target_type == "release": + release_to_branch[target_id].add(branch_name) + elif target_type == "alias": + branch_aliases[branch_name] = target_id + # FIXME: handle pointers to other object types + + def _enrich_release_branch(branch, release): + releases[branch] = { + "name": release["name"], + "branch_name": branch, + "date": format_utc_iso_date(release["date"]), + "id": release["id"], + "message": release["message"], + "target_type": release["target_type"], + "target": release["target"], + } + + def _enrich_revision_branch(branch, revision): + branches[branch].update( + { + "revision": revision["id"], + "directory": revision["directory"], + "date": format_utc_iso_date(revision["date"]), + "message": revision["message"], + } + ) + + releases_info = service.lookup_release_multiple(release_to_branch.keys()) + for release in releases_info: + branches_to_update = release_to_branch[release["id"]] + for branch in branches_to_update: + _enrich_release_branch(branch, release) + if release["target_type"] == "revision": + revision_to_release[release["target"]].update(branches_to_update) + + revisions = service.lookup_revision_multiple( + set(revision_to_branch.keys()) | set(revision_to_release.keys()) + ) + + for revision in revisions: + if not revision: + continue + for branch in revision_to_branch[revision["id"]]: + _enrich_revision_branch(branch, revision) + for release in revision_to_release[revision["id"]]: + releases[release]["directory"] = revision["directory"] + + for branch_alias, branch_target in branch_aliases.items(): + if branch_target in branches: + branches[branch_alias] = dict(branches[branch_target]) + else: + snp = service.lookup_snapshot( + snapshot["id"], branches_from=branch_target, branches_count=1 + ) + if snp and branch_target in snp["branches"]: + + if snp["branches"][branch_target] is None: + continue + + target_type = snp["branches"][branch_target]["target_type"] + target = snp["branches"][branch_target]["target"] + if target_type == "revision": + branches[branch_alias] = snp["branches"][branch_target] + revision = service.lookup_revision(target) + _enrich_revision_branch(branch_alias, revision) + elif target_type == "release": + release = service.lookup_release(target) + _enrich_release_branch(branch_alias, release) + + if branch_alias in branches: + branches[branch_alias]["name"] = branch_alias + + ret_branches = list(sorted(branches.values(), key=lambda b: b["name"])) + ret_releases = list(sorted(releases.values(), key=lambda b: b["name"])) + + return ret_branches, ret_releases + + +def get_snapshot_content(snapshot_id): + """Returns the lists of branches and releases + associated to a swh snapshot. + That list is put in cache in order to speedup the navigation + in the swh-web/browse ui. + + .. warning:: At most 1000 branches contained in the snapshot + will be returned for performance reasons. + + Args: + snapshot_id (str): hexadecimal representation of the snapshot + identifier + + Returns: + A tuple with two members. The first one is a list of dict describing + the snapshot branches. The second one is a list of dict describing the + snapshot releases. + + Raises: + NotFoundExc if the snapshot does not exist + """ + cache_entry_id = "swh_snapshot_%s" % snapshot_id + cache_entry = cache.get(cache_entry_id) + + if cache_entry: + return cache_entry["branches"], cache_entry["releases"] + + branches = [] + releases = [] + + snapshot_content_max_size = get_config()["snapshot_content_max_size"] + + if snapshot_id: + snapshot = service.lookup_snapshot( + snapshot_id, branches_count=snapshot_content_max_size + ) + branches, releases = process_snapshot_branches(snapshot) + + cache.set(cache_entry_id, {"branches": branches, "releases": releases,}) + + return branches, releases + + +def get_origin_visit_snapshot( + origin_info, visit_ts=None, visit_id=None, snapshot_id=None +): + """Returns the lists of branches and releases + associated to a swh origin for a given visit. + The visit is expressed by a timestamp. In the latter case, + the closest visit from the provided timestamp will be used. + If no visit parameter is provided, it returns the list of branches + found for the latest visit. + That list is put in cache in order to speedup the navigation + in the swh-web/browse ui. + + .. warning:: At most 1000 branches contained in the snapshot + will be returned for performance reasons. + + Args: + origin_info (dict): a dict filled with origin information + (id, url, type) + visit_ts (int or str): an ISO date string or Unix timestamp to parse + visit_id (int): optional visit id for disambiguation in case + several visits have the same timestamp + + Returns: + A tuple with two members. The first one is a list of dict describing + the origin branches for the given visit. + The second one is a list of dict describing the origin releases + for the given visit. + + Raises: + NotFoundExc if the origin or its visit are not found + """ + + visit_info = get_origin_visit(origin_info, visit_ts, visit_id, snapshot_id) + + return get_snapshot_content(visit_info["snapshot"]) + + +def get_snapshot_context( + snapshot_id=None, origin_url=None, timestamp=None, visit_id=None +): + """ + Utility function to compute relevant information when navigating + the archive in a snapshot context. The snapshot is either + referenced by its id or it will be retrieved from an origin visit. + + Args: + snapshot_id (str): hexadecimal representation of a snapshot identifier, + all other parameters will be ignored if it is provided + origin_url (str): the origin_url + (e.g. https://github.com/(user)/(repo)/) + timestamp (str): a datetime string for retrieving the closest + visit of the origin + visit_id (int): optional visit id for disambiguation in case + of several visits with the same timestamp + + Returns: + A dict with the following entries: + * origin_info: dict containing origin information + * visit_info: dict containing visit information + * branches: the list of branches for the origin found + during the visit + * releases: the list of releases for the origin found + during the visit + * origin_browse_url: the url to browse the origin + * origin_branches_url: the url to browse the origin branches + * origin_releases_url': the url to browse the origin releases + * origin_visit_url: the url to browse the snapshot of the origin + found during the visit + * url_args: dict containing url arguments to use when browsing in + the context of the origin and its visit + + Raises: + swh.web.common.exc.NotFoundExc: if no snapshot is found for the visit + of an origin. + """ + origin_info = None + visit_info = None + url_args = None + query_params = {} + branches = [] + releases = [] + browse_url = None + visit_url = None + branches_url = None + releases_url = None + swh_type = "snapshot" + if origin_url: + swh_type = "origin" + origin_info = service.lookup_origin({"url": origin_url}) + + visit_info = get_origin_visit(origin_info, timestamp, visit_id, snapshot_id) + fmt_date = format_utc_iso_date(visit_info["date"]) + visit_info["fmt_date"] = fmt_date + snapshot_id = visit_info["snapshot"] + + if not snapshot_id: + raise NotFoundExc( + "No snapshot associated to the visit of origin " + "%s on %s" % (escape(origin_url), fmt_date) + ) + + # provided timestamp is not necessarily equals to the one + # of the retrieved visit, so get the exact one in order + # use it in the urls generated below + if timestamp: + timestamp = visit_info["date"] + + branches, releases = get_origin_visit_snapshot( + origin_info, timestamp, visit_id, snapshot_id + ) + + url_args = {"origin_url": origin_info["url"]} + + query_params = {"visit_id": visit_id} + + browse_url = reverse("browse-origin-visits", url_args=url_args) + + if timestamp: + url_args["timestamp"] = format_utc_iso_date(timestamp, "%Y-%m-%dT%H:%M:%S") + visit_url = reverse( + "browse-origin-directory", url_args=url_args, query_params=query_params + ) + visit_info["url"] = visit_url + + branches_url = reverse( + "browse-origin-branches", url_args=url_args, query_params=query_params + ) + + releases_url = reverse( + "browse-origin-releases", url_args=url_args, query_params=query_params + ) + elif snapshot_id: + branches, releases = get_snapshot_content(snapshot_id) + url_args = {"snapshot_id": snapshot_id} + browse_url = reverse("browse-snapshot", url_args=url_args) + branches_url = reverse("browse-snapshot-branches", url_args=url_args) + + releases_url = reverse("browse-snapshot-releases", url_args=url_args) + + releases = list(reversed(releases)) + + snapshot_sizes = service.lookup_snapshot_sizes(snapshot_id) + + is_empty = sum(snapshot_sizes.values()) == 0 + + swh_snp_id = persistent_identifier("snapshot", snapshot_id) + + return { + "swh_type": swh_type, + "swh_object_id": swh_snp_id, + "snapshot_id": snapshot_id, + "snapshot_sizes": snapshot_sizes, + "is_empty": is_empty, + "origin_info": origin_info, + "visit_info": visit_info, + "branches": branches, + "releases": releases, + "branch": None, + "release": None, + "browse_url": browse_url, + "branches_url": branches_url, + "releases_url": releases_url, + "url_args": url_args, + "query_params": query_params, + } + + def _process_snapshot_request( request, snapshot_id=None, origin_url=None, timestamp=None, path=None, browse_context="directory", ): """ Utility function to perform common input request processing for snapshot context views. """ visit_id = request.GET.get("visit_id", None) snapshot_context = get_snapshot_context( snapshot_id, origin_url, timestamp, visit_id ) swh_type = snapshot_context["swh_type"] origin_info = snapshot_context["origin_info"] branches = snapshot_context["branches"] releases = snapshot_context["releases"] url_args = snapshot_context["url_args"] query_params = snapshot_context["query_params"] if snapshot_context["visit_info"]: timestamp = format_utc_iso_date( snapshot_context["visit_info"]["date"], "%Y-%m-%dT%H:%M:%SZ" ) snapshot_context["timestamp"] = format_utc_iso_date( snapshot_context["visit_info"]["date"] ) browse_view_name = "browse-" + swh_type + "-" + browse_context root_sha1_git = None revision_id = request.GET.get("revision", None) release_name = request.GET.get("release", None) release_id = None branch_name = None snapshot_sizes = snapshot_context["snapshot_sizes"] snapshot_total_size = sum(snapshot_sizes.values()) if snapshot_total_size and revision_id: revision = service.lookup_revision(revision_id) root_sha1_git = revision["directory"] branches.append( { "name": revision_id, "revision": revision_id, "directory": root_sha1_git, "url": None, } ) branch_name = revision_id query_params["revision"] = revision_id elif snapshot_total_size and release_name: release = _get_release(releases, release_name, snapshot_context["snapshot_id"]) try: root_sha1_git = release["directory"] revision_id = release["target"] release_id = release["id"] query_params["release"] = release_name except Exception as exc: sentry_sdk.capture_exception(exc) _branch_not_found( "release", release_name, snapshot_id, snapshot_sizes, origin_info, timestamp, visit_id, ) elif snapshot_total_size: branch_name = request.GET.get("branch", None) if branch_name: query_params["branch"] = branch_name branch = _get_branch( branches, branch_name or "HEAD", snapshot_context["snapshot_id"] ) try: branch_name = branch["name"] revision_id = branch["revision"] root_sha1_git = branch["directory"] except Exception as exc: sentry_sdk.capture_exception(exc) _branch_not_found( "branch", branch_name, snapshot_id, snapshot_sizes, origin_info, timestamp, visit_id, ) for b in branches: branch_url_args = dict(url_args) branch_query_params = dict(query_params) if "release" in branch_query_params: del branch_query_params["release"] branch_query_params["branch"] = b["name"] if path: b["path"] = path branch_url_args["path"] = path b["url"] = reverse( browse_view_name, url_args=branch_url_args, query_params=branch_query_params ) for r in releases: release_url_args = dict(url_args) release_query_params = dict(query_params) if "branch" in release_query_params: del release_query_params["branch"] release_query_params["release"] = r["name"] if path: r["path"] = path release_url_args["path"] = path r["url"] = reverse( browse_view_name, url_args=release_url_args, query_params=release_query_params, ) snapshot_context["query_params"] = query_params snapshot_context["root_sha1_git"] = root_sha1_git snapshot_context["revision_id"] = revision_id snapshot_context["branch"] = branch_name snapshot_context["release"] = release_name snapshot_context["release_id"] = release_id return snapshot_context def browse_snapshot_directory( request, snapshot_id=None, origin_url=None, timestamp=None, path=None ): """ Django view implementation for browsing a directory in a snapshot context. """ try: snapshot_context = _process_snapshot_request( request, snapshot_id, origin_url, timestamp, path, browse_context="directory", ) root_sha1_git = snapshot_context["root_sha1_git"] sha1_git = root_sha1_git if root_sha1_git and path: dir_info = service.lookup_directory_with_path(root_sha1_git, path) sha1_git = dir_info["target"] dirs = [] files = [] if sha1_git: dirs, files = get_directory_entries(sha1_git) except Exception as exc: return handle_view_exception(request, exc) swh_type = snapshot_context["swh_type"] origin_info = snapshot_context["origin_info"] visit_info = snapshot_context["visit_info"] url_args = snapshot_context["url_args"] query_params = snapshot_context["query_params"] revision_id = snapshot_context["revision_id"] snapshot_id = snapshot_context["snapshot_id"] path_info = gen_path_info(path) browse_view_name = "browse-" + swh_type + "-directory" breadcrumbs = [] if root_sha1_git: breadcrumbs.append( { "name": root_sha1_git[:7], "url": reverse( browse_view_name, url_args=url_args, query_params=query_params ), } ) for pi in path_info: bc_url_args = dict(url_args) bc_url_args["path"] = pi["path"] breadcrumbs.append( { "name": pi["name"], "url": reverse( browse_view_name, url_args=bc_url_args, query_params=query_params ), } ) path = "" if path is None else (path + "/") for d in dirs: if d["type"] == "rev": d["url"] = reverse("browse-revision", url_args={"sha1_git": d["target"]}) else: bc_url_args = dict(url_args) bc_url_args["path"] = path + d["name"] d["url"] = reverse( browse_view_name, url_args=bc_url_args, query_params=query_params ) sum_file_sizes = 0 readmes = {} browse_view_name = "browse-" + swh_type + "-content" for f in files: bc_url_args = dict(url_args) bc_url_args["path"] = path + f["name"] f["url"] = reverse( browse_view_name, url_args=bc_url_args, query_params=query_params ) if f["length"] is not None: sum_file_sizes += f["length"] f["length"] = filesizeformat(f["length"]) if f["name"].lower().startswith("readme"): readmes[f["name"]] = f["checksums"]["sha1"] readme_name, readme_url, readme_html = get_readme_to_display(readmes) browse_view_name = "browse-" + swh_type + "-log" history_url = None if snapshot_id != _empty_snapshot_id: history_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) nb_files = None nb_dirs = None dir_path = None if root_sha1_git: nb_files = len(files) nb_dirs = len(dirs) sum_file_sizes = filesizeformat(sum_file_sizes) dir_path = "/" + path browse_dir_link = gen_directory_link(sha1_git) browse_rev_link = gen_revision_link(revision_id) browse_snp_link = gen_snapshot_link(snapshot_id) dir_metadata = { "directory": sha1_git, "context-independent directory": browse_dir_link, "number of regular files": nb_files, "number of subdirectories": nb_dirs, "sum of regular file sizes": sum_file_sizes, "path": dir_path, "revision": revision_id, "context-independent revision": browse_rev_link, "snapshot": snapshot_id, "context-independent snapshot": browse_snp_link, } if origin_info: dir_metadata["origin url"] = origin_info["url"] dir_metadata["origin visit date"] = format_utc_iso_date(visit_info["date"]) dir_metadata["origin visit type"] = visit_info["type"] vault_cooking = { "directory_context": True, "directory_id": sha1_git, "revision_context": True, "revision_id": revision_id, } swh_objects = [ {"type": "directory", "id": sha1_git}, {"type": "revision", "id": revision_id}, {"type": "snapshot", "id": snapshot_id}, ] release_id = snapshot_context["release_id"] if release_id: swh_objects.append({"type": "release", "id": release_id}) browse_rel_link = gen_release_link(release_id) dir_metadata["release"] = release_id dir_metadata["context-independent release"] = browse_rel_link swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context) dir_path = "/".join([bc["name"] for bc in breadcrumbs]) + "/" context_found = "snapshot: %s" % snapshot_context["snapshot_id"] if origin_info: context_found = "origin: %s" % origin_info["url"] heading = "Directory - %s - %s - %s" % ( dir_path, snapshot_context["branch"], context_found, ) top_right_link = None if not snapshot_context["is_empty"]: top_right_link = { "url": history_url, "icon": swh_object_icons["revisions history"], "text": "History", } return render( request, "browse/directory.html", { "heading": heading, "swh_object_name": "Directory", "swh_object_metadata": dir_metadata, "dirs": dirs, "files": files, "breadcrumbs": breadcrumbs if root_sha1_git else [], "top_right_link": top_right_link, "readme_name": readme_name, "readme_url": readme_url, "readme_html": readme_html, "snapshot_context": snapshot_context, "vault_cooking": vault_cooking, "show_actions_menu": True, "swh_ids": swh_ids, }, ) def browse_snapshot_content( request, snapshot_id=None, origin_url=None, timestamp=None, path=None, selected_language=None, ): """ Django view implementation for browsing a content in a snapshot context. """ try: snapshot_context = _process_snapshot_request( request, snapshot_id, origin_url, timestamp, path, browse_context="content" ) root_sha1_git = snapshot_context["root_sha1_git"] sha1_git = None query_string = None content_data = None directory_id = None split_path = path.split("/") filename = split_path[-1] filepath = path[: -len(filename)] if root_sha1_git: content_info = service.lookup_directory_with_path(root_sha1_git, path) sha1_git = content_info["target"] query_string = "sha1_git:" + sha1_git content_data = request_content(query_string, raise_if_unavailable=False) if filepath: dir_info = service.lookup_directory_with_path(root_sha1_git, filepath) directory_id = dir_info["target"] else: directory_id = root_sha1_git except Exception as exc: return handle_view_exception(request, exc) swh_type = snapshot_context["swh_type"] url_args = snapshot_context["url_args"] query_params = snapshot_context["query_params"] revision_id = snapshot_context["revision_id"] origin_info = snapshot_context["origin_info"] visit_info = snapshot_context["visit_info"] snapshot_id = snapshot_context["snapshot_id"] content = None language = None mimetype = None if content_data and content_data["raw_data"] is not None: content_display_data = prepare_content_for_display( content_data["raw_data"], content_data["mimetype"], path ) content = content_display_data["content_data"] language = content_display_data["language"] mimetype = content_display_data["mimetype"] # Override language with user-selected language if selected_language is not None: language = selected_language available_languages = None if mimetype and "text/" in mimetype: available_languages = highlightjs.get_supported_languages() browse_view_name = "browse-" + swh_type + "-directory" breadcrumbs = [] path_info = gen_path_info(filepath) if root_sha1_git: breadcrumbs.append( { "name": root_sha1_git[:7], "url": reverse( browse_view_name, url_args=url_args, query_params=query_params ), } ) for pi in path_info: bc_url_args = dict(url_args) bc_url_args["path"] = pi["path"] breadcrumbs.append( { "name": pi["name"], "url": reverse( browse_view_name, url_args=bc_url_args, query_params=query_params ), } ) breadcrumbs.append({"name": filename, "url": None}) browse_content_link = gen_content_link(sha1_git) content_raw_url = None if query_string: content_raw_url = reverse( "browse-content-raw", url_args={"query_string": query_string}, query_params={"filename": filename}, ) browse_rev_link = gen_revision_link(revision_id) browse_dir_link = gen_directory_link(directory_id) content_metadata = { "context-independent content": browse_content_link, "path": None, "filename": None, "directory": directory_id, "context-independent directory": browse_dir_link, "revision": revision_id, "context-independent revision": browse_rev_link, "snapshot": snapshot_id, } cnt_sha1_git = None content_size = None error_code = 200 error_description = "" error_message = "" if content_data: for checksum in content_data["checksums"].keys(): content_metadata[checksum] = content_data["checksums"][checksum] content_metadata["mimetype"] = content_data["mimetype"] content_metadata["encoding"] = content_data["encoding"] content_metadata["size"] = filesizeformat(content_data["length"]) content_metadata["language"] = content_data["language"] content_metadata["licenses"] = content_data["licenses"] content_metadata["path"] = "/" + filepath content_metadata["filename"] = filename cnt_sha1_git = content_data["checksums"]["sha1_git"] content_size = content_data["length"] error_code = content_data["error_code"] error_message = content_data["error_message"] error_description = content_data["error_description"] if origin_info: content_metadata["origin url"] = origin_info["url"] content_metadata["origin visit date"] = format_utc_iso_date(visit_info["date"]) content_metadata["origin visit type"] = visit_info["type"] browse_snapshot_link = gen_snapshot_link(snapshot_id) content_metadata["context-independent snapshot"] = browse_snapshot_link swh_objects = [ {"type": "content", "id": cnt_sha1_git}, {"type": "directory", "id": directory_id}, {"type": "revision", "id": revision_id}, {"type": "snapshot", "id": snapshot_id}, ] release_id = snapshot_context["release_id"] if release_id: swh_objects.append({"type": "release", "id": release_id}) browse_rel_link = gen_release_link(release_id) content_metadata["release"] = release_id content_metadata["context-independent release"] = browse_rel_link swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context) content_path = "/".join([bc["name"] for bc in breadcrumbs]) context_found = "snapshot: %s" % snapshot_context["snapshot_id"] if origin_info: context_found = "origin: %s" % origin_info["url"] heading = "Content - %s - %s - %s" % ( content_path, snapshot_context["branch"], context_found, ) top_right_link = None if not snapshot_context["is_empty"]: top_right_link = { "url": content_raw_url, "icon": swh_object_icons["content"], "text": "Raw File", } return render( request, "browse/content.html", { "heading": heading, "swh_object_name": "Content", "swh_object_metadata": content_metadata, "content": content, "content_size": content_size, "max_content_size": content_display_max_size, "mimetype": mimetype, "language": language, "available_languages": available_languages, "breadcrumbs": breadcrumbs if root_sha1_git else [], "top_right_link": top_right_link, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions_menu": True, "swh_ids": swh_ids, "error_code": error_code, "error_message": error_message, "error_description": error_description, }, status=error_code, ) PER_PAGE = 100 def browse_snapshot_log(request, snapshot_id=None, origin_url=None, timestamp=None): """ Django view implementation for browsing a revision history in a snapshot context. """ try: snapshot_context = _process_snapshot_request( request, snapshot_id, origin_url, timestamp, browse_context="log" ) revision_id = snapshot_context["revision_id"] per_page = int(request.GET.get("per_page", PER_PAGE)) offset = int(request.GET.get("offset", 0)) revs_ordering = request.GET.get("revs_ordering", "committer_date") session_key = "rev_%s_log_ordering_%s" % (revision_id, revs_ordering) rev_log_session = request.session.get(session_key, None) rev_log = [] revs_walker_state = None if rev_log_session: rev_log = rev_log_session["rev_log"] revs_walker_state = rev_log_session["revs_walker_state"] if len(rev_log) < offset + per_page: revs_walker = service.get_revisions_walker( revs_ordering, revision_id, max_revs=offset + per_page + 1, state=revs_walker_state, ) rev_log += [rev["id"] for rev in revs_walker] revs_walker_state = revs_walker.export_state() revs = rev_log[offset : offset + per_page] revision_log = service.lookup_revision_multiple(revs) request.session[session_key] = { "rev_log": rev_log, "revs_walker_state": revs_walker_state, } except Exception as exc: return handle_view_exception(request, exc) swh_type = snapshot_context["swh_type"] origin_info = snapshot_context["origin_info"] visit_info = snapshot_context["visit_info"] url_args = snapshot_context["url_args"] query_params = snapshot_context["query_params"] snapshot_id = snapshot_context["snapshot_id"] query_params["per_page"] = per_page revs_ordering = request.GET.get("revs_ordering", "") query_params["revs_ordering"] = revs_ordering browse_view_name = "browse-" + swh_type + "-log" prev_log_url = None if len(rev_log) > offset + per_page: query_params["offset"] = offset + per_page prev_log_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) next_log_url = None if offset != 0: query_params["offset"] = offset - per_page next_log_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) revision_log_data = format_log_entries(revision_log, per_page, snapshot_context) browse_rev_link = gen_revision_link(revision_id) browse_log_link = gen_revision_log_link(revision_id) browse_snp_link = gen_snapshot_link(snapshot_id) revision_metadata = { "context-independent revision": browse_rev_link, "context-independent revision history": browse_log_link, "context-independent snapshot": browse_snp_link, "snapshot": snapshot_id, } if origin_info: revision_metadata["origin url"] = origin_info["url"] revision_metadata["origin visit date"] = format_utc_iso_date(visit_info["date"]) revision_metadata["origin visit type"] = visit_info["type"] swh_objects = [ {"type": "revision", "id": revision_id}, {"type": "snapshot", "id": snapshot_id}, ] release_id = snapshot_context["release_id"] if release_id: swh_objects.append({"type": "release", "id": release_id}) browse_rel_link = gen_release_link(release_id) revision_metadata["release"] = release_id revision_metadata["context-independent release"] = browse_rel_link swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context) context_found = "snapshot: %s" % snapshot_context["snapshot_id"] if origin_info: context_found = "origin: %s" % origin_info["url"] heading = "Revision history - %s - %s" % (snapshot_context["branch"], context_found) return render( request, "browse/revision-log.html", { "heading": heading, "swh_object_name": "Revisions history", "swh_object_metadata": revision_metadata, "revision_log": revision_log_data, "revs_ordering": revs_ordering, "next_log_url": next_log_url, "prev_log_url": prev_log_url, "breadcrumbs": None, "top_right_link": None, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions_menu": True, "swh_ids": swh_ids, }, ) def browse_snapshot_branches( request, snapshot_id=None, origin_url=None, timestamp=None ): """ Django view implementation for browsing a list of branches in a snapshot context. """ try: snapshot_context = _process_snapshot_request( request, snapshot_id, origin_url, timestamp ) branches_bc = request.GET.get("branches_breadcrumbs", "") branches_bc = branches_bc.split(",") if branches_bc else [] branches_from = branches_bc[-1] if branches_bc else "" swh_type = snapshot_context["swh_type"] origin_info = snapshot_context["origin_info"] url_args = snapshot_context["url_args"] query_params = snapshot_context["query_params"] browse_view_name = "browse-" + swh_type + "-directory" snapshot = service.lookup_snapshot( snapshot_context["snapshot_id"], branches_from, PER_PAGE + 1, target_types=["revision", "alias"], ) displayed_branches, _ = process_snapshot_branches(snapshot) except Exception as exc: return handle_view_exception(request, exc) for branch in displayed_branches: if snapshot_id: revision_url = reverse( "browse-revision", url_args={"sha1_git": branch["revision"]}, query_params={"snapshot_id": snapshot_id}, ) else: revision_url = reverse( "browse-revision", url_args={"sha1_git": branch["revision"]}, query_params={"origin": origin_info["url"]}, ) query_params["branch"] = branch["name"] directory_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) del query_params["branch"] branch["revision_url"] = revision_url branch["directory_url"] = directory_url browse_view_name = "browse-" + swh_type + "-branches" prev_branches_url = None next_branches_url = None if branches_bc: query_params_prev = dict(query_params) query_params_prev["branches_breadcrumbs"] = ",".join(branches_bc[:-1]) prev_branches_url = reverse( browse_view_name, url_args=url_args, query_params=query_params_prev ) elif branches_from: prev_branches_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) if snapshot["next_branch"] is not None: query_params_next = dict(query_params) next_branch = displayed_branches[-1]["name"] del displayed_branches[-1] branches_bc.append(next_branch) query_params_next["branches_breadcrumbs"] = ",".join(branches_bc) next_branches_url = reverse( browse_view_name, url_args=url_args, query_params=query_params_next ) heading = "Branches - " if origin_info: heading += "origin: %s" % origin_info["url"] else: heading += "snapshot: %s" % snapshot_id return render( request, "browse/branches.html", { "heading": heading, "swh_object_name": "Branches", "swh_object_metadata": {}, "top_right_link": None, "displayed_branches": displayed_branches, "prev_branches_url": prev_branches_url, "next_branches_url": next_branches_url, "snapshot_context": snapshot_context, }, ) def browse_snapshot_releases( request, snapshot_id=None, origin_url=None, timestamp=None ): """ Django view implementation for browsing a list of releases in a snapshot context. """ try: snapshot_context = _process_snapshot_request( request, snapshot_id, origin_url, timestamp ) rel_bc = request.GET.get("releases_breadcrumbs", "") rel_bc = rel_bc.split(",") if rel_bc else [] rel_from = rel_bc[-1] if rel_bc else "" swh_type = snapshot_context["swh_type"] origin_info = snapshot_context["origin_info"] url_args = snapshot_context["url_args"] query_params = snapshot_context["query_params"] snapshot = service.lookup_snapshot( snapshot_context["snapshot_id"], rel_from, PER_PAGE + 1, target_types=["release", "alias"], ) _, displayed_releases = process_snapshot_branches(snapshot) except Exception as exc: return handle_view_exception(request, exc) for release in displayed_releases: if snapshot_id: query_params_tgt = {"snapshot_id": snapshot_id} else: query_params_tgt = {"origin": origin_info["url"]} release_url = reverse( "browse-release", url_args={"sha1_git": release["id"]}, query_params=query_params_tgt, ) target_url = "" if release["target_type"] == "revision": target_url = reverse( "browse-revision", url_args={"sha1_git": release["target"]}, query_params=query_params_tgt, ) elif release["target_type"] == "directory": target_url = reverse( "browse-directory", url_args={"sha1_git": release["target"]}, query_params=query_params_tgt, ) elif release["target_type"] == "content": target_url = reverse( "browse-content", url_args={"query_string": release["target"]}, query_params=query_params_tgt, ) elif release["target_type"] == "release": target_url = reverse( "browse-release", url_args={"sha1_git": release["target"]}, query_params=query_params_tgt, ) release["release_url"] = release_url release["target_url"] = target_url browse_view_name = "browse-" + swh_type + "-releases" prev_releases_url = None next_releases_url = None if rel_bc: query_params_prev = dict(query_params) query_params_prev["releases_breadcrumbs"] = ",".join(rel_bc[:-1]) prev_releases_url = reverse( browse_view_name, url_args=url_args, query_params=query_params_prev ) elif rel_from: prev_releases_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) if snapshot["next_branch"] is not None: query_params_next = dict(query_params) next_rel = displayed_releases[-1]["branch_name"] del displayed_releases[-1] rel_bc.append(next_rel) query_params_next["releases_breadcrumbs"] = ",".join(rel_bc) next_releases_url = reverse( browse_view_name, url_args=url_args, query_params=query_params_next ) heading = "Releases - " if origin_info: heading += "origin: %s" % origin_info["url"] else: heading += "snapshot: %s" % snapshot_id return render( request, "browse/releases.html", { "heading": heading, "top_panel_visible": False, "top_panel_collapsible": False, "swh_object_name": "Releases", "swh_object_metadata": {}, "top_right_link": None, "displayed_releases": displayed_releases, "prev_releases_url": prev_releases_url, "next_releases_url": next_releases_url, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions_menu": False, }, ) diff --git a/swh/web/browse/utils.py b/swh/web/browse/utils.py index 052ddca9..357242cd 100644 --- a/swh/web/browse/utils.py +++ b/swh/web/browse/utils.py @@ -1,1137 +1,811 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import magic import stat import textwrap -from collections import defaultdict from threading import Lock from django.core.cache import cache from django.utils.safestring import mark_safe from django.utils.html import escape import sentry_sdk -from swh.model.identifiers import persistent_identifier from swh.web.common import highlightjs, service -from swh.web.common.exc import NotFoundExc, http_status_code_message +from swh.web.common.exc import http_status_code_message from swh.web.common.identifiers import get_swh_persistent_id -from swh.web.common.origin_visits import get_origin_visit from swh.web.common.utils import ( reverse, format_utc_iso_date, swh_object_icons, rst_to_html, ) from swh.web.config import get_config def get_directory_entries(sha1_git): """Function that retrieves the content of a directory from the archive. The directories entries are first sorted in lexicographical order. Sub-directories and regular files are then extracted. Args: sha1_git: sha1_git identifier of the directory Returns: A tuple whose first member corresponds to the sub-directories list and second member the regular files list Raises: NotFoundExc if the directory is not found """ cache_entry_id = "directory_entries_%s" % sha1_git cache_entry = cache.get(cache_entry_id) if cache_entry: return cache_entry entries = list(service.lookup_directory(sha1_git)) for e in entries: e["perms"] = stat.filemode(e["perms"]) if e["type"] == "rev": # modify dir entry name to explicitly show it points # to a revision e["name"] = "%s @ %s" % (e["name"], e["target"][:7]) dirs = [e for e in entries if e["type"] in ("dir", "rev")] files = [e for e in entries if e["type"] == "file"] dirs = sorted(dirs, key=lambda d: d["name"]) files = sorted(files, key=lambda f: f["name"]) cache.set(cache_entry_id, (dirs, files)) return dirs, files _lock = Lock() def get_mimetype_and_encoding_for_content(content): """Function that returns the mime type and the encoding associated to a content buffer using the magic module under the hood. Args: content (bytes): a content buffer Returns: A tuple (mimetype, encoding), for instance ('text/plain', 'us-ascii'), associated to the provided content. """ # https://pypi.org/project/python-magic/ # packaged as python3-magic in debian buster if hasattr(magic, "from_buffer"): m = magic.Magic(mime=True, mime_encoding=True) mime_encoding = m.from_buffer(content) mime_type, encoding = mime_encoding.split(";") encoding = encoding.replace(" charset=", "") # https://pypi.org/project/file-magic/ # packaged as python3-magic in debian stretch else: # TODO: Remove that code when production environment is upgraded # to debian buster # calls to the file-magic API are not thread-safe so they must # be protected with a Lock to guarantee they will succeed _lock.acquire() magic_result = magic.detect_from_content(content) _lock.release() mime_type = magic_result.mime_type encoding = magic_result.encoding return mime_type, encoding # maximum authorized content size in bytes for HTML display # with code highlighting content_display_max_size = get_config()["content_display_max_size"] -snapshot_content_max_size = get_config()["snapshot_content_max_size"] - def _re_encode_content(mimetype, encoding, content_data): # encode textual content to utf-8 if needed if mimetype.startswith("text/"): # probably a malformed UTF-8 content, re-encode it # by replacing invalid chars with a substitution one if encoding == "unknown-8bit": content_data = content_data.decode("utf-8", "replace").encode("utf-8") elif encoding not in ["utf-8", "binary"]: content_data = content_data.decode(encoding, "replace").encode("utf-8") elif mimetype.startswith("application/octet-stream"): # file may detect a text content as binary # so try to decode it for display encodings = ["us-ascii", "utf-8"] encodings += ["iso-8859-%s" % i for i in range(1, 17)] for enc in encodings: try: content_data = content_data.decode(enc).encode("utf-8") except Exception as exc: sentry_sdk.capture_exception(exc) else: # ensure display in content view encoding = enc mimetype = "text/plain" break return mimetype, encoding, content_data def request_content( query_string, max_size=content_display_max_size, raise_if_unavailable=True, re_encode=True, ): """Function that retrieves a content from the archive. Raw bytes content is first retrieved, then the content mime type. If the mime type is not stored in the archive, it will be computed using Python magic module. Args: query_string: a string of the form "[ALGO_HASH:]HASH" where optional ALGO_HASH can be either ``sha1``, ``sha1_git``, ``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH the hexadecimal representation of the hash value max_size: the maximum size for a content to retrieve (default to 1MB, no size limit if None) Returns: A tuple whose first member corresponds to the content raw bytes and second member the content mime type Raises: NotFoundExc if the content is not found """ content_data = service.lookup_content(query_string) filetype = None language = None license = None # requests to the indexer db may fail so properly handle # those cases in order to avoid content display errors try: filetype = service.lookup_content_filetype(query_string) language = service.lookup_content_language(query_string) license = service.lookup_content_license(query_string) except Exception as exc: sentry_sdk.capture_exception(exc) mimetype = "unknown" encoding = "unknown" if filetype: mimetype = filetype["mimetype"] encoding = filetype["encoding"] # workaround when encountering corrupted data due to implicit # conversion from bytea to text in the indexer db (see T818) # TODO: Remove that code when all data have been correctly converted if mimetype.startswith("\\"): filetype = None content_data["error_code"] = 200 content_data["error_message"] = "" content_data["error_description"] = "" if not max_size or content_data["length"] < max_size: try: content_raw = service.lookup_content_raw(query_string) except Exception as exc: if raise_if_unavailable: raise exc else: sentry_sdk.capture_exception(exc) content_data["raw_data"] = None content_data["error_code"] = 404 content_data["error_description"] = ( "The bytes of the content are currently not available " "in the archive." ) content_data["error_message"] = http_status_code_message[ content_data["error_code"] ] else: content_data["raw_data"] = content_raw["data"] if not filetype: mimetype, encoding = get_mimetype_and_encoding_for_content( content_data["raw_data"] ) if re_encode: mimetype, encoding, raw_data = _re_encode_content( mimetype, encoding, content_data["raw_data"] ) content_data["raw_data"] = raw_data else: content_data["raw_data"] = None content_data["mimetype"] = mimetype content_data["encoding"] = encoding if language: content_data["language"] = language["lang"] else: content_data["language"] = "not detected" if license: content_data["licenses"] = ", ".join(license["facts"][0]["licenses"]) else: content_data["licenses"] = "not detected" return content_data _browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def prepare_content_for_display(content_data, mime_type, path): """Function that prepares a content for HTML display. The function tries to associate a programming language to a content in order to perform syntax highlighting client-side using highlightjs. The language is determined using either the content filename or its mime type. If the mime type corresponds to an image format supported by web browsers, the content will be encoded in base64 for displaying the image. Args: content_data (bytes): raw bytes of the content mime_type (string): mime type of the content path (string): path of the content including filename Returns: A dict containing the content bytes (possibly different from the one provided as parameter if it is an image) under the key 'content_data and the corresponding highlightjs language class under the key 'language'. """ language = highlightjs.get_hljs_language_from_filename(path) if not language: language = highlightjs.get_hljs_language_from_mime_type(mime_type) if not language: language = "nohighlight" elif mime_type.startswith("application/"): mime_type = mime_type.replace("application/", "text/") if mime_type.startswith("image/"): if mime_type in _browsers_supported_image_mimes: content_data = base64.b64encode(content_data).decode("ascii") else: content_data = None if mime_type.startswith("image/svg"): mime_type = "image/svg+xml" if mime_type.startswith("text/"): content_data = content_data.decode("utf-8", errors="replace") return {"content_data": content_data, "language": language, "mimetype": mime_type} -def process_snapshot_branches(snapshot): - """ - Process a dictionary describing snapshot branches: extract those - targeting revisions and releases, put them in two different lists, - then sort those lists in lexicographical order of the branches' names. - - Args: - snapshot_branches (dict): A dict describing the branches of a snapshot - as returned for instance by - :func:`swh.web.common.service.lookup_snapshot` - - Returns: - tuple: A tuple whose first member is the sorted list of branches - targeting revisions and second member the sorted list of branches - targeting releases - """ - snapshot_branches = snapshot["branches"] - branches = {} - branch_aliases = {} - releases = {} - revision_to_branch = defaultdict(set) - revision_to_release = defaultdict(set) - release_to_branch = defaultdict(set) - for branch_name, target in snapshot_branches.items(): - if not target: - # FIXME: display branches with an unknown target anyway - continue - target_id = target["target"] - target_type = target["target_type"] - if target_type == "revision": - branches[branch_name] = { - "name": branch_name, - "revision": target_id, - } - revision_to_branch[target_id].add(branch_name) - elif target_type == "release": - release_to_branch[target_id].add(branch_name) - elif target_type == "alias": - branch_aliases[branch_name] = target_id - # FIXME: handle pointers to other object types - - def _enrich_release_branch(branch, release): - releases[branch] = { - "name": release["name"], - "branch_name": branch, - "date": format_utc_iso_date(release["date"]), - "id": release["id"], - "message": release["message"], - "target_type": release["target_type"], - "target": release["target"], - } - - def _enrich_revision_branch(branch, revision): - branches[branch].update( - { - "revision": revision["id"], - "directory": revision["directory"], - "date": format_utc_iso_date(revision["date"]), - "message": revision["message"], - } - ) - - releases_info = service.lookup_release_multiple(release_to_branch.keys()) - for release in releases_info: - branches_to_update = release_to_branch[release["id"]] - for branch in branches_to_update: - _enrich_release_branch(branch, release) - if release["target_type"] == "revision": - revision_to_release[release["target"]].update(branches_to_update) - - revisions = service.lookup_revision_multiple( - set(revision_to_branch.keys()) | set(revision_to_release.keys()) - ) - - for revision in revisions: - if not revision: - continue - for branch in revision_to_branch[revision["id"]]: - _enrich_revision_branch(branch, revision) - for release in revision_to_release[revision["id"]]: - releases[release]["directory"] = revision["directory"] - - for branch_alias, branch_target in branch_aliases.items(): - if branch_target in branches: - branches[branch_alias] = dict(branches[branch_target]) - else: - snp = service.lookup_snapshot( - snapshot["id"], branches_from=branch_target, branches_count=1 - ) - if snp and branch_target in snp["branches"]: - - if snp["branches"][branch_target] is None: - continue - - target_type = snp["branches"][branch_target]["target_type"] - target = snp["branches"][branch_target]["target"] - if target_type == "revision": - branches[branch_alias] = snp["branches"][branch_target] - revision = service.lookup_revision(target) - _enrich_revision_branch(branch_alias, revision) - elif target_type == "release": - release = service.lookup_release(target) - _enrich_release_branch(branch_alias, release) - - if branch_alias in branches: - branches[branch_alias]["name"] = branch_alias - - ret_branches = list(sorted(branches.values(), key=lambda b: b["name"])) - ret_releases = list(sorted(releases.values(), key=lambda b: b["name"])) - - return ret_branches, ret_releases - - -def get_snapshot_content(snapshot_id): - """Returns the lists of branches and releases - associated to a swh snapshot. - That list is put in cache in order to speedup the navigation - in the swh-web/browse ui. - - .. warning:: At most 1000 branches contained in the snapshot - will be returned for performance reasons. - - Args: - snapshot_id (str): hexadecimal representation of the snapshot - identifier - - Returns: - A tuple with two members. The first one is a list of dict describing - the snapshot branches. The second one is a list of dict describing the - snapshot releases. - - Raises: - NotFoundExc if the snapshot does not exist - """ - cache_entry_id = "swh_snapshot_%s" % snapshot_id - cache_entry = cache.get(cache_entry_id) - - if cache_entry: - return cache_entry["branches"], cache_entry["releases"] - - branches = [] - releases = [] - - if snapshot_id: - snapshot = service.lookup_snapshot( - snapshot_id, branches_count=snapshot_content_max_size - ) - branches, releases = process_snapshot_branches(snapshot) - - cache.set(cache_entry_id, {"branches": branches, "releases": releases,}) - - return branches, releases - - -def get_origin_visit_snapshot( - origin_info, visit_ts=None, visit_id=None, snapshot_id=None -): - """Returns the lists of branches and releases - associated to a swh origin for a given visit. - The visit is expressed by a timestamp. In the latter case, - the closest visit from the provided timestamp will be used. - If no visit parameter is provided, it returns the list of branches - found for the latest visit. - That list is put in cache in order to speedup the navigation - in the swh-web/browse ui. - - .. warning:: At most 1000 branches contained in the snapshot - will be returned for performance reasons. - - Args: - origin_info (dict): a dict filled with origin information - (id, url, type) - visit_ts (int or str): an ISO date string or Unix timestamp to parse - visit_id (int): optional visit id for disambiguation in case - several visits have the same timestamp - - Returns: - A tuple with two members. The first one is a list of dict describing - the origin branches for the given visit. - The second one is a list of dict describing the origin releases - for the given visit. - - Raises: - NotFoundExc if the origin or its visit are not found - """ - - visit_info = get_origin_visit(origin_info, visit_ts, visit_id, snapshot_id) - - return get_snapshot_content(visit_info["snapshot"]) - - def gen_link(url, link_text=None, link_attrs=None): """ Utility function for generating an HTML link to insert in Django templates. Args: url (str): an url link_text (str): optional text for the produced link, if not provided the url will be used link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ attrs = " " if link_attrs: for k, v in link_attrs.items(): attrs += '%s="%s" ' % (k, v) if not link_text: link_text = url link = '%s' % (attrs, escape(url), escape(link_text)) return mark_safe(link) def _snapshot_context_query_params(snapshot_context): query_params = None if snapshot_context and snapshot_context["origin_info"]: origin_info = snapshot_context["origin_info"] query_params = {"origin": origin_info["url"]} if "timestamp" in snapshot_context["url_args"]: query_params["timestamp"] = snapshot_context["url_args"]["timestamp"] if "visit_id" in snapshot_context["query_params"]: query_params["visit_id"] = snapshot_context["query_params"]["visit_id"] elif snapshot_context: query_params = {"snapshot_id": snapshot_context["snapshot_id"]} return query_params def gen_revision_url(revision_id, snapshot_context=None): """ Utility function for generating an url to a revision. Args: revision_id (str): a revision id snapshot_context (dict): if provided, generate snapshot-dependent browsing url Returns: str: The url to browse the revision """ query_params = _snapshot_context_query_params(snapshot_context) return reverse( "browse-revision", url_args={"sha1_git": revision_id}, query_params=query_params ) def gen_revision_link( revision_id, shorten_id=False, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a revision HTML view to insert in Django templates. Args: revision_id (str): a revision id shorten_id (boolean): whether to shorten the revision id to 7 characters for the link text snapshot_context (dict): if provided, generate snapshot-dependent browsing link link_text (str): optional text for the generated link (the revision id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: str: An HTML link in the form 'revision_id' """ if not revision_id: return None revision_url = gen_revision_url(revision_id, snapshot_context) if shorten_id: return gen_link(revision_url, revision_id[:7], link_attrs) else: if not link_text: link_text = revision_id return gen_link(revision_url, link_text, link_attrs) def gen_directory_link( sha1_git, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a directory HTML view to insert in Django templates. Args: sha1_git (str): directory identifier link_text (str): optional text for the generated link (the directory id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ if not sha1_git: return None query_params = _snapshot_context_query_params(snapshot_context) directory_url = reverse( "browse-directory", url_args={"sha1_git": sha1_git}, query_params=query_params ) if not link_text: link_text = sha1_git return gen_link(directory_url, link_text, link_attrs) def gen_snapshot_link( snapshot_id, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a snapshot HTML view to insert in Django templates. Args: snapshot_id (str): snapshot identifier link_text (str): optional text for the generated link (the snapshot id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ query_params = _snapshot_context_query_params(snapshot_context) snapshot_url = reverse( "browse-snapshot", url_args={"snapshot_id": snapshot_id}, query_params=query_params, ) if not link_text: link_text = snapshot_id return gen_link(snapshot_url, link_text, link_attrs) def gen_content_link( sha1_git, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a content HTML view to insert in Django templates. Args: sha1_git (str): content identifier link_text (str): optional text for the generated link (the content sha1_git will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ if not sha1_git: return None query_params = _snapshot_context_query_params(snapshot_context) content_url = reverse( "browse-content", url_args={"query_string": "sha1_git:" + sha1_git}, query_params=query_params, ) if not link_text: link_text = sha1_git return gen_link(content_url, link_text, link_attrs) def get_revision_log_url(revision_id, snapshot_context=None): """ Utility function for getting the URL for a revision log HTML view (possibly in the context of an origin). Args: revision_id (str): revision identifier the history heads to snapshot_context (dict): if provided, generate snapshot-dependent browsing link Returns: The revision log view URL """ query_params = {"revision": revision_id} if snapshot_context and snapshot_context["origin_info"]: origin_info = snapshot_context["origin_info"] url_args = {"origin_url": origin_info["url"]} if "timestamp" in snapshot_context["url_args"]: url_args["timestamp"] = snapshot_context["url_args"]["timestamp"] if "visit_id" in snapshot_context["query_params"]: query_params["visit_id"] = snapshot_context["query_params"]["visit_id"] revision_log_url = reverse( "browse-origin-log", url_args=url_args, query_params=query_params ) elif snapshot_context: url_args = {"snapshot_id": snapshot_context["snapshot_id"]} revision_log_url = reverse( "browse-snapshot-log", url_args=url_args, query_params=query_params ) else: revision_log_url = reverse( "browse-revision-log", url_args={"sha1_git": revision_id} ) return revision_log_url def gen_revision_log_link( revision_id, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a revision log HTML view (possibly in the context of an origin) to insert in Django templates. Args: revision_id (str): revision identifier the history heads to snapshot_context (dict): if provided, generate snapshot-dependent browsing link link_text (str): optional text to use for the generated link (the revision id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ if not revision_id: return None revision_log_url = get_revision_log_url(revision_id, snapshot_context) if not link_text: link_text = revision_id return gen_link(revision_log_url, link_text, link_attrs) def gen_person_mail_link(person, link_text=None): """ Utility function for generating a mail link to a person to insert in Django templates. Args: person (dict): dictionary containing person data (*name*, *email*, *fullname*) link_text (str): optional text to use for the generated mail link (the person name will be used by default) Returns: str: A mail link to the person or the person name if no email is present in person data """ person_name = person["name"] or person["fullname"] or "None" if link_text is None: link_text = person_name person_email = person["email"] if person["email"] else None if person_email is None and "@" in person_name and " " not in person_name: person_email = person_name if person_email: return gen_link(url="mailto:%s" % person_email, link_text=link_text) else: return person_name def gen_release_link( sha1_git, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a release HTML view to insert in Django templates. Args: sha1_git (str): release identifier link_text (str): optional text for the generated link (the release id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ query_params = _snapshot_context_query_params(snapshot_context) release_url = reverse( "browse-release", url_args={"sha1_git": sha1_git}, query_params=query_params ) if not link_text: link_text = sha1_git return gen_link(release_url, link_text, link_attrs) def format_log_entries(revision_log, per_page, snapshot_context=None): """ Utility functions that process raw revision log data for HTML display. Its purpose is to: * add links to relevant browse views * format date in human readable format * truncate the message log Args: revision_log (list): raw revision log as returned by the swh-web api per_page (int): number of log entries per page snapshot_context (dict): if provided, generate snapshot-dependent browsing link """ revision_log_data = [] for i, rev in enumerate(revision_log): if i == per_page: break author_name = "None" author_fullname = "None" committer_fullname = "None" if rev["author"]: author_name = gen_person_mail_link(rev["author"]) author_fullname = rev["author"]["fullname"] if rev["committer"]: committer_fullname = rev["committer"]["fullname"] author_date = format_utc_iso_date(rev["date"]) committer_date = format_utc_iso_date(rev["committer_date"]) tooltip = "revision %s\n" % rev["id"] tooltip += "author: %s\n" % author_fullname tooltip += "author date: %s\n" % author_date tooltip += "committer: %s\n" % committer_fullname tooltip += "committer date: %s\n\n" % committer_date if rev["message"]: tooltip += textwrap.indent(rev["message"], " " * 4) revision_log_data.append( { "author": author_name, "id": rev["id"][:7], "message": rev["message"], "date": author_date, "commit_date": committer_date, "url": gen_revision_url(rev["id"], snapshot_context), "tooltip": tooltip, } ) return revision_log_data -def get_snapshot_context( - snapshot_id=None, origin_url=None, timestamp=None, visit_id=None -): - """ - Utility function to compute relevant information when navigating - the archive in a snapshot context. The snapshot is either - referenced by its id or it will be retrieved from an origin visit. - - Args: - snapshot_id (str): hexadecimal representation of a snapshot identifier, - all other parameters will be ignored if it is provided - origin_url (str): the origin_url - (e.g. https://github.com/(user)/(repo)/) - timestamp (str): a datetime string for retrieving the closest - visit of the origin - visit_id (int): optional visit id for disambiguation in case - of several visits with the same timestamp - - Returns: - A dict with the following entries: - * origin_info: dict containing origin information - * visit_info: dict containing visit information - * branches: the list of branches for the origin found - during the visit - * releases: the list of releases for the origin found - during the visit - * origin_browse_url: the url to browse the origin - * origin_branches_url: the url to browse the origin branches - * origin_releases_url': the url to browse the origin releases - * origin_visit_url: the url to browse the snapshot of the origin - found during the visit - * url_args: dict containing url arguments to use when browsing in - the context of the origin and its visit - - Raises: - swh.web.common.exc.NotFoundExc: if no snapshot is found for the visit - of an origin. - """ - origin_info = None - visit_info = None - url_args = None - query_params = {} - branches = [] - releases = [] - browse_url = None - visit_url = None - branches_url = None - releases_url = None - swh_type = "snapshot" - if origin_url: - swh_type = "origin" - origin_info = service.lookup_origin({"url": origin_url}) - - visit_info = get_origin_visit(origin_info, timestamp, visit_id, snapshot_id) - fmt_date = format_utc_iso_date(visit_info["date"]) - visit_info["fmt_date"] = fmt_date - snapshot_id = visit_info["snapshot"] - - if not snapshot_id: - raise NotFoundExc( - "No snapshot associated to the visit of origin " - "%s on %s" % (escape(origin_url), fmt_date) - ) - - # provided timestamp is not necessarily equals to the one - # of the retrieved visit, so get the exact one in order - # use it in the urls generated below - if timestamp: - timestamp = visit_info["date"] - - branches, releases = get_origin_visit_snapshot( - origin_info, timestamp, visit_id, snapshot_id - ) - - url_args = {"origin_url": origin_info["url"]} - - query_params = {"visit_id": visit_id} - - browse_url = reverse("browse-origin-visits", url_args=url_args) - - if timestamp: - url_args["timestamp"] = format_utc_iso_date(timestamp, "%Y-%m-%dT%H:%M:%S") - visit_url = reverse( - "browse-origin-directory", url_args=url_args, query_params=query_params - ) - visit_info["url"] = visit_url - - branches_url = reverse( - "browse-origin-branches", url_args=url_args, query_params=query_params - ) - - releases_url = reverse( - "browse-origin-releases", url_args=url_args, query_params=query_params - ) - elif snapshot_id: - branches, releases = get_snapshot_content(snapshot_id) - url_args = {"snapshot_id": snapshot_id} - browse_url = reverse("browse-snapshot", url_args=url_args) - branches_url = reverse("browse-snapshot-branches", url_args=url_args) - - releases_url = reverse("browse-snapshot-releases", url_args=url_args) - - releases = list(reversed(releases)) - - snapshot_sizes = service.lookup_snapshot_sizes(snapshot_id) - - is_empty = sum(snapshot_sizes.values()) == 0 - - swh_snp_id = persistent_identifier("snapshot", snapshot_id) - - return { - "swh_type": swh_type, - "swh_object_id": swh_snp_id, - "snapshot_id": snapshot_id, - "snapshot_sizes": snapshot_sizes, - "is_empty": is_empty, - "origin_info": origin_info, - "visit_info": visit_info, - "branches": branches, - "releases": releases, - "branch": None, - "release": None, - "browse_url": browse_url, - "branches_url": branches_url, - "releases_url": releases_url, - "url_args": url_args, - "query_params": query_params, - } - - # list of common readme names ordered by preference # (lower indices have higher priority) _common_readme_names = [ "readme.markdown", "readme.md", "readme.rst", "readme.txt", "readme", ] def get_readme_to_display(readmes): """ Process a list of readme files found in a directory in order to find the adequate one to display. Args: readmes: a list of dict where keys are readme file names and values are readme sha1s Returns: A tuple (readme_name, readme_sha1) """ readme_name = None readme_url = None readme_sha1 = None readme_html = None lc_readmes = {k.lower(): {"orig_name": k, "sha1": v} for k, v in readmes.items()} # look for readme names according to the preference order # defined by the _common_readme_names list for common_readme_name in _common_readme_names: if common_readme_name in lc_readmes: readme_name = lc_readmes[common_readme_name]["orig_name"] readme_sha1 = lc_readmes[common_readme_name]["sha1"] readme_url = reverse( "browse-content-raw", url_args={"query_string": readme_sha1}, query_params={"re_encode": "true"}, ) break # otherwise pick the first readme like file if any if not readme_name and len(readmes.items()) > 0: readme_name = next(iter(readmes)) readme_sha1 = readmes[readme_name] readme_url = reverse( "browse-content-raw", url_args={"query_string": readme_sha1}, query_params={"re_encode": "true"}, ) # convert rst README to html server side as there is # no viable solution to perform that task client side if readme_name and readme_name.endswith(".rst"): cache_entry_id = "readme_%s" % readme_sha1 cache_entry = cache.get(cache_entry_id) if cache_entry: readme_html = cache_entry else: try: rst_doc = request_content(readme_sha1) readme_html = rst_to_html(rst_doc["raw_data"]) cache.set(cache_entry_id, readme_html) except Exception as exc: sentry_sdk.capture_exception(exc) readme_html = "Readme bytes are not available" return readme_name, readme_url, readme_html def get_swh_persistent_ids(swh_objects, snapshot_context=None): """ Returns a list of dict containing info related to persistent identifiers of swh objects. Args: swh_objects (list): a list of dict with the following keys: * type: swh object type (content/directory/release/revision/snapshot) * id: swh object id snapshot_context (dict): optional parameter describing the snapshot in which the object has been found Returns: list: a list of dict with the following keys: * object_type: the swh object type (content/directory/release/revision/snapshot) * object_icon: the swh object icon to use in HTML views * swh_id: the computed swh object persistent identifier * swh_id_url: the url resolving the persistent identifier * show_options: boolean indicating if the persistent id options must be displayed in persistent ids HTML view """ swh_ids = [] for swh_object in swh_objects: if not swh_object["id"]: continue swh_id = get_swh_persistent_id(swh_object["type"], swh_object["id"]) show_options = swh_object["type"] == "content" or ( snapshot_context and snapshot_context["origin_info"] is not None ) object_icon = swh_object_icons[swh_object["type"]] swh_ids.append( { "object_type": swh_object["type"], "object_id": swh_object["id"], "object_icon": object_icon, "swh_id": swh_id, "swh_id_url": reverse("browse-swh-id", url_args={"swh_id": swh_id}), "show_options": show_options, } ) return swh_ids diff --git a/swh/web/browse/views/content.py b/swh/web/browse/views/content.py index 191e2d31..67535b22 100644 --- a/swh/web/browse/views/content.py +++ b/swh/web/browse/views/content.py @@ -1,354 +1,354 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import difflib import json from distutils.util import strtobool from django.http import HttpResponse from django.shortcuts import render from django.template.defaultfilters import filesizeformat import sentry_sdk from swh.model.hashutil import hash_to_hex -from swh.web.common import query, service, highlightjs -from swh.web.common.utils import reverse, gen_path_info, swh_object_icons -from swh.web.common.exc import NotFoundExc, handle_view_exception +from swh.web.browse.browseurls import browse_route +from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import ( request_content, prepare_content_for_display, content_display_max_size, - get_snapshot_context, get_swh_persistent_ids, gen_link, gen_directory_link, ) -from swh.web.browse.browseurls import browse_route +from swh.web.common import query, service, highlightjs +from swh.web.common.exc import NotFoundExc, handle_view_exception +from swh.web.common.utils import reverse, gen_path_info, swh_object_icons @browse_route( r"content/(?P[0-9a-z_:]*[0-9a-f]+.)/raw/", view_name="browse-content-raw", checksum_args=["query_string"], ) def content_raw(request, query_string): """Django view that produces a raw display of a content identified by its hash value. The url that points to it is :http:get:`/browse/content/[(algo_hash):](hash)/raw/` """ try: re_encode = bool(strtobool(request.GET.get("re_encode", "false"))) algo, checksum = query.parse_hash(query_string) checksum = hash_to_hex(checksum) content_data = request_content(query_string, max_size=None, re_encode=re_encode) except Exception as exc: return handle_view_exception(request, exc) filename = request.GET.get("filename", None) if not filename: filename = "%s_%s" % (algo, checksum) if ( content_data["mimetype"].startswith("text/") or content_data["mimetype"] == "inode/x-empty" ): response = HttpResponse(content_data["raw_data"], content_type="text/plain") response["Content-disposition"] = "filename=%s" % filename else: response = HttpResponse( content_data["raw_data"], content_type="application/octet-stream" ) response["Content-disposition"] = "attachment; filename=%s" % filename return response _auto_diff_size_limit = 20000 @browse_route( r"content/(?P.*)/diff/(?P.*)", view_name="diff-contents", ) def _contents_diff(request, from_query_string, to_query_string): """ Browse endpoint used to compute unified diffs between two contents. Diffs are generated only if the two contents are textual. By default, diffs whose size are greater than 20 kB will not be generated. To force the generation of large diffs, the 'force' boolean query parameter must be used. Args: request: input django http request from_query_string: a string of the form "[ALGO_HASH:]HASH" where optional ALGO_HASH can be either ``sha1``, ``sha1_git``, ``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH the hexadecimal representation of the hash value identifying the first content to_query_string: same as above for identifying the second content Returns: A JSON object containing the unified diff. """ diff_data = {} content_from = None content_to = None content_from_size = 0 content_to_size = 0 content_from_lines = [] content_to_lines = [] force = request.GET.get("force", "false") path = request.GET.get("path", None) language = "nohighlight" force = bool(strtobool(force)) if from_query_string == to_query_string: diff_str = "File renamed without changes" else: try: text_diff = True if from_query_string: content_from = request_content(from_query_string, max_size=None) content_from_display_data = prepare_content_for_display( content_from["raw_data"], content_from["mimetype"], path ) language = content_from_display_data["language"] content_from_size = content_from["length"] if not ( content_from["mimetype"].startswith("text/") or content_from["mimetype"] == "inode/x-empty" ): text_diff = False if text_diff and to_query_string: content_to = request_content(to_query_string, max_size=None) content_to_display_data = prepare_content_for_display( content_to["raw_data"], content_to["mimetype"], path ) language = content_to_display_data["language"] content_to_size = content_to["length"] if not ( content_to["mimetype"].startswith("text/") or content_to["mimetype"] == "inode/x-empty" ): text_diff = False diff_size = abs(content_to_size - content_from_size) if not text_diff: diff_str = "Diffs are not generated for non textual content" language = "nohighlight" elif not force and diff_size > _auto_diff_size_limit: diff_str = "Large diffs are not automatically computed" language = "nohighlight" else: if content_from: content_from_lines = ( content_from["raw_data"].decode("utf-8").splitlines(True) ) if content_from_lines and content_from_lines[-1][-1] != "\n": content_from_lines[-1] += "[swh-no-nl-marker]\n" if content_to: content_to_lines = ( content_to["raw_data"].decode("utf-8").splitlines(True) ) if content_to_lines and content_to_lines[-1][-1] != "\n": content_to_lines[-1] += "[swh-no-nl-marker]\n" diff_lines = difflib.unified_diff(content_from_lines, content_to_lines) diff_str = "".join(list(diff_lines)[2:]) except Exception as exc: sentry_sdk.capture_exception(exc) diff_str = str(exc) diff_data["diff_str"] = diff_str diff_data["language"] = language diff_data_json = json.dumps(diff_data, separators=(",", ": ")) return HttpResponse(diff_data_json, content_type="application/json") @browse_route( r"content/(?P[0-9a-z_:]*[0-9a-f]+.)/", view_name="browse-content", checksum_args=["query_string"], ) def content_display(request, query_string): """Django view that produces an HTML display of a content identified by its hash value. The url that points to it is :http:get:`/browse/content/[(algo_hash):](hash)/` """ try: algo, checksum = query.parse_hash(query_string) checksum = hash_to_hex(checksum) content_data = request_content(query_string, raise_if_unavailable=False) origin_url = request.GET.get("origin_url", None) selected_language = request.GET.get("language", None) if not origin_url: origin_url = request.GET.get("origin", None) snapshot_context = None if origin_url: try: snapshot_context = get_snapshot_context(origin_url=origin_url) except NotFoundExc: raw_cnt_url = reverse( "browse-content", url_args={"query_string": query_string} ) error_message = ( "The Software Heritage archive has a content " "with the hash you provided but the origin " "mentioned in your request appears broken: %s. " "Please check the URL and try again.\n\n" "Nevertheless, you can still browse the content " "without origin information: %s" % (gen_link(origin_url), gen_link(raw_cnt_url)) ) raise NotFoundExc(error_message) if snapshot_context: snapshot_context["visit_info"] = None except Exception as exc: return handle_view_exception(request, exc) path = request.GET.get("path", None) content = None language = None mimetype = None if content_data["raw_data"] is not None: content_display_data = prepare_content_for_display( content_data["raw_data"], content_data["mimetype"], path ) content = content_display_data["content_data"] language = content_display_data["language"] mimetype = content_display_data["mimetype"] # Override language with user-selected language if selected_language is not None: language = selected_language available_languages = None if mimetype and "text/" in mimetype: available_languages = highlightjs.get_supported_languages() root_dir = None filename = None path_info = None directory_id = None directory_url = None query_params = {"origin": origin_url} breadcrumbs = [] if path: split_path = path.split("/") root_dir = split_path[0] filename = split_path[-1] if root_dir != path: path = path.replace(root_dir + "/", "") path = path[: -len(filename)] path_info = gen_path_info(path) dir_url = reverse( "browse-directory", url_args={"sha1_git": root_dir}, query_params=query_params, ) breadcrumbs.append({"name": root_dir[:7], "url": dir_url}) for pi in path_info: dir_url = reverse( "browse-directory", url_args={"sha1_git": root_dir, "path": pi["path"]}, query_params=query_params, ) breadcrumbs.append({"name": pi["name"], "url": dir_url}) breadcrumbs.append({"name": filename, "url": None}) if path and root_dir != path: try: dir_info = service.lookup_directory_with_path(root_dir, path) directory_id = dir_info["target"] except Exception as exc: return handle_view_exception(request, exc) elif root_dir != path: directory_id = root_dir if directory_id: directory_url = gen_directory_link(directory_id) query_params = {"filename": filename} content_raw_url = reverse( "browse-content-raw", url_args={"query_string": query_string}, query_params=query_params, ) content_metadata = { "sha1": content_data["checksums"]["sha1"], "sha1_git": content_data["checksums"]["sha1_git"], "sha256": content_data["checksums"]["sha256"], "blake2s256": content_data["checksums"]["blake2s256"], "mimetype": content_data["mimetype"], "encoding": content_data["encoding"], "size": filesizeformat(content_data["length"]), "language": content_data["language"], "licenses": content_data["licenses"], "filename": filename, "directory": directory_id, "context-independent directory": directory_url, } if filename: content_metadata["filename"] = filename sha1_git = content_data["checksums"]["sha1_git"] swh_ids = get_swh_persistent_ids([{"type": "content", "id": sha1_git}]) heading = "Content - %s" % sha1_git if breadcrumbs: content_path = "/".join([bc["name"] for bc in breadcrumbs]) heading += " - %s" % content_path return render( request, "browse/content.html", { "heading": heading, "swh_object_id": swh_ids[0]["swh_id"], "swh_object_name": "Content", "swh_object_metadata": content_metadata, "content": content, "content_size": content_data["length"], "max_content_size": content_display_max_size, "mimetype": mimetype, "language": language, "available_languages": available_languages, "breadcrumbs": breadcrumbs, "top_right_link": { "url": content_raw_url, "icon": swh_object_icons["content"], "text": "Raw File", }, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions_menu": True, "swh_ids": swh_ids, "error_code": content_data["error_code"], "error_message": content_data["error_message"], "error_description": content_data["error_description"], }, status=content_data["error_code"], ) diff --git a/swh/web/browse/views/directory.py b/swh/web/browse/views/directory.py index 31e3caad..090ec8b1 100644 --- a/swh/web/browse/views/directory.py +++ b/swh/web/browse/views/directory.py @@ -1,211 +1,211 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from django.http import HttpResponse from django.shortcuts import render, redirect from django.template.defaultfilters import filesizeformat import sentry_sdk -from swh.web.common import service -from swh.web.common.utils import reverse, gen_path_info -from swh.web.common.exc import handle_view_exception, NotFoundExc + +from swh.web.browse.browseurls import browse_route +from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import ( get_directory_entries, - get_snapshot_context, get_readme_to_display, get_swh_persistent_ids, gen_link, ) - -from swh.web.browse.browseurls import browse_route +from swh.web.common import service +from swh.web.common.exc import handle_view_exception, NotFoundExc +from swh.web.common.utils import reverse, gen_path_info @browse_route( r"directory/(?P[0-9a-f]+)/", r"directory/(?P[0-9a-f]+)/(?P.+)/", view_name="browse-directory", checksum_args=["sha1_git"], ) def directory_browse(request, sha1_git, path=None): """Django view for browsing the content of a directory identified by its sha1_git value. The url that points to it is :http:get:`/browse/directory/(sha1_git)/[(path)/]` """ root_sha1_git = sha1_git try: if path: dir_info = service.lookup_directory_with_path(sha1_git, path) sha1_git = dir_info["target"] dirs, files = get_directory_entries(sha1_git) origin_url = request.GET.get("origin_url", None) if not origin_url: origin_url = request.GET.get("origin", None) snapshot_context = None if origin_url: try: snapshot_context = get_snapshot_context(origin_url=origin_url) except NotFoundExc: raw_dir_url = reverse( "browse-directory", url_args={"sha1_git": sha1_git} ) error_message = ( "The Software Heritage archive has a directory " "with the hash you provided but the origin " "mentioned in your request appears broken: %s. " "Please check the URL and try again.\n\n" "Nevertheless, you can still browse the directory " "without origin information: %s" % (gen_link(origin_url), gen_link(raw_dir_url)) ) raise NotFoundExc(error_message) if snapshot_context: snapshot_context["visit_info"] = None except Exception as exc: return handle_view_exception(request, exc) path_info = gen_path_info(path) query_params = {"origin": origin_url} breadcrumbs = [] breadcrumbs.append( { "name": root_sha1_git[:7], "url": reverse( "browse-directory", url_args={"sha1_git": root_sha1_git}, query_params=query_params, ), } ) for pi in path_info: breadcrumbs.append( { "name": pi["name"], "url": reverse( "browse-directory", url_args={"sha1_git": root_sha1_git, "path": pi["path"]}, query_params=query_params, ), } ) path = "" if path is None else (path + "/") for d in dirs: if d["type"] == "rev": d["url"] = reverse( "browse-revision", url_args={"sha1_git": d["target"]}, query_params=query_params, ) else: d["url"] = reverse( "browse-directory", url_args={"sha1_git": root_sha1_git, "path": path + d["name"]}, query_params=query_params, ) sum_file_sizes = 0 readmes = {} for f in files: query_string = "sha1_git:" + f["target"] f["url"] = reverse( "browse-content", url_args={"query_string": query_string}, query_params={ "path": root_sha1_git + "/" + path + f["name"], "origin": origin_url, }, ) if f["length"] is not None: sum_file_sizes += f["length"] f["length"] = filesizeformat(f["length"]) if f["name"].lower().startswith("readme"): readmes[f["name"]] = f["checksums"]["sha1"] readme_name, readme_url, readme_html = get_readme_to_display(readmes) sum_file_sizes = filesizeformat(sum_file_sizes) dir_metadata = { "directory": sha1_git, "number of regular files": len(files), "number of subdirectories": len(dirs), "sum of regular file sizes": sum_file_sizes, } vault_cooking = { "directory_context": True, "directory_id": sha1_git, "revision_context": False, "revision_id": None, } swh_objects = [{"type": "directory", "id": sha1_git}] swh_ids = get_swh_persistent_ids( swh_objects=swh_objects, snapshot_context=snapshot_context ) heading = "Directory - %s" % sha1_git if breadcrumbs: dir_path = "/".join([bc["name"] for bc in breadcrumbs]) + "/" heading += " - %s" % dir_path return render( request, "browse/directory.html", { "heading": heading, "swh_object_id": swh_ids[0]["swh_id"], "swh_object_name": "Directory", "swh_object_metadata": dir_metadata, "dirs": dirs, "files": files, "breadcrumbs": breadcrumbs, "top_right_link": None, "readme_name": readme_name, "readme_url": readme_url, "readme_html": readme_html, "snapshot_context": snapshot_context, "vault_cooking": vault_cooking, "show_actions_menu": True, "swh_ids": swh_ids, }, ) @browse_route( r"directory/resolve/content-path/(?P[0-9a-f]+)/(?P.+)/", view_name="browse-directory-resolve-content-path", checksum_args=["sha1_git"], ) def _directory_resolve_content_path(request, sha1_git, path): """ Internal endpoint redirecting to data url for a specific file path relative to a root directory. """ try: path = os.path.normpath(path) if not path.startswith("../"): dir_info = service.lookup_directory_with_path(sha1_git, path) if dir_info["type"] == "file": sha1 = dir_info["checksums"]["sha1"] data_url = reverse( "browse-content-raw", url_args={"query_string": sha1} ) return redirect(data_url) except Exception as exc: sentry_sdk.capture_exception(exc) return HttpResponse(status=404) diff --git a/swh/web/browse/views/origin.py b/swh/web/browse/views/origin.py index 15f0007b..dadbd7ae 100644 --- a/swh/web/browse/views/origin.py +++ b/swh/web/browse/views/origin.py @@ -1,189 +1,189 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.shortcuts import render, redirect -from swh.web.common import service -from swh.web.common.origin_visits import get_origin_visits -from swh.web.common.utils import reverse, format_utc_iso_date, parse_timestamp -from swh.web.common.exc import handle_view_exception -from swh.web.browse.utils import get_snapshot_context -from swh.web.browse.browseurls import browse_route -from .utils.snapshot_context import ( +from swh.web.browse.browseurls import browse_route +from swh.web.browse.snapshot_context import ( browse_snapshot_directory, browse_snapshot_content, browse_snapshot_log, browse_snapshot_branches, browse_snapshot_releases, + get_snapshot_context, ) +from swh.web.common import service +from swh.web.common.exc import handle_view_exception +from swh.web.common.origin_visits import get_origin_visits +from swh.web.common.utils import reverse, format_utc_iso_date, parse_timestamp @browse_route( r"origin/(?P.+)/visit/(?P.+)/directory/", - r"origin/(?P.+)/visit/(?P.+)" "/directory/(?P.+)/", + r"origin/(?P.+)/visit/(?P.+)/directory/(?P.+)/", r"origin/(?P.+)/directory/", r"origin/(?P.+)/directory/(?P.+)/", view_name="browse-origin-directory", ) def origin_directory_browse(request, origin_url, timestamp=None, path=None): """Django view for browsing the content of a directory associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/(origin_url)/directory/[(path)/]` * :http:get:`/browse/origin/(origin_url)/visit/(timestamp)/directory/[(path)/]` """ return browse_snapshot_directory( request, origin_url=origin_url, timestamp=timestamp, path=path ) @browse_route( - r"origin/(?P.+)/visit/(?P.+)" "/content/(?P.+)/", + r"origin/(?P.+)/visit/(?P.+)/content/(?P.+)/", r"origin/(?P.+)/content/(?P.+)/", view_name="browse-origin-content", ) def origin_content_browse(request, origin_url, path=None, timestamp=None): """Django view that produces an HTML display of a content associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/(origin_url)/content/(path)/` * :http:get:`/browse/origin/(origin_url)/visit/(timestamp)/content/(path)/` """ language = request.GET.get("language", None) return browse_snapshot_content( request, origin_url=origin_url, timestamp=timestamp, path=path, selected_language=language, ) PER_PAGE = 20 @browse_route( r"origin/(?P.+)/visit/(?P.+)/log/", r"origin/(?P.+)/log/", view_name="browse-origin-log", ) def origin_log_browse(request, origin_url, timestamp=None): """Django view that produces an HTML display of revisions history (aka the commit log) associated to a software origin. The url scheme that points to it is the following: * :http:get:`/browse/origin/(origin_url)/log/` * :http:get:`/browse/origin/(origin_url)/visit/(timestamp)/log/` """ return browse_snapshot_log(request, origin_url=origin_url, timestamp=timestamp) @browse_route( r"origin/(?P.+)/visit/(?P.+)/branches/", r"origin/(?P.+)/branches/", view_name="browse-origin-branches", ) def origin_branches_browse(request, origin_url, timestamp=None): """Django view that produces an HTML display of the list of branches associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/(origin_url)/branches/` * :http:get:`/browse/origin/(origin_url)/visit/(timestamp)/branches/` """ return browse_snapshot_branches(request, origin_url=origin_url, timestamp=timestamp) @browse_route( r"origin/(?P.+)/visit/(?P.+)/releases/", r"origin/(?P.+)/releases/", view_name="browse-origin-releases", ) def origin_releases_browse(request, origin_url, timestamp=None): """Django view that produces an HTML display of the list of releases associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/(origin_url)/releases/` * :http:get:`/browse/origin/(origin_url)/visit/(timestamp)/releases/` """ return browse_snapshot_releases(request, origin_url=origin_url, timestamp=timestamp) @browse_route(r"origin/(?P.+)/visits/", view_name="browse-origin-visits") def origin_visits_browse(request, origin_url): """Django view that produces an HTML display of visits reporting for a given origin. The url that points to it is :http:get:`/browse/origin/(origin_url)/visits/`. """ try: origin_info = service.lookup_origin({"url": origin_url}) origin_visits = get_origin_visits(origin_info) snapshot_context = get_snapshot_context(origin_url=origin_url) except Exception as exc: return handle_view_exception(request, exc) for i, visit in enumerate(origin_visits): url_date = format_utc_iso_date(visit["date"], "%Y-%m-%dT%H:%M:%SZ") visit["fmt_date"] = format_utc_iso_date(visit["date"]) query_params = {} if i < len(origin_visits) - 1: if visit["date"] == origin_visits[i + 1]["date"]: query_params = {"visit_id": visit["visit"]} if i > 0: if visit["date"] == origin_visits[i - 1]["date"]: query_params = {"visit_id": visit["visit"]} snapshot = visit["snapshot"] if visit["snapshot"] else "" visit["browse_url"] = reverse( "browse-origin-directory", url_args={"origin_url": origin_url, "timestamp": url_date}, query_params=query_params, ) if not snapshot: visit["snapshot"] = "" visit["date"] = parse_timestamp(visit["date"]).timestamp() heading = "Origin visits - %s" % origin_url return render( request, "browse/origin-visits.html", { "heading": heading, "swh_object_name": "Visits", "swh_object_metadata": origin_info, "origin_visits": origin_visits, "origin_info": origin_info, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions_menu": False, }, ) @browse_route(r"origin/(?P.+)/", view_name="browse-origin") def origin_browse(request, origin_url): """Django view that redirects to the display of the latest archived snapshot for a given software origin. """ last_snapshot_url = reverse( "browse-origin-directory", url_args={"origin_url": origin_url} ) return redirect(last_snapshot_url) diff --git a/swh/web/browse/views/release.py b/swh/web/browse/views/release.py index 01697a9c..5d135d22 100644 --- a/swh/web/browse/views/release.py +++ b/swh/web/browse/views/release.py @@ -1,235 +1,235 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.shortcuts import render import sentry_sdk -from swh.web.common import service -from swh.web.common.utils import reverse, format_utc_iso_date -from swh.web.common.exc import NotFoundExc, handle_view_exception from swh.web.browse.browseurls import browse_route +from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import ( gen_revision_link, - get_snapshot_context, gen_link, gen_snapshot_link, get_swh_persistent_ids, gen_directory_link, gen_content_link, gen_release_link, gen_person_mail_link, ) +from swh.web.common import service +from swh.web.common.exc import NotFoundExc, handle_view_exception +from swh.web.common.utils import reverse, format_utc_iso_date @browse_route( r"release/(?P[0-9a-f]+)/", view_name="browse-release", checksum_args=["sha1_git"], ) def release_browse(request, sha1_git): """ Django view that produces an HTML display of a release identified by its id. The url that points to it is :http:get:`/browse/release/(sha1_git)/`. """ try: release = service.lookup_release(sha1_git) snapshot_context = None origin_info = None snapshot_id = request.GET.get("snapshot_id", None) origin_url = request.GET.get("origin_url", None) if not origin_url: origin_url = request.GET.get("origin", None) timestamp = request.GET.get("timestamp", None) visit_id = request.GET.get("visit_id", None) if origin_url: try: snapshot_context = get_snapshot_context( snapshot_id, origin_url, timestamp, visit_id ) except NotFoundExc: raw_rel_url = reverse("browse-release", url_args={"sha1_git": sha1_git}) error_message = ( "The Software Heritage archive has a release " "with the hash you provided but the origin " "mentioned in your request appears broken: %s. " "Please check the URL and try again.\n\n" "Nevertheless, you can still browse the release " "without origin information: %s" % (gen_link(origin_url), gen_link(raw_rel_url)) ) raise NotFoundExc(error_message) origin_info = snapshot_context["origin_info"] elif snapshot_id: snapshot_context = get_snapshot_context(snapshot_id) except Exception as exc: return handle_view_exception(request, exc) release_data = {} release_data["author"] = "None" if release["author"]: release_data["author"] = gen_person_mail_link(release["author"]) release_data["date"] = format_utc_iso_date(release["date"]) release_data["release"] = sha1_git release_data["name"] = release["name"] release_data["synthetic"] = release["synthetic"] release_data["target"] = release["target"] release_data["target type"] = release["target_type"] if snapshot_context: if release["target_type"] == "revision": release_data["context-independent target"] = gen_revision_link( release["target"] ) elif release["target_type"] == "content": release_data["context-independent target"] = gen_content_link( release["target"] ) elif release["target_type"] == "directory": release_data["context-independent target"] = gen_directory_link( release["target"] ) elif release["target_type"] == "release": release_data["context-independent target"] = gen_release_link( release["target"] ) release_note_lines = [] if release["message"]: release_note_lines = release["message"].split("\n") vault_cooking = None rev_directory = None target_link = None if release["target_type"] == "revision": target_link = gen_revision_link( release["target"], snapshot_context=snapshot_context, link_text=None, link_attrs=None, ) try: revision = service.lookup_revision(release["target"]) rev_directory = revision["directory"] vault_cooking = { "directory_context": True, "directory_id": rev_directory, "revision_context": True, "revision_id": release["target"], } except Exception as exc: sentry_sdk.capture_exception(exc) elif release["target_type"] == "directory": target_link = gen_directory_link( release["target"], snapshot_context=snapshot_context, link_text=None, link_attrs=None, ) try: # check directory exists service.lookup_directory(release["target"]) vault_cooking = { "directory_context": True, "directory_id": release["target"], "revision_context": False, "revision_id": None, } except Exception as exc: sentry_sdk.capture_exception(exc) elif release["target_type"] == "content": target_link = gen_content_link( release["target"], snapshot_context=snapshot_context, link_text=None, link_attrs=None, ) elif release["target_type"] == "release": target_link = gen_release_link( release["target"], snapshot_context=snapshot_context, link_text=None, link_attrs=None, ) rev_directory_url = None if rev_directory is not None: if origin_info: rev_directory_url = reverse( "browse-origin-directory", url_args={"origin_url": origin_info["url"]}, query_params={"release": release["name"]}, ) elif snapshot_id: rev_directory_url = reverse( "browse-snapshot-directory", url_args={"snapshot_id": snapshot_id}, query_params={"release": release["name"]}, ) else: rev_directory_url = reverse( "browse-directory", url_args={"sha1_git": rev_directory} ) directory_link = None if rev_directory_url is not None: directory_link = gen_link(rev_directory_url, rev_directory) release["directory_link"] = directory_link release["target_link"] = target_link if snapshot_context: release_data["snapshot"] = snapshot_context["snapshot_id"] if origin_info: release_data["context-independent release"] = gen_release_link(release["id"]) release_data["origin url"] = gen_link(origin_info["url"], origin_info["url"]) browse_snapshot_link = gen_snapshot_link(snapshot_context["snapshot_id"]) release_data["context-independent snapshot"] = browse_snapshot_link swh_objects = [{"type": "release", "id": sha1_git}] if snapshot_context: snapshot_id = snapshot_context["snapshot_id"] if snapshot_id: swh_objects.append({"type": "snapshot", "id": snapshot_id}) swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context) note_header = "None" if len(release_note_lines) > 0: note_header = release_note_lines[0] release["note_header"] = note_header release["note_body"] = "\n".join(release_note_lines[1:]) heading = "Release - %s" % release["name"] if snapshot_context: context_found = "snapshot: %s" % snapshot_context["snapshot_id"] if origin_info: context_found = "origin: %s" % origin_info["url"] heading += " - %s" % context_found return render( request, "browse/release.html", { "heading": heading, "swh_object_id": swh_ids[0]["swh_id"], "swh_object_name": "Release", "swh_object_metadata": release_data, "release": release, "snapshot_context": snapshot_context, "show_actions_menu": True, "breadcrumbs": None, "vault_cooking": vault_cooking, "top_right_link": None, "swh_ids": swh_ids, }, ) diff --git a/swh/web/browse/views/revision.py b/swh/web/browse/views/revision.py index d72df374..8c89423e 100644 --- a/swh/web/browse/views/revision.py +++ b/swh/web/browse/views/revision.py @@ -1,593 +1,593 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import json import textwrap from django.http import HttpResponse from django.shortcuts import render from django.template.defaultfilters import filesizeformat from django.utils.html import escape from django.utils.safestring import mark_safe from swh.model.identifiers import persistent_identifier -from swh.web.common import service -from swh.web.common.utils import ( - reverse, - format_utc_iso_date, - gen_path_info, - swh_object_icons, -) -from swh.web.common.exc import NotFoundExc, handle_view_exception from swh.web.browse.browseurls import browse_route +from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import ( gen_link, gen_revision_link, gen_revision_url, - get_snapshot_context, get_revision_log_url, get_directory_entries, gen_directory_link, request_content, prepare_content_for_display, content_display_max_size, gen_snapshot_link, get_readme_to_display, get_swh_persistent_ids, format_log_entries, gen_person_mail_link, ) +from swh.web.common import service +from swh.web.common.exc import NotFoundExc, handle_view_exception +from swh.web.common.utils import ( + reverse, + format_utc_iso_date, + gen_path_info, + swh_object_icons, +) def _gen_content_url(revision, query_string, path, snapshot_context): if snapshot_context: url_args = snapshot_context["url_args"] url_args["path"] = path query_params = snapshot_context["query_params"] query_params["revision"] = revision["id"] content_url = reverse( "browse-origin-content", url_args=url_args, query_params=query_params ) else: content_path = "%s/%s" % (revision["directory"], path) content_url = reverse( "browse-content", url_args={"query_string": query_string}, query_params={"path": content_path}, ) return content_url def _gen_diff_link(idx, diff_anchor, link_text): if idx < _max_displayed_file_diffs: return gen_link(diff_anchor, link_text) else: return link_text # TODO: put in conf _max_displayed_file_diffs = 1000 def _gen_revision_changes_list(revision, changes, snapshot_context): """ Returns a HTML string describing the file changes introduced in a revision. As this string will be displayed in the browse revision view, links to adequate file diffs are also generated. Args: revision (str): hexadecimal representation of a revision identifier changes (list): list of file changes in the revision snapshot_context (dict): optional origin context used to reverse the content urls Returns: A string to insert in a revision HTML view. """ changes_msg = [] for i, change in enumerate(changes): hasher = hashlib.sha1() from_query_string = "" to_query_string = "" diff_id = "diff-" if change["from"]: from_query_string = "sha1_git:" + change["from"]["target"] diff_id += change["from"]["target"] + "-" + change["from_path"] diff_id += "-" if change["to"]: to_query_string = "sha1_git:" + change["to"]["target"] diff_id += change["to"]["target"] + change["to_path"] change["path"] = change["to_path"] or change["from_path"] url_args = { "from_query_string": from_query_string, "to_query_string": to_query_string, } query_params = {"path": change["path"]} change["diff_url"] = reverse( "diff-contents", url_args=url_args, query_params=query_params ) hasher.update(diff_id.encode("utf-8")) diff_id = hasher.hexdigest() change["id"] = diff_id panel_diff_link = "#panel_" + diff_id if change["type"] == "modify": change["content_url"] = _gen_content_url( revision, to_query_string, change["to_path"], snapshot_context ) changes_msg.append( "modified: %s" % _gen_diff_link(i, panel_diff_link, change["to_path"]) ) elif change["type"] == "insert": change["content_url"] = _gen_content_url( revision, to_query_string, change["to_path"], snapshot_context ) changes_msg.append( "new file: %s" % _gen_diff_link(i, panel_diff_link, change["to_path"]) ) elif change["type"] == "delete": parent = service.lookup_revision(revision["parents"][0]) change["content_url"] = _gen_content_url( parent, from_query_string, change["from_path"], snapshot_context ) changes_msg.append( "deleted: %s" % _gen_diff_link(i, panel_diff_link, change["from_path"]) ) elif change["type"] == "rename": change["content_url"] = _gen_content_url( revision, to_query_string, change["to_path"], snapshot_context ) link_text = change["from_path"] + " → " + change["to_path"] changes_msg.append( "renamed: %s" % _gen_diff_link(i, panel_diff_link, link_text) ) if not changes: changes_msg.append("No changes") return mark_safe("\n".join(changes_msg)) @browse_route( r"revision/(?P[0-9a-f]+)/diff/", view_name="diff-revision", checksum_args=["sha1_git"], ) def _revision_diff(request, sha1_git): """ Browse internal endpoint to compute revision diff """ try: revision = service.lookup_revision(sha1_git) snapshot_context = None origin_url = request.GET.get("origin_url", None) if not origin_url: origin_url = request.GET.get("origin", None) timestamp = request.GET.get("timestamp", None) visit_id = request.GET.get("visit_id", None) if origin_url: snapshot_context = get_snapshot_context( origin_url=origin_url, timestamp=timestamp, visit_id=visit_id ) except Exception as exc: return handle_view_exception(request, exc) changes = service.diff_revision(sha1_git) changes_msg = _gen_revision_changes_list(revision, changes, snapshot_context) diff_data = { "total_nb_changes": len(changes), "changes": changes[:_max_displayed_file_diffs], "changes_msg": changes_msg, } diff_data_json = json.dumps(diff_data, separators=(",", ": ")) return HttpResponse(diff_data_json, content_type="application/json") NB_LOG_ENTRIES = 100 @browse_route( r"revision/(?P[0-9a-f]+)/log/", view_name="browse-revision-log", checksum_args=["sha1_git"], ) def revision_log_browse(request, sha1_git): """ Django view that produces an HTML display of the history log for a revision identified by its id. The url that points to it is :http:get:`/browse/revision/(sha1_git)/log/` """ try: per_page = int(request.GET.get("per_page", NB_LOG_ENTRIES)) offset = int(request.GET.get("offset", 0)) revs_ordering = request.GET.get("revs_ordering", "committer_date") session_key = "rev_%s_log_ordering_%s" % (sha1_git, revs_ordering) rev_log_session = request.session.get(session_key, None) rev_log = [] revs_walker_state = None if rev_log_session: rev_log = rev_log_session["rev_log"] revs_walker_state = rev_log_session["revs_walker_state"] if len(rev_log) < offset + per_page: revs_walker = service.get_revisions_walker( revs_ordering, sha1_git, max_revs=offset + per_page + 1, state=revs_walker_state, ) rev_log += [rev["id"] for rev in revs_walker] revs_walker_state = revs_walker.export_state() revs = rev_log[offset : offset + per_page] revision_log = service.lookup_revision_multiple(revs) request.session[session_key] = { "rev_log": rev_log, "revs_walker_state": revs_walker_state, } except Exception as exc: return handle_view_exception(request, exc) revs_ordering = request.GET.get("revs_ordering", "") prev_log_url = None if len(rev_log) > offset + per_page: prev_log_url = reverse( "browse-revision-log", url_args={"sha1_git": sha1_git}, query_params={ "per_page": per_page, "offset": offset + per_page, "revs_ordering": revs_ordering, }, ) next_log_url = None if offset != 0: next_log_url = reverse( "browse-revision-log", url_args={"sha1_git": sha1_git}, query_params={ "per_page": per_page, "offset": offset - per_page, "revs_ordering": revs_ordering, }, ) revision_log_data = format_log_entries(revision_log, per_page) swh_rev_id = persistent_identifier("revision", sha1_git) return render( request, "browse/revision-log.html", { "heading": "Revision history", "swh_object_id": swh_rev_id, "swh_object_name": "Revisions history", "swh_object_metadata": None, "revision_log": revision_log_data, "revs_ordering": revs_ordering, "next_log_url": next_log_url, "prev_log_url": prev_log_url, "breadcrumbs": None, "top_right_link": None, "snapshot_context": None, "vault_cooking": None, "show_actions_menu": True, "swh_ids": None, }, ) @browse_route( r"revision/(?P[0-9a-f]+)/", r"revision/(?P[0-9a-f]+)/(?P.+)/", view_name="browse-revision", checksum_args=["sha1_git"], ) def revision_browse(request, sha1_git, extra_path=None): """ Django view that produces an HTML display of a revision identified by its id. The url that points to it is :http:get:`/browse/revision/(sha1_git)/`. """ try: revision = service.lookup_revision(sha1_git) origin_info = None snapshot_context = None origin_url = request.GET.get("origin_url", None) if not origin_url: origin_url = request.GET.get("origin", None) timestamp = request.GET.get("timestamp", None) visit_id = request.GET.get("visit_id", None) snapshot_id = request.GET.get("snapshot_id", None) path = request.GET.get("path", None) dir_id = None dirs, files = None, None content_data = None if origin_url: try: snapshot_context = get_snapshot_context( origin_url=origin_url, timestamp=timestamp, visit_id=visit_id ) except NotFoundExc: raw_rev_url = reverse( "browse-revision", url_args={"sha1_git": sha1_git} ) error_message = ( "The Software Heritage archive has a revision " "with the hash you provided but the origin " "mentioned in your request appears broken: %s. " "Please check the URL and try again.\n\n" "Nevertheless, you can still browse the revision " "without origin information: %s" % (gen_link(origin_url), gen_link(raw_rev_url)) ) raise NotFoundExc(error_message) origin_info = snapshot_context["origin_info"] snapshot_id = snapshot_context["snapshot_id"] elif snapshot_id: snapshot_context = get_snapshot_context(snapshot_id) if path: file_info = service.lookup_directory_with_path(revision["directory"], path) if file_info["type"] == "dir": dir_id = file_info["target"] else: query_string = "sha1_git:" + file_info["target"] content_data = request_content(query_string, raise_if_unavailable=False) else: dir_id = revision["directory"] if dir_id: path = "" if path is None else (path + "/") dirs, files = get_directory_entries(dir_id) except Exception as exc: return handle_view_exception(request, exc) revision_data = {} revision_data["author"] = "None" if revision["author"]: author_link = gen_person_mail_link(revision["author"]) revision_data["author"] = author_link revision_data["committer"] = "None" if revision["committer"]: committer_link = gen_person_mail_link(revision["committer"]) revision_data["committer"] = committer_link revision_data["committer date"] = format_utc_iso_date(revision["committer_date"]) revision_data["date"] = format_utc_iso_date(revision["date"]) revision_data["directory"] = revision["directory"] if snapshot_context: revision_data["snapshot"] = snapshot_id browse_snapshot_link = gen_snapshot_link(snapshot_id) revision_data["context-independent snapshot"] = browse_snapshot_link revision_data["context-independent directory"] = gen_directory_link( revision["directory"] ) revision_data["revision"] = sha1_git revision_data["merge"] = revision["merge"] revision_data["metadata"] = escape( json.dumps( revision["metadata"], sort_keys=True, indent=4, separators=(",", ": ") ) ) if origin_info: revision_data["origin url"] = gen_link(origin_info["url"], origin_info["url"]) revision_data["context-independent revision"] = gen_revision_link(sha1_git) parents = "" for p in revision["parents"]: parent_link = gen_revision_link( p, link_text=None, link_attrs=None, snapshot_context=snapshot_context ) parents += parent_link + "
" revision_data["parents"] = mark_safe(parents) revision_data["synthetic"] = revision["synthetic"] revision_data["type"] = revision["type"] message_lines = ["None"] if revision["message"]: message_lines = revision["message"].split("\n") parents = [] for p in revision["parents"]: parent_url = gen_revision_url(p, snapshot_context) parents.append({"id": p, "url": parent_url}) path_info = gen_path_info(path) query_params = { "snapshot_id": snapshot_id, "origin": origin_url, "timestamp": timestamp, "visit_id": visit_id, } breadcrumbs = [] breadcrumbs.append( { "name": revision["directory"][:7], "url": reverse( "browse-revision", url_args={"sha1_git": sha1_git}, query_params=query_params, ), } ) for pi in path_info: query_params["path"] = pi["path"] breadcrumbs.append( { "name": pi["name"], "url": reverse( "browse-revision", url_args={"sha1_git": sha1_git}, query_params=query_params, ), } ) vault_cooking = { "directory_context": False, "directory_id": None, "revision_context": True, "revision_id": sha1_git, } swh_objects = [{"type": "revision", "id": sha1_git}] content = None content_size = None mimetype = None language = None readme_name = None readme_url = None readme_html = None readmes = {} error_code = 200 error_message = "" error_description = "" if content_data: breadcrumbs[-1]["url"] = None content_size = content_data["length"] mimetype = content_data["mimetype"] if content_data["raw_data"]: content_display_data = prepare_content_for_display( content_data["raw_data"], content_data["mimetype"], path ) content = content_display_data["content_data"] language = content_display_data["language"] mimetype = content_display_data["mimetype"] query_params = {} if path: filename = path_info[-1]["name"] query_params["filename"] = path_info[-1]["name"] revision_data["filename"] = filename top_right_link = { "url": reverse( "browse-content-raw", url_args={"query_string": query_string}, query_params=query_params, ), "icon": swh_object_icons["content"], "text": "Raw File", } swh_objects.append({"type": "content", "id": file_info["target"]}) error_code = content_data["error_code"] error_message = content_data["error_message"] error_description = content_data["error_description"] else: for d in dirs: if d["type"] == "rev": d["url"] = reverse( "browse-revision", url_args={"sha1_git": d["target"]} ) else: query_params["path"] = path + d["name"] d["url"] = reverse( "browse-revision", url_args={"sha1_git": sha1_git}, query_params=query_params, ) for f in files: query_params["path"] = path + f["name"] f["url"] = reverse( "browse-revision", url_args={"sha1_git": sha1_git}, query_params=query_params, ) if f["length"] is not None: f["length"] = filesizeformat(f["length"]) if f["name"].lower().startswith("readme"): readmes[f["name"]] = f["checksums"]["sha1"] readme_name, readme_url, readme_html = get_readme_to_display(readmes) top_right_link = { "url": get_revision_log_url(sha1_git, snapshot_context), "icon": swh_object_icons["revisions history"], "text": "History", } vault_cooking["directory_context"] = True vault_cooking["directory_id"] = dir_id swh_objects.append({"type": "directory", "id": dir_id}) diff_revision_url = reverse( "diff-revision", url_args={"sha1_git": sha1_git}, query_params={ "origin": origin_url, "timestamp": timestamp, "visit_id": visit_id, }, ) if snapshot_id: swh_objects.append({"type": "snapshot", "id": snapshot_id}) swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context) heading = "Revision - %s - %s" % ( sha1_git[:7], textwrap.shorten(message_lines[0], width=70), ) if snapshot_context: context_found = "snapshot: %s" % snapshot_context["snapshot_id"] if origin_info: context_found = "origin: %s" % origin_info["url"] heading += " - %s" % context_found return render( request, "browse/revision.html", { "heading": heading, "swh_object_id": swh_ids[0]["swh_id"], "swh_object_name": "Revision", "swh_object_metadata": revision_data, "message_header": message_lines[0], "message_body": "\n".join(message_lines[1:]), "parents": parents, "snapshot_context": snapshot_context, "dirs": dirs, "files": files, "content": content, "content_size": content_size, "max_content_size": content_display_max_size, "mimetype": mimetype, "language": language, "readme_name": readme_name, "readme_url": readme_url, "readme_html": readme_html, "breadcrumbs": breadcrumbs, "top_right_link": top_right_link, "vault_cooking": vault_cooking, "diff_revision_url": diff_revision_url, "show_actions_menu": True, "swh_ids": swh_ids, "error_code": error_code, "error_message": error_message, "error_description": error_description, }, status=error_code, ) diff --git a/swh/web/browse/views/snapshot.py b/swh/web/browse/views/snapshot.py index 282a4bfc..e9489cb6 100644 --- a/swh/web/browse/views/snapshot.py +++ b/swh/web/browse/views/snapshot.py @@ -1,120 +1,120 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.shortcuts import redirect from swh.web.browse.browseurls import browse_route from swh.web.common.utils import reverse -from .utils.snapshot_context import ( +from swh.web.browse.snapshot_context import ( browse_snapshot_directory, browse_snapshot_content, browse_snapshot_log, browse_snapshot_branches, browse_snapshot_releases, ) @browse_route( r"snapshot/(?P[0-9a-f]+)/", view_name="browse-snapshot", checksum_args=["snapshot_id"], ) def snapshot_browse(request, snapshot_id): """Django view for browsing the content of a snapshot. The url that points to it is :http:get:`/browse/snapshot/(snapshot_id)/` """ browse_snapshot_url = reverse( "browse-snapshot-directory", url_args={"snapshot_id": snapshot_id}, query_params=request.GET, ) return redirect(browse_snapshot_url) @browse_route( r"snapshot/(?P[0-9a-f]+)/directory/", r"snapshot/(?P[0-9a-f]+)/directory/(?P.+)/", view_name="browse-snapshot-directory", checksum_args=["snapshot_id"], ) def snapshot_directory_browse(request, snapshot_id, path=None): """Django view for browsing the content of a directory collected in a snapshot. The url that points to it is :http:get:`/browse/snapshot/(snapshot_id)/directory/[(path)/]` """ origin_url = request.GET.get("origin_url", None) if not origin_url: origin_url = request.GET.get("origin", None) return browse_snapshot_directory( request, snapshot_id=snapshot_id, path=path, origin_url=origin_url ) @browse_route( r"snapshot/(?P[0-9a-f]+)/content/(?P.+)/", view_name="browse-snapshot-content", checksum_args=["snapshot_id"], ) def snapshot_content_browse(request, snapshot_id, path): """Django view that produces an HTML display of a content collected in a snapshot. The url that points to it is :http:get:`/browse/snapshot/(snapshot_id)/content/(path)/` """ language = request.GET.get("language", None) return browse_snapshot_content( request, snapshot_id=snapshot_id, path=path, selected_language=language ) @browse_route( r"snapshot/(?P[0-9a-f]+)/log/", view_name="browse-snapshot-log", checksum_args=["snapshot_id"], ) def snapshot_log_browse(request, snapshot_id): """Django view that produces an HTML display of revisions history (aka the commit log) collected in a snapshot. The url that points to it is :http:get:`/browse/snapshot/(snapshot_id)/log/` """ return browse_snapshot_log(request, snapshot_id=snapshot_id) @browse_route( r"snapshot/(?P[0-9a-f]+)/branches/", view_name="browse-snapshot-branches", checksum_args=["snapshot_id"], ) def snapshot_branches_browse(request, snapshot_id): """Django view that produces an HTML display of the list of releases collected in a snapshot. The url that points to it is :http:get:`/browse/snapshot/(snapshot_id)/branches/` """ return browse_snapshot_branches(request, snapshot_id=snapshot_id) @browse_route( r"snapshot/(?P[0-9a-f]+)/releases/", view_name="browse-snapshot-releases", checksum_args=["snapshot_id"], ) def snapshot_releases_browse(request, snapshot_id): """Django view that produces an HTML display of the list of releases collected in a snapshot. The url that points to it is :http:get:`/browse/snapshot/(snapshot_id)/releases/` """ return browse_snapshot_releases(request, snapshot_id=snapshot_id) diff --git a/swh/web/browse/views/utils/__init__.py b/swh/web/browse/views/utils/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/swh/web/tests/browse/test_utils.py b/swh/web/tests/browse/test_snapshot_context.py similarity index 52% copy from swh/web/tests/browse/test_utils.py copy to swh/web/tests/browse/test_snapshot_context.py index 587497a9..a14d2ed2 100644 --- a/swh/web/tests/browse/test_utils.py +++ b/swh/web/tests/browse/test_snapshot_context.py @@ -1,131 +1,65 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given -from swh.web.browse import utils -from swh.web.common.utils import reverse, format_utc_iso_date +from swh.web.browse.snapshot_context import get_origin_visit_snapshot +from swh.web.common.utils import format_utc_iso_date from swh.web.tests.strategies import origin_with_multiple_visits -def test_get_mimetype_and_encoding_for_content(): - text = b"Hello world!" - assert utils.get_mimetype_and_encoding_for_content(text) == ( - "text/plain", - "us-ascii", - ) - - @given(origin_with_multiple_visits()) def test_get_origin_visit_snapshot_simple(archive_data, origin): visits = archive_data.origin_visit_get(origin["url"]) for visit in visits: snapshot = archive_data.snapshot_get(visit["snapshot"]) branches = [] releases = [] def _process_branch_data(branch, branch_data): if branch_data["target_type"] == "revision": rev_data = archive_data.revision_get(branch_data["target"]) branches.append( { "name": branch, "revision": branch_data["target"], "directory": rev_data["directory"], "date": format_utc_iso_date(rev_data["date"]), "message": rev_data["message"], } ) elif branch_data["target_type"] == "release": rel_data = archive_data.release_get(branch_data["target"]) rev_data = archive_data.revision_get(rel_data["target"]) releases.append( { "name": rel_data["name"], "branch_name": branch, "date": format_utc_iso_date(rel_data["date"]), "id": rel_data["id"], "message": rel_data["message"], "target_type": rel_data["target_type"], "target": rel_data["target"], "directory": rev_data["directory"], } ) for branch in sorted(snapshot["branches"].keys()): branch_data = snapshot["branches"][branch] if branch_data["target_type"] == "alias": target_data = snapshot["branches"][branch_data["target"]] _process_branch_data(branch, target_data) else: _process_branch_data(branch, branch_data) assert branches and releases, "Incomplete test data." - origin_visit_branches = utils.get_origin_visit_snapshot( + origin_visit_branches = get_origin_visit_snapshot( origin, visit_id=visit["visit"] ) assert origin_visit_branches == (branches, releases) - - -def test_gen_link(): - assert ( - utils.gen_link("https://www.softwareheritage.org/", "swh") - == 'swh' - ) - - -def test_gen_revision_link(): - revision_id = "28a0bc4120d38a394499382ba21d6965a67a3703" - revision_url = reverse("browse-revision", url_args={"sha1_git": revision_id}) - - assert utils.gen_revision_link( - revision_id, link_text=None, link_attrs=None - ) == '%s' % (revision_url, revision_id) - assert utils.gen_revision_link( - revision_id, shorten_id=True, link_attrs=None - ) == '%s' % (revision_url, revision_id[:7]) - - -def test_gen_person_mail_link(): - person_full = { - "name": "John Doe", - "email": "john.doe@swh.org", - "fullname": "John Doe ", - } - - assert utils.gen_person_mail_link(person_full) == '%s' % ( - person_full["email"], - person_full["name"], - ) - - link_text = "Mail" - assert utils.gen_person_mail_link( - person_full, link_text=link_text - ) == '%s' % (person_full["email"], link_text) - - person_partial_email = {"name": None, "email": None, "fullname": "john.doe@swh.org"} - - assert utils.gen_person_mail_link( - person_partial_email - ) == '%s' % ( - person_partial_email["fullname"], - person_partial_email["fullname"], - ) - - person_partial = { - "name": None, - "email": None, - "fullname": "John Doe ", - } - - assert utils.gen_person_mail_link(person_partial) == person_partial["fullname"] - - person_none = {"name": None, "email": None, "fullname": None} - - assert utils.gen_person_mail_link(person_none) == "None" diff --git a/swh/web/tests/browse/test_utils.py b/swh/web/tests/browse/test_utils.py index 587497a9..2f00a493 100644 --- a/swh/web/tests/browse/test_utils.py +++ b/swh/web/tests/browse/test_utils.py @@ -1,131 +1,75 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from hypothesis import given - -from swh.web.browse import utils -from swh.web.common.utils import reverse, format_utc_iso_date -from swh.web.tests.strategies import origin_with_multiple_visits +from swh.web.browse.utils import ( + get_mimetype_and_encoding_for_content, + gen_link, + gen_revision_link, + gen_person_mail_link, +) +from swh.web.common.utils import reverse def test_get_mimetype_and_encoding_for_content(): text = b"Hello world!" - assert utils.get_mimetype_and_encoding_for_content(text) == ( - "text/plain", - "us-ascii", - ) - - -@given(origin_with_multiple_visits()) -def test_get_origin_visit_snapshot_simple(archive_data, origin): - visits = archive_data.origin_visit_get(origin["url"]) - - for visit in visits: - - snapshot = archive_data.snapshot_get(visit["snapshot"]) - branches = [] - releases = [] - - def _process_branch_data(branch, branch_data): - if branch_data["target_type"] == "revision": - rev_data = archive_data.revision_get(branch_data["target"]) - branches.append( - { - "name": branch, - "revision": branch_data["target"], - "directory": rev_data["directory"], - "date": format_utc_iso_date(rev_data["date"]), - "message": rev_data["message"], - } - ) - elif branch_data["target_type"] == "release": - rel_data = archive_data.release_get(branch_data["target"]) - rev_data = archive_data.revision_get(rel_data["target"]) - releases.append( - { - "name": rel_data["name"], - "branch_name": branch, - "date": format_utc_iso_date(rel_data["date"]), - "id": rel_data["id"], - "message": rel_data["message"], - "target_type": rel_data["target_type"], - "target": rel_data["target"], - "directory": rev_data["directory"], - } - ) - - for branch in sorted(snapshot["branches"].keys()): - branch_data = snapshot["branches"][branch] - if branch_data["target_type"] == "alias": - target_data = snapshot["branches"][branch_data["target"]] - _process_branch_data(branch, target_data) - else: - _process_branch_data(branch, branch_data) - - assert branches and releases, "Incomplete test data." - - origin_visit_branches = utils.get_origin_visit_snapshot( - origin, visit_id=visit["visit"] - ) - - assert origin_visit_branches == (branches, releases) + assert get_mimetype_and_encoding_for_content(text) == ("text/plain", "us-ascii",) def test_gen_link(): assert ( - utils.gen_link("https://www.softwareheritage.org/", "swh") + gen_link("https://www.softwareheritage.org/", "swh") == 'swh' ) def test_gen_revision_link(): revision_id = "28a0bc4120d38a394499382ba21d6965a67a3703" revision_url = reverse("browse-revision", url_args={"sha1_git": revision_id}) - assert utils.gen_revision_link( + assert gen_revision_link( revision_id, link_text=None, link_attrs=None ) == '%s' % (revision_url, revision_id) - assert utils.gen_revision_link( + assert gen_revision_link( revision_id, shorten_id=True, link_attrs=None ) == '%s' % (revision_url, revision_id[:7]) def test_gen_person_mail_link(): person_full = { "name": "John Doe", "email": "john.doe@swh.org", "fullname": "John Doe ", } - assert utils.gen_person_mail_link(person_full) == '%s' % ( + assert gen_person_mail_link(person_full) == '%s' % ( person_full["email"], person_full["name"], ) link_text = "Mail" - assert utils.gen_person_mail_link( + assert gen_person_mail_link( person_full, link_text=link_text ) == '%s' % (person_full["email"], link_text) person_partial_email = {"name": None, "email": None, "fullname": "john.doe@swh.org"} - assert utils.gen_person_mail_link( + assert gen_person_mail_link( person_partial_email ) == '%s' % ( person_partial_email["fullname"], person_partial_email["fullname"], ) person_partial = { "name": None, "email": None, "fullname": "John Doe ", } - assert utils.gen_person_mail_link(person_partial) == person_partial["fullname"] + assert gen_person_mail_link(person_partial) == person_partial["fullname"] person_none = {"name": None, "email": None, "fullname": None} - assert utils.gen_person_mail_link(person_none) == "None" + assert gen_person_mail_link(person_none) == "None" diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py index d7794007..efdf12b4 100644 --- a/swh/web/tests/browse/views/test_origin.py +++ b/swh/web/tests/browse/views/test_origin.py @@ -1,1117 +1,1113 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random import re import string -import swh.web.browse.utils - from django.utils.html import escape from hypothesis import given from swh.model.hashutil import hash_to_bytes from swh.model.model import Snapshot -from swh.web.browse.utils import process_snapshot_branches +from swh.web.browse.snapshot_context import process_snapshot_branches from swh.web.common.exc import NotFoundExc from swh.web.common.identifiers import get_swh_persistent_id from swh.web.common.utils import ( reverse, gen_path_info, format_utc_iso_date, parse_timestamp, ) +from swh.web.config import get_config from swh.web.tests.data import get_content, random_sha1 from swh.web.tests.django_asserts import assert_contains, assert_template_used from swh.web.tests.strategies import ( origin, origin_with_multiple_visits, new_origin, new_snapshot, visit_dates, revisions, origin_with_releases, release as existing_release, ) @given(origin_with_multiple_visits()) def test_origin_visits_browse(client, archive_data, origin): url = reverse("browse-origin-visits", url_args={"origin_url": origin["url"]}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/origin-visits.html") url = reverse("browse-origin-visits", url_args={"origin_url": origin["url"]}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/origin-visits.html") visits = archive_data.origin_visit_get(origin["url"]) for v in visits: vdate = format_utc_iso_date(v["date"], "%Y-%m-%dT%H:%M:%SZ") browse_dir_url = reverse( "browse-origin-directory", url_args={"origin_url": origin["url"], "timestamp": vdate}, ) assert_contains(resp, browse_dir_url) @given(origin_with_multiple_visits()) def test_origin_content_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) def _get_archive_data(visit_idx): snapshot = archive_data.snapshot_get(origin_visits[visit_idx]["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) dir_content = archive_data.directory_ls(head_rev["directory"]) dir_files = [e for e in dir_content if e["type"] == "file"] dir_file = random.choice(dir_files) branches, releases = process_snapshot_branches(snapshot) return { "branches": branches, "releases": releases, "root_dir_sha1": head_rev["directory"], "content": get_content(dir_file["checksums"]["sha1"]), "visit": origin_visits[visit_idx], } tdata = _get_archive_data(-1) _origin_content_view_test_helper( client, origin, origin_visits, tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], ) _origin_content_view_test_helper( client, origin, origin_visits, tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], timestamp=tdata["visit"]["date"], ) visit_unix_ts = parse_timestamp(tdata["visit"]["date"]).timestamp() visit_unix_ts = int(visit_unix_ts) _origin_content_view_test_helper( client, origin, origin_visits, tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], timestamp=visit_unix_ts, ) tdata = _get_archive_data(0) _origin_content_view_test_helper( client, origin, origin_visits, tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], visit_id=tdata["visit"]["visit"], ) @given(origin()) def test_origin_root_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev["directory"] dir_content = archive_data.directory_ls(root_dir_sha1) branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit["date"]).timestamp() visit_unix_ts = int(visit_unix_ts) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit["visit"], ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts, ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit["date"], ) origin = dict(origin) del origin["type"] _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit["visit"], ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts, ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit["date"], ) @given(origin()) def test_origin_sub_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev["directory"] subdirs = [ e for e in archive_data.directory_ls(root_dir_sha1) if e["type"] == "dir" ] branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit["date"]).timestamp() visit_unix_ts = int(visit_unix_ts) if len(subdirs) == 0: return subdir = random.choice(subdirs) subdir_content = archive_data.directory_ls(subdir["target"]) subdir_path = subdir["name"] _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit["visit"], ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts, ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit["date"], ) origin = dict(origin) del origin["type"] _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit["visit"], ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts, ) _origin_directory_view_test_helper( client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit["date"], ) @given(origin()) def test_origin_branches(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) snapshot_content = process_snapshot_branches(snapshot) _origin_branches_test_helper(client, origin, snapshot_content) origin = dict(origin) origin["type"] = None _origin_branches_test_helper(client, origin, snapshot_content) @given(origin()) def test_origin_releases(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) snapshot_content = process_snapshot_branches(snapshot) _origin_releases_test_helper(client, origin, snapshot_content) origin = dict(origin) origin["type"] = None _origin_releases_test_helper(client, origin, snapshot_content) @given( new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=3, max_size=3), ) def test_origin_snapshot_null_branch( client, archive_data, new_origin, new_snapshot, visit_dates, revisions ): snp_dict = new_snapshot.to_dict() new_origin = archive_data.origin_add([new_origin])[0] for i, branch in enumerate(snp_dict["branches"].keys()): if i == 0: snp_dict["branches"][branch] = None else: snp_dict["branches"][branch] = { "target_type": "revision", "target": hash_to_bytes(revisions[i - 1]), } archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git") archive_data.origin_visit_update( new_origin["url"], visit.visit, status="partial", snapshot=snp_dict["id"] ) url = reverse("browse-origin-directory", url_args={"origin_url": new_origin["url"]}) rv = client.get(url) assert rv.status_code == 200 @given( new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=4, max_size=4), ) def test_origin_snapshot_invalid_branch( client, archive_data, new_origin, new_snapshot, visit_dates, revisions ): snp_dict = new_snapshot.to_dict() new_origin = archive_data.origin_add([new_origin])[0] for i, branch in enumerate(snp_dict["branches"].keys()): snp_dict["branches"][branch] = { "target_type": "revision", "target": hash_to_bytes(revisions[i]), } archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git") archive_data.origin_visit_update( new_origin["url"], visit.visit, status="full", snapshot=snp_dict["id"] ) url = reverse( "browse-origin-directory", url_args={"origin_url": new_origin["url"]}, query_params={"branch": "invalid_branch"}, ) rv = client.get(url) assert rv.status_code == 404 def test_origin_request_errors(client, archive_data, mocker): - mock_snapshot_service = mocker.patch( - "swh.web.browse.views.utils.snapshot_context.service" - ) + mock_snapshot_service = mocker.patch("swh.web.browse.snapshot_context.service") mock_origin_service = mocker.patch("swh.web.browse.views.origin.service") mock_utils_service = mocker.patch("swh.web.browse.utils.service") mock_get_origin_visit_snapshot = mocker.patch( - "swh.web.browse.utils.get_origin_visit_snapshot" + "swh.web.browse.snapshot_context.get_origin_visit_snapshot" ) mock_get_origin_visits = mocker.patch( "swh.web.common.origin_visits.get_origin_visits" ) mock_request_content = mocker.patch( - "swh.web.browse.views.utils.snapshot_context.request_content" + "swh.web.browse.snapshot_context.request_content" ) mock_origin_service.lookup_origin.side_effect = NotFoundExc("origin not found") url = reverse("browse-origin-visits", url_args={"origin_url": "bar"}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains(resp, "origin not found", status_code=404) mock_origin_service.lookup_origin.side_effect = None mock_origin_service.lookup_origin.return_value = { "type": "foo", "url": "bar", "id": 457, } mock_get_origin_visits.return_value = [] url = reverse("browse-origin-directory", url_args={"origin_url": "bar"}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{"visit": 1}] mock_get_origin_visit_snapshot.side_effect = NotFoundExc("visit not found") url = reverse( "browse-origin-directory", url_args={"origin_url": "bar"}, query_params={"visit_id": 2}, ) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert re.search("Visit.*not found", resp.content.decode("utf-8")) mock_get_origin_visits.return_value = [ { "date": "2015-09-26T09:30:52.373449+00:00", "metadata": {}, "origin": 457, "snapshot": "bdaf9ac436488a8c6cda927a0f44e172934d3f65", "status": "full", "visit": 1, } ] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ( [ { "directory": "ae59ceecf46367e8e4ad800e231fc76adc3afffb", "name": "HEAD", "revision": "7bc08e1aa0b08cb23e18715a32aa38517ad34672", "date": "04 May 2017, 13:27 UTC", "message": "", } ], [], ) - mock_utils_service.lookup_snapshot_sizes.return_value = { + mock_snapshot_service.lookup_snapshot_sizes.return_value = { "revision": 1, "release": 0, } mock_lookup_directory = mock_utils_service.lookup_directory mock_lookup_directory.side_effect = NotFoundExc("Directory not found") url = reverse("browse-origin-directory", url_args={"origin_url": "bar"}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains(resp, "Directory not found", status_code=404) mock_origin_service.lookup_origin.side_effect = None mock_origin_service.lookup_origin.return_value = { "type": "foo", "url": "bar", "id": 457, } mock_get_origin_visits.return_value = [] url = reverse( "browse-origin-content", url_args={"origin_url": "bar", "path": "foo"} ) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{"visit": 1}] mock_get_origin_visit_snapshot.side_effect = NotFoundExc("visit not found") url = reverse( "browse-origin-content", url_args={"origin_url": "bar", "path": "foo"}, query_params={"visit_id": 2}, ) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert re.search("Visit.*not found", resp.content.decode("utf-8")) mock_get_origin_visits.return_value = [ { "date": "2015-09-26T09:30:52.373449+00:00", "metadata": {}, "origin": 457, "snapshot": "bdaf9ac436488a8c6cda927a0f44e172934d3f65", "status": "full", "type": "git", "visit": 1, } ] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ([], []) - mock_utils_service.lookup_snapshot_sizes.return_value = { + mock_snapshot_service.lookup_snapshot_sizes.return_value = { "revision": 0, "release": 0, } - mock_utils_service.lookup_origin.return_value = { + mock_snapshot_service.lookup_origin.return_value = { "type": "foo", "url": "bar", "id": 457, } url = reverse( "browse-origin-content", url_args={"origin_url": "bar", "path": "baz"} ) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/content.html") assert re.search("snapshot.*is empty", resp.content.decode("utf-8")) mock_get_origin_visit_snapshot.return_value = ( [ { "directory": "ae59ceecf46367e8e4ad800e231fc76adc3afffb", "name": "HEAD", "revision": "7bc08e1aa0b08cb23e18715a32aa38517ad34672", "date": "04 May 2017, 13:27 UTC", "message": "", } ], [], ) - mock_utils_service.lookup_snapshot_sizes.return_value = { + mock_snapshot_service.lookup_snapshot_sizes.return_value = { "revision": 1, "release": 0, } mock_snapshot_service.lookup_directory_with_path.return_value = { "target": "5ecd9f37b7a2d2e9980d201acd6286116f2ba1f1" } mock_request_content.side_effect = NotFoundExc("Content not found") url = reverse( "browse-origin-content", url_args={"origin_url": "bar", "path": "baz"} ) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains(resp, "Content not found", status_code=404) mock_get_snapshot_context = mocker.patch( - "swh.web.browse.views.utils.snapshot_context.get_snapshot_context" + "swh.web.browse.snapshot_context.get_snapshot_context" ) mock_get_snapshot_context.side_effect = NotFoundExc("Snapshot not found") url = reverse("browse-origin-directory", url_args={"origin_url": "bar"}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains(resp, "Snapshot not found", status_code=404) def test_origin_empty_snapshot(client, mocker): - mock_utils_service = mocker.patch("swh.web.browse.utils.service") + mock_utils_service = mocker.patch("swh.web.browse.snapshot_context.service") mock_get_origin_visit_snapshot = mocker.patch( - "swh.web.browse.utils.get_origin_visit_snapshot" + "swh.web.browse.snapshot_context.get_origin_visit_snapshot" ) mock_get_origin_visits = mocker.patch( "swh.web.common.origin_visits.get_origin_visits" ) mock_get_origin_visits.return_value = [ { "date": "2015-09-26T09:30:52.373449+00:00", "metadata": {}, "origin": 457, "snapshot": "bdaf9ac436488a8c6cda927a0f44e172934d3f65", "status": "full", "type": "git", "visit": 1, } ] mock_get_origin_visit_snapshot.return_value = ([], []) mock_utils_service.lookup_snapshot_sizes.return_value = { "revision": 0, "release": 0, } mock_utils_service.lookup_origin.return_value = { "id": 457, "url": "https://github.com/foo/bar", } url = reverse("browse-origin-directory", url_args={"origin_url": "bar"}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/directory.html") resp_content = resp.content.decode("utf-8") assert re.search("snapshot.*is empty", resp_content) assert not re.search("swh-tr-link", resp_content) @given(origin_with_releases()) def test_origin_release_browse(client, archive_data, origin): - # for swh.web.browse.utils.get_snapshot_content to only return one branch - snapshot_max_size = swh.web.browse.utils.snapshot_content_max_size - swh.web.browse.utils.snapshot_content_max_size = 1 + # for swh.web.browse.snapshot_context.get_snapshot_content to only return one branch + config = get_config() + snapshot_max_size = int(config["snapshot_content_max_size"]) + config["snapshot_content_max_size"] = 1 try: snapshot = archive_data.snapshot_get_latest(origin["url"]) release = [ b for b in snapshot["branches"].values() if b["target_type"] == "release" ][-1] release_data = archive_data.release_get(release["target"]) url = reverse( "browse-origin-directory", url_args={"origin_url": origin["url"]}, query_params={"release": release_data["name"]}, ) resp = client.get(url) assert resp.status_code == 200 assert_contains(resp, release_data["name"]) assert_contains(resp, release["target"]) finally: - swh.web.browse.utils.snapshot_content_max_size = snapshot_max_size + config["snapshot_content_max_size"] = snapshot_max_size @given(origin_with_releases()) def test_origin_release_browse_not_found(client, archive_data, origin): invalid_release_name = "swh-foo-bar" url = reverse( "browse-origin-directory", url_args={"origin_url": origin["url"]}, query_params={"release": invalid_release_name}, ) resp = client.get(url) assert resp.status_code == 404 assert re.search( f"Release {invalid_release_name}.*not found", resp.content.decode("utf-8") ) def _origin_content_view_test_helper( client, origin_info, origin_visits, origin_branches, origin_releases, root_dir_sha1, content, visit_id=None, timestamp=None, ): content_path = "/".join(content["path"].split("/")[1:]) url_args = {"origin_url": origin_info["url"], "path": content_path} if not visit_id: visit_id = origin_visits[-1]["visit"] query_params = {} if timestamp: url_args["timestamp"] = timestamp if visit_id: query_params["visit_id"] = visit_id url = reverse("browse-origin-content", url_args=url_args, query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/content.html") assert type(content["data"]) == str assert_contains(resp, '' % content["hljs_language"]) assert_contains(resp, escape(content["data"])) split_path = content_path.split("/") filename = split_path[-1] path = content_path.replace(filename, "")[:-1] path_info = gen_path_info(path) del url_args["path"] if timestamp: url_args["timestamp"] = format_utc_iso_date( parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%S" ) root_dir_url = reverse( "browse-origin-directory", url_args=url_args, query_params=query_params ) assert_contains(resp, '
  • ', count=len(path_info) + 1) assert_contains(resp, '%s' % (root_dir_url, root_dir_sha1[:7])) for p in path_info: url_args["path"] = p["path"] dir_url = reverse( "browse-origin-directory", url_args=url_args, query_params=query_params ) assert_contains(resp, '%s' % (dir_url, p["name"])) assert_contains(resp, "
  • %s
  • " % filename) query_string = "sha1_git:" + content["sha1_git"] url_raw = reverse( "browse-content-raw", url_args={"query_string": query_string}, query_params={"filename": filename}, ) assert_contains(resp, url_raw) if "args" in url_args: del url_args["path"] origin_branches_url = reverse( "browse-origin-branches", url_args=url_args, query_params=query_params ) assert_contains( resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches)), ) origin_releases_url = reverse( "browse-origin-releases", url_args=url_args, query_params=query_params ) assert_contains( resp, 'Releases (%s)' % (origin_releases_url, len(origin_releases)), ) assert_contains(resp, '
  • ', count=len(origin_branches)) url_args["path"] = content_path for branch in origin_branches: query_params["branch"] = branch["name"] root_dir_branch_url = reverse( "browse-origin-content", url_args=url_args, query_params=query_params ) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params["branch"] = None for release in origin_releases: query_params["release"] = release["name"] root_dir_release_url = reverse( "browse-origin-content", url_args=url_args, query_params=query_params ) assert_contains(resp, '' % root_dir_release_url) url = reverse("browse-origin-content", url_args=url_args, query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/content.html") swh_cnt_id = get_swh_persistent_id("content", content["sha1_git"]) swh_cnt_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) assert_contains(resp, "swh-take-new-snapshot") def _origin_directory_view_test_helper( client, origin_info, origin_visits, origin_branches, origin_releases, root_directory_sha1, directory_entries, visit_id=None, timestamp=None, path=None, ): dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")] files = [e for e in directory_entries if e["type"] == "file"] if not visit_id: visit_id = origin_visits[-1]["visit"] url_args = {"origin_url": origin_info["url"]} query_params = {} if timestamp: url_args["timestamp"] = timestamp else: query_params["visit_id"] = visit_id if path: url_args["path"] = path url = reverse( "browse-origin-directory", url_args=url_args, query_params=query_params ) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/directory.html") assert resp.status_code == 200 assert_template_used(resp, "browse/directory.html") assert_contains(resp, '', count=len(dirs)) assert_contains(resp, '', count=len(files)) if timestamp: url_args["timestamp"] = format_utc_iso_date( parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%S" ) for d in dirs: if d["type"] == "rev": dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]}) else: dir_path = d["name"] if path: dir_path = "%s/%s" % (path, d["name"]) dir_url_args = dict(url_args) dir_url_args["path"] = dir_path dir_url = reverse( "browse-origin-directory", url_args=dir_url_args, query_params=query_params, ) assert_contains(resp, dir_url) for f in files: file_path = f["name"] if path: file_path = "%s/%s" % (path, f["name"]) file_url_args = dict(url_args) file_url_args["path"] = file_path file_url = reverse( "browse-origin-content", url_args=file_url_args, query_params=query_params ) assert_contains(resp, file_url) if "path" in url_args: del url_args["path"] root_dir_branch_url = reverse( "browse-origin-directory", url_args=url_args, query_params=query_params ) nb_bc_paths = 1 if path: nb_bc_paths = len(path.split("/")) + 1 assert_contains(resp, '
  • ', count=nb_bc_paths) assert_contains( resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7]) ) origin_branches_url = reverse( "browse-origin-branches", url_args=url_args, query_params=query_params ) assert_contains( resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches)), ) origin_releases_url = reverse( "browse-origin-releases", url_args=url_args, query_params=query_params ) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains( resp, 'Releases (%s)' % (origin_releases_url, nb_releases) ) if path: url_args["path"] = path assert_contains(resp, '
  • ', count=len(origin_branches)) for branch in origin_branches: query_params["branch"] = branch["name"] root_dir_branch_url = reverse( "browse-origin-directory", url_args=url_args, query_params=query_params ) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params["branch"] = None for release in origin_releases: query_params["release"] = release["name"] root_dir_release_url = reverse( "browse-origin-directory", url_args=url_args, query_params=query_params ) assert_contains(resp, '' % root_dir_release_url) assert_contains(resp, "vault-cook-directory") assert_contains(resp, "vault-cook-revision") swh_dir_id = get_swh_persistent_id("directory", directory_entries[0]["dir_id"]) swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url) assert_contains(resp, "swh-take-new-snapshot") def _origin_branches_test_helper(client, origin_info, origin_snapshot): url_args = {"origin_url": origin_info["url"]} url = reverse("browse-origin-branches", url_args=url_args) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/branches.html") origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse("browse-origin-branches", url_args=url_args) assert_contains( resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches)), ) origin_releases_url = reverse("browse-origin-releases", url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains( resp, 'Releases (%s)' % (origin_releases_url, nb_releases) ) assert_contains(resp, '' % escape(browse_branch_url)) browse_revision_url = reverse( "browse-revision", url_args={"sha1_git": branch["revision"]}, query_params={"origin": origin_info["url"]}, ) assert_contains(resp, '' % escape(browse_revision_url)) def _origin_releases_test_helper(client, origin_info, origin_snapshot): url_args = {"origin_url": origin_info["url"]} url = reverse("browse-origin-releases", url_args=url_args) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/releases.html") origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse("browse-origin-branches", url_args=url_args) assert_contains( resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches)), ) origin_releases_url = reverse("browse-origin-releases", url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains( resp, 'Releases (%s)' % (origin_releases_url, nb_releases) ) assert_contains(resp, '' % escape(browse_release_url)) assert_contains(resp, '' % escape(browse_revision_url)) @given( new_origin(), visit_dates(), revisions(min_size=10, max_size=10), existing_release() ) def test_origin_branches_pagination_with_alias( client, archive_data, mocker, new_origin, visit_dates, revisions, existing_release ): """ When a snapshot contains a branch or a release alias, pagination links in the branches / releases view should be displayed. """ - mocker.patch( - "swh.web.browse.views.utils.snapshot_context.PER_PAGE", len(revisions) / 2 - ) + mocker.patch("swh.web.browse.snapshot_context.PER_PAGE", len(revisions) / 2) snp_dict = {"branches": {}, "id": hash_to_bytes(random_sha1())} for i in range(len(revisions)): branch = "".join(random.choices(string.ascii_lowercase, k=8)) snp_dict["branches"][branch.encode()] = { "target_type": "revision", "target": hash_to_bytes(revisions[i]), } release = "".join(random.choices(string.ascii_lowercase, k=8)) snp_dict["branches"][b"RELEASE_ALIAS"] = { "target_type": "alias", "target": release.encode(), } snp_dict["branches"][release.encode()] = { "target_type": "release", "target": hash_to_bytes(existing_release), } new_origin = archive_data.origin_add([new_origin])[0] archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git") archive_data.origin_visit_update( new_origin["url"], visit.visit, status="full", snapshot=snp_dict["id"] ) url = reverse("browse-origin-branches", url_args={"origin_url": new_origin["url"]}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/branches.html") assert_contains(resp, '