diff --git a/swh/web/add_forge_now/views.py b/swh/web/add_forge_now/views.py index a675ae0c..ce6883c3 100644 --- a/swh/web/add_forge_now/views.py +++ b/swh/web/add_forge_now/views.py @@ -1,160 +1,160 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List from django.conf import settings from django.conf.urls import url from django.contrib.auth.decorators import user_passes_test from django.core.paginator import Paginator from django.db.models import Q from django.http.request import HttpRequest from django.http.response import HttpResponse, JsonResponse from django.shortcuts import render from swh.web.add_forge_now.models import Request as AddForgeRequest from swh.web.add_forge_now.models import RequestHistory from swh.web.api.views.add_forge_now import ( AddForgeNowRequestPublicSerializer, AddForgeNowRequestSerializer, ) -from swh.web.common.utils import has_add_forge_now_permission +from swh.web.auth.utils import is_add_forge_now_moderator def add_forge_request_list_datatables(request: HttpRequest) -> HttpResponse: """Dedicated endpoint used by datatables to display the add-forge requests in the Web UI. """ draw = int(request.GET.get("draw", 0)) add_forge_requests = AddForgeRequest.objects.all() table_data: Dict[str, Any] = { "recordsTotal": add_forge_requests.count(), "draw": draw, } search_value = request.GET.get("search[value]") column_order = request.GET.get("order[0][column]") field_order = request.GET.get(f"columns[{column_order}][name]", "id") order_dir = request.GET.get("order[0][dir]", "desc") if field_order: if order_dir == "desc": field_order = "-" + field_order add_forge_requests = add_forge_requests.order_by(field_order) per_page = int(request.GET.get("length", 10)) page_num = int(request.GET.get("start", 0)) // per_page + 1 if search_value: add_forge_requests = add_forge_requests.filter( Q(forge_type__icontains=search_value) | Q(forge_url__icontains=search_value) | Q(status__icontains=search_value) ) if ( int(request.GET.get("user_requests_only", "0")) and request.user.is_authenticated ): add_forge_requests = add_forge_requests.filter( submitter_name=request.user.username ) paginator = Paginator(add_forge_requests, per_page) page = paginator.page(page_num) - if has_add_forge_now_permission(request.user): + if is_add_forge_now_moderator(request.user): requests = AddForgeNowRequestSerializer(page.object_list, many=True).data else: requests = AddForgeNowRequestPublicSerializer(page.object_list, many=True).data results = [dict(req) for req in requests] table_data["recordsFiltered"] = add_forge_requests.count() table_data["data"] = results return JsonResponse(table_data) FORGE_TYPES: List[str] = [ "bitbucket", "cgit", "gitlab", "gitea", "heptapod", ] def create_request_create(request): """View to create a new 'add_forge_now' request.""" return render( request, "add_forge_now/creation_form.html", {"forge_types": FORGE_TYPES}, ) def create_request_list(request): """View to list existing 'add_forge_now' requests.""" return render( request, "add_forge_now/list.html", ) def create_request_help(request): """View to explain 'add_forge_now'.""" return render( request, "add_forge_now/help.html", ) @user_passes_test( - has_add_forge_now_permission, + is_add_forge_now_moderator, redirect_field_name="next_path", login_url=settings.LOGIN_URL, ) def create_request_message_source(request: HttpRequest, id: int) -> HttpResponse: """View to retrieve the message source for a given request history entry""" try: history_entry = RequestHistory.objects.select_related("request").get( pk=id, message_source__isnull=False ) assert history_entry.message_source is not None except RequestHistory.DoesNotExist: return HttpResponse(status=404) response = HttpResponse( bytes(history_entry.message_source), content_type="text/email" ) filename = f"add-forge-now-{history_entry.request.forge_domain}-message{id}.eml" response["Content-Disposition"] = f'attachment; filename="{filename}"' return response urlpatterns = [ url( r"^add-forge/request/list/datatables/$", add_forge_request_list_datatables, name="add-forge-request-list-datatables", ), url(r"^add-forge/request/create/$", create_request_create, name="forge-add-create"), url(r"^add-forge/request/list/$", create_request_list, name="forge-add-list"), url( r"^add-forge/request/message-source/(?P\d+)/$", create_request_message_source, name="forge-add-message-source", ), url(r"^add-forge/request/help/$", create_request_help, name="forge-add-help"), ] diff --git a/swh/web/admin/add_forge_now.py b/swh/web/admin/add_forge_now.py index aa21ca6a..a407bd99 100644 --- a/swh/web/admin/add_forge_now.py +++ b/swh/web/admin/add_forge_now.py @@ -1,39 +1,39 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.conf import settings from django.contrib.auth.decorators import user_passes_test from django.shortcuts import render from swh.web.admin.adminurls import admin_route -from swh.web.common.utils import has_add_forge_now_permission +from swh.web.auth.utils import is_add_forge_now_moderator @admin_route( r"add-forge/requests/", view_name="add-forge-now-requests-moderation", ) -@user_passes_test(has_add_forge_now_permission, login_url=settings.LOGIN_URL) +@user_passes_test(is_add_forge_now_moderator, login_url=settings.LOGIN_URL) def add_forge_now_requests_moderation_dashboard(request): """Moderation dashboard to allow listing current requests.""" return render( request, "add_forge_now/requests-moderation.html", {"heading": "Add forge now requests moderation"}, ) @admin_route( r"add-forge/request/(?P(\d)+)/", view_name="add-forge-now-request-dashboard", ) -@user_passes_test(has_add_forge_now_permission, login_url=settings.LOGIN_URL) +@user_passes_test(is_add_forge_now_moderator, login_url=settings.LOGIN_URL) def add_forge_now_request_dashboard(request, request_id): """Moderation dashboard to allow listing current requests.""" return render( request, "add_forge_now/request-dashboard.html", {"request_id": request_id, "heading": "Add forge now request dashboard"}, ) diff --git a/swh/web/api/views/add_forge_now.py b/swh/web/api/views/add_forge_now.py index 8040baed..be649272 100644 --- a/swh/web/api/views/add_forge_now.py +++ b/swh/web/api/views/add_forge_now.py @@ -1,412 +1,410 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Union from django.core.exceptions import ObjectDoesNotExist from django.core.paginator import Paginator from django.db import transaction from django.db.models.query import QuerySet from django.forms import CharField, ModelForm from django.http import HttpResponseBadRequest from django.http.request import HttpRequest from django.http.response import HttpResponse, HttpResponseForbidden from rest_framework import serializers from rest_framework.request import Request from rest_framework.response import Response from swh.web.add_forge_now.models import Request as AddForgeRequest from swh.web.add_forge_now.models import RequestActorRole as AddForgeNowRequestActorRole from swh.web.add_forge_now.models import RequestHistory as AddForgeNowRequestHistory from swh.web.add_forge_now.models import RequestStatus as AddForgeNowRequestStatus from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route -from swh.web.auth.utils import ADD_FORGE_MODERATOR_PERMISSION +from swh.web.auth.utils import is_add_forge_now_moderator from swh.web.common.exc import BadInputExc -from swh.web.common.utils import has_add_forge_now_permission, reverse +from swh.web.common.utils import reverse def _block_while_testing(): """Replaced by tests to check concurrency behavior""" pass class AddForgeNowRequestForm(ModelForm): forge_contact_comment = CharField( required=False, ) class Meta: model = AddForgeRequest fields = ( "forge_type", "forge_url", "forge_contact_email", "forge_contact_name", "forge_contact_comment", "submitter_forward_username", ) class AddForgeNowRequestHistoryForm(ModelForm): new_status = CharField( max_length=200, required=False, ) class Meta: model = AddForgeNowRequestHistory fields = ("text", "new_status") class AddForgeNowRequestSerializer(serializers.ModelSerializer): inbound_email_address = serializers.CharField() forge_domain = serializers.CharField() last_moderator = serializers.SerializerMethodField() last_modified_date = serializers.SerializerMethodField() history: Dict[int, QuerySet] = {} class Meta: model = AddForgeRequest fields = "__all__" def _gethistory(self, request): if request.id not in self.history: self.history[request.id] = AddForgeNowRequestHistory.objects.filter( request=request ).order_by("id") return self.history[request.id] def get_last_moderator(self, request): last_history_with_moderator = ( self._gethistory(request).filter(actor_role="MODERATOR").last() ) return ( last_history_with_moderator.actor if last_history_with_moderator else "None" ) def get_last_modified_date(self, request): last_history = self._gethistory(request).last() return ( last_history.date.isoformat().replace("+00:00", "Z") if last_history else None ) class AddForgeNowRequestPublicSerializer(serializers.ModelSerializer): """Serializes AddForgeRequest without private fields.""" class Meta: model = AddForgeRequest fields = ("id", "forge_url", "forge_type", "status", "submission_date") class AddForgeNowRequestHistorySerializer(serializers.ModelSerializer): message_source_url = serializers.SerializerMethodField() class Meta: model = AddForgeNowRequestHistory exclude = ("request", "message_source") def get_message_source_url(self, request_history): if request_history.message_source is None: return None return reverse( "forge-add-message-source", url_args={"id": request_history.pk}, request=self.context["request"], ) class AddForgeNowRequestHistoryPublicSerializer(serializers.ModelSerializer): class Meta: model = AddForgeNowRequestHistory fields = ("id", "date", "new_status", "actor_role") @api_route( r"/add-forge/request/create/", "api-1-add-forge-request-create", methods=["POST"], ) @api_doc("/add-forge/request/create") @format_docstring() @transaction.atomic def api_add_forge_request_create(request: Union[HttpRequest, Request]) -> HttpResponse: """ .. http:post:: /api/1/add-forge/request/create/ Create a new request to add a forge to the list of those crawled regularly by Software Heritage. .. warning:: That endpoint is not publicly available and requires authentication in order to be able to request it. {common_headers} :[0-9]+)/update/", "api-1-add-forge-request-update", methods=["POST"], ) @api_doc("/add-forge/request/update", tags=["hidden"]) @format_docstring() @transaction.atomic def api_add_forge_request_update( request: Union[HttpRequest, Request], id: int ) -> HttpResponse: """ .. http:post:: /api/1/add-forge/request/update/ Update a request to add a forge to the list of those crawled regularly by Software Heritage. .. warning:: That endpoint is not publicly available and requires authentication in order to be able to request it. {common_headers} :[0-9]+)/get/", "api-1-add-forge-request-get", methods=["GET"], ) @api_doc("/add-forge/request/get") @format_docstring() def api_add_forge_request_get(request: Request, id: int): """ .. http:get:: /api/1/add-forge/request/get/ Return all details about an add-forge request. {common_headers} :param int id: add-forge request identifier :statuscode 200: request details successfully returned :statuscode 400: request identifier does not exist """ try: add_forge_request = AddForgeRequest.objects.get(id=id) except ObjectDoesNotExist: raise BadInputExc("Request id does not exist") request_history = AddForgeNowRequestHistory.objects.filter( request=add_forge_request ).order_by("id") - if request.user.is_authenticated and request.user.has_perm( - ADD_FORGE_MODERATOR_PERMISSION - ): + if is_add_forge_now_moderator(request.user): data = AddForgeNowRequestSerializer(add_forge_request).data history = AddForgeNowRequestHistorySerializer( request_history, many=True, context={"request": request} ).data else: data = AddForgeNowRequestPublicSerializer(add_forge_request).data history = AddForgeNowRequestHistoryPublicSerializer( request_history, many=True ).data return {"request": data, "history": history} diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py index 96e1cae0..5d8da6ff 100644 --- a/swh/web/auth/utils.py +++ b/swh/web/auth/utils.py @@ -1,116 +1,126 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import urlsafe_b64encode from typing import List from cryptography.fernet import Fernet from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from django.contrib.auth.decorators import user_passes_test from django.http.request import HttpRequest from swh.web.common.exc import ForbiddenExc OIDC_SWH_WEB_CLIENT_ID = "swh-web" SWH_AMBASSADOR_PERMISSION = "swh.ambassador" API_SAVE_ORIGIN_PERMISSION = "swh.web.api.save_origin" ADMIN_LIST_DEPOSIT_PERMISSION = "swh.web.admin.list_deposits" MAILMAP_PERMISSION = "swh.web.mailmap" ADD_FORGE_MODERATOR_PERMISSION = "swh.web.add_forge_now.moderator" MAILMAP_ADMIN_PERMISSION = "swh.web.admin.mailmap" API_RAW_OBJECT_PERMISSION = "swh.web.api.raw_object" def _get_fernet(password: bytes, salt: bytes) -> Fernet: """ Instantiate a Fernet system from a password and a salt value (see https://cryptography.io/en/latest/fernet/). Args: password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The Fernet system """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend(), ) key = urlsafe_b64encode(kdf.derive(password)) return Fernet(key) def encrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes: """ Encrypt data using Fernet system (symmetric encryption). Args: data: input data to encrypt password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The encrypted data """ return _get_fernet(password, salt).encrypt(data) def decrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes: """ Decrypt data using Fernet system (symmetric encryption). Args: data: input data to decrypt password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The decrypted data """ return _get_fernet(password, salt).decrypt(data) def privileged_user(request: HttpRequest, permissions: List[str] = []) -> bool: """Determine whether a user is authenticated and is a privileged one (e.g ambassador). This allows such user to have access to some more actions (e.g. bypass save code now review, access to 'archives' type...). A user is considered as privileged if he is a staff member or has any permission from those provided as parameters. Args: request: Input django HTTP request permissions: list of permission names to determine if user is privileged or not Returns: Whether the user is privileged or not. """ user = request.user return user.is_authenticated and ( user.is_staff or any([user.has_perm(perm) for perm in permissions]) ) def any_permission_required(*perms): """View decorator granting access to it if user has at least one permission among those passed as parameters. """ def check_perms(user): if any(user.has_perm(perm) for perm in perms): return True raise ForbiddenExc return user_passes_test(check_perms) + + +def is_add_forge_now_moderator(user) -> bool: + """Is a user considered an add-forge-now moderator? + + Returns + True if a user is staff or has add forge now moderator permission + + """ + return user.is_staff or user.has_perm(ADD_FORGE_MODERATOR_PERMISSION) diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index fcd3f154..21ff9dc3 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,529 +1,519 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import functools import os import re from typing import Any, Callable, Dict, List, Optional import urllib.parse from bs4 import BeautifulSoup from docutils.core import publish_parts import docutils.parsers.rst import docutils.utils from docutils.writers.html5_polyglot import HTMLTranslator, Writer from iso8601 import ParseError, parse_date from pkg_resources import get_distribution from prometheus_client.registry import CollectorRegistry import requests from requests.auth import HTTPBasicAuth from django.core.cache import cache from django.core.cache.backends.base import DEFAULT_TIMEOUT from django.http import HttpRequest, QueryDict from django.shortcuts import redirect from django.urls import resolve from django.urls import reverse as django_reverse from swh.web.auth.utils import ( ADD_FORGE_MODERATOR_PERMISSION, ADMIN_LIST_DEPOSIT_PERMISSION, MAILMAP_ADMIN_PERMISSION, ) from swh.web.common.exc import BadInputExc, sentry_capture_exception from swh.web.common.typing import QueryParameters from swh.web.config import SWH_WEB_SERVER_NAME, get_config, search SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) SWHID_RE = "swh:1:[a-z]{3}:[0-9a-z]{40}" swh_object_icons = { "alias": "mdi mdi-star", "branch": "mdi mdi-source-branch", "branches": "mdi mdi-source-branch", "content": "mdi mdi-file-document", "cnt": "mdi mdi-file-document", "directory": "mdi mdi-folder", "dir": "mdi mdi-folder", "origin": "mdi mdi-source-repository", "ori": "mdi mdi-source-repository", "person": "mdi mdi-account", "revisions history": "mdi mdi-history", "release": "mdi mdi-tag", "rel": "mdi mdi-tag", "releases": "mdi mdi-tag", "revision": "mdi mdi-rotate-90 mdi-source-commit", "rev": "mdi mdi-rotate-90 mdi-source-commit", "snapshot": "mdi mdi-camera", "snp": "mdi mdi-camera", "visits": "mdi mdi-calendar-month", } def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[QueryParameters] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo and date.tzinfo != timezone.utc: return date.astimezone(tz=timezone.utc) else: return date def parse_iso8601_date_to_utc(iso_date: str) -> datetime: """Given an ISO 8601 datetime string, parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed date Raises: swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - 2007-01-14T20:34:22Z """ try: date = parse_date(iso_date) return datetime_to_utc(date) except ParseError as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}" sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}" ret = re.sub(sha256_re, r"\1...", path) return re.sub(sha1_re, r"\1...", ret) def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M:%S UTC"): """Turns a string representation of an ISO 8601 datetime string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_iso8601_date_to_utc(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip("/").split("/") path_from_root = "" for p in sub_paths: path_from_root += "/" + p path_info.append({"name": p, "path": path_from_root.strip("/")}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components ).get_default_values() settings.report_level = report_level document = docutils.utils.new_document("rst-doc", settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: ip = x_forwarded_for.split(",")[0] else: ip = request.META.get("REMOTE_ADDR") return ip def is_swh_web_development(request: HttpRequest) -> bool: """Indicate if we are running a development version of swh-web.""" site_base_url = request.build_absolute_uri("/") return any( host in site_base_url for host in ("localhost", "127.0.0.1", "testserver") ) def is_swh_web_staging(request: HttpRequest) -> bool: """Indicate if we are running a staging version of swh-web.""" config = get_config() site_base_url = request.build_absolute_uri("/") return any( server_name in site_base_url for server_name in config["staging_server_names"] ) def is_swh_web_production(request: HttpRequest) -> bool: """Indicate if we are running the public production version of swh-web.""" return SWH_WEB_SERVER_NAME in request.build_absolute_uri("/") browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if ( hasattr(request, "user") and request.user.is_authenticated and not hasattr(request.user, "backend") ): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = "django.contrib.auth.backends.ModelBackend" return { "swh_object_icons": swh_object_icons, "available_languages": None, "swh_client_config": config["client_config"], "oidc_enabled": bool(config["keycloak"]["server_url"]), "browsers_supported_image_mimes": browsers_supported_image_mimes, "keycloak": config["keycloak"], "site_base_url": request.build_absolute_uri("/"), "DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"], "status": config["status"], "swh_web_dev": is_swh_web_development(request), "swh_web_staging": is_swh_web_staging(request), "swh_web_version": get_distribution("swh.web").version, "iframe_mode": False, "ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION, "ADD_FORGE_MODERATOR_PERMISSION": ADD_FORGE_MODERATOR_PERMISSION, "FEATURES": get_config()["features"], "MAILMAP_ADMIN_PERMISSION": MAILMAP_ADMIN_PERMISSION, } def resolve_branch_alias( snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch["target_type"] == "alias": if branch["target"] in snapshot["branches"]: branch = snapshot["branches"][branch["target"]] else: from swh.web.common import archive snp = archive.lookup_snapshot( snapshot["id"], branches_from=branch["target"], branches_count=1 ) if snp and branch["target"] in snp["branches"]: branch = snp["branches"][branch["target"]] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { "initial_header_level": 2, "halt_level": 4, "traceback": True, "file_insertion_enabled": False, "raw_enabled": False, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
{pp["html_body"]}
' def prettify_html(html: str) -> str: """ Prettify an HTML document. Args: html: Input HTML document Returns: The prettified HTML document """ return BeautifulSoup(html, "lxml").prettify() def django_cache( timeout: int = DEFAULT_TIMEOUT, catch_exception: bool = False, exception_return_value: Any = None, invalidate_cache_pred: Callable[[Any], bool] = lambda val: False, ): """Decorator to put the result of a function call in Django cache, subsequent calls will directly return the cached value. Args: timeout: The number of seconds value will be hold in cache catch_exception: If :const:`True`, any thrown exception by the decorated function will be caught and not reraised exception_return_value: The value to return if previous parameter is set to :const:`True` invalidate_cache_pred: A predicate function enabling to invalidate the cache under certain conditions, decorated function will then be called again Returns: The returned value of the decorated function for the specified parameters """ def inner(func): @functools.wraps(func) def wrapper(*args, **kwargs): func_args = args + (0,) + tuple(sorted(kwargs.items())) cache_key = str(hash((func.__module__, func.__name__) + func_args)) ret = cache.get(cache_key) if ret is None or invalidate_cache_pred(ret): try: ret = func(*args, **kwargs) except Exception as exc: if catch_exception: sentry_capture_exception(exc) return exception_return_value else: raise else: cache.set(cache_key, ret, timeout=timeout) return ret return wrapper return inner def _deposits_list_url( deposits_list_base_url: str, page_size: int, username: Optional[str] ) -> str: params = {"page_size": str(page_size)} if username is not None: params["username"] = username return f"{deposits_list_base_url}?{urllib.parse.urlencode(params)}" def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]: """Return the list of software deposits using swh-deposit API""" config = get_config()["deposit"] private_api_url = config["private_api_url"].rstrip("/") + "/" deposits_list_base_url = private_api_url + "deposits" deposits_list_auth = HTTPBasicAuth( config["private_api_user"], config["private_api_password"] ) deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=1, username=username ) nb_deposits = requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30 ).json()["count"] @django_cache(invalidate_cache_pred=lambda data: data["count"] != nb_deposits) def _get_deposits_data(): deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=nb_deposits, username=username ) return requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30, ).json() deposits_data = _get_deposits_data() return deposits_data["results"] _origin_visit_types_cache_timeout = 24 * 60 * 60 # 24 hours @django_cache( timeout=_origin_visit_types_cache_timeout, catch_exception=True, exception_return_value=[], ) def origin_visit_types() -> List[str]: """Return the exhaustive list of visit types for origins ingested into the archive. """ return sorted(search().visit_types_count().keys()) def redirect_to_new_route(request, new_route, permanent=True): """Redirect a request to another route with url args and query parameters eg: /origin//log?path=test can be redirected as /log?url=&path=test. This can be used to deprecate routes """ request_path = resolve(request.path_info) args = {**request_path.kwargs, **request.GET.dict()} return redirect( reverse(new_route, query_params=args), permanent=permanent, ) - - -def has_add_forge_now_permission(user) -> bool: - """Is a user considered an add-forge-now moderator? - - Returns - True if a user is staff or has add forge now moderator permission - - """ - return user.is_staff or user.has_perm(ADD_FORGE_MODERATOR_PERMISSION)