diff --git a/swh/web/api/throttling.py b/swh/web/api/throttling.py
index 24e7f11b..87ffb456 100644
--- a/swh/web/api/throttling.py
+++ b/swh/web/api/throttling.py
@@ -1,217 +1,216 @@
-# Copyright (C) 2017-2020 The Software Heritage developers
+# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from ipaddress import IPv4Network, IPv6Network, ip_address, ip_network
from typing import Callable, List, TypeVar, Union
-import sentry_sdk
-
from django.core.exceptions import ImproperlyConfigured
import rest_framework
from rest_framework.throttling import ScopedRateThrottle
from swh.web.auth.utils import API_SAVE_ORIGIN_PERMISSION
+from swh.web.common.exc import sentry_capture_exception
from swh.web.config import get_config
APIView = TypeVar("APIView", bound="rest_framework.views.APIView")
Request = rest_framework.request.Request
API_THROTTLING_EXEMPTED_PERM = "swh.web.api.throttling_exempted"
class SwhWebRateThrottle(ScopedRateThrottle):
"""Custom DRF request rate limiter for anonymous users
Requests are grouped into scopes. It enables to apply different
requests rate limiting based on the scope name but also the
input HTTP request types.
To associate a scope to requests, one must add a 'throttle_scope'
attribute when using a class based view, or call the 'throttle_scope'
decorator when using a function based view. By default, requests
do not have an associated scope and are not rate limited.
Rate limiting can also be configured according to the type
of the input HTTP requests for fine grained tuning.
For instance, the following YAML configuration section sets a rate of:
- 1 per minute for POST requests
- 60 per minute for other request types
for the 'swh_api' scope while exempting those coming from the
127.0.0.0/8 ip network.
.. code-block:: yaml
throttling:
scopes:
swh_api:
limiter_rate:
default: 60/m
POST: 1/m
exempted_networks:
- 127.0.0.0/8
"""
scope = None
def __init__(self):
super().__init__()
self.exempted_networks = None
self.num_requests = 0
self.duration = 0
def get_cache_key(self, request, view):
# do not handle throttling if user is authenticated
if request.user.is_authenticated:
return None
else:
return super().get_cache_key(request, view)
def get_exempted_networks(
self, scope_name: str
) -> List[Union[IPv4Network, IPv6Network]]:
if not self.exempted_networks:
scopes = get_config()["throttling"]["scopes"]
scope = scopes.get(scope_name)
if scope:
networks = scope.get("exempted_networks")
if networks:
self.exempted_networks = [
ip_network(network) for network in networks
]
return self.exempted_networks
def get_scope(self, view: APIView):
if not self.scope:
# class based view case
return getattr(view, self.scope_attr, None)
else:
# function based view case
return self.scope
def allow_request(self, request: Request, view: APIView) -> bool:
# class based view case
if not self.scope:
default_scope = getattr(view, self.scope_attr, None)
request_allowed = None
if default_scope is not None:
# check if there is a specific rate limiting associated
# to the request type
assert request.method is not None
request_scope = f"{default_scope}_{request.method.lower()}"
setattr(view, self.scope_attr, request_scope)
try:
request_allowed = super().allow_request(request, view)
# use default rate limiting otherwise
except ImproperlyConfigured as exc:
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
setattr(view, self.scope_attr, default_scope)
if request_allowed is None:
request_allowed = super().allow_request(request, view)
# function based view case
else:
default_scope = self.scope
# check if there is a specific rate limiting associated
# to the request type
self.scope = default_scope + "_" + request.method.lower()
try:
self.rate = self.get_rate()
# use default rate limiting otherwise
except ImproperlyConfigured:
self.scope = default_scope
self.rate = self.get_rate()
self.num_requests, self.duration = self.parse_rate(self.rate)
request_allowed = super(ScopedRateThrottle, self).allow_request(
request, view
)
self.scope = default_scope
exempted_networks = self.get_exempted_networks(default_scope)
exempted_ip = False
if exempted_networks:
remote_address = ip_address(self.get_ident(request))
exempted_ip = any(
remote_address in network for network in exempted_networks
)
request_allowed = exempted_ip or request_allowed
# set throttling related data in the request metadata
# in order for the ThrottlingHeadersMiddleware to
# add X-RateLimit-* headers in the HTTP response
if not exempted_ip and hasattr(self, "history"):
hit_count = len(self.history)
request.META["RateLimit-Limit"] = self.num_requests
request.META["RateLimit-Remaining"] = self.num_requests - hit_count
wait = self.wait()
if wait is not None:
request.META["RateLimit-Reset"] = int(self.now + wait)
return request_allowed
class SwhWebUserRateThrottle(SwhWebRateThrottle):
"""Custom DRF request rate limiter for authenticated users
It has the same behavior than :class:`swh.web.api.throttling.SwhWebRateThrottle`
except the number of allowed requests for each throttle scope is increased by a
1Ox factor.
"""
NUM_REQUESTS_FACTOR = 10
def get_cache_key(self, request, view):
# do not handle throttling if user is not authenticated
if request.user.is_authenticated:
return super(SwhWebRateThrottle, self).get_cache_key(request, view)
else:
return None
def parse_rate(self, rate):
# increase number of allowed requests
num_requests, duration = super().parse_rate(rate)
return (num_requests * self.NUM_REQUESTS_FACTOR, duration)
def allow_request(self, request: Request, view: APIView) -> bool:
if request.user.is_staff or request.user.has_perm(API_THROTTLING_EXEMPTED_PERM):
# no throttling for staff users or users with adequate permission
return True
scope = self.get_scope(view)
if scope == "save_origin" and request.user.has_perm(API_SAVE_ORIGIN_PERMISSION):
# no throttling on save origin endpoint for users with adequate permission
return True
return super().allow_request(request, view)
def throttle_scope(scope: str) -> Callable[..., APIView]:
"""Decorator that allows the throttle scope of a DRF
function based view to be set::
@api_view(['GET', ])
@throttle_scope('scope')
def view(request):
...
"""
def decorator(func: APIView) -> APIView:
SwhScopeRateThrottle = type(
"SwhWebScopeRateThrottle", (SwhWebRateThrottle,), {"scope": scope}
)
SwhScopeUserRateThrottle = type(
"SwhWebScopeUserRateThrottle",
(SwhWebUserRateThrottle,),
{"scope": scope},
)
func.throttle_classes = (SwhScopeRateThrottle, SwhScopeUserRateThrottle)
return func
return decorator
diff --git a/swh/web/browse/utils.py b/swh/web/browse/utils.py
index 3eaef3cd..cc992f1c 100644
--- a/swh/web/browse/utils.py
+++ b/swh/web/browse/utils.py
@@ -1,726 +1,725 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
import stat
import textwrap
from typing import Tuple
import chardet
import magic
-import sentry_sdk
from django.utils.html import escape
from django.utils.safestring import mark_safe
from swh.web.common import archive, highlightjs
-from swh.web.common.exc import NotFoundExc
+from swh.web.common.exc import NotFoundExc, sentry_capture_exception
from swh.web.common.utils import (
browsers_supported_image_mimes,
django_cache,
format_utc_iso_date,
reverse,
rst_to_html,
)
from swh.web.config import get_config
@django_cache()
def get_directory_entries(sha1_git):
"""Function that retrieves the content of a directory
from the archive.
The directories entries are first sorted in lexicographical order.
Sub-directories and regular files are then extracted.
Args:
sha1_git: sha1_git identifier of the directory
Returns:
A tuple whose first member corresponds to the sub-directories list
and second member the regular files list
Raises:
NotFoundExc if the directory is not found
"""
entries = list(archive.lookup_directory(sha1_git))
for e in entries:
e["perms"] = stat.filemode(e["perms"])
if e["type"] == "rev":
# modify dir entry name to explicitly show it points
# to a revision
e["name"] = "%s @ %s" % (e["name"], e["target"][:7])
dirs = [e for e in entries if e["type"] in ("dir", "rev")]
files = [e for e in entries if e["type"] == "file"]
dirs = sorted(dirs, key=lambda d: d["name"])
files = sorted(files, key=lambda f: f["name"])
return dirs, files
def get_mimetype_and_encoding_for_content(content):
"""Function that returns the mime type and the encoding associated to
a content buffer using the magic module under the hood.
Args:
content (bytes): a content buffer
Returns:
A tuple (mimetype, encoding), for instance ('text/plain', 'us-ascii'),
associated to the provided content.
"""
m = magic.Magic(mime=True, mime_encoding=True)
mime_encoding = m.from_buffer(content)
mime_type, encoding = mime_encoding.split(";")
encoding = encoding.replace(" charset=", "")
return mime_type, encoding
# maximum authorized content size in bytes for HTML display
# with code highlighting
content_display_max_size = get_config()["content_display_max_size"]
def re_encode_content(
mimetype: str, encoding: str, content_data: bytes
) -> Tuple[str, str, bytes]:
"""Try to re-encode textual content if it is not encoded to UTF-8
for proper display in the browse Web UI.
Args:
mimetype: content mimetype as detected by python-magic
encoding: content encoding as detected by python-magic
content_data: raw content bytes
Returns:
A tuple with 3 members: content mimetype, content encoding (possibly updated
after processing), content raw bytes (possibly reencoded to UTF-8)
"""
if mimetype.startswith("text/") and encoding not in ("us-ascii", "utf-8"):
# first check if chardet detects an encoding with confidence
result = chardet.detect(content_data)
if result["confidence"] >= 0.9:
encoding = result["encoding"]
content_data = content_data.decode(encoding).encode("utf-8")
elif encoding == "unknown-8bit":
# probably a malformed UTF-8 content, re-encode it
# by replacing invalid chars with a substitution one
content_data = content_data.decode("utf-8", "replace").encode("utf-8")
elif encoding not in ["utf-8", "binary"]:
content_data = content_data.decode(encoding, "replace").encode("utf-8")
elif mimetype.startswith("application/octet-stream"):
# file may detect a text content as binary
# so try to decode it for display
encodings = ["us-ascii", "utf-8"]
encodings += ["iso-8859-%s" % i for i in range(1, 17)]
for enc in encodings:
try:
content_data = content_data.decode(enc).encode("utf-8")
except Exception as exc:
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
else:
# ensure display in content view
encoding = enc
mimetype = "text/plain"
break
return mimetype, encoding, content_data
def request_content(
query_string,
max_size=content_display_max_size,
re_encode=True,
):
"""Function that retrieves a content from the archive.
Raw bytes content is first retrieved, then the content mime type.
If the mime type is not stored in the archive, it will be computed
using Python magic module.
Args:
query_string: a string of the form "[ALGO_HASH:]HASH" where
optional ALGO_HASH can be either ``sha1``, ``sha1_git``,
``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH
the hexadecimal representation of the hash value
max_size: the maximum size for a content to retrieve (default to 1MB,
no size limit if None)
Returns:
A tuple whose first member corresponds to the content raw bytes
and second member the content mime type
Raises:
NotFoundExc if the content is not found
"""
content_data = archive.lookup_content(query_string)
filetype = None
language = None
# requests to the indexer db may fail so properly handle
# those cases in order to avoid content display errors
try:
filetype = archive.lookup_content_filetype(query_string)
language = archive.lookup_content_language(query_string)
except Exception as exc:
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
mimetype = "unknown"
encoding = "unknown"
if filetype:
mimetype = filetype["mimetype"]
encoding = filetype["encoding"]
if not max_size or content_data["length"] < max_size:
try:
content_raw = archive.lookup_content_raw(query_string)
except Exception as exc:
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
raise NotFoundExc(
"The bytes of the content are currently not available "
"in the archive."
)
else:
content_data["raw_data"] = content_raw["data"]
if not filetype:
mimetype, encoding = get_mimetype_and_encoding_for_content(
content_data["raw_data"]
)
if re_encode:
mimetype, encoding, raw_data = re_encode_content(
mimetype, encoding, content_data["raw_data"]
)
content_data["raw_data"] = raw_data
else:
content_data["raw_data"] = None
content_data["mimetype"] = mimetype
content_data["encoding"] = encoding
if language:
content_data["language"] = language["lang"]
else:
content_data["language"] = "not detected"
return content_data
def prepare_content_for_display(content_data, mime_type, path):
"""Function that prepares a content for HTML display.
The function tries to associate a programming language to a
content in order to perform syntax highlighting client-side
using highlightjs. The language is determined using either
the content filename or its mime type.
If the mime type corresponds to an image format supported
by web browsers, the content will be encoded in base64
for displaying the image.
Args:
content_data (bytes): raw bytes of the content
mime_type (string): mime type of the content
path (string): path of the content including filename
Returns:
A dict containing the content bytes (possibly different from the one
provided as parameter if it is an image) under the key 'content_data
and the corresponding highlightjs language class under the
key 'language'.
"""
language = None
if path:
language = highlightjs.get_hljs_language_from_filename(path.split("/")[-1])
if language is None:
language = highlightjs.get_hljs_language_from_mime_type(mime_type)
if language is None:
language = "plaintext"
if mime_type.startswith("image/"):
if mime_type in browsers_supported_image_mimes:
content_data = base64.b64encode(content_data).decode("ascii")
if mime_type.startswith("image/svg"):
mime_type = "image/svg+xml"
if mime_type.startswith("text/") or mime_type.startswith("application/"):
content_data = content_data.decode("utf-8", errors="replace")
return {"content_data": content_data, "language": language, "mimetype": mime_type}
def gen_link(url, link_text=None, link_attrs=None):
"""
Utility function for generating an HTML link to insert
in Django templates.
Args:
url (str): an url
link_text (str): optional text for the produced link,
if not provided the url will be used
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form 'link_text'
"""
attrs = " "
if link_attrs:
for k, v in link_attrs.items():
attrs += '%s="%s" ' % (k, v)
if not link_text:
link_text = url
link = '%s' % (attrs, escape(url), escape(link_text))
return mark_safe(link)
def _snapshot_context_query_params(snapshot_context):
query_params = {}
if not snapshot_context:
return query_params
if snapshot_context and snapshot_context["origin_info"]:
origin_info = snapshot_context["origin_info"]
snp_query_params = snapshot_context["query_params"]
query_params = {"origin_url": origin_info["url"]}
if "timestamp" in snp_query_params:
query_params["timestamp"] = snp_query_params["timestamp"]
if "visit_id" in snp_query_params:
query_params["visit_id"] = snp_query_params["visit_id"]
if "snapshot" in snp_query_params and "visit_id" not in query_params:
query_params["snapshot"] = snp_query_params["snapshot"]
elif snapshot_context:
query_params = {"snapshot": snapshot_context["snapshot_id"]}
if snapshot_context["release"]:
query_params["release"] = snapshot_context["release"]
elif snapshot_context["branch"] and snapshot_context["branch"] not in (
"HEAD",
snapshot_context["revision_id"],
):
query_params["branch"] = snapshot_context["branch"]
elif snapshot_context["revision_id"]:
query_params["revision"] = snapshot_context["revision_id"]
return query_params
def gen_revision_url(revision_id, snapshot_context=None):
"""
Utility function for generating an url to a revision.
Args:
revision_id (str): a revision id
snapshot_context (dict): if provided, generate snapshot-dependent
browsing url
Returns:
str: The url to browse the revision
"""
query_params = _snapshot_context_query_params(snapshot_context)
# remove query parameters not needed for a revision view
query_params.pop("revision", None)
query_params.pop("release", None)
return reverse(
"browse-revision", url_args={"sha1_git": revision_id}, query_params=query_params
)
def gen_revision_link(
revision_id,
shorten_id=False,
snapshot_context=None,
link_text="Browse",
link_attrs={"class": "btn btn-default btn-sm", "role": "button"},
):
"""
Utility function for generating a link to a revision HTML view
to insert in Django templates.
Args:
revision_id (str): a revision id
shorten_id (boolean): whether to shorten the revision id to 7
characters for the link text
snapshot_context (dict): if provided, generate snapshot-dependent
browsing link
link_text (str): optional text for the generated link
(the revision id will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
str: An HTML link in the form 'revision_id'
"""
if not revision_id:
return None
revision_url = gen_revision_url(revision_id, snapshot_context)
if shorten_id:
return gen_link(revision_url, revision_id[:7], link_attrs)
else:
if not link_text:
link_text = revision_id
return gen_link(revision_url, link_text, link_attrs)
def gen_directory_link(
sha1_git,
snapshot_context=None,
link_text="Browse",
link_attrs={"class": "btn btn-default btn-sm", "role": "button"},
):
"""
Utility function for generating a link to a directory HTML view
to insert in Django templates.
Args:
sha1_git (str): directory identifier
link_text (str): optional text for the generated link
(the directory id will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form 'link_text'
"""
if not sha1_git:
return None
query_params = _snapshot_context_query_params(snapshot_context)
directory_url = reverse(
"browse-directory", url_args={"sha1_git": sha1_git}, query_params=query_params
)
if not link_text:
link_text = sha1_git
return gen_link(directory_url, link_text, link_attrs)
def gen_snapshot_link(
snapshot_id,
snapshot_context=None,
link_text="Browse",
link_attrs={"class": "btn btn-default btn-sm", "role": "button"},
):
"""
Utility function for generating a link to a snapshot HTML view
to insert in Django templates.
Args:
snapshot_id (str): snapshot identifier
link_text (str): optional text for the generated link
(the snapshot id will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form 'link_text'
"""
query_params = _snapshot_context_query_params(snapshot_context)
snapshot_url = reverse(
"browse-snapshot",
url_args={"snapshot_id": snapshot_id},
query_params=query_params,
)
if not link_text:
link_text = snapshot_id
return gen_link(snapshot_url, link_text, link_attrs)
def gen_content_link(
sha1_git,
snapshot_context=None,
link_text="Browse",
link_attrs={"class": "btn btn-default btn-sm", "role": "button"},
):
"""
Utility function for generating a link to a content HTML view
to insert in Django templates.
Args:
sha1_git (str): content identifier
link_text (str): optional text for the generated link
(the content sha1_git will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form 'link_text'
"""
if not sha1_git:
return None
query_params = _snapshot_context_query_params(snapshot_context)
content_url = reverse(
"browse-content",
url_args={"query_string": "sha1_git:" + sha1_git},
query_params=query_params,
)
if not link_text:
link_text = sha1_git
return gen_link(content_url, link_text, link_attrs)
def get_revision_log_url(revision_id, snapshot_context=None):
"""
Utility function for getting the URL for a revision log HTML view
(possibly in the context of an origin).
Args:
revision_id (str): revision identifier the history heads to
snapshot_context (dict): if provided, generate snapshot-dependent
browsing link
Returns:
The revision log view URL
"""
query_params = {}
if snapshot_context:
query_params = _snapshot_context_query_params(snapshot_context)
query_params["revision"] = revision_id
if snapshot_context and snapshot_context["origin_info"]:
revision_log_url = reverse("browse-origin-log", query_params=query_params)
elif snapshot_context:
url_args = {"snapshot_id": snapshot_context["snapshot_id"]}
del query_params["snapshot"]
revision_log_url = reverse(
"browse-snapshot-log", url_args=url_args, query_params=query_params
)
else:
revision_log_url = reverse(
"browse-revision-log", url_args={"sha1_git": revision_id}
)
return revision_log_url
def gen_revision_log_link(
revision_id,
snapshot_context=None,
link_text="Browse",
link_attrs={"class": "btn btn-default btn-sm", "role": "button"},
):
"""
Utility function for generating a link to a revision log HTML view
(possibly in the context of an origin) to insert in Django templates.
Args:
revision_id (str): revision identifier the history heads to
snapshot_context (dict): if provided, generate snapshot-dependent
browsing link
link_text (str): optional text to use for the generated link
(the revision id will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form
'link_text'
"""
if not revision_id:
return None
revision_log_url = get_revision_log_url(revision_id, snapshot_context)
if not link_text:
link_text = revision_id
return gen_link(revision_log_url, link_text, link_attrs)
def gen_person_mail_link(person, link_text=None):
"""
Utility function for generating a mail link to a person to insert
in Django templates.
Args:
person (dict): dictionary containing person data
(*name*, *email*, *fullname*)
link_text (str): optional text to use for the generated mail link
(the person name will be used by default)
Returns:
str: A mail link to the person or the person name if no email is
present in person data
"""
person_name = person["name"] or person["fullname"] or "None"
if link_text is None:
link_text = person_name
person_email = person["email"] if person["email"] else None
if person_email is None and "@" in person_name and " " not in person_name:
person_email = person_name
if person_email:
return gen_link(url="mailto:%s" % person_email, link_text=link_text)
else:
return person_name
def gen_release_link(
sha1_git,
snapshot_context=None,
link_text="Browse",
link_attrs={"class": "btn btn-default btn-sm", "role": "button"},
):
"""
Utility function for generating a link to a release HTML view
to insert in Django templates.
Args:
sha1_git (str): release identifier
link_text (str): optional text for the generated link
(the release id will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form 'link_text'
"""
query_params = _snapshot_context_query_params(snapshot_context)
release_url = reverse(
"browse-release", url_args={"sha1_git": sha1_git}, query_params=query_params
)
if not link_text:
link_text = sha1_git
return gen_link(release_url, link_text, link_attrs)
def format_log_entries(revision_log, per_page, snapshot_context=None):
"""
Utility functions that process raw revision log data for HTML display.
Its purpose is to:
* add links to relevant browse views
* format date in human readable format
* truncate the message log
Args:
revision_log (list): raw revision log as returned by the swh-web api
per_page (int): number of log entries per page
snapshot_context (dict): if provided, generate snapshot-dependent
browsing link
"""
revision_log_data = []
for i, rev in enumerate(revision_log):
if i == per_page:
break
author_name = "None"
author_fullname = "None"
committer_fullname = "None"
if rev["author"]:
author_name = gen_person_mail_link(rev["author"])
author_fullname = rev["author"]["fullname"]
if rev["committer"]:
committer_fullname = rev["committer"]["fullname"]
author_date = format_utc_iso_date(rev["date"])
committer_date = format_utc_iso_date(rev["committer_date"])
tooltip = "revision %s\n" % rev["id"]
tooltip += "author: %s\n" % author_fullname
tooltip += "author date: %s\n" % author_date
tooltip += "committer: %s\n" % committer_fullname
tooltip += "committer date: %s\n\n" % committer_date
if rev["message"]:
tooltip += textwrap.indent(rev["message"], " " * 4)
revision_log_data.append(
{
"author": author_name,
"id": rev["id"][:7],
"message": rev["message"],
"date": author_date,
"commit_date": committer_date,
"url": gen_revision_url(rev["id"], snapshot_context),
"tooltip": tooltip,
}
)
return revision_log_data
# list of common readme names ordered by preference
# (lower indices have higher priority)
_common_readme_names = [
"readme.markdown",
"readme.md",
"readme.rst",
"readme.txt",
"readme",
]
def get_readme_to_display(readmes):
"""
Process a list of readme files found in a directory
in order to find the adequate one to display.
Args:
readmes: a list of dict where keys are readme file names and values
are readme sha1s
Returns:
A tuple (readme_name, readme_sha1)
"""
readme_name = None
readme_url = None
readme_sha1 = None
readme_html = None
lc_readmes = {k.lower(): {"orig_name": k, "sha1": v} for k, v in readmes.items()}
# look for readme names according to the preference order
# defined by the _common_readme_names list
for common_readme_name in _common_readme_names:
if common_readme_name in lc_readmes:
readme_name = lc_readmes[common_readme_name]["orig_name"]
readme_sha1 = lc_readmes[common_readme_name]["sha1"]
readme_url = reverse(
"browse-content-raw",
url_args={"query_string": readme_sha1},
query_params={"re_encode": "true"},
)
break
# otherwise pick the first readme like file if any
if not readme_name and len(readmes.items()) > 0:
readme_name = next(iter(readmes))
readme_sha1 = readmes[readme_name]
readme_url = reverse(
"browse-content-raw",
url_args={"query_string": readme_sha1},
query_params={"re_encode": "true"},
)
# convert rst README to html server side as there is
# no viable solution to perform that task client side
if readme_name and readme_name.endswith(".rst"):
@django_cache(
catch_exception=True,
exception_return_value="Readme bytes are not available",
)
def _rst_readme_to_html(readme_sha1):
rst_doc = request_content(readme_sha1)
return rst_to_html(rst_doc["raw_data"])
readme_html = _rst_readme_to_html(readme_sha1)
return readme_name, readme_url, readme_html
diff --git a/swh/web/browse/views/content.py b/swh/web/browse/views/content.py
index b69241e8..7d0995a1 100644
--- a/swh/web/browse/views/content.py
+++ b/swh/web/browse/views/content.py
@@ -1,452 +1,455 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import difflib
from distutils.util import strtobool
-import sentry_sdk
-
from django.http import HttpResponse, JsonResponse
from django.shortcuts import redirect, render
from swh.model.hashutil import hash_to_hex
from swh.model.swhids import ObjectType
from swh.web.browse.browseurls import browse_route
from swh.web.browse.snapshot_context import get_snapshot_context
from swh.web.browse.utils import (
content_display_max_size,
gen_link,
prepare_content_for_display,
request_content,
)
from swh.web.common import archive, highlightjs, query
-from swh.web.common.exc import BadInputExc, NotFoundExc, http_status_code_message
+from swh.web.common.exc import (
+ BadInputExc,
+ NotFoundExc,
+ http_status_code_message,
+ sentry_capture_exception,
+)
from swh.web.common.identifiers import get_swhids_info
from swh.web.common.typing import ContentMetadata, SWHObjectInfo
from swh.web.common.utils import gen_path_info, reverse, swh_object_icons
@browse_route(
r"content/(?P[0-9a-z_:]*[0-9a-f]+.)/raw/",
view_name="browse-content-raw",
checksum_args=["query_string"],
)
def content_raw(request, query_string):
"""Django view that produces a raw display of a content identified
by its hash value.
The url that points to it is
:http:get:`/browse/content/[(algo_hash):](hash)/raw/`
"""
re_encode = bool(strtobool(request.GET.get("re_encode", "false")))
algo, checksum = query.parse_hash(query_string)
checksum = hash_to_hex(checksum)
content_data = request_content(query_string, max_size=None, re_encode=re_encode)
filename = request.GET.get("filename", None)
if not filename:
filename = "%s_%s" % (algo, checksum)
if (
content_data["mimetype"].startswith("text/")
or content_data["mimetype"] == "inode/x-empty"
):
response = HttpResponse(content_data["raw_data"], content_type="text/plain")
response["Content-disposition"] = "filename=%s" % filename
else:
response = HttpResponse(
content_data["raw_data"], content_type="application/octet-stream"
)
response["Content-disposition"] = "attachment; filename=%s" % filename
return response
_auto_diff_size_limit = 20000
@browse_route(
r"content/(?P.*)/diff/(?P.*)/",
view_name="diff-contents",
)
def _contents_diff(request, from_query_string, to_query_string):
"""
Browse endpoint used to compute unified diffs between two contents.
Diffs are generated only if the two contents are textual.
By default, diffs whose size are greater than 20 kB will
not be generated. To force the generation of large diffs,
the 'force' boolean query parameter must be used.
Args:
request: input django http request
from_query_string: a string of the form "[ALGO_HASH:]HASH" where
optional ALGO_HASH can be either ``sha1``, ``sha1_git``,
``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH
the hexadecimal representation of the hash value identifying
the first content
to_query_string: same as above for identifying the second content
Returns:
A JSON object containing the unified diff.
"""
diff_data = {}
content_from = None
content_to = None
content_from_size = 0
content_to_size = 0
content_from_lines = []
content_to_lines = []
force = request.GET.get("force", "false")
path = request.GET.get("path", None)
language = "plaintext"
force = bool(strtobool(force))
if from_query_string == to_query_string:
diff_str = "File renamed without changes"
else:
try:
text_diff = True
if from_query_string:
content_from = request_content(from_query_string, max_size=None)
content_from_display_data = prepare_content_for_display(
content_from["raw_data"], content_from["mimetype"], path
)
language = content_from_display_data["language"]
content_from_size = content_from["length"]
if not (
content_from["mimetype"].startswith("text/")
or content_from["mimetype"] == "inode/x-empty"
):
text_diff = False
if text_diff and to_query_string:
content_to = request_content(to_query_string, max_size=None)
content_to_display_data = prepare_content_for_display(
content_to["raw_data"], content_to["mimetype"], path
)
language = content_to_display_data["language"]
content_to_size = content_to["length"]
if not (
content_to["mimetype"].startswith("text/")
or content_to["mimetype"] == "inode/x-empty"
):
text_diff = False
diff_size = abs(content_to_size - content_from_size)
if not text_diff:
diff_str = "Diffs are not generated for non textual content"
language = "plaintext"
elif not force and diff_size > _auto_diff_size_limit:
diff_str = "Large diffs are not automatically computed"
language = "plaintext"
else:
if content_from:
content_from_lines = (
content_from["raw_data"].decode("utf-8").splitlines(True)
)
if content_from_lines and content_from_lines[-1][-1] != "\n":
content_from_lines[-1] += "[swh-no-nl-marker]\n"
if content_to:
content_to_lines = (
content_to["raw_data"].decode("utf-8").splitlines(True)
)
if content_to_lines and content_to_lines[-1][-1] != "\n":
content_to_lines[-1] += "[swh-no-nl-marker]\n"
diff_lines = difflib.unified_diff(content_from_lines, content_to_lines)
diff_str = "".join(list(diff_lines)[2:])
except Exception as exc:
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
diff_str = str(exc)
diff_data["diff_str"] = diff_str
diff_data["language"] = language
return JsonResponse(diff_data)
def _get_content_from_request(request):
path = request.GET.get("path")
if path is None:
raise BadInputExc("The path query parameter must be provided.")
snapshot = request.GET.get("snapshot") or request.GET.get("snapshot_id")
origin_url = request.GET.get("origin_url")
if snapshot is None and origin_url is None:
raise BadInputExc(
"The origin_url or snapshot query parameters must be provided."
)
visit_id = int(request.GET.get("visit_id", 0))
snapshot_context = get_snapshot_context(
snapshot_id=snapshot,
origin_url=origin_url,
path=path,
timestamp=request.GET.get("timestamp"),
visit_id=visit_id or None,
branch_name=request.GET.get("branch"),
release_name=request.GET.get("release"),
browse_context="content",
)
root_directory = snapshot_context["root_directory"]
return archive.lookup_directory_with_path(root_directory, path)
@browse_route(
r"content/(?P[0-9a-z_:]*[0-9a-f]+.)/",
r"content/",
view_name="browse-content",
checksum_args=["query_string"],
)
def content_display(request, query_string=None):
"""Django view that produces an HTML display of a content identified
by its hash value.
The URLs that points to it are
:http:get:`/browse/content/[(algo_hash):](hash)/`
:http:get:`/browse/content/`
"""
if query_string is None:
# this case happens when redirected from origin/content or snapshot/content
content = _get_content_from_request(request)
return redirect(
reverse(
"browse-content",
url_args={"query_string": f"sha1_git:{content['target']}"},
query_params=request.GET,
),
)
algo, checksum = query.parse_hash(query_string)
checksum = hash_to_hex(checksum)
origin_url = request.GET.get("origin_url")
selected_language = request.GET.get("language")
if not origin_url:
origin_url = request.GET.get("origin")
snapshot_id = request.GET.get("snapshot") or request.GET.get("snapshot_id")
path = request.GET.get("path")
content_data = {}
error_info = {"status_code": 200, "description": None}
try:
content_data = request_content(query_string)
except NotFoundExc as e:
error_info["status_code"] = 404
error_info["description"] = f"NotFoundExc: {str(e)}"
snapshot_context = None
if origin_url is not None or snapshot_id is not None:
try:
visit_id = int(request.GET.get("visit_id", 0))
snapshot_context = get_snapshot_context(
origin_url=origin_url,
snapshot_id=snapshot_id,
timestamp=request.GET.get("timestamp"),
visit_id=visit_id or None,
branch_name=request.GET.get("branch"),
release_name=request.GET.get("release"),
revision_id=request.GET.get("revision"),
path=path,
browse_context="content",
)
except NotFoundExc as e:
if str(e).startswith("Origin"):
raw_cnt_url = reverse(
"browse-content", url_args={"query_string": query_string}
)
error_message = (
"The Software Heritage archive has a content "
"with the hash you provided but the origin "
"mentioned in your request appears broken: %s. "
"Please check the URL and try again.\n\n"
"Nevertheless, you can still browse the content "
"without origin information: %s"
% (gen_link(origin_url), gen_link(raw_cnt_url))
)
raise NotFoundExc(error_message)
else:
raise e
content = None
language = None
mimetype = None
if content_data.get("raw_data") is not None:
content_display_data = prepare_content_for_display(
content_data["raw_data"], content_data["mimetype"], path
)
content = content_display_data["content_data"]
language = content_display_data["language"]
mimetype = content_display_data["mimetype"]
# Override language with user-selected language
if selected_language is not None:
language = selected_language
available_languages = None
if mimetype and "text/" in mimetype:
available_languages = highlightjs.get_supported_languages()
filename = None
path_info = None
directory_id = None
root_dir = None
if snapshot_context:
root_dir = snapshot_context.get("root_directory")
query_params = snapshot_context["query_params"] if snapshot_context else {}
breadcrumbs = []
if path:
split_path = path.split("/")
root_dir = root_dir or split_path[0]
filename = split_path[-1]
if root_dir != path:
path = path.replace(root_dir + "/", "")
path = path[: -len(filename)]
path_info = gen_path_info(path)
query_params.pop("path", None)
dir_url = reverse(
"browse-directory",
url_args={"sha1_git": root_dir},
query_params=query_params,
)
breadcrumbs.append({"name": root_dir[:7], "url": dir_url})
for pi in path_info:
query_params["path"] = pi["path"]
dir_url = reverse(
"browse-directory",
url_args={"sha1_git": root_dir},
query_params=query_params,
)
breadcrumbs.append({"name": pi["name"], "url": dir_url})
breadcrumbs.append({"name": filename, "url": None})
if path and root_dir != path:
dir_info = archive.lookup_directory_with_path(root_dir, path)
directory_id = dir_info["target"]
elif root_dir != path:
directory_id = root_dir
else:
root_dir = None
query_params = {"filename": filename}
content_checksums = content_data.get("checksums", {})
content_url = reverse(
"browse-content",
url_args={"query_string": query_string},
)
content_raw_url = reverse(
"browse-content-raw",
url_args={"query_string": query_string},
query_params=query_params,
)
content_metadata = ContentMetadata(
object_type=ObjectType.CONTENT,
object_id=content_checksums.get("sha1_git"),
sha1=content_checksums.get("sha1"),
sha1_git=content_checksums.get("sha1_git"),
sha256=content_checksums.get("sha256"),
blake2s256=content_checksums.get("blake2s256"),
content_url=content_url,
mimetype=content_data.get("mimetype"),
encoding=content_data.get("encoding"),
size=content_data.get("length", 0),
language=content_data.get("language"),
root_directory=root_dir,
path=f"/{path}" if path else None,
filename=filename or "",
directory=directory_id,
revision=None,
release=None,
snapshot=None,
origin_url=origin_url,
)
swh_objects = []
if content_checksums:
swh_objects.append(
SWHObjectInfo(
object_type=ObjectType.CONTENT,
object_id=content_checksums.get("sha1_git"),
)
)
if directory_id:
swh_objects.append(
SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory_id)
)
if snapshot_context:
if snapshot_context["revision_id"]:
swh_objects.append(
SWHObjectInfo(
object_type=ObjectType.REVISION,
object_id=snapshot_context["revision_id"],
)
)
swh_objects.append(
SWHObjectInfo(
object_type=ObjectType.SNAPSHOT,
object_id=snapshot_context["snapshot_id"],
)
)
if snapshot_context["release_id"]:
swh_objects.append(
SWHObjectInfo(
object_type=ObjectType.RELEASE,
object_id=snapshot_context["release_id"],
)
)
swhids_info = get_swhids_info(
swh_objects,
snapshot_context,
extra_context=content_metadata,
)
heading = "Content - %s" % content_checksums.get("sha1_git")
if breadcrumbs:
content_path = "/".join([bc["name"] for bc in breadcrumbs])
heading += " - %s" % content_path
return render(
request,
"browse/content.html",
{
"heading": heading,
"swh_object_id": swhids_info[0]["swhid"] if swhids_info else "",
"swh_object_name": "Content",
"swh_object_metadata": content_metadata,
"content": content,
"content_size": content_data.get("length"),
"max_content_size": content_display_max_size,
"filename": filename,
"encoding": content_data.get("encoding"),
"mimetype": mimetype,
"language": language,
"available_languages": available_languages,
"breadcrumbs": breadcrumbs,
"top_right_link": {
"url": content_raw_url,
"icon": swh_object_icons["content"],
"text": "Raw File",
},
"snapshot_context": snapshot_context,
"vault_cooking": None,
"show_actions": True,
"swhids_info": swhids_info,
"error_code": error_info["status_code"],
"error_message": http_status_code_message.get(error_info["status_code"]),
"error_description": error_info["description"],
},
status=error_info["status_code"],
)
diff --git a/swh/web/browse/views/directory.py b/swh/web/browse/views/directory.py
index cb598ea6..09d852a1 100644
--- a/swh/web/browse/views/directory.py
+++ b/swh/web/browse/views/directory.py
@@ -1,292 +1,294 @@
-# Copyright (C) 2017-2021 The Software Heritage developers
+# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
-import sentry_sdk
-
from django.http import HttpResponse
from django.shortcuts import redirect, render
from swh.model.swhids import ObjectType
from swh.web.browse.browseurls import browse_route
from swh.web.browse.snapshot_context import get_snapshot_context
from swh.web.browse.utils import gen_link, get_directory_entries, get_readme_to_display
from swh.web.common import archive
-from swh.web.common.exc import NotFoundExc, http_status_code_message
+from swh.web.common.exc import (
+ NotFoundExc,
+ http_status_code_message,
+ sentry_capture_exception,
+)
from swh.web.common.identifiers import get_swhids_info
from swh.web.common.typing import DirectoryMetadata, SWHObjectInfo
from swh.web.common.utils import gen_path_info, reverse, swh_object_icons
def _directory_browse(request, sha1_git, path=None):
root_sha1_git = sha1_git
error_info = {"status_code": 200, "description": None}
if path:
try:
dir_info = archive.lookup_directory_with_path(sha1_git, path)
sha1_git = dir_info["target"]
except NotFoundExc as e:
error_info["status_code"] = 404
error_info["description"] = f"NotFoundExc: {str(e)}"
sha1_git = None
dirs, files = [], []
if sha1_git is not None:
dirs, files = get_directory_entries(sha1_git)
origin_url = request.GET.get("origin_url")
if not origin_url:
origin_url = request.GET.get("origin")
snapshot_id = request.GET.get("snapshot")
snapshot_context = None
if origin_url is not None or snapshot_id is not None:
try:
snapshot_context = get_snapshot_context(
snapshot_id=snapshot_id,
origin_url=origin_url,
branch_name=request.GET.get("branch"),
release_name=request.GET.get("release"),
revision_id=request.GET.get("revision"),
path=path,
)
except NotFoundExc as e:
if str(e).startswith("Origin"):
raw_dir_url = reverse(
"browse-directory", url_args={"sha1_git": sha1_git}
)
error_message = (
"The Software Heritage archive has a directory "
"with the hash you provided but the origin "
"mentioned in your request appears broken: %s. "
"Please check the URL and try again.\n\n"
"Nevertheless, you can still browse the directory "
"without origin information: %s"
% (gen_link(origin_url), gen_link(raw_dir_url))
)
raise NotFoundExc(error_message)
else:
raise e
path_info = gen_path_info(path)
query_params = snapshot_context["query_params"] if snapshot_context else {}
breadcrumbs = []
breadcrumbs.append(
{
"name": root_sha1_git[:7],
"url": reverse(
"browse-directory",
url_args={"sha1_git": root_sha1_git},
query_params={**query_params, "path": None},
),
}
)
for pi in path_info:
breadcrumbs.append(
{
"name": pi["name"],
"url": reverse(
"browse-directory",
url_args={"sha1_git": root_sha1_git},
query_params={
**query_params,
"path": pi["path"],
},
),
}
)
path = "" if path is None else (path + "/")
for d in dirs:
if d["type"] == "rev":
d["url"] = reverse(
"browse-revision",
url_args={"sha1_git": d["target"]},
query_params=query_params,
)
else:
d["url"] = reverse(
"browse-directory",
url_args={"sha1_git": root_sha1_git},
query_params={
**query_params,
"path": path + d["name"],
},
)
sum_file_sizes = 0
readmes = {}
for f in files:
query_string = "sha1_git:" + f["target"]
f["url"] = reverse(
"browse-content",
url_args={"query_string": query_string},
query_params={
**query_params,
"path": root_sha1_git + "/" + path + f["name"],
},
)
if f["length"] is not None:
sum_file_sizes += f["length"]
if f["name"].lower().startswith("readme"):
readmes[f["name"]] = f["checksums"]["sha1"]
readme_name, readme_url, readme_html = get_readme_to_display(readmes)
dir_metadata = DirectoryMetadata(
object_type=ObjectType.DIRECTORY,
object_id=sha1_git,
directory=root_sha1_git,
nb_files=len(files),
nb_dirs=len(dirs),
sum_file_sizes=sum_file_sizes,
root_directory=root_sha1_git,
path=f"/{path}" if path else None,
revision=None,
revision_found=None,
release=None,
snapshot=None,
)
vault_cooking = {
"directory_context": True,
"directory_swhid": f"swh:1:dir:{sha1_git}",
"revision_context": False,
"revision_swhid": None,
}
swh_objects = [SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=sha1_git)]
if snapshot_context:
if snapshot_context["revision_id"]:
swh_objects.append(
SWHObjectInfo(
object_type=ObjectType.REVISION,
object_id=snapshot_context["revision_id"],
)
)
swh_objects.append(
SWHObjectInfo(
object_type=ObjectType.SNAPSHOT,
object_id=snapshot_context["snapshot_id"],
)
)
if snapshot_context["release_id"]:
swh_objects.append(
SWHObjectInfo(
object_type=ObjectType.RELEASE,
object_id=snapshot_context["release_id"],
)
)
swhids_info = get_swhids_info(swh_objects, snapshot_context, dir_metadata)
heading = "Directory - %s" % sha1_git
if breadcrumbs:
dir_path = "/".join([bc["name"] for bc in breadcrumbs]) + "/"
heading += " - %s" % dir_path
top_right_link = None
if (
snapshot_context is not None
and not snapshot_context["is_empty"]
and snapshot_context["revision_id"] is not None
):
history_url = reverse(
"browse-revision-log",
url_args={"sha1_git": snapshot_context["revision_id"]},
query_params=query_params,
)
top_right_link = {
"url": history_url,
"icon": swh_object_icons["revisions history"],
"text": "History",
}
return render(
request,
"browse/directory.html",
{
"heading": heading,
"swh_object_id": swhids_info[0]["swhid"],
"swh_object_name": "Directory",
"swh_object_metadata": dir_metadata,
"dirs": dirs,
"files": files,
"breadcrumbs": breadcrumbs,
"top_right_link": top_right_link,
"readme_name": readme_name,
"readme_url": readme_url,
"readme_html": readme_html,
"snapshot_context": snapshot_context,
"vault_cooking": vault_cooking,
"show_actions": True,
"swhids_info": swhids_info,
"error_code": error_info["status_code"],
"error_message": http_status_code_message.get(error_info["status_code"]),
"error_description": error_info["description"],
},
status=error_info["status_code"],
)
@browse_route(
r"directory/(?P[0-9a-f]+)/",
view_name="browse-directory",
checksum_args=["sha1_git"],
)
def directory_browse(request, sha1_git):
"""Django view for browsing the content of a directory identified
by its sha1_git value.
The url that points to it is
:http:get:`/browse/directory/(sha1_git)/`
"""
return _directory_browse(request, sha1_git, request.GET.get("path"))
@browse_route(
r"directory/(?P[0-9a-f]+)/(?P.+)/",
view_name="browse-directory-legacy",
checksum_args=["sha1_git"],
)
def directory_browse_legacy(request, sha1_git, path):
"""Django view for browsing the content of a directory identified
by its sha1_git value.
The url that points to it is
:http:get:`/browse/directory/(sha1_git)/(path)/`
"""
return _directory_browse(request, sha1_git, path)
@browse_route(
r"directory/resolve/content-path/(?P[0-9a-f]+)/",
view_name="browse-directory-resolve-content-path",
checksum_args=["sha1_git"],
)
def _directory_resolve_content_path(request, sha1_git):
"""
Internal endpoint redirecting to data url for a specific file path
relative to a root directory.
"""
try:
path = os.path.normpath(request.GET.get("path"))
if not path.startswith("../"):
dir_info = archive.lookup_directory_with_path(sha1_git, path)
if dir_info["type"] == "file":
sha1 = dir_info["checksums"]["sha1"]
data_url = reverse(
"browse-content-raw", url_args={"query_string": sha1}
)
return redirect(data_url)
except Exception as exc:
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
return HttpResponse(status=404)
diff --git a/swh/web/browse/views/release.py b/swh/web/browse/views/release.py
index e9faaa94..6f3d9dec 100644
--- a/swh/web/browse/views/release.py
+++ b/swh/web/browse/views/release.py
@@ -1,245 +1,243 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import sentry_sdk
-
from django.shortcuts import render
from swh.model.swhids import ObjectType
from swh.web.browse.browseurls import browse_route
from swh.web.browse.snapshot_context import get_snapshot_context
from swh.web.browse.utils import (
gen_content_link,
gen_directory_link,
gen_link,
gen_person_mail_link,
gen_release_link,
gen_revision_link,
)
from swh.web.common import archive
-from swh.web.common.exc import NotFoundExc
+from swh.web.common.exc import NotFoundExc, sentry_capture_exception
from swh.web.common.identifiers import get_swhids_info
from swh.web.common.typing import ReleaseMetadata, SWHObjectInfo
from swh.web.common.utils import format_utc_iso_date, reverse
@browse_route(
r"release/(?P[0-9a-f]+)/",
view_name="browse-release",
checksum_args=["sha1_git"],
)
def release_browse(request, sha1_git):
"""
Django view that produces an HTML display of a release
identified by its id.
The url that points to it is :http:get:`/browse/release/(sha1_git)/`.
"""
release = archive.lookup_release(sha1_git)
snapshot_context = {}
origin_info = None
snapshot_id = request.GET.get("snapshot_id")
if not snapshot_id:
snapshot_id = request.GET.get("snapshot")
origin_url = request.GET.get("origin_url")
if not origin_url:
origin_url = request.GET.get("origin")
timestamp = request.GET.get("timestamp")
visit_id = int(request.GET.get("visit_id", 0))
if origin_url:
try:
snapshot_context = get_snapshot_context(
snapshot_id,
origin_url,
timestamp,
visit_id or None,
release_name=release["name"],
)
except NotFoundExc as e:
raw_rel_url = reverse("browse-release", url_args={"sha1_git": sha1_git})
error_message = (
"The Software Heritage archive has a release "
"with the hash you provided but the origin "
"mentioned in your request appears broken: %s. "
"Please check the URL and try again.\n\n"
"Nevertheless, you can still browse the release "
"without origin information: %s"
% (gen_link(origin_url), gen_link(raw_rel_url))
)
if str(e).startswith("Origin"):
raise NotFoundExc(error_message)
else:
raise e
origin_info = snapshot_context["origin_info"]
elif snapshot_id:
snapshot_context = get_snapshot_context(
snapshot_id, release_name=release["name"]
)
snapshot_id = snapshot_context.get("snapshot_id", None)
release_metadata = ReleaseMetadata(
object_type=ObjectType.RELEASE,
object_id=sha1_git,
release=sha1_git,
author=release["author"]["fullname"] if release["author"] else "None",
author_url=gen_person_mail_link(release["author"])
if release["author"]
else "None",
date=format_utc_iso_date(release["date"]),
name=release["name"],
synthetic=release["synthetic"],
target=release["target"],
target_type=release["target_type"],
snapshot=snapshot_id,
origin_url=origin_url,
)
release_note_lines = []
if release["message"]:
release_note_lines = release["message"].split("\n")
swh_objects = [SWHObjectInfo(object_type=ObjectType.RELEASE, object_id=sha1_git)]
vault_cooking = None
rev_directory = None
target_link = None
if release["target_type"] == ObjectType.REVISION.name.lower():
target_link = gen_revision_link(
release["target"],
snapshot_context=snapshot_context,
link_text=None,
link_attrs=None,
)
try:
revision = archive.lookup_revision(release["target"])
rev_directory = revision["directory"]
vault_cooking = {
"directory_context": True,
"directory_swhid": f"swh:1:dir:{rev_directory}",
"revision_context": True,
"revision_swhid": f"swh:1:rev:{release['target']}",
}
swh_objects.append(
SWHObjectInfo(
object_type=ObjectType.REVISION, object_id=release["target"]
)
)
swh_objects.append(
SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=rev_directory)
)
except Exception as exc:
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
elif release["target_type"] == ObjectType.DIRECTORY.name.lower():
target_link = gen_directory_link(
release["target"],
snapshot_context=snapshot_context,
link_text=None,
link_attrs=None,
)
try:
# check directory exists
archive.lookup_directory(release["target"])
vault_cooking = {
"directory_context": True,
"directory_swhid": f"swh:1:dir:{release['target']}",
"revision_context": False,
"revision_swhid": None,
}
swh_objects.append(
SWHObjectInfo(
object_type=ObjectType.DIRECTORY, object_id=release["target"]
)
)
except Exception as exc:
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
elif release["target_type"] == ObjectType.CONTENT.name.lower():
target_link = gen_content_link(
release["target"],
snapshot_context=snapshot_context,
link_text=None,
link_attrs=None,
)
swh_objects.append(
SWHObjectInfo(object_type=ObjectType.CONTENT, object_id=release["target"])
)
elif release["target_type"] == ObjectType.RELEASE.name.lower():
target_link = gen_release_link(
release["target"],
snapshot_context=snapshot_context,
link_text=None,
link_attrs=None,
)
rev_directory_url = None
if rev_directory is not None:
if origin_info:
rev_directory_url = reverse(
"browse-origin-directory",
query_params={
"origin_url": origin_info["url"],
"release": release["name"],
"snapshot": snapshot_id,
},
)
elif snapshot_id:
rev_directory_url = reverse(
"browse-snapshot-directory",
url_args={"snapshot_id": snapshot_id},
query_params={"release": release["name"]},
)
else:
rev_directory_url = reverse(
"browse-directory", url_args={"sha1_git": rev_directory}
)
directory_link = None
if rev_directory_url is not None:
directory_link = gen_link(rev_directory_url, rev_directory)
release["directory_link"] = directory_link
release["target_link"] = target_link
if snapshot_context:
snapshot_id = snapshot_context["snapshot_id"]
if snapshot_id:
swh_objects.append(
SWHObjectInfo(object_type=ObjectType.SNAPSHOT, object_id=snapshot_id)
)
swhids_info = get_swhids_info(swh_objects, snapshot_context)
note_header = "None"
if len(release_note_lines) > 0:
note_header = release_note_lines[0]
release["note_header"] = note_header
release["note_body"] = "\n".join(release_note_lines[1:])
heading = "Release - %s" % release["name"]
if snapshot_context:
context_found = "snapshot: %s" % snapshot_context["snapshot_id"]
if origin_info:
context_found = "origin: %s" % origin_info["url"]
heading += " - %s" % context_found
return render(
request,
"browse/release.html",
{
"heading": heading,
"swh_object_id": swhids_info[0]["swhid"],
"swh_object_name": "Release",
"swh_object_metadata": release_metadata,
"release": release,
"snapshot_context": snapshot_context,
"show_actions": True,
"breadcrumbs": None,
"vault_cooking": vault_cooking,
"top_right_link": None,
"swhids_info": swhids_info,
},
)
diff --git a/swh/web/common/highlightjs.py b/swh/web/common/highlightjs.py
index 7e0c7229..362bfd92 100644
--- a/swh/web/common/highlightjs.py
+++ b/swh/web/common/highlightjs.py
@@ -1,183 +1,184 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import functools
import json
from typing import Dict
from pygments.lexers import get_all_lexers, get_lexer_for_filename
-import sentry_sdk
from django.contrib.staticfiles.finders import find
+from swh.web.common.exc import sentry_capture_exception
+
@functools.lru_cache()
def _hljs_languages_data():
with open(str(find("json/highlightjs-languages.json")), "r") as hljs_languages_file:
return json.load(hljs_languages_file)
# set of languages ids that can be highlighted by highlight.js library
@functools.lru_cache()
def _hljs_languages():
return set(_hljs_languages_data()["languages"])
# languages aliases defined in highlight.js
@functools.lru_cache()
def _hljs_languages_aliases():
language_aliases = _hljs_languages_data()["languages_aliases"]
language_aliases.pop("robots.txt", None)
return {
**language_aliases,
"ml": "ocaml",
"bsl": "1c",
"ep": "mojolicious",
"lc": "livecode",
"p": "parser3",
"pde": "processing",
"rsc": "routeros",
"s": "armasm",
"sl": "rsl",
"4dm": "4d",
"kaos": "chaos",
"dfy": "dafny",
"ejs": "eta",
"nev": "never",
"m": "octave",
"shader": "hlsl",
"fx": "hlsl",
"prg": "xsharp",
"xs": "xsharp",
}
# dictionary mapping pygment lexers to hljs languages
_pygments_lexer_to_hljs_language = {} # type: Dict[str, str]
# dictionary mapping mime types to hljs languages
_mime_type_to_hljs_language = {
"text/x-c": "c",
"text/x-c++": "cpp",
"text/x-msdos-batch": "dos",
"text/x-lisp": "lisp",
"text/x-shellscript": "bash",
}
# dictionary mapping filenames to hljs languages
_filename_to_hljs_language = {
"cmakelists.txt": "cmake",
".htaccess": "apache",
"httpd.conf": "apache",
"access.log": "accesslog",
"nginx.log": "accesslog",
"resolv.conf": "dns",
"dockerfile": "docker",
"nginx.conf": "nginx",
"pf.conf": "pf",
"robots.txt": "robots-txt",
}
# function to fill the above dictionaries
def _init_pygments_to_hljs_map():
if len(_pygments_lexer_to_hljs_language) == 0:
hljs_languages = _hljs_languages()
hljs_languages_aliases = _hljs_languages_aliases()
for lexer in get_all_lexers():
lexer_name = lexer[0]
lang_aliases = lexer[1]
lang_mime_types = lexer[3]
lang = None
for lang_alias in lang_aliases:
if lang_alias in hljs_languages:
lang = lang_alias
_pygments_lexer_to_hljs_language[lexer_name] = lang_alias
break
if lang_alias in hljs_languages_aliases:
lang = hljs_languages_aliases[lang_alias]
_pygments_lexer_to_hljs_language[lexer_name] = lang_alias
break
if lang:
for lang_mime_type in lang_mime_types:
if lang_mime_type not in _mime_type_to_hljs_language:
_mime_type_to_hljs_language[lang_mime_type] = lang
def get_hljs_language_from_filename(filename):
"""Function that tries to associate a language supported by highlight.js
from a filename.
Args:
filename: input filename
Returns:
highlight.js language id or None if no correspondence has been found
"""
_init_pygments_to_hljs_map()
if filename:
filename_lower = filename.lower()
if filename_lower in _filename_to_hljs_language:
return _filename_to_hljs_language[filename_lower]
if filename_lower in _hljs_languages():
return filename_lower
exts = filename_lower.split(".")
# check if file extension matches an hljs language
# also handle .ext.in cases
for ext in reversed(exts[-2:]):
if ext in _hljs_languages():
return ext
if ext in _hljs_languages_aliases():
return _hljs_languages_aliases()[ext]
# otherwise use Pygments language database
lexer = None
# try to find a Pygment lexer
try:
lexer = get_lexer_for_filename(filename)
except Exception as exc:
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
# if there is a correspondence between the lexer and an hljs
# language, return it
if lexer and lexer.name in _pygments_lexer_to_hljs_language:
return _pygments_lexer_to_hljs_language[lexer.name]
# otherwise, try to find a match between the file extensions
# associated to the lexer and the hljs language aliases
if lexer:
exts = [ext.replace("*.", "") for ext in lexer.filenames]
for ext in exts:
if ext in _hljs_languages_aliases():
return _hljs_languages_aliases()[ext]
return None
def get_hljs_language_from_mime_type(mime_type):
"""Function that tries to associate a language supported by highlight.js
from a mime type.
Args:
mime_type: input mime type
Returns:
highlight.js language id or None if no correspondence has been found
"""
_init_pygments_to_hljs_map()
if mime_type and mime_type in _mime_type_to_hljs_language:
return _mime_type_to_hljs_language[mime_type]
return None
@functools.lru_cache()
def get_supported_languages():
"""
Return the list of programming languages that can be highlighted using the
highlight.js library.
Returns:
List[str]: the list of supported languages
"""
return sorted(list(_hljs_languages()))
diff --git a/swh/web/common/origin_save.py b/swh/web/common/origin_save.py
index 1a6d3647..68c91f0f 100644
--- a/swh/web/common/origin_save.py
+++ b/swh/web/common/origin_save.py
@@ -1,930 +1,934 @@
# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta, timezone
from functools import lru_cache
from itertools import product
import json
import logging
from typing import Any, Dict, List, Optional, Tuple
from prometheus_client import Gauge
import requests
-import sentry_sdk
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.core.validators import URLValidator
from django.db.models import Q, QuerySet
from django.utils.html import escape
from swh.scheduler.utils import create_oneshot_task_dict
from swh.web.common import archive
-from swh.web.common.exc import BadInputExc, ForbiddenExc, NotFoundExc
+from swh.web.common.exc import (
+ BadInputExc,
+ ForbiddenExc,
+ NotFoundExc,
+ sentry_capture_exception,
+)
from swh.web.common.models import (
SAVE_REQUEST_ACCEPTED,
SAVE_REQUEST_PENDING,
SAVE_REQUEST_REJECTED,
SAVE_TASK_FAILED,
SAVE_TASK_NOT_CREATED,
SAVE_TASK_NOT_YET_SCHEDULED,
SAVE_TASK_RUNNING,
SAVE_TASK_SCHEDULED,
SAVE_TASK_SUCCEEDED,
VISIT_STATUS_CREATED,
VISIT_STATUS_ONGOING,
SaveAuthorizedOrigin,
SaveOriginRequest,
SaveUnauthorizedOrigin,
)
from swh.web.common.typing import OriginExistenceCheckInfo, SaveOriginRequestInfo
from swh.web.common.utils import SWH_WEB_METRICS_REGISTRY, parse_iso8601_date_to_utc
from swh.web.config import get_config, scheduler
logger = logging.getLogger(__name__)
# Number of days in the past to lookup for information
MAX_THRESHOLD_DAYS = 30
# Non terminal visit statuses which needs updates
NON_TERMINAL_STATUSES = [
VISIT_STATUS_CREATED,
VISIT_STATUS_ONGOING,
]
def get_origin_save_authorized_urls() -> List[str]:
"""
Get the list of origin url prefixes authorized to be
immediately loaded into the archive (whitelist).
Returns:
list: The list of authorized origin url prefix
"""
return [origin.url for origin in SaveAuthorizedOrigin.objects.all()]
def get_origin_save_unauthorized_urls() -> List[str]:
"""
Get the list of origin url prefixes forbidden to be
loaded into the archive (blacklist).
Returns:
list: the list of unauthorized origin url prefix
"""
return [origin.url for origin in SaveUnauthorizedOrigin.objects.all()]
def can_save_origin(origin_url: str, bypass_pending_review: bool = False) -> str:
"""
Check if a software origin can be saved into the archive.
Based on the origin url, the save request will be either:
* immediately accepted if the url is whitelisted
* rejected if the url is blacklisted
* put in pending state for manual review otherwise
Args:
origin_url (str): the software origin url to check
Returns:
str: the origin save request status, either **accepted**,
**rejected** or **pending**
"""
# origin url may be blacklisted
for url_prefix in get_origin_save_unauthorized_urls():
if origin_url.startswith(url_prefix):
return SAVE_REQUEST_REJECTED
# if the origin url is in the white list, it can be immediately saved
for url_prefix in get_origin_save_authorized_urls():
if origin_url.startswith(url_prefix):
return SAVE_REQUEST_ACCEPTED
# otherwise, the origin url needs to be manually verified if the user
# that submitted it does not have special permission
if bypass_pending_review:
# mark the origin URL as trusted in that case
SaveAuthorizedOrigin.objects.get_or_create(url=origin_url)
return SAVE_REQUEST_ACCEPTED
else:
return SAVE_REQUEST_PENDING
# map visit type to scheduler task
# TODO: do not hardcode the task name here (T1157)
_visit_type_task = {
"git": "load-git",
"hg": "load-hg",
"svn": "load-svn",
"cvs": "load-cvs",
"bzr": "load-bzr",
}
_visit_type_task_privileged = {
"archives": "load-archive-files",
}
# map scheduler task status to origin save status
_save_task_status = {
"next_run_not_scheduled": SAVE_TASK_NOT_YET_SCHEDULED,
"next_run_scheduled": SAVE_TASK_SCHEDULED,
"completed": SAVE_TASK_SUCCEEDED,
"disabled": SAVE_TASK_FAILED,
}
# map scheduler task_run status to origin save status
_save_task_run_status = {
"scheduled": SAVE_TASK_SCHEDULED,
"started": SAVE_TASK_RUNNING,
"eventful": SAVE_TASK_SUCCEEDED,
"uneventful": SAVE_TASK_SUCCEEDED,
"failed": SAVE_TASK_FAILED,
"permfailed": SAVE_TASK_FAILED,
"lost": SAVE_TASK_FAILED,
}
@lru_cache()
def get_scheduler_load_task_types() -> List[str]:
task_types = scheduler().get_task_types()
return [t["type"] for t in task_types if t["type"].startswith("load")]
def get_savable_visit_types_dict(privileged_user: bool = False) -> Dict:
"""Returned the supported task types the user has access to.
Args:
privileged_user: Flag to determine if all visit types should be returned or not.
Default to False to only list unprivileged visit types.
Returns:
the dict of supported visit types for the user
"""
if privileged_user:
task_types = {**_visit_type_task, **_visit_type_task_privileged}
else:
task_types = _visit_type_task
# filter visit types according to scheduler load task types if available
try:
load_task_types = get_scheduler_load_task_types()
return {k: v for k, v in task_types.items() if v in load_task_types}
except Exception:
return task_types
def get_savable_visit_types(privileged_user: bool = False) -> List[str]:
"""Return the list of visit types the user can perform save requests on.
Args:
privileged_user: Flag to determine if all visit types should be returned or not.
Default to False to only list unprivileged visit types.
Returns:
the list of saveable visit types
"""
return sorted(list(get_savable_visit_types_dict(privileged_user).keys()))
def _check_visit_type_savable(visit_type: str, privileged_user: bool = False) -> None:
visit_type_tasks = get_savable_visit_types(privileged_user)
if visit_type not in visit_type_tasks:
allowed_visit_types = ", ".join(visit_type_tasks)
raise BadInputExc(
f"Visit of type {visit_type} can not be saved! "
f"Allowed types are the following: {allowed_visit_types}"
)
_validate_url = URLValidator(
schemes=["http", "https", "svn", "git", "rsync", "pserver", "ssh", "bzr"]
)
def _check_origin_url_valid(origin_url: str) -> None:
try:
_validate_url(origin_url)
except ValidationError:
raise BadInputExc(
"The provided origin url (%s) is not valid!" % escape(origin_url)
)
def origin_exists(origin_url: str) -> OriginExistenceCheckInfo:
"""Check the origin url for existence. If it exists, extract some more useful
information on the origin.
"""
resp = requests.head(origin_url, allow_redirects=True)
exists = resp.ok
content_length: Optional[int] = None
last_modified: Optional[str] = None
if exists:
# Also process X-Archive-Orig-* headers in case the URL targets the
# Internet Archive.
size_ = resp.headers.get(
"Content-Length", resp.headers.get("X-Archive-Orig-Content-Length")
)
content_length = int(size_) if size_ else None
try:
date_str = resp.headers.get(
"Last-Modified", resp.headers.get("X-Archive-Orig-Last-Modified", "")
)
date = datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S %Z")
last_modified = date.isoformat()
except ValueError:
# if not provided or not parsable as per the expected format, keep it None
pass
return OriginExistenceCheckInfo(
origin_url=origin_url,
exists=exists,
last_modified=last_modified,
content_length=content_length,
)
def _check_origin_exists(url: str) -> OriginExistenceCheckInfo:
"""Ensure an URL exists, if not raise an explicit message."""
metadata = origin_exists(url)
if not metadata["exists"]:
raise BadInputExc(f"The provided url ({escape(url)}) does not exist!")
return metadata
def _get_visit_info_for_save_request(
save_request: SaveOriginRequest,
) -> Tuple[Optional[datetime], Optional[str]]:
"""Retrieve visit information out of a save request
Args:
save_request: Input save origin request to retrieve information for.
Returns:
Tuple of (visit date, optional visit status) for such save request origin
"""
visit_date = None
visit_status = None
time_now = datetime.now(tz=timezone.utc)
time_delta = time_now - save_request.request_date
# stop trying to find a visit date one month after save request submission
# as those requests to storage are expensive and associated loading task
# surely ended up with errors
if time_delta.days <= MAX_THRESHOLD_DAYS:
origin = save_request.origin_url
ovs = archive.origin_visit_find_by_date(origin, save_request.request_date)
if ovs:
visit_date = parse_iso8601_date_to_utc(ovs["date"])
visit_status = ovs["status"]
return visit_date, visit_status
def _check_visit_update_status(
save_request: SaveOriginRequest,
) -> Tuple[Optional[datetime], Optional[str], Optional[str]]:
"""Given a save request, determine whether a save request was successful or failed.
Args:
save_request: Input save origin request to retrieve information for.
Returns:
Tuple of (optional visit date, optional visit status, optional save task status)
for such save request origin
"""
visit_date, visit_status = _get_visit_info_for_save_request(save_request)
loading_task_status = None
if visit_date and visit_status in ("full", "partial"):
# visit has been performed, mark the saving task as succeeded
loading_task_status = SAVE_TASK_SUCCEEDED
elif visit_status in ("created", "ongoing"):
# visit is currently running
loading_task_status = SAVE_TASK_RUNNING
elif visit_status in ("not_found", "failed"):
loading_task_status = SAVE_TASK_FAILED
else:
time_now = datetime.now(tz=timezone.utc)
time_delta = time_now - save_request.request_date
# consider the task as failed if it is still in scheduled state
# 30 days after its submission
if time_delta.days > MAX_THRESHOLD_DAYS:
loading_task_status = SAVE_TASK_FAILED
return visit_date, visit_status, loading_task_status
def _compute_task_loading_status(
task: Optional[Dict[str, Any]] = None,
task_run: Optional[Dict[str, Any]] = None,
) -> Optional[str]:
loading_task_status: Optional[str] = None
# First determine the loading task status out of task information
if task:
loading_task_status = _save_task_status[task["status"]]
if task_run:
loading_task_status = _save_task_run_status[task_run["status"]]
return loading_task_status
def _update_save_request_info(
save_request: SaveOriginRequest,
task: Optional[Dict[str, Any]] = None,
task_run: Optional[Dict[str, Any]] = None,
) -> SaveOriginRequestInfo:
"""Update save request information out of the visit status and fallback to the task and
task_run information if the visit status is missing.
Args:
save_request: Save request
task: Associated scheduler task information about the save request
task_run: Most recent run occurrence of the associated task
Returns:
Summary of the save request information updated.
"""
must_save = False
# To determine the save code now request's final status, the visit date must be set
# and the visit status must be a final one. Once they do, the save code now is
# definitely done.
if (
not save_request.visit_date
or not save_request.visit_status
or save_request.visit_status in NON_TERMINAL_STATUSES
):
visit_date, visit_status, loading_task_status = _check_visit_update_status(
save_request
)
if not loading_task_status: # fallback when not provided
loading_task_status = _compute_task_loading_status(task, task_run)
if visit_date != save_request.visit_date:
must_save = True
save_request.visit_date = visit_date
if visit_status != save_request.visit_status:
must_save = True
save_request.visit_status = visit_status
if (
loading_task_status is not None
and loading_task_status != save_request.loading_task_status
):
must_save = True
save_request.loading_task_status = loading_task_status
if must_save:
save_request.save()
return save_request.to_dict()
def create_save_origin_request(
visit_type: str,
origin_url: str,
privileged_user: bool = False,
user_id: Optional[int] = None,
**kwargs,
) -> SaveOriginRequestInfo:
"""Create a loading task to save a software origin into the archive.
This function aims to create a software origin loading task through the use of the
swh-scheduler component.
First, some checks are performed to see if the visit type and origin url are valid
but also if the the save request can be accepted. For the 'archives' visit type,
this also ensures the artifacts actually exists. If those checks passed, the loading
task is then created. Otherwise, the save request is put in pending or rejected
state.
All the submitted save requests are logged into the swh-web database to keep track
of them.
Args:
visit_type: the type of visit to perform (e.g. git, hg, svn, archives, ...)
origin_url: the url of the origin to save
privileged: Whether the user has some more privilege than other (bypass
review, access to privileged other visit types)
user_id: User identifier (provided when authenticated)
kwargs: Optional parameters (e.g. artifact_url, artifact_filename,
artifact_version)
Raises:
BadInputExc: the visit type or origin url is invalid or inexistent
ForbiddenExc: the provided origin url is blacklisted
Returns:
dict: A dict describing the save request with the following keys:
* **visit_type**: the type of visit to perform
* **origin_url**: the url of the origin
* **save_request_date**: the date the request was submitted
* **save_request_status**: the request status, either **accepted**,
**rejected** or **pending**
* **save_task_status**: the origin loading task status, either
**not created**, **not yet scheduled**, **scheduled**,
**succeed** or **failed**
"""
visit_type_tasks = get_savable_visit_types_dict(privileged_user)
_check_visit_type_savable(visit_type, privileged_user)
_check_origin_url_valid(origin_url)
# if all checks passed so far, we can try and save the origin
save_request_status = can_save_origin(origin_url, privileged_user)
task = None
# if the origin save request is accepted, create a scheduler
# task to load it into the archive
if save_request_status == SAVE_REQUEST_ACCEPTED:
# create a task with high priority
task_kwargs: Dict[str, Any] = {
"priority": "high",
"url": origin_url,
}
if visit_type == "archives":
# extra arguments for that type are required
archives_data = kwargs.get("archives_data", [])
if not archives_data:
raise BadInputExc(
"Artifacts data are missing for the archives visit type."
)
artifacts = []
for artifact in archives_data:
artifact_url = artifact.get("artifact_url")
artifact_version = artifact.get("artifact_version")
if not artifact_url or not artifact_version:
raise BadInputExc("Missing url or version for an artifact to load.")
metadata = _check_origin_exists(artifact_url)
artifacts.append(
{
"url": artifact_url,
"version": artifact_version,
"time": metadata["last_modified"],
"length": metadata["content_length"],
}
)
task_kwargs = dict(**task_kwargs, artifacts=artifacts, snapshot_append=True)
sor = None
# get list of previously submitted save requests (most recent first)
current_sors = list(
SaveOriginRequest.objects.filter(
visit_type=visit_type, origin_url=origin_url
).order_by("-request_date")
)
can_create_task = False
# if no save requests previously submitted, create the scheduler task
if not current_sors:
can_create_task = True
else:
# get the latest submitted save request
sor = current_sors[0]
# if it was in pending state, we need to create the scheduler task
# and update the save request info in the database
if sor.status == SAVE_REQUEST_PENDING:
can_create_task = True
# a task has already been created to load the origin
elif sor.loading_task_id != -1:
# get the scheduler task and its status
tasks = scheduler().get_tasks([sor.loading_task_id])
task = tasks[0] if tasks else None
task_runs = scheduler().get_task_runs([sor.loading_task_id])
task_run = task_runs[0] if task_runs else None
save_request_info = _update_save_request_info(sor, task, task_run)
task_status = save_request_info["save_task_status"]
# create a new scheduler task only if the previous one has been
# already executed
if (
task_status == SAVE_TASK_FAILED
or task_status == SAVE_TASK_SUCCEEDED
):
can_create_task = True
sor = None
else:
can_create_task = False
if can_create_task:
# effectively create the scheduler task
task_dict = create_oneshot_task_dict(
visit_type_tasks[visit_type], **task_kwargs
)
task = scheduler().create_tasks([task_dict])[0]
# pending save request has been accepted
if sor:
sor.status = SAVE_REQUEST_ACCEPTED
sor.loading_task_id = task["id"]
sor.save()
else:
sor = SaveOriginRequest.objects.create(
visit_type=visit_type,
origin_url=origin_url,
status=save_request_status,
loading_task_id=task["id"],
user_ids=f'"{user_id}"' if user_id else None,
)
# save request must be manually reviewed for acceptation
elif save_request_status == SAVE_REQUEST_PENDING:
# check if there is already such a save request already submitted,
# no need to add it to the database in that case
try:
sor = SaveOriginRequest.objects.get(
visit_type=visit_type, origin_url=origin_url, status=save_request_status
)
user_ids = sor.user_ids if sor.user_ids is not None else ""
if user_id is not None and f'"{user_id}"' not in user_ids:
# update user ids list
sor.user_ids = f'{sor.user_ids},"{user_id}"'
sor.save()
# if not add it to the database
except ObjectDoesNotExist:
sor = SaveOriginRequest.objects.create(
visit_type=visit_type,
origin_url=origin_url,
status=save_request_status,
user_ids=f'"{user_id}"' if user_id else None,
)
# origin can not be saved as its url is blacklisted,
# log the request to the database anyway
else:
sor = SaveOriginRequest.objects.create(
visit_type=visit_type,
origin_url=origin_url,
status=save_request_status,
user_ids=f'"{user_id}"' if user_id else None,
)
if save_request_status == SAVE_REQUEST_REJECTED:
raise ForbiddenExc(
(
'The "save code now" request has been rejected '
"because the provided origin url is blacklisted."
)
)
assert sor is not None
return _update_save_request_info(sor, task)
def update_save_origin_requests_from_queryset(
requests_queryset: QuerySet,
) -> List[SaveOriginRequestInfo]:
"""Update all save requests from a SaveOriginRequest queryset, update their status in db
and return the list of impacted save_requests.
Args:
requests_queryset: input SaveOriginRequest queryset
Returns:
list: A list of save origin request info dicts as described in
:func:`swh.web.common.origin_save.create_save_origin_request`
"""
task_ids = []
for sor in requests_queryset:
task_ids.append(sor.loading_task_id)
save_requests = []
if task_ids:
try:
tasks = scheduler().get_tasks(task_ids)
tasks = {task["id"]: task for task in tasks}
task_runs = scheduler().get_task_runs(tasks)
task_runs = {task_run["task"]: task_run for task_run in task_runs}
except Exception:
# allow to avoid mocking api GET responses for /origin/save endpoint when
# running cypress tests as scheduler is not available
tasks = {}
task_runs = {}
for sor in requests_queryset:
sr_dict = _update_save_request_info(
sor,
tasks.get(sor.loading_task_id),
task_runs.get(sor.loading_task_id),
)
save_requests.append(sr_dict)
return save_requests
def refresh_save_origin_request_statuses() -> List[SaveOriginRequestInfo]:
"""Refresh non-terminal save origin requests (SOR) in the backend.
Non-terminal SOR are requests whose status is **accepted** and their task status are
either **created**, **not yet scheduled**, **scheduled** or **running**.
This shall compute this list of SOR, checks their status in the scheduler and
optionally elasticsearch for their current status. Then update those in db.
Finally, this returns the refreshed information on those SOR.
"""
pivot_date = datetime.now(tz=timezone.utc) - timedelta(days=MAX_THRESHOLD_DAYS)
save_requests = SaveOriginRequest.objects.filter(
# Retrieve accepted request statuses (all statuses)
Q(status=SAVE_REQUEST_ACCEPTED),
# those without the required information we need to update
Q(visit_date__isnull=True)
| Q(visit_status__isnull=True)
| Q(visit_status__in=NON_TERMINAL_STATUSES),
# limit results to recent ones (that is roughly 30 days old at best)
Q(request_date__gte=pivot_date),
)
return (
update_save_origin_requests_from_queryset(save_requests)
if save_requests.count() > 0
else []
)
def get_save_origin_requests(
visit_type: str, origin_url: str
) -> List[SaveOriginRequestInfo]:
"""
Get all save requests for a given software origin.
Args:
visit_type: the type of visit
origin_url: the url of the origin
Raises:
BadInputExc: the visit type or origin url is invalid
swh.web.common.exc.NotFoundExc: no save requests can be found for the
given origin
Returns:
list: A list of save origin requests dict as described in
:func:`swh.web.common.origin_save.create_save_origin_request`
"""
_check_visit_type_savable(visit_type)
_check_origin_url_valid(origin_url)
sors = SaveOriginRequest.objects.filter(
visit_type=visit_type, origin_url=origin_url
)
if sors.count() == 0:
raise NotFoundExc(
f"No save requests found for visit of type {visit_type} "
f"on origin with url {origin_url}."
)
return update_save_origin_requests_from_queryset(sors)
def get_save_origin_task_info(
save_request_id: int, full_info: bool = True
) -> Dict[str, Any]:
"""
Get detailed information about an accepted save origin request
and its associated loading task.
If the associated loading task info is archived and removed
from the scheduler database, returns an empty dictionary.
Args:
save_request_id: identifier of a save origin request
full_info: whether to return detailed info for staff users
Returns:
A dictionary with the following keys:
- **type**: loading task type
- **arguments**: loading task arguments
- **id**: loading task database identifier
- **backend_id**: loading task celery identifier
- **scheduled**: loading task scheduling date
- **ended**: loading task termination date
- **status**: loading task execution status
- **visit_status**: Actual visit status
Depending on the availability of the task logs in the elasticsearch
cluster of Software Heritage, the returned dictionary may also
contain the following keys:
- **name**: associated celery task name
- **message**: relevant log message from task execution
- **duration**: task execution time (only if it succeeded)
- **worker**: name of the worker that executed the task
"""
try:
save_request = SaveOriginRequest.objects.get(id=save_request_id)
except ObjectDoesNotExist:
return {}
task_info: Dict[str, Any] = {}
if save_request.note is not None:
task_info["note"] = save_request.note
try:
task = scheduler().get_tasks([save_request.loading_task_id])
except Exception:
# to avoid mocking GET responses of /save/task/info/ endpoint when running
# cypress tests as scheduler is not available in that case
task = None
task = task[0] if task else None
if task is None:
return task_info
task_run = scheduler().get_task_runs([task["id"]])
task_run = task_run[0] if task_run else None
if task_run is None:
return task_info
task_info.update(task_run)
task_info["type"] = task["type"]
task_info["arguments"] = task["arguments"]
task_info["id"] = task_run["task"]
del task_info["task"]
del task_info["metadata"]
# Enrich the task info with the loading visit status
task_info["visit_status"] = save_request.visit_status
es_workers_index_url = get_config()["es_workers_index_url"]
if not es_workers_index_url:
return task_info
es_workers_index_url += "/_search"
if save_request.visit_date:
min_ts = save_request.visit_date
max_ts = min_ts + timedelta(days=7)
else:
min_ts = save_request.request_date
max_ts = min_ts + timedelta(days=MAX_THRESHOLD_DAYS)
min_ts_unix = int(min_ts.timestamp()) * 1000
max_ts_unix = int(max_ts.timestamp()) * 1000
save_task_status = _save_task_status[task["status"]]
priority = "3" if save_task_status == SAVE_TASK_FAILED else "6"
query = {
"bool": {
"must": [
{"match_phrase": {"syslog.priority": {"query": priority}}},
{
"match_phrase": {
"journald.custom.swh_task_id": {"query": task_run["backend_id"]}
}
},
{
"range": {
"@timestamp": {
"gte": min_ts_unix,
"lte": max_ts_unix,
"format": "epoch_millis",
}
}
},
]
}
}
try:
response = requests.post(
es_workers_index_url,
json={"query": query, "sort": ["@timestamp"]},
timeout=30,
)
results = json.loads(response.text)
if results["hits"]["total"]["value"] >= 1:
task_run_info = results["hits"]["hits"][-1]["_source"]
journald_custom = task_run_info.get("journald", {}).get("custom", {})
task_info["duration"] = journald_custom.get(
"swh_logging_args_runtime", "not available"
)
task_info["message"] = task_run_info.get("message", "not available")
task_info["name"] = journald_custom.get("swh_task_name", "not available")
task_info["worker"] = task_run_info.get("host", {}).get("hostname")
except Exception as exc:
logger.warning("Request to Elasticsearch failed\n%s", exc)
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
if not full_info:
for field in ("id", "backend_id", "worker"):
# remove some staff only fields
task_info.pop(field, None)
if "message" in task_run and "Loading failure" in task_run["message"]:
# hide traceback for non staff users, only display exception
message_lines = task_info["message"].split("\n")
message = ""
for line in message_lines:
if line.startswith("Traceback"):
break
message += f"{line}\n"
message += message_lines[-1]
task_info["message"] = message
return task_info
SUBMITTED_SAVE_REQUESTS_METRIC = "swh_web_submitted_save_requests"
_submitted_save_requests_gauge = Gauge(
name=SUBMITTED_SAVE_REQUESTS_METRIC,
documentation="Number of submitted origin save requests",
labelnames=["status", "visit_type"],
registry=SWH_WEB_METRICS_REGISTRY,
)
ACCEPTED_SAVE_REQUESTS_METRIC = "swh_web_accepted_save_requests"
_accepted_save_requests_gauge = Gauge(
name=ACCEPTED_SAVE_REQUESTS_METRIC,
documentation="Number of accepted origin save requests",
labelnames=["load_task_status", "visit_type"],
registry=SWH_WEB_METRICS_REGISTRY,
)
# Metric on the delay of save code now request per status and visit_type. This is the
# time difference between the save code now is requested and the time it got ingested.
ACCEPTED_SAVE_REQUESTS_DELAY_METRIC = "swh_web_save_requests_delay_seconds"
_accepted_save_requests_delay_gauge = Gauge(
name=ACCEPTED_SAVE_REQUESTS_DELAY_METRIC,
documentation="Save Requests Duration",
labelnames=["load_task_status", "visit_type"],
registry=SWH_WEB_METRICS_REGISTRY,
)
def compute_save_requests_metrics() -> None:
"""Compute Prometheus metrics related to origin save requests:
- Number of submitted origin save requests
- Number of accepted origin save requests
- Save Code Now requests delay between request time and actual time of ingestion
"""
request_statuses = (
SAVE_REQUEST_ACCEPTED,
SAVE_REQUEST_REJECTED,
SAVE_REQUEST_PENDING,
)
load_task_statuses = (
SAVE_TASK_NOT_CREATED,
SAVE_TASK_NOT_YET_SCHEDULED,
SAVE_TASK_SCHEDULED,
SAVE_TASK_SUCCEEDED,
SAVE_TASK_FAILED,
SAVE_TASK_RUNNING,
)
# for metrics, we want access to all visit types
visit_types = get_savable_visit_types(privileged_user=True)
labels_set = product(request_statuses, visit_types)
for labels in labels_set:
_submitted_save_requests_gauge.labels(*labels).set(0)
labels_set = product(load_task_statuses, visit_types)
for labels in labels_set:
_accepted_save_requests_gauge.labels(*labels).set(0)
duration_load_task_statuses = (
SAVE_TASK_FAILED,
SAVE_TASK_SUCCEEDED,
)
for labels in product(duration_load_task_statuses, visit_types):
_accepted_save_requests_delay_gauge.labels(*labels).set(0)
for sor in SaveOriginRequest.objects.all():
if sor.status == SAVE_REQUEST_ACCEPTED:
_accepted_save_requests_gauge.labels(
load_task_status=sor.loading_task_status,
visit_type=sor.visit_type,
).inc()
_submitted_save_requests_gauge.labels(
status=sor.status, visit_type=sor.visit_type
).inc()
if (
sor.loading_task_status in (SAVE_TASK_SUCCEEDED, SAVE_TASK_FAILED)
and sor.visit_date is not None
and sor.request_date is not None
):
delay = sor.visit_date.timestamp() - sor.request_date.timestamp()
_accepted_save_requests_delay_gauge.labels(
load_task_status=sor.loading_task_status,
visit_type=sor.visit_type,
).inc(delay)
diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
index d8bb0fcf..41f5d934 100644
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -1,528 +1,527 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import functools
import os
import re
from typing import Any, Callable, Dict, List, Optional
import urllib.parse
from bs4 import BeautifulSoup
from docutils.core import publish_parts
import docutils.parsers.rst
import docutils.utils
from docutils.writers.html5_polyglot import HTMLTranslator, Writer
from iso8601 import ParseError, parse_date
from pkg_resources import get_distribution
from prometheus_client.registry import CollectorRegistry
import requests
from requests.auth import HTTPBasicAuth
-import sentry_sdk
from django.core.cache import cache
from django.core.cache.backends.base import DEFAULT_TIMEOUT
from django.http import HttpRequest, QueryDict
from django.shortcuts import redirect
from django.urls import resolve
from django.urls import reverse as django_reverse
from swh.web.auth.utils import (
ADD_FORGE_MODERATOR_PERMISSION,
ADMIN_LIST_DEPOSIT_PERMISSION,
MAILMAP_ADMIN_PERMISSION,
)
-from swh.web.common.exc import BadInputExc
+from swh.web.common.exc import BadInputExc, sentry_capture_exception
from swh.web.common.typing import QueryParameters
from swh.web.config import SWH_WEB_SERVER_NAME, get_config, search
SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True)
swh_object_icons = {
"alias": "mdi mdi-star",
"branch": "mdi mdi-source-branch",
"branches": "mdi mdi-source-branch",
"content": "mdi mdi-file-document",
"cnt": "mdi mdi-file-document",
"directory": "mdi mdi-folder",
"dir": "mdi mdi-folder",
"origin": "mdi mdi-source-repository",
"ori": "mdi mdi-source-repository",
"person": "mdi mdi-account",
"revisions history": "mdi mdi-history",
"release": "mdi mdi-tag",
"rel": "mdi mdi-tag",
"releases": "mdi mdi-tag",
"revision": "mdi mdi-rotate-90 mdi-source-commit",
"rev": "mdi mdi-rotate-90 mdi-source-commit",
"snapshot": "mdi mdi-camera",
"snp": "mdi mdi-camera",
"visits": "mdi mdi-calendar-month",
}
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[QueryParameters] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url
def datetime_to_utc(date):
"""Returns datetime in UTC without timezone info
Args:
date (datetime.datetime): input datetime with timezone info
Returns:
datetime.datetime: datetime in UTC without timezone info
"""
if date.tzinfo and date.tzinfo != timezone.utc:
return date.astimezone(tz=timezone.utc)
else:
return date
def parse_iso8601_date_to_utc(iso_date: str) -> datetime:
"""Given an ISO 8601 datetime string, parse the result as UTC datetime.
Returns:
a timezone-aware datetime representing the parsed date
Raises:
swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- 2007-01-14T20:34:22Z
"""
try:
date = parse_date(iso_date)
return datetime_to_utc(date)
except ParseError as e:
raise BadInputExc(e)
def shorten_path(path):
"""Shorten the given path: for each hash present, only return the first
8 characters followed by an ellipsis"""
sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}"
sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}"
ret = re.sub(sha256_re, r"\1...", path)
return re.sub(sha1_re, r"\1...", ret)
def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"):
"""Turns a string representation of an ISO 8601 datetime string
to UTC and format it into a more human readable one.
For instance, from the following input
string: '2017-05-04T13:27:13+02:00' the following one
is returned: '04 May 2017, 11:27 UTC'.
Custom format string may also be provided
as parameter
Args:
iso_date (str): a string representation of an ISO 8601 date
fmt (str): optional date formatting string
Returns:
str: a formatted string representation of the input iso date
"""
if not iso_date:
return iso_date
date = parse_iso8601_date_to_utc(iso_date)
return date.strftime(fmt)
def gen_path_info(path):
"""Function to generate path data navigation for use
with a breadcrumb in the swh web ui.
For instance, from a path /folder1/folder2/folder3,
it returns the following list::
[{'name': 'folder1', 'path': 'folder1'},
{'name': 'folder2', 'path': 'folder1/folder2'},
{'name': 'folder3', 'path': 'folder1/folder2/folder3'}]
Args:
path: a filesystem path
Returns:
list: a list of path data for navigation as illustrated above.
"""
path_info = []
if path:
sub_paths = path.strip("/").split("/")
path_from_root = ""
for p in sub_paths:
path_from_root += "/" + p
path_info.append({"name": p, "path": path_from_root.strip("/")})
return path_info
def parse_rst(text, report_level=2):
"""
Parse a reStructuredText string with docutils.
Args:
text (str): string with reStructuredText markups in it
report_level (int): level of docutils report messages to print
(1 info 2 warning 3 error 4 severe 5 none)
Returns:
docutils.nodes.document: a parsed docutils document
"""
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components
).get_default_values()
settings.report_level = report_level
document = docutils.utils.new_document("rst-doc", settings=settings)
parser.parse(text, document)
return document
def get_client_ip(request):
"""
Return the client IP address from an incoming HTTP request.
Args:
request (django.http.HttpRequest): the incoming HTTP request
Returns:
str: The client IP address
"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip
def is_swh_web_development(request: HttpRequest) -> bool:
"""Indicate if we are running a development version of swh-web."""
site_base_url = request.build_absolute_uri("/")
return any(
host in site_base_url for host in ("localhost", "127.0.0.1", "testserver")
)
def is_swh_web_staging(request: HttpRequest) -> bool:
"""Indicate if we are running a staging version of swh-web."""
config = get_config()
site_base_url = request.build_absolute_uri("/")
return any(
server_name in site_base_url for server_name in config["staging_server_names"]
)
def is_swh_web_production(request: HttpRequest) -> bool:
"""Indicate if we are running the public production version of swh-web."""
return SWH_WEB_SERVER_NAME in request.build_absolute_uri("/")
browsers_supported_image_mimes = set(
[
"image/gif",
"image/png",
"image/jpeg",
"image/bmp",
"image/webp",
"image/svg",
"image/svg+xml",
]
)
def context_processor(request):
"""
Django context processor used to inject variables
in all swh-web templates.
"""
config = get_config()
if (
hasattr(request, "user")
and request.user.is_authenticated
and not hasattr(request.user, "backend")
):
# To avoid django.template.base.VariableDoesNotExist errors
# when rendering templates when standard Django user is logged in.
request.user.backend = "django.contrib.auth.backends.ModelBackend"
return {
"swh_object_icons": swh_object_icons,
"available_languages": None,
"swh_client_config": config["client_config"],
"oidc_enabled": bool(config["keycloak"]["server_url"]),
"browsers_supported_image_mimes": browsers_supported_image_mimes,
"keycloak": config["keycloak"],
"site_base_url": request.build_absolute_uri("/"),
"DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"],
"status": config["status"],
"swh_web_dev": is_swh_web_development(request),
"swh_web_staging": is_swh_web_staging(request),
"swh_web_version": get_distribution("swh.web").version,
"iframe_mode": False,
"ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION,
"ADD_FORGE_MODERATOR_PERMISSION": ADD_FORGE_MODERATOR_PERMISSION,
"FEATURES": get_config()["features"],
"MAILMAP_ADMIN_PERMISSION": MAILMAP_ADMIN_PERMISSION,
}
def resolve_branch_alias(
snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Resolve branch alias in snapshot content.
Args:
snapshot: a full snapshot content
branch: a branch alias contained in the snapshot
Returns:
The real snapshot branch that got aliased.
"""
while branch and branch["target_type"] == "alias":
if branch["target"] in snapshot["branches"]:
branch = snapshot["branches"][branch["target"]]
else:
from swh.web.common import archive
snp = archive.lookup_snapshot(
snapshot["id"], branches_from=branch["target"], branches_count=1
)
if snp and branch["target"] in snp["branches"]:
branch = snp["branches"][branch["target"]]
else:
branch = None
return branch
class _NoHeaderHTMLTranslator(HTMLTranslator):
"""
Docutils translator subclass to customize the generation of HTML
from reST-formatted docstrings
"""
def __init__(self, document):
super().__init__(document)
self.body_prefix = []
self.body_suffix = []
_HTML_WRITER = Writer()
_HTML_WRITER.translator_class = _NoHeaderHTMLTranslator
def rst_to_html(rst: str) -> str:
"""
Convert reStructuredText document into HTML.
Args:
rst: A string containing a reStructuredText document
Returns:
Body content of the produced HTML conversion.
"""
settings = {
"initial_header_level": 2,
"halt_level": 4,
"traceback": True,
"file_insertion_enabled": False,
"raw_enabled": False,
}
pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings)
return f'{pp["html_body"]}
'
def prettify_html(html: str) -> str:
"""
Prettify an HTML document.
Args:
html: Input HTML document
Returns:
The prettified HTML document
"""
return BeautifulSoup(html, "lxml").prettify()
def django_cache(
timeout: int = DEFAULT_TIMEOUT,
catch_exception: bool = False,
exception_return_value: Any = None,
invalidate_cache_pred: Callable[[Any], bool] = lambda val: False,
):
"""Decorator to put the result of a function call in Django cache,
subsequent calls will directly return the cached value.
Args:
timeout: The number of seconds value will be hold in cache
catch_exception: If :const:`True`, any thrown exception by
the decorated function will be caught and not reraised
exception_return_value: The value to return if previous
parameter is set to :const:`True`
invalidate_cache_pred: A predicate function enabling to
invalidate the cache under certain conditions, decorated
function will then be called again
Returns:
The returned value of the decorated function for the specified
parameters
"""
def inner(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_args = args + (0,) + tuple(sorted(kwargs.items()))
cache_key = str(hash((func.__module__, func.__name__) + func_args))
ret = cache.get(cache_key)
if ret is None or invalidate_cache_pred(ret):
try:
ret = func(*args, **kwargs)
except Exception as exc:
- sentry_sdk.capture_exception(exc)
if catch_exception:
+ sentry_capture_exception(exc)
return exception_return_value
else:
raise
else:
cache.set(cache_key, ret, timeout=timeout)
return ret
return wrapper
return inner
def _deposits_list_url(
deposits_list_base_url: str, page_size: int, username: Optional[str]
) -> str:
params = {"page_size": str(page_size)}
if username is not None:
params["username"] = username
return f"{deposits_list_base_url}?{urllib.parse.urlencode(params)}"
def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]:
"""Return the list of software deposits using swh-deposit API"""
config = get_config()["deposit"]
private_api_url = config["private_api_url"].rstrip("/") + "/"
deposits_list_base_url = private_api_url + "deposits"
deposits_list_auth = HTTPBasicAuth(
config["private_api_user"], config["private_api_password"]
)
deposits_list_url = _deposits_list_url(
deposits_list_base_url, page_size=1, username=username
)
nb_deposits = requests.get(
deposits_list_url, auth=deposits_list_auth, timeout=30
).json()["count"]
@django_cache(invalidate_cache_pred=lambda data: data["count"] != nb_deposits)
def _get_deposits_data():
deposits_list_url = _deposits_list_url(
deposits_list_base_url, page_size=nb_deposits, username=username
)
return requests.get(
deposits_list_url,
auth=deposits_list_auth,
timeout=30,
).json()
deposits_data = _get_deposits_data()
return deposits_data["results"]
_origin_visit_types_cache_timeout = 24 * 60 * 60 # 24 hours
@django_cache(
timeout=_origin_visit_types_cache_timeout,
catch_exception=True,
exception_return_value=[],
)
def origin_visit_types() -> List[str]:
"""Return the exhaustive list of visit types for origins
ingested into the archive.
"""
return sorted(search().visit_types_count().keys())
def redirect_to_new_route(request, new_route, permanent=True):
"""Redirect a request to another route with url args and query parameters
eg: /origin//log?path=test can be redirected as
/log?url=&path=test. This can be used to deprecate routes
"""
request_path = resolve(request.path_info)
args = {**request_path.kwargs, **request.GET.dict()}
return redirect(
reverse(new_route, query_params=args),
permanent=permanent,
)
def has_add_forge_now_permission(user) -> bool:
"""Is a user considered an add-forge-now moderator?
Returns
True if a user is staff or has add forge now moderator permission
"""
return user.is_staff or user.has_perm(ADD_FORGE_MODERATOR_PERMISSION)
diff --git a/swh/web/inbound_email/management/commands/process_inbound_email.py b/swh/web/inbound_email/management/commands/process_inbound_email.py
index 8d49670a..fd7445c3 100644
--- a/swh/web/inbound_email/management/commands/process_inbound_email.py
+++ b/swh/web/inbound_email/management/commands/process_inbound_email.py
@@ -1,73 +1,72 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import email
import email.message
import email.policy
import logging
import sys
from typing import Callable
-import sentry_sdk
-
from django.core.management.base import BaseCommand
+from swh.web.common.exc import sentry_capture_exception
from swh.web.inbound_email import signals
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Process a new inbound email"
def handle(self, *args, **options):
raw_message = sys.stdin.buffer.read()
try:
message = email.message_from_bytes(raw_message, policy=email.policy.default)
except Exception as exc:
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
self.handle_failed_message(raw_message)
# XXX make sure having logging doesn't make postfix unhappy
logger.exception("Could not convert email from bytes")
return
responses = signals.email_received.send_robust(
sender=self.__class__, message=message
)
handled = False
for receiver, response in responses:
if isinstance(response, Exception):
- sentry_sdk.capture_exception(response)
+ sentry_capture_exception(response)
self.handle_failing_receiver(message, receiver)
logger.error(
"Receiver produced the following exception", exc_info=response
)
elif response is signals.EmailProcessingStatus.FAILED:
self.handle_failing_receiver(message, receiver)
elif response is signals.EmailProcessingStatus.PROCESSED:
handled = True
if not handled:
self.handle_unhandled_message(message)
def handle_failed_message(self, raw_message: bytes):
# TODO: forward email as attachment for inspection
logger.error("Failed message: %s", raw_message.decode("ascii", "replace"))
def handle_failing_receiver(
self, message: email.message.EmailMessage, receiver: Callable
):
# TODO: forward email for inspection
logger.error(
"Failed receiver %s:%s; message: %s",
receiver.__module__,
receiver.__qualname__,
str(message),
)
def handle_unhandled_message(self, message: email.message.EmailMessage):
# TODO: pass email through to a fallback alias?
logger.error("Unhandled message: %s", str(message))
diff --git a/swh/web/misc/urls.py b/swh/web/misc/urls.py
index 8f81f17a..03e22969 100644
--- a/swh/web/misc/urls.py
+++ b/swh/web/misc/urls.py
@@ -1,104 +1,104 @@
-# Copyright (C) 2019-2021 The Software Heritage developers
+# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import requests
-import sentry_sdk
from django.conf.urls import include, url
from django.contrib.staticfiles import finders
from django.http import JsonResponse
from django.shortcuts import render
from swh.web.common import archive
+from swh.web.common.exc import sentry_capture_exception
from swh.web.config import get_config
from swh.web.misc.metrics import prometheus_metrics
def _jslicenses(request):
jslicenses_file = finders.find("jssources/jslicenses.json")
jslicenses_data = json.load(open(jslicenses_file))
jslicenses_data = sorted(
jslicenses_data.items(), key=lambda item: item[0].split("/")[-1]
)
return render(request, "misc/jslicenses.html", {"jslicenses_data": jslicenses_data})
def _stat_counters(request):
stat_counters = archive.stat_counters()
url = get_config()["history_counters_url"]
stat_counters_history = {}
try:
response = requests.get(url, timeout=5)
stat_counters_history = json.loads(response.text)
except Exception as exc:
- sentry_sdk.capture_exception(exc)
+ sentry_capture_exception(exc)
counters = {
"stat_counters": stat_counters,
"stat_counters_history": stat_counters_history,
}
return JsonResponse(counters)
urlpatterns = [
url(r"^", include("swh.web.misc.coverage")),
url(r"^jslicenses/$", _jslicenses, name="jslicenses"),
url(r"^", include("swh.web.misc.origin_save")),
url(r"^stat_counters/$", _stat_counters, name="stat-counters"),
url(r"^", include("swh.web.misc.badges")),
url(r"^metrics/prometheus/$", prometheus_metrics, name="metrics-prometheus"),
url(r"^", include("swh.web.misc.iframe")),
url(r"^", include("swh.web.misc.fundraising")),
]
# when running end to end tests through cypress, declare some extra
# endpoints to provide input data for some of those tests
if get_config()["e2e_tests_mode"]:
from swh.web.tests.views import (
get_content_code_data_all_exts,
get_content_code_data_all_filenames,
get_content_code_data_by_ext,
get_content_code_data_by_filename,
get_content_other_data_by_ext,
)
urlpatterns.append(
url(
r"^tests/data/content/code/extension/(?P.+)/$",
get_content_code_data_by_ext,
name="tests-content-code-extension",
)
)
urlpatterns.append(
url(
r"^tests/data/content/other/extension/(?P.+)/$",
get_content_other_data_by_ext,
name="tests-content-other-extension",
)
)
urlpatterns.append(
url(
r"^tests/data/content/code/extensions/$",
get_content_code_data_all_exts,
name="tests-content-code-extensions",
)
)
urlpatterns.append(
url(
r"^tests/data/content/code/filename/(?P.+)/$",
get_content_code_data_by_filename,
name="tests-content-code-filename",
)
)
urlpatterns.append(
url(
r"^tests/data/content/code/filenames/$",
get_content_code_data_all_filenames,
name="tests-content-code-filenames",
)
)