diff --git a/swh/web/api/throttling.py b/swh/web/api/throttling.py index 24e7f11b..87ffb456 100644 --- a/swh/web/api/throttling.py +++ b/swh/web/api/throttling.py @@ -1,217 +1,216 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from ipaddress import IPv4Network, IPv6Network, ip_address, ip_network from typing import Callable, List, TypeVar, Union -import sentry_sdk - from django.core.exceptions import ImproperlyConfigured import rest_framework from rest_framework.throttling import ScopedRateThrottle from swh.web.auth.utils import API_SAVE_ORIGIN_PERMISSION +from swh.web.common.exc import sentry_capture_exception from swh.web.config import get_config APIView = TypeVar("APIView", bound="rest_framework.views.APIView") Request = rest_framework.request.Request API_THROTTLING_EXEMPTED_PERM = "swh.web.api.throttling_exempted" class SwhWebRateThrottle(ScopedRateThrottle): """Custom DRF request rate limiter for anonymous users Requests are grouped into scopes. It enables to apply different requests rate limiting based on the scope name but also the input HTTP request types. To associate a scope to requests, one must add a 'throttle_scope' attribute when using a class based view, or call the 'throttle_scope' decorator when using a function based view. By default, requests do not have an associated scope and are not rate limited. Rate limiting can also be configured according to the type of the input HTTP requests for fine grained tuning. For instance, the following YAML configuration section sets a rate of: - 1 per minute for POST requests - 60 per minute for other request types for the 'swh_api' scope while exempting those coming from the 127.0.0.0/8 ip network. .. code-block:: yaml throttling: scopes: swh_api: limiter_rate: default: 60/m POST: 1/m exempted_networks: - 127.0.0.0/8 """ scope = None def __init__(self): super().__init__() self.exempted_networks = None self.num_requests = 0 self.duration = 0 def get_cache_key(self, request, view): # do not handle throttling if user is authenticated if request.user.is_authenticated: return None else: return super().get_cache_key(request, view) def get_exempted_networks( self, scope_name: str ) -> List[Union[IPv4Network, IPv6Network]]: if not self.exempted_networks: scopes = get_config()["throttling"]["scopes"] scope = scopes.get(scope_name) if scope: networks = scope.get("exempted_networks") if networks: self.exempted_networks = [ ip_network(network) for network in networks ] return self.exempted_networks def get_scope(self, view: APIView): if not self.scope: # class based view case return getattr(view, self.scope_attr, None) else: # function based view case return self.scope def allow_request(self, request: Request, view: APIView) -> bool: # class based view case if not self.scope: default_scope = getattr(view, self.scope_attr, None) request_allowed = None if default_scope is not None: # check if there is a specific rate limiting associated # to the request type assert request.method is not None request_scope = f"{default_scope}_{request.method.lower()}" setattr(view, self.scope_attr, request_scope) try: request_allowed = super().allow_request(request, view) # use default rate limiting otherwise except ImproperlyConfigured as exc: - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) setattr(view, self.scope_attr, default_scope) if request_allowed is None: request_allowed = super().allow_request(request, view) # function based view case else: default_scope = self.scope # check if there is a specific rate limiting associated # to the request type self.scope = default_scope + "_" + request.method.lower() try: self.rate = self.get_rate() # use default rate limiting otherwise except ImproperlyConfigured: self.scope = default_scope self.rate = self.get_rate() self.num_requests, self.duration = self.parse_rate(self.rate) request_allowed = super(ScopedRateThrottle, self).allow_request( request, view ) self.scope = default_scope exempted_networks = self.get_exempted_networks(default_scope) exempted_ip = False if exempted_networks: remote_address = ip_address(self.get_ident(request)) exempted_ip = any( remote_address in network for network in exempted_networks ) request_allowed = exempted_ip or request_allowed # set throttling related data in the request metadata # in order for the ThrottlingHeadersMiddleware to # add X-RateLimit-* headers in the HTTP response if not exempted_ip and hasattr(self, "history"): hit_count = len(self.history) request.META["RateLimit-Limit"] = self.num_requests request.META["RateLimit-Remaining"] = self.num_requests - hit_count wait = self.wait() if wait is not None: request.META["RateLimit-Reset"] = int(self.now + wait) return request_allowed class SwhWebUserRateThrottle(SwhWebRateThrottle): """Custom DRF request rate limiter for authenticated users It has the same behavior than :class:`swh.web.api.throttling.SwhWebRateThrottle` except the number of allowed requests for each throttle scope is increased by a 1Ox factor. """ NUM_REQUESTS_FACTOR = 10 def get_cache_key(self, request, view): # do not handle throttling if user is not authenticated if request.user.is_authenticated: return super(SwhWebRateThrottle, self).get_cache_key(request, view) else: return None def parse_rate(self, rate): # increase number of allowed requests num_requests, duration = super().parse_rate(rate) return (num_requests * self.NUM_REQUESTS_FACTOR, duration) def allow_request(self, request: Request, view: APIView) -> bool: if request.user.is_staff or request.user.has_perm(API_THROTTLING_EXEMPTED_PERM): # no throttling for staff users or users with adequate permission return True scope = self.get_scope(view) if scope == "save_origin" and request.user.has_perm(API_SAVE_ORIGIN_PERMISSION): # no throttling on save origin endpoint for users with adequate permission return True return super().allow_request(request, view) def throttle_scope(scope: str) -> Callable[..., APIView]: """Decorator that allows the throttle scope of a DRF function based view to be set:: @api_view(['GET', ]) @throttle_scope('scope') def view(request): ... """ def decorator(func: APIView) -> APIView: SwhScopeRateThrottle = type( "SwhWebScopeRateThrottle", (SwhWebRateThrottle,), {"scope": scope} ) SwhScopeUserRateThrottle = type( "SwhWebScopeUserRateThrottle", (SwhWebUserRateThrottle,), {"scope": scope}, ) func.throttle_classes = (SwhScopeRateThrottle, SwhScopeUserRateThrottle) return func return decorator diff --git a/swh/web/browse/utils.py b/swh/web/browse/utils.py index 3eaef3cd..cc992f1c 100644 --- a/swh/web/browse/utils.py +++ b/swh/web/browse/utils.py @@ -1,726 +1,725 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import stat import textwrap from typing import Tuple import chardet import magic -import sentry_sdk from django.utils.html import escape from django.utils.safestring import mark_safe from swh.web.common import archive, highlightjs -from swh.web.common.exc import NotFoundExc +from swh.web.common.exc import NotFoundExc, sentry_capture_exception from swh.web.common.utils import ( browsers_supported_image_mimes, django_cache, format_utc_iso_date, reverse, rst_to_html, ) from swh.web.config import get_config @django_cache() def get_directory_entries(sha1_git): """Function that retrieves the content of a directory from the archive. The directories entries are first sorted in lexicographical order. Sub-directories and regular files are then extracted. Args: sha1_git: sha1_git identifier of the directory Returns: A tuple whose first member corresponds to the sub-directories list and second member the regular files list Raises: NotFoundExc if the directory is not found """ entries = list(archive.lookup_directory(sha1_git)) for e in entries: e["perms"] = stat.filemode(e["perms"]) if e["type"] == "rev": # modify dir entry name to explicitly show it points # to a revision e["name"] = "%s @ %s" % (e["name"], e["target"][:7]) dirs = [e for e in entries if e["type"] in ("dir", "rev")] files = [e for e in entries if e["type"] == "file"] dirs = sorted(dirs, key=lambda d: d["name"]) files = sorted(files, key=lambda f: f["name"]) return dirs, files def get_mimetype_and_encoding_for_content(content): """Function that returns the mime type and the encoding associated to a content buffer using the magic module under the hood. Args: content (bytes): a content buffer Returns: A tuple (mimetype, encoding), for instance ('text/plain', 'us-ascii'), associated to the provided content. """ m = magic.Magic(mime=True, mime_encoding=True) mime_encoding = m.from_buffer(content) mime_type, encoding = mime_encoding.split(";") encoding = encoding.replace(" charset=", "") return mime_type, encoding # maximum authorized content size in bytes for HTML display # with code highlighting content_display_max_size = get_config()["content_display_max_size"] def re_encode_content( mimetype: str, encoding: str, content_data: bytes ) -> Tuple[str, str, bytes]: """Try to re-encode textual content if it is not encoded to UTF-8 for proper display in the browse Web UI. Args: mimetype: content mimetype as detected by python-magic encoding: content encoding as detected by python-magic content_data: raw content bytes Returns: A tuple with 3 members: content mimetype, content encoding (possibly updated after processing), content raw bytes (possibly reencoded to UTF-8) """ if mimetype.startswith("text/") and encoding not in ("us-ascii", "utf-8"): # first check if chardet detects an encoding with confidence result = chardet.detect(content_data) if result["confidence"] >= 0.9: encoding = result["encoding"] content_data = content_data.decode(encoding).encode("utf-8") elif encoding == "unknown-8bit": # probably a malformed UTF-8 content, re-encode it # by replacing invalid chars with a substitution one content_data = content_data.decode("utf-8", "replace").encode("utf-8") elif encoding not in ["utf-8", "binary"]: content_data = content_data.decode(encoding, "replace").encode("utf-8") elif mimetype.startswith("application/octet-stream"): # file may detect a text content as binary # so try to decode it for display encodings = ["us-ascii", "utf-8"] encodings += ["iso-8859-%s" % i for i in range(1, 17)] for enc in encodings: try: content_data = content_data.decode(enc).encode("utf-8") except Exception as exc: - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) else: # ensure display in content view encoding = enc mimetype = "text/plain" break return mimetype, encoding, content_data def request_content( query_string, max_size=content_display_max_size, re_encode=True, ): """Function that retrieves a content from the archive. Raw bytes content is first retrieved, then the content mime type. If the mime type is not stored in the archive, it will be computed using Python magic module. Args: query_string: a string of the form "[ALGO_HASH:]HASH" where optional ALGO_HASH can be either ``sha1``, ``sha1_git``, ``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH the hexadecimal representation of the hash value max_size: the maximum size for a content to retrieve (default to 1MB, no size limit if None) Returns: A tuple whose first member corresponds to the content raw bytes and second member the content mime type Raises: NotFoundExc if the content is not found """ content_data = archive.lookup_content(query_string) filetype = None language = None # requests to the indexer db may fail so properly handle # those cases in order to avoid content display errors try: filetype = archive.lookup_content_filetype(query_string) language = archive.lookup_content_language(query_string) except Exception as exc: - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) mimetype = "unknown" encoding = "unknown" if filetype: mimetype = filetype["mimetype"] encoding = filetype["encoding"] if not max_size or content_data["length"] < max_size: try: content_raw = archive.lookup_content_raw(query_string) except Exception as exc: - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) raise NotFoundExc( "The bytes of the content are currently not available " "in the archive." ) else: content_data["raw_data"] = content_raw["data"] if not filetype: mimetype, encoding = get_mimetype_and_encoding_for_content( content_data["raw_data"] ) if re_encode: mimetype, encoding, raw_data = re_encode_content( mimetype, encoding, content_data["raw_data"] ) content_data["raw_data"] = raw_data else: content_data["raw_data"] = None content_data["mimetype"] = mimetype content_data["encoding"] = encoding if language: content_data["language"] = language["lang"] else: content_data["language"] = "not detected" return content_data def prepare_content_for_display(content_data, mime_type, path): """Function that prepares a content for HTML display. The function tries to associate a programming language to a content in order to perform syntax highlighting client-side using highlightjs. The language is determined using either the content filename or its mime type. If the mime type corresponds to an image format supported by web browsers, the content will be encoded in base64 for displaying the image. Args: content_data (bytes): raw bytes of the content mime_type (string): mime type of the content path (string): path of the content including filename Returns: A dict containing the content bytes (possibly different from the one provided as parameter if it is an image) under the key 'content_data and the corresponding highlightjs language class under the key 'language'. """ language = None if path: language = highlightjs.get_hljs_language_from_filename(path.split("/")[-1]) if language is None: language = highlightjs.get_hljs_language_from_mime_type(mime_type) if language is None: language = "plaintext" if mime_type.startswith("image/"): if mime_type in browsers_supported_image_mimes: content_data = base64.b64encode(content_data).decode("ascii") if mime_type.startswith("image/svg"): mime_type = "image/svg+xml" if mime_type.startswith("text/") or mime_type.startswith("application/"): content_data = content_data.decode("utf-8", errors="replace") return {"content_data": content_data, "language": language, "mimetype": mime_type} def gen_link(url, link_text=None, link_attrs=None): """ Utility function for generating an HTML link to insert in Django templates. Args: url (str): an url link_text (str): optional text for the produced link, if not provided the url will be used link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ attrs = " " if link_attrs: for k, v in link_attrs.items(): attrs += '%s="%s" ' % (k, v) if not link_text: link_text = url link = '%s' % (attrs, escape(url), escape(link_text)) return mark_safe(link) def _snapshot_context_query_params(snapshot_context): query_params = {} if not snapshot_context: return query_params if snapshot_context and snapshot_context["origin_info"]: origin_info = snapshot_context["origin_info"] snp_query_params = snapshot_context["query_params"] query_params = {"origin_url": origin_info["url"]} if "timestamp" in snp_query_params: query_params["timestamp"] = snp_query_params["timestamp"] if "visit_id" in snp_query_params: query_params["visit_id"] = snp_query_params["visit_id"] if "snapshot" in snp_query_params and "visit_id" not in query_params: query_params["snapshot"] = snp_query_params["snapshot"] elif snapshot_context: query_params = {"snapshot": snapshot_context["snapshot_id"]} if snapshot_context["release"]: query_params["release"] = snapshot_context["release"] elif snapshot_context["branch"] and snapshot_context["branch"] not in ( "HEAD", snapshot_context["revision_id"], ): query_params["branch"] = snapshot_context["branch"] elif snapshot_context["revision_id"]: query_params["revision"] = snapshot_context["revision_id"] return query_params def gen_revision_url(revision_id, snapshot_context=None): """ Utility function for generating an url to a revision. Args: revision_id (str): a revision id snapshot_context (dict): if provided, generate snapshot-dependent browsing url Returns: str: The url to browse the revision """ query_params = _snapshot_context_query_params(snapshot_context) # remove query parameters not needed for a revision view query_params.pop("revision", None) query_params.pop("release", None) return reverse( "browse-revision", url_args={"sha1_git": revision_id}, query_params=query_params ) def gen_revision_link( revision_id, shorten_id=False, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a revision HTML view to insert in Django templates. Args: revision_id (str): a revision id shorten_id (boolean): whether to shorten the revision id to 7 characters for the link text snapshot_context (dict): if provided, generate snapshot-dependent browsing link link_text (str): optional text for the generated link (the revision id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: str: An HTML link in the form 'revision_id' """ if not revision_id: return None revision_url = gen_revision_url(revision_id, snapshot_context) if shorten_id: return gen_link(revision_url, revision_id[:7], link_attrs) else: if not link_text: link_text = revision_id return gen_link(revision_url, link_text, link_attrs) def gen_directory_link( sha1_git, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a directory HTML view to insert in Django templates. Args: sha1_git (str): directory identifier link_text (str): optional text for the generated link (the directory id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ if not sha1_git: return None query_params = _snapshot_context_query_params(snapshot_context) directory_url = reverse( "browse-directory", url_args={"sha1_git": sha1_git}, query_params=query_params ) if not link_text: link_text = sha1_git return gen_link(directory_url, link_text, link_attrs) def gen_snapshot_link( snapshot_id, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a snapshot HTML view to insert in Django templates. Args: snapshot_id (str): snapshot identifier link_text (str): optional text for the generated link (the snapshot id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ query_params = _snapshot_context_query_params(snapshot_context) snapshot_url = reverse( "browse-snapshot", url_args={"snapshot_id": snapshot_id}, query_params=query_params, ) if not link_text: link_text = snapshot_id return gen_link(snapshot_url, link_text, link_attrs) def gen_content_link( sha1_git, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a content HTML view to insert in Django templates. Args: sha1_git (str): content identifier link_text (str): optional text for the generated link (the content sha1_git will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ if not sha1_git: return None query_params = _snapshot_context_query_params(snapshot_context) content_url = reverse( "browse-content", url_args={"query_string": "sha1_git:" + sha1_git}, query_params=query_params, ) if not link_text: link_text = sha1_git return gen_link(content_url, link_text, link_attrs) def get_revision_log_url(revision_id, snapshot_context=None): """ Utility function for getting the URL for a revision log HTML view (possibly in the context of an origin). Args: revision_id (str): revision identifier the history heads to snapshot_context (dict): if provided, generate snapshot-dependent browsing link Returns: The revision log view URL """ query_params = {} if snapshot_context: query_params = _snapshot_context_query_params(snapshot_context) query_params["revision"] = revision_id if snapshot_context and snapshot_context["origin_info"]: revision_log_url = reverse("browse-origin-log", query_params=query_params) elif snapshot_context: url_args = {"snapshot_id": snapshot_context["snapshot_id"]} del query_params["snapshot"] revision_log_url = reverse( "browse-snapshot-log", url_args=url_args, query_params=query_params ) else: revision_log_url = reverse( "browse-revision-log", url_args={"sha1_git": revision_id} ) return revision_log_url def gen_revision_log_link( revision_id, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a revision log HTML view (possibly in the context of an origin) to insert in Django templates. Args: revision_id (str): revision identifier the history heads to snapshot_context (dict): if provided, generate snapshot-dependent browsing link link_text (str): optional text to use for the generated link (the revision id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ if not revision_id: return None revision_log_url = get_revision_log_url(revision_id, snapshot_context) if not link_text: link_text = revision_id return gen_link(revision_log_url, link_text, link_attrs) def gen_person_mail_link(person, link_text=None): """ Utility function for generating a mail link to a person to insert in Django templates. Args: person (dict): dictionary containing person data (*name*, *email*, *fullname*) link_text (str): optional text to use for the generated mail link (the person name will be used by default) Returns: str: A mail link to the person or the person name if no email is present in person data """ person_name = person["name"] or person["fullname"] or "None" if link_text is None: link_text = person_name person_email = person["email"] if person["email"] else None if person_email is None and "@" in person_name and " " not in person_name: person_email = person_name if person_email: return gen_link(url="mailto:%s" % person_email, link_text=link_text) else: return person_name def gen_release_link( sha1_git, snapshot_context=None, link_text="Browse", link_attrs={"class": "btn btn-default btn-sm", "role": "button"}, ): """ Utility function for generating a link to a release HTML view to insert in Django templates. Args: sha1_git (str): release identifier link_text (str): optional text for the generated link (the release id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ query_params = _snapshot_context_query_params(snapshot_context) release_url = reverse( "browse-release", url_args={"sha1_git": sha1_git}, query_params=query_params ) if not link_text: link_text = sha1_git return gen_link(release_url, link_text, link_attrs) def format_log_entries(revision_log, per_page, snapshot_context=None): """ Utility functions that process raw revision log data for HTML display. Its purpose is to: * add links to relevant browse views * format date in human readable format * truncate the message log Args: revision_log (list): raw revision log as returned by the swh-web api per_page (int): number of log entries per page snapshot_context (dict): if provided, generate snapshot-dependent browsing link """ revision_log_data = [] for i, rev in enumerate(revision_log): if i == per_page: break author_name = "None" author_fullname = "None" committer_fullname = "None" if rev["author"]: author_name = gen_person_mail_link(rev["author"]) author_fullname = rev["author"]["fullname"] if rev["committer"]: committer_fullname = rev["committer"]["fullname"] author_date = format_utc_iso_date(rev["date"]) committer_date = format_utc_iso_date(rev["committer_date"]) tooltip = "revision %s\n" % rev["id"] tooltip += "author: %s\n" % author_fullname tooltip += "author date: %s\n" % author_date tooltip += "committer: %s\n" % committer_fullname tooltip += "committer date: %s\n\n" % committer_date if rev["message"]: tooltip += textwrap.indent(rev["message"], " " * 4) revision_log_data.append( { "author": author_name, "id": rev["id"][:7], "message": rev["message"], "date": author_date, "commit_date": committer_date, "url": gen_revision_url(rev["id"], snapshot_context), "tooltip": tooltip, } ) return revision_log_data # list of common readme names ordered by preference # (lower indices have higher priority) _common_readme_names = [ "readme.markdown", "readme.md", "readme.rst", "readme.txt", "readme", ] def get_readme_to_display(readmes): """ Process a list of readme files found in a directory in order to find the adequate one to display. Args: readmes: a list of dict where keys are readme file names and values are readme sha1s Returns: A tuple (readme_name, readme_sha1) """ readme_name = None readme_url = None readme_sha1 = None readme_html = None lc_readmes = {k.lower(): {"orig_name": k, "sha1": v} for k, v in readmes.items()} # look for readme names according to the preference order # defined by the _common_readme_names list for common_readme_name in _common_readme_names: if common_readme_name in lc_readmes: readme_name = lc_readmes[common_readme_name]["orig_name"] readme_sha1 = lc_readmes[common_readme_name]["sha1"] readme_url = reverse( "browse-content-raw", url_args={"query_string": readme_sha1}, query_params={"re_encode": "true"}, ) break # otherwise pick the first readme like file if any if not readme_name and len(readmes.items()) > 0: readme_name = next(iter(readmes)) readme_sha1 = readmes[readme_name] readme_url = reverse( "browse-content-raw", url_args={"query_string": readme_sha1}, query_params={"re_encode": "true"}, ) # convert rst README to html server side as there is # no viable solution to perform that task client side if readme_name and readme_name.endswith(".rst"): @django_cache( catch_exception=True, exception_return_value="Readme bytes are not available", ) def _rst_readme_to_html(readme_sha1): rst_doc = request_content(readme_sha1) return rst_to_html(rst_doc["raw_data"]) readme_html = _rst_readme_to_html(readme_sha1) return readme_name, readme_url, readme_html diff --git a/swh/web/browse/views/content.py b/swh/web/browse/views/content.py index b69241e8..7d0995a1 100644 --- a/swh/web/browse/views/content.py +++ b/swh/web/browse/views/content.py @@ -1,452 +1,455 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import difflib from distutils.util import strtobool -import sentry_sdk - from django.http import HttpResponse, JsonResponse from django.shortcuts import redirect, render from swh.model.hashutil import hash_to_hex from swh.model.swhids import ObjectType from swh.web.browse.browseurls import browse_route from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import ( content_display_max_size, gen_link, prepare_content_for_display, request_content, ) from swh.web.common import archive, highlightjs, query -from swh.web.common.exc import BadInputExc, NotFoundExc, http_status_code_message +from swh.web.common.exc import ( + BadInputExc, + NotFoundExc, + http_status_code_message, + sentry_capture_exception, +) from swh.web.common.identifiers import get_swhids_info from swh.web.common.typing import ContentMetadata, SWHObjectInfo from swh.web.common.utils import gen_path_info, reverse, swh_object_icons @browse_route( r"content/(?P[0-9a-z_:]*[0-9a-f]+.)/raw/", view_name="browse-content-raw", checksum_args=["query_string"], ) def content_raw(request, query_string): """Django view that produces a raw display of a content identified by its hash value. The url that points to it is :http:get:`/browse/content/[(algo_hash):](hash)/raw/` """ re_encode = bool(strtobool(request.GET.get("re_encode", "false"))) algo, checksum = query.parse_hash(query_string) checksum = hash_to_hex(checksum) content_data = request_content(query_string, max_size=None, re_encode=re_encode) filename = request.GET.get("filename", None) if not filename: filename = "%s_%s" % (algo, checksum) if ( content_data["mimetype"].startswith("text/") or content_data["mimetype"] == "inode/x-empty" ): response = HttpResponse(content_data["raw_data"], content_type="text/plain") response["Content-disposition"] = "filename=%s" % filename else: response = HttpResponse( content_data["raw_data"], content_type="application/octet-stream" ) response["Content-disposition"] = "attachment; filename=%s" % filename return response _auto_diff_size_limit = 20000 @browse_route( r"content/(?P.*)/diff/(?P.*)/", view_name="diff-contents", ) def _contents_diff(request, from_query_string, to_query_string): """ Browse endpoint used to compute unified diffs between two contents. Diffs are generated only if the two contents are textual. By default, diffs whose size are greater than 20 kB will not be generated. To force the generation of large diffs, the 'force' boolean query parameter must be used. Args: request: input django http request from_query_string: a string of the form "[ALGO_HASH:]HASH" where optional ALGO_HASH can be either ``sha1``, ``sha1_git``, ``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH the hexadecimal representation of the hash value identifying the first content to_query_string: same as above for identifying the second content Returns: A JSON object containing the unified diff. """ diff_data = {} content_from = None content_to = None content_from_size = 0 content_to_size = 0 content_from_lines = [] content_to_lines = [] force = request.GET.get("force", "false") path = request.GET.get("path", None) language = "plaintext" force = bool(strtobool(force)) if from_query_string == to_query_string: diff_str = "File renamed without changes" else: try: text_diff = True if from_query_string: content_from = request_content(from_query_string, max_size=None) content_from_display_data = prepare_content_for_display( content_from["raw_data"], content_from["mimetype"], path ) language = content_from_display_data["language"] content_from_size = content_from["length"] if not ( content_from["mimetype"].startswith("text/") or content_from["mimetype"] == "inode/x-empty" ): text_diff = False if text_diff and to_query_string: content_to = request_content(to_query_string, max_size=None) content_to_display_data = prepare_content_for_display( content_to["raw_data"], content_to["mimetype"], path ) language = content_to_display_data["language"] content_to_size = content_to["length"] if not ( content_to["mimetype"].startswith("text/") or content_to["mimetype"] == "inode/x-empty" ): text_diff = False diff_size = abs(content_to_size - content_from_size) if not text_diff: diff_str = "Diffs are not generated for non textual content" language = "plaintext" elif not force and diff_size > _auto_diff_size_limit: diff_str = "Large diffs are not automatically computed" language = "plaintext" else: if content_from: content_from_lines = ( content_from["raw_data"].decode("utf-8").splitlines(True) ) if content_from_lines and content_from_lines[-1][-1] != "\n": content_from_lines[-1] += "[swh-no-nl-marker]\n" if content_to: content_to_lines = ( content_to["raw_data"].decode("utf-8").splitlines(True) ) if content_to_lines and content_to_lines[-1][-1] != "\n": content_to_lines[-1] += "[swh-no-nl-marker]\n" diff_lines = difflib.unified_diff(content_from_lines, content_to_lines) diff_str = "".join(list(diff_lines)[2:]) except Exception as exc: - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) diff_str = str(exc) diff_data["diff_str"] = diff_str diff_data["language"] = language return JsonResponse(diff_data) def _get_content_from_request(request): path = request.GET.get("path") if path is None: raise BadInputExc("The path query parameter must be provided.") snapshot = request.GET.get("snapshot") or request.GET.get("snapshot_id") origin_url = request.GET.get("origin_url") if snapshot is None and origin_url is None: raise BadInputExc( "The origin_url or snapshot query parameters must be provided." ) visit_id = int(request.GET.get("visit_id", 0)) snapshot_context = get_snapshot_context( snapshot_id=snapshot, origin_url=origin_url, path=path, timestamp=request.GET.get("timestamp"), visit_id=visit_id or None, branch_name=request.GET.get("branch"), release_name=request.GET.get("release"), browse_context="content", ) root_directory = snapshot_context["root_directory"] return archive.lookup_directory_with_path(root_directory, path) @browse_route( r"content/(?P[0-9a-z_:]*[0-9a-f]+.)/", r"content/", view_name="browse-content", checksum_args=["query_string"], ) def content_display(request, query_string=None): """Django view that produces an HTML display of a content identified by its hash value. The URLs that points to it are :http:get:`/browse/content/[(algo_hash):](hash)/` :http:get:`/browse/content/` """ if query_string is None: # this case happens when redirected from origin/content or snapshot/content content = _get_content_from_request(request) return redirect( reverse( "browse-content", url_args={"query_string": f"sha1_git:{content['target']}"}, query_params=request.GET, ), ) algo, checksum = query.parse_hash(query_string) checksum = hash_to_hex(checksum) origin_url = request.GET.get("origin_url") selected_language = request.GET.get("language") if not origin_url: origin_url = request.GET.get("origin") snapshot_id = request.GET.get("snapshot") or request.GET.get("snapshot_id") path = request.GET.get("path") content_data = {} error_info = {"status_code": 200, "description": None} try: content_data = request_content(query_string) except NotFoundExc as e: error_info["status_code"] = 404 error_info["description"] = f"NotFoundExc: {str(e)}" snapshot_context = None if origin_url is not None or snapshot_id is not None: try: visit_id = int(request.GET.get("visit_id", 0)) snapshot_context = get_snapshot_context( origin_url=origin_url, snapshot_id=snapshot_id, timestamp=request.GET.get("timestamp"), visit_id=visit_id or None, branch_name=request.GET.get("branch"), release_name=request.GET.get("release"), revision_id=request.GET.get("revision"), path=path, browse_context="content", ) except NotFoundExc as e: if str(e).startswith("Origin"): raw_cnt_url = reverse( "browse-content", url_args={"query_string": query_string} ) error_message = ( "The Software Heritage archive has a content " "with the hash you provided but the origin " "mentioned in your request appears broken: %s. " "Please check the URL and try again.\n\n" "Nevertheless, you can still browse the content " "without origin information: %s" % (gen_link(origin_url), gen_link(raw_cnt_url)) ) raise NotFoundExc(error_message) else: raise e content = None language = None mimetype = None if content_data.get("raw_data") is not None: content_display_data = prepare_content_for_display( content_data["raw_data"], content_data["mimetype"], path ) content = content_display_data["content_data"] language = content_display_data["language"] mimetype = content_display_data["mimetype"] # Override language with user-selected language if selected_language is not None: language = selected_language available_languages = None if mimetype and "text/" in mimetype: available_languages = highlightjs.get_supported_languages() filename = None path_info = None directory_id = None root_dir = None if snapshot_context: root_dir = snapshot_context.get("root_directory") query_params = snapshot_context["query_params"] if snapshot_context else {} breadcrumbs = [] if path: split_path = path.split("/") root_dir = root_dir or split_path[0] filename = split_path[-1] if root_dir != path: path = path.replace(root_dir + "/", "") path = path[: -len(filename)] path_info = gen_path_info(path) query_params.pop("path", None) dir_url = reverse( "browse-directory", url_args={"sha1_git": root_dir}, query_params=query_params, ) breadcrumbs.append({"name": root_dir[:7], "url": dir_url}) for pi in path_info: query_params["path"] = pi["path"] dir_url = reverse( "browse-directory", url_args={"sha1_git": root_dir}, query_params=query_params, ) breadcrumbs.append({"name": pi["name"], "url": dir_url}) breadcrumbs.append({"name": filename, "url": None}) if path and root_dir != path: dir_info = archive.lookup_directory_with_path(root_dir, path) directory_id = dir_info["target"] elif root_dir != path: directory_id = root_dir else: root_dir = None query_params = {"filename": filename} content_checksums = content_data.get("checksums", {}) content_url = reverse( "browse-content", url_args={"query_string": query_string}, ) content_raw_url = reverse( "browse-content-raw", url_args={"query_string": query_string}, query_params=query_params, ) content_metadata = ContentMetadata( object_type=ObjectType.CONTENT, object_id=content_checksums.get("sha1_git"), sha1=content_checksums.get("sha1"), sha1_git=content_checksums.get("sha1_git"), sha256=content_checksums.get("sha256"), blake2s256=content_checksums.get("blake2s256"), content_url=content_url, mimetype=content_data.get("mimetype"), encoding=content_data.get("encoding"), size=content_data.get("length", 0), language=content_data.get("language"), root_directory=root_dir, path=f"/{path}" if path else None, filename=filename or "", directory=directory_id, revision=None, release=None, snapshot=None, origin_url=origin_url, ) swh_objects = [] if content_checksums: swh_objects.append( SWHObjectInfo( object_type=ObjectType.CONTENT, object_id=content_checksums.get("sha1_git"), ) ) if directory_id: swh_objects.append( SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory_id) ) if snapshot_context: if snapshot_context["revision_id"]: swh_objects.append( SWHObjectInfo( object_type=ObjectType.REVISION, object_id=snapshot_context["revision_id"], ) ) swh_objects.append( SWHObjectInfo( object_type=ObjectType.SNAPSHOT, object_id=snapshot_context["snapshot_id"], ) ) if snapshot_context["release_id"]: swh_objects.append( SWHObjectInfo( object_type=ObjectType.RELEASE, object_id=snapshot_context["release_id"], ) ) swhids_info = get_swhids_info( swh_objects, snapshot_context, extra_context=content_metadata, ) heading = "Content - %s" % content_checksums.get("sha1_git") if breadcrumbs: content_path = "/".join([bc["name"] for bc in breadcrumbs]) heading += " - %s" % content_path return render( request, "browse/content.html", { "heading": heading, "swh_object_id": swhids_info[0]["swhid"] if swhids_info else "", "swh_object_name": "Content", "swh_object_metadata": content_metadata, "content": content, "content_size": content_data.get("length"), "max_content_size": content_display_max_size, "filename": filename, "encoding": content_data.get("encoding"), "mimetype": mimetype, "language": language, "available_languages": available_languages, "breadcrumbs": breadcrumbs, "top_right_link": { "url": content_raw_url, "icon": swh_object_icons["content"], "text": "Raw File", }, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions": True, "swhids_info": swhids_info, "error_code": error_info["status_code"], "error_message": http_status_code_message.get(error_info["status_code"]), "error_description": error_info["description"], }, status=error_info["status_code"], ) diff --git a/swh/web/browse/views/directory.py b/swh/web/browse/views/directory.py index cb598ea6..09d852a1 100644 --- a/swh/web/browse/views/directory.py +++ b/swh/web/browse/views/directory.py @@ -1,292 +1,294 @@ -# Copyright (C) 2017-2021 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os -import sentry_sdk - from django.http import HttpResponse from django.shortcuts import redirect, render from swh.model.swhids import ObjectType from swh.web.browse.browseurls import browse_route from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import gen_link, get_directory_entries, get_readme_to_display from swh.web.common import archive -from swh.web.common.exc import NotFoundExc, http_status_code_message +from swh.web.common.exc import ( + NotFoundExc, + http_status_code_message, + sentry_capture_exception, +) from swh.web.common.identifiers import get_swhids_info from swh.web.common.typing import DirectoryMetadata, SWHObjectInfo from swh.web.common.utils import gen_path_info, reverse, swh_object_icons def _directory_browse(request, sha1_git, path=None): root_sha1_git = sha1_git error_info = {"status_code": 200, "description": None} if path: try: dir_info = archive.lookup_directory_with_path(sha1_git, path) sha1_git = dir_info["target"] except NotFoundExc as e: error_info["status_code"] = 404 error_info["description"] = f"NotFoundExc: {str(e)}" sha1_git = None dirs, files = [], [] if sha1_git is not None: dirs, files = get_directory_entries(sha1_git) origin_url = request.GET.get("origin_url") if not origin_url: origin_url = request.GET.get("origin") snapshot_id = request.GET.get("snapshot") snapshot_context = None if origin_url is not None or snapshot_id is not None: try: snapshot_context = get_snapshot_context( snapshot_id=snapshot_id, origin_url=origin_url, branch_name=request.GET.get("branch"), release_name=request.GET.get("release"), revision_id=request.GET.get("revision"), path=path, ) except NotFoundExc as e: if str(e).startswith("Origin"): raw_dir_url = reverse( "browse-directory", url_args={"sha1_git": sha1_git} ) error_message = ( "The Software Heritage archive has a directory " "with the hash you provided but the origin " "mentioned in your request appears broken: %s. " "Please check the URL and try again.\n\n" "Nevertheless, you can still browse the directory " "without origin information: %s" % (gen_link(origin_url), gen_link(raw_dir_url)) ) raise NotFoundExc(error_message) else: raise e path_info = gen_path_info(path) query_params = snapshot_context["query_params"] if snapshot_context else {} breadcrumbs = [] breadcrumbs.append( { "name": root_sha1_git[:7], "url": reverse( "browse-directory", url_args={"sha1_git": root_sha1_git}, query_params={**query_params, "path": None}, ), } ) for pi in path_info: breadcrumbs.append( { "name": pi["name"], "url": reverse( "browse-directory", url_args={"sha1_git": root_sha1_git}, query_params={ **query_params, "path": pi["path"], }, ), } ) path = "" if path is None else (path + "/") for d in dirs: if d["type"] == "rev": d["url"] = reverse( "browse-revision", url_args={"sha1_git": d["target"]}, query_params=query_params, ) else: d["url"] = reverse( "browse-directory", url_args={"sha1_git": root_sha1_git}, query_params={ **query_params, "path": path + d["name"], }, ) sum_file_sizes = 0 readmes = {} for f in files: query_string = "sha1_git:" + f["target"] f["url"] = reverse( "browse-content", url_args={"query_string": query_string}, query_params={ **query_params, "path": root_sha1_git + "/" + path + f["name"], }, ) if f["length"] is not None: sum_file_sizes += f["length"] if f["name"].lower().startswith("readme"): readmes[f["name"]] = f["checksums"]["sha1"] readme_name, readme_url, readme_html = get_readme_to_display(readmes) dir_metadata = DirectoryMetadata( object_type=ObjectType.DIRECTORY, object_id=sha1_git, directory=root_sha1_git, nb_files=len(files), nb_dirs=len(dirs), sum_file_sizes=sum_file_sizes, root_directory=root_sha1_git, path=f"/{path}" if path else None, revision=None, revision_found=None, release=None, snapshot=None, ) vault_cooking = { "directory_context": True, "directory_swhid": f"swh:1:dir:{sha1_git}", "revision_context": False, "revision_swhid": None, } swh_objects = [SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=sha1_git)] if snapshot_context: if snapshot_context["revision_id"]: swh_objects.append( SWHObjectInfo( object_type=ObjectType.REVISION, object_id=snapshot_context["revision_id"], ) ) swh_objects.append( SWHObjectInfo( object_type=ObjectType.SNAPSHOT, object_id=snapshot_context["snapshot_id"], ) ) if snapshot_context["release_id"]: swh_objects.append( SWHObjectInfo( object_type=ObjectType.RELEASE, object_id=snapshot_context["release_id"], ) ) swhids_info = get_swhids_info(swh_objects, snapshot_context, dir_metadata) heading = "Directory - %s" % sha1_git if breadcrumbs: dir_path = "/".join([bc["name"] for bc in breadcrumbs]) + "/" heading += " - %s" % dir_path top_right_link = None if ( snapshot_context is not None and not snapshot_context["is_empty"] and snapshot_context["revision_id"] is not None ): history_url = reverse( "browse-revision-log", url_args={"sha1_git": snapshot_context["revision_id"]}, query_params=query_params, ) top_right_link = { "url": history_url, "icon": swh_object_icons["revisions history"], "text": "History", } return render( request, "browse/directory.html", { "heading": heading, "swh_object_id": swhids_info[0]["swhid"], "swh_object_name": "Directory", "swh_object_metadata": dir_metadata, "dirs": dirs, "files": files, "breadcrumbs": breadcrumbs, "top_right_link": top_right_link, "readme_name": readme_name, "readme_url": readme_url, "readme_html": readme_html, "snapshot_context": snapshot_context, "vault_cooking": vault_cooking, "show_actions": True, "swhids_info": swhids_info, "error_code": error_info["status_code"], "error_message": http_status_code_message.get(error_info["status_code"]), "error_description": error_info["description"], }, status=error_info["status_code"], ) @browse_route( r"directory/(?P[0-9a-f]+)/", view_name="browse-directory", checksum_args=["sha1_git"], ) def directory_browse(request, sha1_git): """Django view for browsing the content of a directory identified by its sha1_git value. The url that points to it is :http:get:`/browse/directory/(sha1_git)/` """ return _directory_browse(request, sha1_git, request.GET.get("path")) @browse_route( r"directory/(?P[0-9a-f]+)/(?P.+)/", view_name="browse-directory-legacy", checksum_args=["sha1_git"], ) def directory_browse_legacy(request, sha1_git, path): """Django view for browsing the content of a directory identified by its sha1_git value. The url that points to it is :http:get:`/browse/directory/(sha1_git)/(path)/` """ return _directory_browse(request, sha1_git, path) @browse_route( r"directory/resolve/content-path/(?P[0-9a-f]+)/", view_name="browse-directory-resolve-content-path", checksum_args=["sha1_git"], ) def _directory_resolve_content_path(request, sha1_git): """ Internal endpoint redirecting to data url for a specific file path relative to a root directory. """ try: path = os.path.normpath(request.GET.get("path")) if not path.startswith("../"): dir_info = archive.lookup_directory_with_path(sha1_git, path) if dir_info["type"] == "file": sha1 = dir_info["checksums"]["sha1"] data_url = reverse( "browse-content-raw", url_args={"query_string": sha1} ) return redirect(data_url) except Exception as exc: - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) return HttpResponse(status=404) diff --git a/swh/web/browse/views/release.py b/swh/web/browse/views/release.py index e9faaa94..6f3d9dec 100644 --- a/swh/web/browse/views/release.py +++ b/swh/web/browse/views/release.py @@ -1,245 +1,243 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -import sentry_sdk - from django.shortcuts import render from swh.model.swhids import ObjectType from swh.web.browse.browseurls import browse_route from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import ( gen_content_link, gen_directory_link, gen_link, gen_person_mail_link, gen_release_link, gen_revision_link, ) from swh.web.common import archive -from swh.web.common.exc import NotFoundExc +from swh.web.common.exc import NotFoundExc, sentry_capture_exception from swh.web.common.identifiers import get_swhids_info from swh.web.common.typing import ReleaseMetadata, SWHObjectInfo from swh.web.common.utils import format_utc_iso_date, reverse @browse_route( r"release/(?P[0-9a-f]+)/", view_name="browse-release", checksum_args=["sha1_git"], ) def release_browse(request, sha1_git): """ Django view that produces an HTML display of a release identified by its id. The url that points to it is :http:get:`/browse/release/(sha1_git)/`. """ release = archive.lookup_release(sha1_git) snapshot_context = {} origin_info = None snapshot_id = request.GET.get("snapshot_id") if not snapshot_id: snapshot_id = request.GET.get("snapshot") origin_url = request.GET.get("origin_url") if not origin_url: origin_url = request.GET.get("origin") timestamp = request.GET.get("timestamp") visit_id = int(request.GET.get("visit_id", 0)) if origin_url: try: snapshot_context = get_snapshot_context( snapshot_id, origin_url, timestamp, visit_id or None, release_name=release["name"], ) except NotFoundExc as e: raw_rel_url = reverse("browse-release", url_args={"sha1_git": sha1_git}) error_message = ( "The Software Heritage archive has a release " "with the hash you provided but the origin " "mentioned in your request appears broken: %s. " "Please check the URL and try again.\n\n" "Nevertheless, you can still browse the release " "without origin information: %s" % (gen_link(origin_url), gen_link(raw_rel_url)) ) if str(e).startswith("Origin"): raise NotFoundExc(error_message) else: raise e origin_info = snapshot_context["origin_info"] elif snapshot_id: snapshot_context = get_snapshot_context( snapshot_id, release_name=release["name"] ) snapshot_id = snapshot_context.get("snapshot_id", None) release_metadata = ReleaseMetadata( object_type=ObjectType.RELEASE, object_id=sha1_git, release=sha1_git, author=release["author"]["fullname"] if release["author"] else "None", author_url=gen_person_mail_link(release["author"]) if release["author"] else "None", date=format_utc_iso_date(release["date"]), name=release["name"], synthetic=release["synthetic"], target=release["target"], target_type=release["target_type"], snapshot=snapshot_id, origin_url=origin_url, ) release_note_lines = [] if release["message"]: release_note_lines = release["message"].split("\n") swh_objects = [SWHObjectInfo(object_type=ObjectType.RELEASE, object_id=sha1_git)] vault_cooking = None rev_directory = None target_link = None if release["target_type"] == ObjectType.REVISION.name.lower(): target_link = gen_revision_link( release["target"], snapshot_context=snapshot_context, link_text=None, link_attrs=None, ) try: revision = archive.lookup_revision(release["target"]) rev_directory = revision["directory"] vault_cooking = { "directory_context": True, "directory_swhid": f"swh:1:dir:{rev_directory}", "revision_context": True, "revision_swhid": f"swh:1:rev:{release['target']}", } swh_objects.append( SWHObjectInfo( object_type=ObjectType.REVISION, object_id=release["target"] ) ) swh_objects.append( SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=rev_directory) ) except Exception as exc: - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) elif release["target_type"] == ObjectType.DIRECTORY.name.lower(): target_link = gen_directory_link( release["target"], snapshot_context=snapshot_context, link_text=None, link_attrs=None, ) try: # check directory exists archive.lookup_directory(release["target"]) vault_cooking = { "directory_context": True, "directory_swhid": f"swh:1:dir:{release['target']}", "revision_context": False, "revision_swhid": None, } swh_objects.append( SWHObjectInfo( object_type=ObjectType.DIRECTORY, object_id=release["target"] ) ) except Exception as exc: - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) elif release["target_type"] == ObjectType.CONTENT.name.lower(): target_link = gen_content_link( release["target"], snapshot_context=snapshot_context, link_text=None, link_attrs=None, ) swh_objects.append( SWHObjectInfo(object_type=ObjectType.CONTENT, object_id=release["target"]) ) elif release["target_type"] == ObjectType.RELEASE.name.lower(): target_link = gen_release_link( release["target"], snapshot_context=snapshot_context, link_text=None, link_attrs=None, ) rev_directory_url = None if rev_directory is not None: if origin_info: rev_directory_url = reverse( "browse-origin-directory", query_params={ "origin_url": origin_info["url"], "release": release["name"], "snapshot": snapshot_id, }, ) elif snapshot_id: rev_directory_url = reverse( "browse-snapshot-directory", url_args={"snapshot_id": snapshot_id}, query_params={"release": release["name"]}, ) else: rev_directory_url = reverse( "browse-directory", url_args={"sha1_git": rev_directory} ) directory_link = None if rev_directory_url is not None: directory_link = gen_link(rev_directory_url, rev_directory) release["directory_link"] = directory_link release["target_link"] = target_link if snapshot_context: snapshot_id = snapshot_context["snapshot_id"] if snapshot_id: swh_objects.append( SWHObjectInfo(object_type=ObjectType.SNAPSHOT, object_id=snapshot_id) ) swhids_info = get_swhids_info(swh_objects, snapshot_context) note_header = "None" if len(release_note_lines) > 0: note_header = release_note_lines[0] release["note_header"] = note_header release["note_body"] = "\n".join(release_note_lines[1:]) heading = "Release - %s" % release["name"] if snapshot_context: context_found = "snapshot: %s" % snapshot_context["snapshot_id"] if origin_info: context_found = "origin: %s" % origin_info["url"] heading += " - %s" % context_found return render( request, "browse/release.html", { "heading": heading, "swh_object_id": swhids_info[0]["swhid"], "swh_object_name": "Release", "swh_object_metadata": release_metadata, "release": release, "snapshot_context": snapshot_context, "show_actions": True, "breadcrumbs": None, "vault_cooking": vault_cooking, "top_right_link": None, "swhids_info": swhids_info, }, ) diff --git a/swh/web/common/highlightjs.py b/swh/web/common/highlightjs.py index 7e0c7229..362bfd92 100644 --- a/swh/web/common/highlightjs.py +++ b/swh/web/common/highlightjs.py @@ -1,183 +1,184 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import json from typing import Dict from pygments.lexers import get_all_lexers, get_lexer_for_filename -import sentry_sdk from django.contrib.staticfiles.finders import find +from swh.web.common.exc import sentry_capture_exception + @functools.lru_cache() def _hljs_languages_data(): with open(str(find("json/highlightjs-languages.json")), "r") as hljs_languages_file: return json.load(hljs_languages_file) # set of languages ids that can be highlighted by highlight.js library @functools.lru_cache() def _hljs_languages(): return set(_hljs_languages_data()["languages"]) # languages aliases defined in highlight.js @functools.lru_cache() def _hljs_languages_aliases(): language_aliases = _hljs_languages_data()["languages_aliases"] language_aliases.pop("robots.txt", None) return { **language_aliases, "ml": "ocaml", "bsl": "1c", "ep": "mojolicious", "lc": "livecode", "p": "parser3", "pde": "processing", "rsc": "routeros", "s": "armasm", "sl": "rsl", "4dm": "4d", "kaos": "chaos", "dfy": "dafny", "ejs": "eta", "nev": "never", "m": "octave", "shader": "hlsl", "fx": "hlsl", "prg": "xsharp", "xs": "xsharp", } # dictionary mapping pygment lexers to hljs languages _pygments_lexer_to_hljs_language = {} # type: Dict[str, str] # dictionary mapping mime types to hljs languages _mime_type_to_hljs_language = { "text/x-c": "c", "text/x-c++": "cpp", "text/x-msdos-batch": "dos", "text/x-lisp": "lisp", "text/x-shellscript": "bash", } # dictionary mapping filenames to hljs languages _filename_to_hljs_language = { "cmakelists.txt": "cmake", ".htaccess": "apache", "httpd.conf": "apache", "access.log": "accesslog", "nginx.log": "accesslog", "resolv.conf": "dns", "dockerfile": "docker", "nginx.conf": "nginx", "pf.conf": "pf", "robots.txt": "robots-txt", } # function to fill the above dictionaries def _init_pygments_to_hljs_map(): if len(_pygments_lexer_to_hljs_language) == 0: hljs_languages = _hljs_languages() hljs_languages_aliases = _hljs_languages_aliases() for lexer in get_all_lexers(): lexer_name = lexer[0] lang_aliases = lexer[1] lang_mime_types = lexer[3] lang = None for lang_alias in lang_aliases: if lang_alias in hljs_languages: lang = lang_alias _pygments_lexer_to_hljs_language[lexer_name] = lang_alias break if lang_alias in hljs_languages_aliases: lang = hljs_languages_aliases[lang_alias] _pygments_lexer_to_hljs_language[lexer_name] = lang_alias break if lang: for lang_mime_type in lang_mime_types: if lang_mime_type not in _mime_type_to_hljs_language: _mime_type_to_hljs_language[lang_mime_type] = lang def get_hljs_language_from_filename(filename): """Function that tries to associate a language supported by highlight.js from a filename. Args: filename: input filename Returns: highlight.js language id or None if no correspondence has been found """ _init_pygments_to_hljs_map() if filename: filename_lower = filename.lower() if filename_lower in _filename_to_hljs_language: return _filename_to_hljs_language[filename_lower] if filename_lower in _hljs_languages(): return filename_lower exts = filename_lower.split(".") # check if file extension matches an hljs language # also handle .ext.in cases for ext in reversed(exts[-2:]): if ext in _hljs_languages(): return ext if ext in _hljs_languages_aliases(): return _hljs_languages_aliases()[ext] # otherwise use Pygments language database lexer = None # try to find a Pygment lexer try: lexer = get_lexer_for_filename(filename) except Exception as exc: - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) # if there is a correspondence between the lexer and an hljs # language, return it if lexer and lexer.name in _pygments_lexer_to_hljs_language: return _pygments_lexer_to_hljs_language[lexer.name] # otherwise, try to find a match between the file extensions # associated to the lexer and the hljs language aliases if lexer: exts = [ext.replace("*.", "") for ext in lexer.filenames] for ext in exts: if ext in _hljs_languages_aliases(): return _hljs_languages_aliases()[ext] return None def get_hljs_language_from_mime_type(mime_type): """Function that tries to associate a language supported by highlight.js from a mime type. Args: mime_type: input mime type Returns: highlight.js language id or None if no correspondence has been found """ _init_pygments_to_hljs_map() if mime_type and mime_type in _mime_type_to_hljs_language: return _mime_type_to_hljs_language[mime_type] return None @functools.lru_cache() def get_supported_languages(): """ Return the list of programming languages that can be highlighted using the highlight.js library. Returns: List[str]: the list of supported languages """ return sorted(list(_hljs_languages())) diff --git a/swh/web/common/origin_save.py b/swh/web/common/origin_save.py index 1a6d3647..68c91f0f 100644 --- a/swh/web/common/origin_save.py +++ b/swh/web/common/origin_save.py @@ -1,930 +1,934 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone from functools import lru_cache from itertools import product import json import logging from typing import Any, Dict, List, Optional, Tuple from prometheus_client import Gauge import requests -import sentry_sdk from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.core.validators import URLValidator from django.db.models import Q, QuerySet from django.utils.html import escape from swh.scheduler.utils import create_oneshot_task_dict from swh.web.common import archive -from swh.web.common.exc import BadInputExc, ForbiddenExc, NotFoundExc +from swh.web.common.exc import ( + BadInputExc, + ForbiddenExc, + NotFoundExc, + sentry_capture_exception, +) from swh.web.common.models import ( SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_PENDING, SAVE_REQUEST_REJECTED, SAVE_TASK_FAILED, SAVE_TASK_NOT_CREATED, SAVE_TASK_NOT_YET_SCHEDULED, SAVE_TASK_RUNNING, SAVE_TASK_SCHEDULED, SAVE_TASK_SUCCEEDED, VISIT_STATUS_CREATED, VISIT_STATUS_ONGOING, SaveAuthorizedOrigin, SaveOriginRequest, SaveUnauthorizedOrigin, ) from swh.web.common.typing import OriginExistenceCheckInfo, SaveOriginRequestInfo from swh.web.common.utils import SWH_WEB_METRICS_REGISTRY, parse_iso8601_date_to_utc from swh.web.config import get_config, scheduler logger = logging.getLogger(__name__) # Number of days in the past to lookup for information MAX_THRESHOLD_DAYS = 30 # Non terminal visit statuses which needs updates NON_TERMINAL_STATUSES = [ VISIT_STATUS_CREATED, VISIT_STATUS_ONGOING, ] def get_origin_save_authorized_urls() -> List[str]: """ Get the list of origin url prefixes authorized to be immediately loaded into the archive (whitelist). Returns: list: The list of authorized origin url prefix """ return [origin.url for origin in SaveAuthorizedOrigin.objects.all()] def get_origin_save_unauthorized_urls() -> List[str]: """ Get the list of origin url prefixes forbidden to be loaded into the archive (blacklist). Returns: list: the list of unauthorized origin url prefix """ return [origin.url for origin in SaveUnauthorizedOrigin.objects.all()] def can_save_origin(origin_url: str, bypass_pending_review: bool = False) -> str: """ Check if a software origin can be saved into the archive. Based on the origin url, the save request will be either: * immediately accepted if the url is whitelisted * rejected if the url is blacklisted * put in pending state for manual review otherwise Args: origin_url (str): the software origin url to check Returns: str: the origin save request status, either **accepted**, **rejected** or **pending** """ # origin url may be blacklisted for url_prefix in get_origin_save_unauthorized_urls(): if origin_url.startswith(url_prefix): return SAVE_REQUEST_REJECTED # if the origin url is in the white list, it can be immediately saved for url_prefix in get_origin_save_authorized_urls(): if origin_url.startswith(url_prefix): return SAVE_REQUEST_ACCEPTED # otherwise, the origin url needs to be manually verified if the user # that submitted it does not have special permission if bypass_pending_review: # mark the origin URL as trusted in that case SaveAuthorizedOrigin.objects.get_or_create(url=origin_url) return SAVE_REQUEST_ACCEPTED else: return SAVE_REQUEST_PENDING # map visit type to scheduler task # TODO: do not hardcode the task name here (T1157) _visit_type_task = { "git": "load-git", "hg": "load-hg", "svn": "load-svn", "cvs": "load-cvs", "bzr": "load-bzr", } _visit_type_task_privileged = { "archives": "load-archive-files", } # map scheduler task status to origin save status _save_task_status = { "next_run_not_scheduled": SAVE_TASK_NOT_YET_SCHEDULED, "next_run_scheduled": SAVE_TASK_SCHEDULED, "completed": SAVE_TASK_SUCCEEDED, "disabled": SAVE_TASK_FAILED, } # map scheduler task_run status to origin save status _save_task_run_status = { "scheduled": SAVE_TASK_SCHEDULED, "started": SAVE_TASK_RUNNING, "eventful": SAVE_TASK_SUCCEEDED, "uneventful": SAVE_TASK_SUCCEEDED, "failed": SAVE_TASK_FAILED, "permfailed": SAVE_TASK_FAILED, "lost": SAVE_TASK_FAILED, } @lru_cache() def get_scheduler_load_task_types() -> List[str]: task_types = scheduler().get_task_types() return [t["type"] for t in task_types if t["type"].startswith("load")] def get_savable_visit_types_dict(privileged_user: bool = False) -> Dict: """Returned the supported task types the user has access to. Args: privileged_user: Flag to determine if all visit types should be returned or not. Default to False to only list unprivileged visit types. Returns: the dict of supported visit types for the user """ if privileged_user: task_types = {**_visit_type_task, **_visit_type_task_privileged} else: task_types = _visit_type_task # filter visit types according to scheduler load task types if available try: load_task_types = get_scheduler_load_task_types() return {k: v for k, v in task_types.items() if v in load_task_types} except Exception: return task_types def get_savable_visit_types(privileged_user: bool = False) -> List[str]: """Return the list of visit types the user can perform save requests on. Args: privileged_user: Flag to determine if all visit types should be returned or not. Default to False to only list unprivileged visit types. Returns: the list of saveable visit types """ return sorted(list(get_savable_visit_types_dict(privileged_user).keys())) def _check_visit_type_savable(visit_type: str, privileged_user: bool = False) -> None: visit_type_tasks = get_savable_visit_types(privileged_user) if visit_type not in visit_type_tasks: allowed_visit_types = ", ".join(visit_type_tasks) raise BadInputExc( f"Visit of type {visit_type} can not be saved! " f"Allowed types are the following: {allowed_visit_types}" ) _validate_url = URLValidator( schemes=["http", "https", "svn", "git", "rsync", "pserver", "ssh", "bzr"] ) def _check_origin_url_valid(origin_url: str) -> None: try: _validate_url(origin_url) except ValidationError: raise BadInputExc( "The provided origin url (%s) is not valid!" % escape(origin_url) ) def origin_exists(origin_url: str) -> OriginExistenceCheckInfo: """Check the origin url for existence. If it exists, extract some more useful information on the origin. """ resp = requests.head(origin_url, allow_redirects=True) exists = resp.ok content_length: Optional[int] = None last_modified: Optional[str] = None if exists: # Also process X-Archive-Orig-* headers in case the URL targets the # Internet Archive. size_ = resp.headers.get( "Content-Length", resp.headers.get("X-Archive-Orig-Content-Length") ) content_length = int(size_) if size_ else None try: date_str = resp.headers.get( "Last-Modified", resp.headers.get("X-Archive-Orig-Last-Modified", "") ) date = datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S %Z") last_modified = date.isoformat() except ValueError: # if not provided or not parsable as per the expected format, keep it None pass return OriginExistenceCheckInfo( origin_url=origin_url, exists=exists, last_modified=last_modified, content_length=content_length, ) def _check_origin_exists(url: str) -> OriginExistenceCheckInfo: """Ensure an URL exists, if not raise an explicit message.""" metadata = origin_exists(url) if not metadata["exists"]: raise BadInputExc(f"The provided url ({escape(url)}) does not exist!") return metadata def _get_visit_info_for_save_request( save_request: SaveOriginRequest, ) -> Tuple[Optional[datetime], Optional[str]]: """Retrieve visit information out of a save request Args: save_request: Input save origin request to retrieve information for. Returns: Tuple of (visit date, optional visit status) for such save request origin """ visit_date = None visit_status = None time_now = datetime.now(tz=timezone.utc) time_delta = time_now - save_request.request_date # stop trying to find a visit date one month after save request submission # as those requests to storage are expensive and associated loading task # surely ended up with errors if time_delta.days <= MAX_THRESHOLD_DAYS: origin = save_request.origin_url ovs = archive.origin_visit_find_by_date(origin, save_request.request_date) if ovs: visit_date = parse_iso8601_date_to_utc(ovs["date"]) visit_status = ovs["status"] return visit_date, visit_status def _check_visit_update_status( save_request: SaveOriginRequest, ) -> Tuple[Optional[datetime], Optional[str], Optional[str]]: """Given a save request, determine whether a save request was successful or failed. Args: save_request: Input save origin request to retrieve information for. Returns: Tuple of (optional visit date, optional visit status, optional save task status) for such save request origin """ visit_date, visit_status = _get_visit_info_for_save_request(save_request) loading_task_status = None if visit_date and visit_status in ("full", "partial"): # visit has been performed, mark the saving task as succeeded loading_task_status = SAVE_TASK_SUCCEEDED elif visit_status in ("created", "ongoing"): # visit is currently running loading_task_status = SAVE_TASK_RUNNING elif visit_status in ("not_found", "failed"): loading_task_status = SAVE_TASK_FAILED else: time_now = datetime.now(tz=timezone.utc) time_delta = time_now - save_request.request_date # consider the task as failed if it is still in scheduled state # 30 days after its submission if time_delta.days > MAX_THRESHOLD_DAYS: loading_task_status = SAVE_TASK_FAILED return visit_date, visit_status, loading_task_status def _compute_task_loading_status( task: Optional[Dict[str, Any]] = None, task_run: Optional[Dict[str, Any]] = None, ) -> Optional[str]: loading_task_status: Optional[str] = None # First determine the loading task status out of task information if task: loading_task_status = _save_task_status[task["status"]] if task_run: loading_task_status = _save_task_run_status[task_run["status"]] return loading_task_status def _update_save_request_info( save_request: SaveOriginRequest, task: Optional[Dict[str, Any]] = None, task_run: Optional[Dict[str, Any]] = None, ) -> SaveOriginRequestInfo: """Update save request information out of the visit status and fallback to the task and task_run information if the visit status is missing. Args: save_request: Save request task: Associated scheduler task information about the save request task_run: Most recent run occurrence of the associated task Returns: Summary of the save request information updated. """ must_save = False # To determine the save code now request's final status, the visit date must be set # and the visit status must be a final one. Once they do, the save code now is # definitely done. if ( not save_request.visit_date or not save_request.visit_status or save_request.visit_status in NON_TERMINAL_STATUSES ): visit_date, visit_status, loading_task_status = _check_visit_update_status( save_request ) if not loading_task_status: # fallback when not provided loading_task_status = _compute_task_loading_status(task, task_run) if visit_date != save_request.visit_date: must_save = True save_request.visit_date = visit_date if visit_status != save_request.visit_status: must_save = True save_request.visit_status = visit_status if ( loading_task_status is not None and loading_task_status != save_request.loading_task_status ): must_save = True save_request.loading_task_status = loading_task_status if must_save: save_request.save() return save_request.to_dict() def create_save_origin_request( visit_type: str, origin_url: str, privileged_user: bool = False, user_id: Optional[int] = None, **kwargs, ) -> SaveOriginRequestInfo: """Create a loading task to save a software origin into the archive. This function aims to create a software origin loading task through the use of the swh-scheduler component. First, some checks are performed to see if the visit type and origin url are valid but also if the the save request can be accepted. For the 'archives' visit type, this also ensures the artifacts actually exists. If those checks passed, the loading task is then created. Otherwise, the save request is put in pending or rejected state. All the submitted save requests are logged into the swh-web database to keep track of them. Args: visit_type: the type of visit to perform (e.g. git, hg, svn, archives, ...) origin_url: the url of the origin to save privileged: Whether the user has some more privilege than other (bypass review, access to privileged other visit types) user_id: User identifier (provided when authenticated) kwargs: Optional parameters (e.g. artifact_url, artifact_filename, artifact_version) Raises: BadInputExc: the visit type or origin url is invalid or inexistent ForbiddenExc: the provided origin url is blacklisted Returns: dict: A dict describing the save request with the following keys: * **visit_type**: the type of visit to perform * **origin_url**: the url of the origin * **save_request_date**: the date the request was submitted * **save_request_status**: the request status, either **accepted**, **rejected** or **pending** * **save_task_status**: the origin loading task status, either **not created**, **not yet scheduled**, **scheduled**, **succeed** or **failed** """ visit_type_tasks = get_savable_visit_types_dict(privileged_user) _check_visit_type_savable(visit_type, privileged_user) _check_origin_url_valid(origin_url) # if all checks passed so far, we can try and save the origin save_request_status = can_save_origin(origin_url, privileged_user) task = None # if the origin save request is accepted, create a scheduler # task to load it into the archive if save_request_status == SAVE_REQUEST_ACCEPTED: # create a task with high priority task_kwargs: Dict[str, Any] = { "priority": "high", "url": origin_url, } if visit_type == "archives": # extra arguments for that type are required archives_data = kwargs.get("archives_data", []) if not archives_data: raise BadInputExc( "Artifacts data are missing for the archives visit type." ) artifacts = [] for artifact in archives_data: artifact_url = artifact.get("artifact_url") artifact_version = artifact.get("artifact_version") if not artifact_url or not artifact_version: raise BadInputExc("Missing url or version for an artifact to load.") metadata = _check_origin_exists(artifact_url) artifacts.append( { "url": artifact_url, "version": artifact_version, "time": metadata["last_modified"], "length": metadata["content_length"], } ) task_kwargs = dict(**task_kwargs, artifacts=artifacts, snapshot_append=True) sor = None # get list of previously submitted save requests (most recent first) current_sors = list( SaveOriginRequest.objects.filter( visit_type=visit_type, origin_url=origin_url ).order_by("-request_date") ) can_create_task = False # if no save requests previously submitted, create the scheduler task if not current_sors: can_create_task = True else: # get the latest submitted save request sor = current_sors[0] # if it was in pending state, we need to create the scheduler task # and update the save request info in the database if sor.status == SAVE_REQUEST_PENDING: can_create_task = True # a task has already been created to load the origin elif sor.loading_task_id != -1: # get the scheduler task and its status tasks = scheduler().get_tasks([sor.loading_task_id]) task = tasks[0] if tasks else None task_runs = scheduler().get_task_runs([sor.loading_task_id]) task_run = task_runs[0] if task_runs else None save_request_info = _update_save_request_info(sor, task, task_run) task_status = save_request_info["save_task_status"] # create a new scheduler task only if the previous one has been # already executed if ( task_status == SAVE_TASK_FAILED or task_status == SAVE_TASK_SUCCEEDED ): can_create_task = True sor = None else: can_create_task = False if can_create_task: # effectively create the scheduler task task_dict = create_oneshot_task_dict( visit_type_tasks[visit_type], **task_kwargs ) task = scheduler().create_tasks([task_dict])[0] # pending save request has been accepted if sor: sor.status = SAVE_REQUEST_ACCEPTED sor.loading_task_id = task["id"] sor.save() else: sor = SaveOriginRequest.objects.create( visit_type=visit_type, origin_url=origin_url, status=save_request_status, loading_task_id=task["id"], user_ids=f'"{user_id}"' if user_id else None, ) # save request must be manually reviewed for acceptation elif save_request_status == SAVE_REQUEST_PENDING: # check if there is already such a save request already submitted, # no need to add it to the database in that case try: sor = SaveOriginRequest.objects.get( visit_type=visit_type, origin_url=origin_url, status=save_request_status ) user_ids = sor.user_ids if sor.user_ids is not None else "" if user_id is not None and f'"{user_id}"' not in user_ids: # update user ids list sor.user_ids = f'{sor.user_ids},"{user_id}"' sor.save() # if not add it to the database except ObjectDoesNotExist: sor = SaveOriginRequest.objects.create( visit_type=visit_type, origin_url=origin_url, status=save_request_status, user_ids=f'"{user_id}"' if user_id else None, ) # origin can not be saved as its url is blacklisted, # log the request to the database anyway else: sor = SaveOriginRequest.objects.create( visit_type=visit_type, origin_url=origin_url, status=save_request_status, user_ids=f'"{user_id}"' if user_id else None, ) if save_request_status == SAVE_REQUEST_REJECTED: raise ForbiddenExc( ( 'The "save code now" request has been rejected ' "because the provided origin url is blacklisted." ) ) assert sor is not None return _update_save_request_info(sor, task) def update_save_origin_requests_from_queryset( requests_queryset: QuerySet, ) -> List[SaveOriginRequestInfo]: """Update all save requests from a SaveOriginRequest queryset, update their status in db and return the list of impacted save_requests. Args: requests_queryset: input SaveOriginRequest queryset Returns: list: A list of save origin request info dicts as described in :func:`swh.web.common.origin_save.create_save_origin_request` """ task_ids = [] for sor in requests_queryset: task_ids.append(sor.loading_task_id) save_requests = [] if task_ids: try: tasks = scheduler().get_tasks(task_ids) tasks = {task["id"]: task for task in tasks} task_runs = scheduler().get_task_runs(tasks) task_runs = {task_run["task"]: task_run for task_run in task_runs} except Exception: # allow to avoid mocking api GET responses for /origin/save endpoint when # running cypress tests as scheduler is not available tasks = {} task_runs = {} for sor in requests_queryset: sr_dict = _update_save_request_info( sor, tasks.get(sor.loading_task_id), task_runs.get(sor.loading_task_id), ) save_requests.append(sr_dict) return save_requests def refresh_save_origin_request_statuses() -> List[SaveOriginRequestInfo]: """Refresh non-terminal save origin requests (SOR) in the backend. Non-terminal SOR are requests whose status is **accepted** and their task status are either **created**, **not yet scheduled**, **scheduled** or **running**. This shall compute this list of SOR, checks their status in the scheduler and optionally elasticsearch for their current status. Then update those in db. Finally, this returns the refreshed information on those SOR. """ pivot_date = datetime.now(tz=timezone.utc) - timedelta(days=MAX_THRESHOLD_DAYS) save_requests = SaveOriginRequest.objects.filter( # Retrieve accepted request statuses (all statuses) Q(status=SAVE_REQUEST_ACCEPTED), # those without the required information we need to update Q(visit_date__isnull=True) | Q(visit_status__isnull=True) | Q(visit_status__in=NON_TERMINAL_STATUSES), # limit results to recent ones (that is roughly 30 days old at best) Q(request_date__gte=pivot_date), ) return ( update_save_origin_requests_from_queryset(save_requests) if save_requests.count() > 0 else [] ) def get_save_origin_requests( visit_type: str, origin_url: str ) -> List[SaveOriginRequestInfo]: """ Get all save requests for a given software origin. Args: visit_type: the type of visit origin_url: the url of the origin Raises: BadInputExc: the visit type or origin url is invalid swh.web.common.exc.NotFoundExc: no save requests can be found for the given origin Returns: list: A list of save origin requests dict as described in :func:`swh.web.common.origin_save.create_save_origin_request` """ _check_visit_type_savable(visit_type) _check_origin_url_valid(origin_url) sors = SaveOriginRequest.objects.filter( visit_type=visit_type, origin_url=origin_url ) if sors.count() == 0: raise NotFoundExc( f"No save requests found for visit of type {visit_type} " f"on origin with url {origin_url}." ) return update_save_origin_requests_from_queryset(sors) def get_save_origin_task_info( save_request_id: int, full_info: bool = True ) -> Dict[str, Any]: """ Get detailed information about an accepted save origin request and its associated loading task. If the associated loading task info is archived and removed from the scheduler database, returns an empty dictionary. Args: save_request_id: identifier of a save origin request full_info: whether to return detailed info for staff users Returns: A dictionary with the following keys: - **type**: loading task type - **arguments**: loading task arguments - **id**: loading task database identifier - **backend_id**: loading task celery identifier - **scheduled**: loading task scheduling date - **ended**: loading task termination date - **status**: loading task execution status - **visit_status**: Actual visit status Depending on the availability of the task logs in the elasticsearch cluster of Software Heritage, the returned dictionary may also contain the following keys: - **name**: associated celery task name - **message**: relevant log message from task execution - **duration**: task execution time (only if it succeeded) - **worker**: name of the worker that executed the task """ try: save_request = SaveOriginRequest.objects.get(id=save_request_id) except ObjectDoesNotExist: return {} task_info: Dict[str, Any] = {} if save_request.note is not None: task_info["note"] = save_request.note try: task = scheduler().get_tasks([save_request.loading_task_id]) except Exception: # to avoid mocking GET responses of /save/task/info/ endpoint when running # cypress tests as scheduler is not available in that case task = None task = task[0] if task else None if task is None: return task_info task_run = scheduler().get_task_runs([task["id"]]) task_run = task_run[0] if task_run else None if task_run is None: return task_info task_info.update(task_run) task_info["type"] = task["type"] task_info["arguments"] = task["arguments"] task_info["id"] = task_run["task"] del task_info["task"] del task_info["metadata"] # Enrich the task info with the loading visit status task_info["visit_status"] = save_request.visit_status es_workers_index_url = get_config()["es_workers_index_url"] if not es_workers_index_url: return task_info es_workers_index_url += "/_search" if save_request.visit_date: min_ts = save_request.visit_date max_ts = min_ts + timedelta(days=7) else: min_ts = save_request.request_date max_ts = min_ts + timedelta(days=MAX_THRESHOLD_DAYS) min_ts_unix = int(min_ts.timestamp()) * 1000 max_ts_unix = int(max_ts.timestamp()) * 1000 save_task_status = _save_task_status[task["status"]] priority = "3" if save_task_status == SAVE_TASK_FAILED else "6" query = { "bool": { "must": [ {"match_phrase": {"syslog.priority": {"query": priority}}}, { "match_phrase": { "journald.custom.swh_task_id": {"query": task_run["backend_id"]} } }, { "range": { "@timestamp": { "gte": min_ts_unix, "lte": max_ts_unix, "format": "epoch_millis", } } }, ] } } try: response = requests.post( es_workers_index_url, json={"query": query, "sort": ["@timestamp"]}, timeout=30, ) results = json.loads(response.text) if results["hits"]["total"]["value"] >= 1: task_run_info = results["hits"]["hits"][-1]["_source"] journald_custom = task_run_info.get("journald", {}).get("custom", {}) task_info["duration"] = journald_custom.get( "swh_logging_args_runtime", "not available" ) task_info["message"] = task_run_info.get("message", "not available") task_info["name"] = journald_custom.get("swh_task_name", "not available") task_info["worker"] = task_run_info.get("host", {}).get("hostname") except Exception as exc: logger.warning("Request to Elasticsearch failed\n%s", exc) - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) if not full_info: for field in ("id", "backend_id", "worker"): # remove some staff only fields task_info.pop(field, None) if "message" in task_run and "Loading failure" in task_run["message"]: # hide traceback for non staff users, only display exception message_lines = task_info["message"].split("\n") message = "" for line in message_lines: if line.startswith("Traceback"): break message += f"{line}\n" message += message_lines[-1] task_info["message"] = message return task_info SUBMITTED_SAVE_REQUESTS_METRIC = "swh_web_submitted_save_requests" _submitted_save_requests_gauge = Gauge( name=SUBMITTED_SAVE_REQUESTS_METRIC, documentation="Number of submitted origin save requests", labelnames=["status", "visit_type"], registry=SWH_WEB_METRICS_REGISTRY, ) ACCEPTED_SAVE_REQUESTS_METRIC = "swh_web_accepted_save_requests" _accepted_save_requests_gauge = Gauge( name=ACCEPTED_SAVE_REQUESTS_METRIC, documentation="Number of accepted origin save requests", labelnames=["load_task_status", "visit_type"], registry=SWH_WEB_METRICS_REGISTRY, ) # Metric on the delay of save code now request per status and visit_type. This is the # time difference between the save code now is requested and the time it got ingested. ACCEPTED_SAVE_REQUESTS_DELAY_METRIC = "swh_web_save_requests_delay_seconds" _accepted_save_requests_delay_gauge = Gauge( name=ACCEPTED_SAVE_REQUESTS_DELAY_METRIC, documentation="Save Requests Duration", labelnames=["load_task_status", "visit_type"], registry=SWH_WEB_METRICS_REGISTRY, ) def compute_save_requests_metrics() -> None: """Compute Prometheus metrics related to origin save requests: - Number of submitted origin save requests - Number of accepted origin save requests - Save Code Now requests delay between request time and actual time of ingestion """ request_statuses = ( SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_REJECTED, SAVE_REQUEST_PENDING, ) load_task_statuses = ( SAVE_TASK_NOT_CREATED, SAVE_TASK_NOT_YET_SCHEDULED, SAVE_TASK_SCHEDULED, SAVE_TASK_SUCCEEDED, SAVE_TASK_FAILED, SAVE_TASK_RUNNING, ) # for metrics, we want access to all visit types visit_types = get_savable_visit_types(privileged_user=True) labels_set = product(request_statuses, visit_types) for labels in labels_set: _submitted_save_requests_gauge.labels(*labels).set(0) labels_set = product(load_task_statuses, visit_types) for labels in labels_set: _accepted_save_requests_gauge.labels(*labels).set(0) duration_load_task_statuses = ( SAVE_TASK_FAILED, SAVE_TASK_SUCCEEDED, ) for labels in product(duration_load_task_statuses, visit_types): _accepted_save_requests_delay_gauge.labels(*labels).set(0) for sor in SaveOriginRequest.objects.all(): if sor.status == SAVE_REQUEST_ACCEPTED: _accepted_save_requests_gauge.labels( load_task_status=sor.loading_task_status, visit_type=sor.visit_type, ).inc() _submitted_save_requests_gauge.labels( status=sor.status, visit_type=sor.visit_type ).inc() if ( sor.loading_task_status in (SAVE_TASK_SUCCEEDED, SAVE_TASK_FAILED) and sor.visit_date is not None and sor.request_date is not None ): delay = sor.visit_date.timestamp() - sor.request_date.timestamp() _accepted_save_requests_delay_gauge.labels( load_task_status=sor.loading_task_status, visit_type=sor.visit_type, ).inc(delay) diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index d8bb0fcf..41f5d934 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,528 +1,527 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import functools import os import re from typing import Any, Callable, Dict, List, Optional import urllib.parse from bs4 import BeautifulSoup from docutils.core import publish_parts import docutils.parsers.rst import docutils.utils from docutils.writers.html5_polyglot import HTMLTranslator, Writer from iso8601 import ParseError, parse_date from pkg_resources import get_distribution from prometheus_client.registry import CollectorRegistry import requests from requests.auth import HTTPBasicAuth -import sentry_sdk from django.core.cache import cache from django.core.cache.backends.base import DEFAULT_TIMEOUT from django.http import HttpRequest, QueryDict from django.shortcuts import redirect from django.urls import resolve from django.urls import reverse as django_reverse from swh.web.auth.utils import ( ADD_FORGE_MODERATOR_PERMISSION, ADMIN_LIST_DEPOSIT_PERMISSION, MAILMAP_ADMIN_PERMISSION, ) -from swh.web.common.exc import BadInputExc +from swh.web.common.exc import BadInputExc, sentry_capture_exception from swh.web.common.typing import QueryParameters from swh.web.config import SWH_WEB_SERVER_NAME, get_config, search SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) swh_object_icons = { "alias": "mdi mdi-star", "branch": "mdi mdi-source-branch", "branches": "mdi mdi-source-branch", "content": "mdi mdi-file-document", "cnt": "mdi mdi-file-document", "directory": "mdi mdi-folder", "dir": "mdi mdi-folder", "origin": "mdi mdi-source-repository", "ori": "mdi mdi-source-repository", "person": "mdi mdi-account", "revisions history": "mdi mdi-history", "release": "mdi mdi-tag", "rel": "mdi mdi-tag", "releases": "mdi mdi-tag", "revision": "mdi mdi-rotate-90 mdi-source-commit", "rev": "mdi mdi-rotate-90 mdi-source-commit", "snapshot": "mdi mdi-camera", "snp": "mdi mdi-camera", "visits": "mdi mdi-calendar-month", } def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[QueryParameters] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo and date.tzinfo != timezone.utc: return date.astimezone(tz=timezone.utc) else: return date def parse_iso8601_date_to_utc(iso_date: str) -> datetime: """Given an ISO 8601 datetime string, parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed date Raises: swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - 2007-01-14T20:34:22Z """ try: date = parse_date(iso_date) return datetime_to_utc(date) except ParseError as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}" sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}" ret = re.sub(sha256_re, r"\1...", path) return re.sub(sha1_re, r"\1...", ret) def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"): """Turns a string representation of an ISO 8601 datetime string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_iso8601_date_to_utc(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip("/").split("/") path_from_root = "" for p in sub_paths: path_from_root += "/" + p path_info.append({"name": p, "path": path_from_root.strip("/")}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components ).get_default_values() settings.report_level = report_level document = docutils.utils.new_document("rst-doc", settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: ip = x_forwarded_for.split(",")[0] else: ip = request.META.get("REMOTE_ADDR") return ip def is_swh_web_development(request: HttpRequest) -> bool: """Indicate if we are running a development version of swh-web.""" site_base_url = request.build_absolute_uri("/") return any( host in site_base_url for host in ("localhost", "127.0.0.1", "testserver") ) def is_swh_web_staging(request: HttpRequest) -> bool: """Indicate if we are running a staging version of swh-web.""" config = get_config() site_base_url = request.build_absolute_uri("/") return any( server_name in site_base_url for server_name in config["staging_server_names"] ) def is_swh_web_production(request: HttpRequest) -> bool: """Indicate if we are running the public production version of swh-web.""" return SWH_WEB_SERVER_NAME in request.build_absolute_uri("/") browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if ( hasattr(request, "user") and request.user.is_authenticated and not hasattr(request.user, "backend") ): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = "django.contrib.auth.backends.ModelBackend" return { "swh_object_icons": swh_object_icons, "available_languages": None, "swh_client_config": config["client_config"], "oidc_enabled": bool(config["keycloak"]["server_url"]), "browsers_supported_image_mimes": browsers_supported_image_mimes, "keycloak": config["keycloak"], "site_base_url": request.build_absolute_uri("/"), "DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"], "status": config["status"], "swh_web_dev": is_swh_web_development(request), "swh_web_staging": is_swh_web_staging(request), "swh_web_version": get_distribution("swh.web").version, "iframe_mode": False, "ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION, "ADD_FORGE_MODERATOR_PERMISSION": ADD_FORGE_MODERATOR_PERMISSION, "FEATURES": get_config()["features"], "MAILMAP_ADMIN_PERMISSION": MAILMAP_ADMIN_PERMISSION, } def resolve_branch_alias( snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch["target_type"] == "alias": if branch["target"] in snapshot["branches"]: branch = snapshot["branches"][branch["target"]] else: from swh.web.common import archive snp = archive.lookup_snapshot( snapshot["id"], branches_from=branch["target"], branches_count=1 ) if snp and branch["target"] in snp["branches"]: branch = snp["branches"][branch["target"]] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { "initial_header_level": 2, "halt_level": 4, "traceback": True, "file_insertion_enabled": False, "raw_enabled": False, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
{pp["html_body"]}
' def prettify_html(html: str) -> str: """ Prettify an HTML document. Args: html: Input HTML document Returns: The prettified HTML document """ return BeautifulSoup(html, "lxml").prettify() def django_cache( timeout: int = DEFAULT_TIMEOUT, catch_exception: bool = False, exception_return_value: Any = None, invalidate_cache_pred: Callable[[Any], bool] = lambda val: False, ): """Decorator to put the result of a function call in Django cache, subsequent calls will directly return the cached value. Args: timeout: The number of seconds value will be hold in cache catch_exception: If :const:`True`, any thrown exception by the decorated function will be caught and not reraised exception_return_value: The value to return if previous parameter is set to :const:`True` invalidate_cache_pred: A predicate function enabling to invalidate the cache under certain conditions, decorated function will then be called again Returns: The returned value of the decorated function for the specified parameters """ def inner(func): @functools.wraps(func) def wrapper(*args, **kwargs): func_args = args + (0,) + tuple(sorted(kwargs.items())) cache_key = str(hash((func.__module__, func.__name__) + func_args)) ret = cache.get(cache_key) if ret is None or invalidate_cache_pred(ret): try: ret = func(*args, **kwargs) except Exception as exc: - sentry_sdk.capture_exception(exc) if catch_exception: + sentry_capture_exception(exc) return exception_return_value else: raise else: cache.set(cache_key, ret, timeout=timeout) return ret return wrapper return inner def _deposits_list_url( deposits_list_base_url: str, page_size: int, username: Optional[str] ) -> str: params = {"page_size": str(page_size)} if username is not None: params["username"] = username return f"{deposits_list_base_url}?{urllib.parse.urlencode(params)}" def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]: """Return the list of software deposits using swh-deposit API""" config = get_config()["deposit"] private_api_url = config["private_api_url"].rstrip("/") + "/" deposits_list_base_url = private_api_url + "deposits" deposits_list_auth = HTTPBasicAuth( config["private_api_user"], config["private_api_password"] ) deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=1, username=username ) nb_deposits = requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30 ).json()["count"] @django_cache(invalidate_cache_pred=lambda data: data["count"] != nb_deposits) def _get_deposits_data(): deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=nb_deposits, username=username ) return requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30, ).json() deposits_data = _get_deposits_data() return deposits_data["results"] _origin_visit_types_cache_timeout = 24 * 60 * 60 # 24 hours @django_cache( timeout=_origin_visit_types_cache_timeout, catch_exception=True, exception_return_value=[], ) def origin_visit_types() -> List[str]: """Return the exhaustive list of visit types for origins ingested into the archive. """ return sorted(search().visit_types_count().keys()) def redirect_to_new_route(request, new_route, permanent=True): """Redirect a request to another route with url args and query parameters eg: /origin//log?path=test can be redirected as /log?url=&path=test. This can be used to deprecate routes """ request_path = resolve(request.path_info) args = {**request_path.kwargs, **request.GET.dict()} return redirect( reverse(new_route, query_params=args), permanent=permanent, ) def has_add_forge_now_permission(user) -> bool: """Is a user considered an add-forge-now moderator? Returns True if a user is staff or has add forge now moderator permission """ return user.is_staff or user.has_perm(ADD_FORGE_MODERATOR_PERMISSION) diff --git a/swh/web/inbound_email/management/commands/process_inbound_email.py b/swh/web/inbound_email/management/commands/process_inbound_email.py index 8d49670a..fd7445c3 100644 --- a/swh/web/inbound_email/management/commands/process_inbound_email.py +++ b/swh/web/inbound_email/management/commands/process_inbound_email.py @@ -1,73 +1,72 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import email import email.message import email.policy import logging import sys from typing import Callable -import sentry_sdk - from django.core.management.base import BaseCommand +from swh.web.common.exc import sentry_capture_exception from swh.web.inbound_email import signals logger = logging.getLogger(__name__) class Command(BaseCommand): help = "Process a new inbound email" def handle(self, *args, **options): raw_message = sys.stdin.buffer.read() try: message = email.message_from_bytes(raw_message, policy=email.policy.default) except Exception as exc: - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) self.handle_failed_message(raw_message) # XXX make sure having logging doesn't make postfix unhappy logger.exception("Could not convert email from bytes") return responses = signals.email_received.send_robust( sender=self.__class__, message=message ) handled = False for receiver, response in responses: if isinstance(response, Exception): - sentry_sdk.capture_exception(response) + sentry_capture_exception(response) self.handle_failing_receiver(message, receiver) logger.error( "Receiver produced the following exception", exc_info=response ) elif response is signals.EmailProcessingStatus.FAILED: self.handle_failing_receiver(message, receiver) elif response is signals.EmailProcessingStatus.PROCESSED: handled = True if not handled: self.handle_unhandled_message(message) def handle_failed_message(self, raw_message: bytes): # TODO: forward email as attachment for inspection logger.error("Failed message: %s", raw_message.decode("ascii", "replace")) def handle_failing_receiver( self, message: email.message.EmailMessage, receiver: Callable ): # TODO: forward email for inspection logger.error( "Failed receiver %s:%s; message: %s", receiver.__module__, receiver.__qualname__, str(message), ) def handle_unhandled_message(self, message: email.message.EmailMessage): # TODO: pass email through to a fallback alias? logger.error("Unhandled message: %s", str(message)) diff --git a/swh/web/misc/urls.py b/swh/web/misc/urls.py index 8f81f17a..03e22969 100644 --- a/swh/web/misc/urls.py +++ b/swh/web/misc/urls.py @@ -1,104 +1,104 @@ -# Copyright (C) 2019-2021 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import requests -import sentry_sdk from django.conf.urls import include, url from django.contrib.staticfiles import finders from django.http import JsonResponse from django.shortcuts import render from swh.web.common import archive +from swh.web.common.exc import sentry_capture_exception from swh.web.config import get_config from swh.web.misc.metrics import prometheus_metrics def _jslicenses(request): jslicenses_file = finders.find("jssources/jslicenses.json") jslicenses_data = json.load(open(jslicenses_file)) jslicenses_data = sorted( jslicenses_data.items(), key=lambda item: item[0].split("/")[-1] ) return render(request, "misc/jslicenses.html", {"jslicenses_data": jslicenses_data}) def _stat_counters(request): stat_counters = archive.stat_counters() url = get_config()["history_counters_url"] stat_counters_history = {} try: response = requests.get(url, timeout=5) stat_counters_history = json.loads(response.text) except Exception as exc: - sentry_sdk.capture_exception(exc) + sentry_capture_exception(exc) counters = { "stat_counters": stat_counters, "stat_counters_history": stat_counters_history, } return JsonResponse(counters) urlpatterns = [ url(r"^", include("swh.web.misc.coverage")), url(r"^jslicenses/$", _jslicenses, name="jslicenses"), url(r"^", include("swh.web.misc.origin_save")), url(r"^stat_counters/$", _stat_counters, name="stat-counters"), url(r"^", include("swh.web.misc.badges")), url(r"^metrics/prometheus/$", prometheus_metrics, name="metrics-prometheus"), url(r"^", include("swh.web.misc.iframe")), url(r"^", include("swh.web.misc.fundraising")), ] # when running end to end tests through cypress, declare some extra # endpoints to provide input data for some of those tests if get_config()["e2e_tests_mode"]: from swh.web.tests.views import ( get_content_code_data_all_exts, get_content_code_data_all_filenames, get_content_code_data_by_ext, get_content_code_data_by_filename, get_content_other_data_by_ext, ) urlpatterns.append( url( r"^tests/data/content/code/extension/(?P.+)/$", get_content_code_data_by_ext, name="tests-content-code-extension", ) ) urlpatterns.append( url( r"^tests/data/content/other/extension/(?P.+)/$", get_content_other_data_by_ext, name="tests-content-other-extension", ) ) urlpatterns.append( url( r"^tests/data/content/code/extensions/$", get_content_code_data_all_exts, name="tests-content-code-extensions", ) ) urlpatterns.append( url( r"^tests/data/content/code/filename/(?P.+)/$", get_content_code_data_by_filename, name="tests-content-code-filename", ) ) urlpatterns.append( url( r"^tests/data/content/code/filenames/$", get_content_code_data_all_filenames, name="tests-content-code-filenames", ) )