diff --git a/requirements-swh.txt b/requirements-swh.txt index 3dbc25c3..59e11c3d 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,9 +1,9 @@ -swh.auth[django] >= 0.5.3 +swh.auth[django] >= 0.6.7 swh.core >= 0.0.95 swh.counters >= 0.5.1 swh.indexer >= 2.0.0 swh.model >= 6.3.0 swh.scheduler >= 0.7.0 swh.search >= 0.16.0 swh.storage >= 1.4.0 swh.vault >= 1.0.0 diff --git a/swh/web/add_forge_now/admin_views.py b/swh/web/add_forge_now/admin_views.py index bdaefbc7..b1ce90c5 100644 --- a/swh/web/add_forge_now/admin_views.py +++ b/swh/web/add_forge_now/admin_views.py @@ -1,35 +1,34 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.conf import settings from django.contrib.auth.decorators import user_passes_test from django.shortcuts import render from swh.web.add_forge_now.models import RequestStatus from swh.web.auth.utils import is_add_forge_now_moderator -@user_passes_test(is_add_forge_now_moderator, login_url=settings.LOGIN_URL) +@user_passes_test(is_add_forge_now_moderator) def add_forge_now_requests_moderation_dashboard(request): """Moderation dashboard to allow listing current requests.""" return render( request, "add-forge-requests-moderation.html", {"heading": "Add forge now requests moderation"}, ) -@user_passes_test(is_add_forge_now_moderator, login_url=settings.LOGIN_URL) +@user_passes_test(is_add_forge_now_moderator) def add_forge_now_request_dashboard(request, request_id): """Moderation dashboard to allow listing current requests.""" return render( request, "add-forge-request-dashboard.html", { "request_id": request_id, "heading": "Add forge now request dashboard", "next_statuses_for": RequestStatus.next_statuses_str(), }, ) diff --git a/swh/web/add_forge_now/templates/add-forge-creation-form.html b/swh/web/add_forge_now/templates/add-forge-creation-form.html index 42433b32..50e465d0 100644 --- a/swh/web/add_forge_now/templates/add-forge-creation-form.html +++ b/swh/web/add_forge_now/templates/add-forge-creation-form.html @@ -1,129 +1,128 @@ {% extends "./add-forge-common.html" %} {% comment %} Copyright (C) 2022 The Software Heritage developers See the AUTHORS file at the top-level directory of this distribution License: GNU Affero General Public License version 3, or any later version See top-level LICENSE file for more information {% endcomment %} {% block tab_content %}
{% if not user.is_authenticated %}

You must be logged in to submit an add forge request. Please log in + href="{% url login_url %}?next={% url 'forge-add-create' %}"> + log in +

+ {% else %}
{% csrf_token %}
Supported forge types in software archive.
Remote URL of the forge.
Name of the forge administrator.
Email of the forge administrator. The given email address will not be used for any purpose outside the “add forge now” process.
Optionally, leave a comment to the moderator regarding your request.

Once an add-forge-request is submitted, its status can be viewed in the submitted requests list. This process involves a moderator approval and might take a few days to handle (it primarily depends on the response time from the forge).

{% endif %}
{% endblock %} diff --git a/swh/web/add_forge_now/views.py b/swh/web/add_forge_now/views.py index a2f41c10..3968d8f5 100644 --- a/swh/web/add_forge_now/views.py +++ b/swh/web/add_forge_now/views.py @@ -1,142 +1,137 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List -from django.conf import settings from django.contrib.auth.decorators import user_passes_test from django.core.paginator import Paginator from django.db.models import Q from django.http.request import HttpRequest from django.http.response import HttpResponse, JsonResponse from django.shortcuts import render from swh.web.add_forge_now.api_views import ( AddForgeNowRequestPublicSerializer, AddForgeNowRequestSerializer, ) from swh.web.add_forge_now.models import Request as AddForgeRequest from swh.web.add_forge_now.models import RequestHistory from swh.web.auth.utils import is_add_forge_now_moderator def add_forge_request_list_datatables(request: HttpRequest) -> HttpResponse: """Dedicated endpoint used by datatables to display the add-forge requests in the Web UI. """ draw = int(request.GET.get("draw", 0)) add_forge_requests = AddForgeRequest.objects.all() table_data: Dict[str, Any] = { "recordsTotal": add_forge_requests.count(), "draw": draw, } search_value = request.GET.get("search[value]") column_order = request.GET.get("order[0][column]") field_order = request.GET.get(f"columns[{column_order}][name]", "id") order_dir = request.GET.get("order[0][dir]", "desc") if field_order: if order_dir == "desc": field_order = "-" + field_order add_forge_requests = add_forge_requests.order_by(field_order) per_page = int(request.GET.get("length", 10)) page_num = int(request.GET.get("start", 0)) // per_page + 1 if search_value: add_forge_requests = add_forge_requests.filter( Q(forge_type__icontains=search_value) | Q(forge_url__icontains=search_value) | Q(status__icontains=search_value) ) if ( int(request.GET.get("user_requests_only", "0")) and request.user.is_authenticated ): add_forge_requests = add_forge_requests.filter( submitter_name=request.user.username ) paginator = Paginator(add_forge_requests, per_page) page = paginator.page(page_num) if is_add_forge_now_moderator(request.user): requests = AddForgeNowRequestSerializer(page.object_list, many=True).data else: requests = AddForgeNowRequestPublicSerializer(page.object_list, many=True).data results = [dict(req) for req in requests] table_data["recordsFiltered"] = add_forge_requests.count() table_data["data"] = results return JsonResponse(table_data) FORGE_TYPES: List[str] = [ "bitbucket", "cgit", "gitlab", "gitea", "heptapod", ] def create_request_create(request): """View to create a new 'add_forge_now' request.""" return render( request, "add-forge-creation-form.html", {"forge_types": FORGE_TYPES}, ) def create_request_list(request): """View to list existing 'add_forge_now' requests.""" return render( request, "add-forge-list.html", ) def create_request_help(request): """View to explain 'add_forge_now'.""" return render( request, "add-forge-help.html", ) -@user_passes_test( - is_add_forge_now_moderator, - redirect_field_name="next_path", - login_url=settings.LOGIN_URL, -) +@user_passes_test(is_add_forge_now_moderator) def create_request_message_source(request: HttpRequest, id: int) -> HttpResponse: """View to retrieve the message source for a given request history entry""" try: history_entry = RequestHistory.objects.select_related("request").get( pk=id, message_source__isnull=False ) assert history_entry.message_source is not None except RequestHistory.DoesNotExist: return HttpResponse(status=404) response = HttpResponse( bytes(history_entry.message_source), content_type="text/email" ) filename = f"add-forge-now-{history_entry.request.forge_domain}-message{id}.eml" response["Content-Disposition"] = f'attachment; filename="{filename}"' return response diff --git a/swh/web/api/templates/api.html b/swh/web/api/templates/api.html index edb12962..e2165db2 100644 --- a/swh/web/api/templates/api.html +++ b/swh/web/api/templates/api.html @@ -1,307 +1,311 @@ {% extends "layout.html" %} {% comment %} Copyright (C) 2015-2020 The Software Heritage developers See the AUTHORS file at the top-level directory of this distribution License: GNU Affero General Public License version 3, or any later version See top-level LICENSE file for more information {% endcomment %} {% block title %} Overview – Software Heritage API {% endblock %} {% block navbar-content %}

Web API

{% endblock %} {% block content %}

Endpoint index

You can jump directly to the endpoint index , which lists all available API functionalities, or read on for more general information about the API.

Data model

The Software Heritage project harvests publicly available source code by tracking software distribution channels such as version control systems, tarball releases, and distribution packages.

All retrieved source code and related metadata are stored in the Software Heritage archive, that is conceptually a Merkle DAG. All nodes in the graph are content-addressable, i.e., their node identifiers are computed by hashing their content and, transitively, that of all nodes reachable from them; and no node or edge is ever removed from the graph: the Software Heritage archive is an append-only data structure.

The following types of objects (i.e., graph nodes) can be found in the Software Heritage archive (for more information see the Software Heritage glossary):

Version

The current version of the API isv1.

Warning: this version of the API is not to be considered stable yet. Non-backward compatible changes might happen even without changing the API version number.

Schema

API access is over HTTPS.

All API endpoints are rooted at https://archive.softwareheritage.org/api/1/.

Data is sent and received as JSON by default.

Example:

Response format override

The response format can be overridden using the Accept request header. In particular, Accept: text/html (that web browsers send by default) requests HTML pretty-printing, whereas Accept: application/yaml requests YAML-encoded responses.

Example:

Parameters

Some API endpoints can be tweaked by passing optional parameters. For GET requests, optional parameters can be passed as an HTTP query string.

The optional parameter fields is accepted by all endpoints that return dictionaries and can be used to restrict the list of fields returned by the API, in case you are not interested in all of them. By default, all available fields are returned.

Example:

Errors

While API endpoints will return different kinds of errors depending on their own semantics, some error patterns are common across all endpoints.

Sending malformed data, including syntactically incorrect object identifiers, will result in a 400 Bad Request HTTP response. Example:

Requesting non existent resources will result in a 404 Not Found HTTP response. Example:

Unavailability of the underlying storage backend will result in a 503 Service Unavailable HTTP response.

UTF-8 decoding errors

While attempting to decode UTF-8 strings from raw bytes stored in the archive, some errors might happen when generating an API response. In that case, an extra field decoding_failures will be added to each concerned JSON object (possibly nested). It will contain the list of its key names where UTF-8 decoding failed.

A string that could not be decoded will have the bytes of its invalid UTF-8 sequences escaped as \\x<hex value>.

Pagination

Requests that might potentially return many items will be paginated.

Page size is set to a default (usually: 10 items), but might be overridden with the per_page query parameter up to a maximum (usually: 50 items). Example:

curl https://archive.softwareheritage.org/api/1/origin/1/visits/?per_page=2

To navigate through paginated results, a Link HTTP response header is available to link the current result page to the next one. Example:

curl -i https://archive.softwareheritage.org/api/1/origin/1/visits/?per_page=2 | grep ^Link:
 Link: </api/1/origin/1/visits/?last_visit=2&per_page=2>; rel="next",

Rate limiting

Due to limited resource availability on the back end side, API usage is currently rate limited. API users can be either anonymous or authenticated. For rate-limiting purposes, anonymous users are identified by their origin IP address; authenticated users identify themselves via user-specific credentials, like authentication tokens.
A higher rate-limit quota is available by default for authenticated users.

Three HTTP response fields will inform you about the current state of limits that apply to your current rate limiting bucket:

Example:

curl -i https://archive.softwareheritage.org/api/1/stat/counters/ | grep ^X-RateLimit
 X-RateLimit-Limit: 120
 X-RateLimit-Remaining: 119
 X-RateLimit-Reset: 1620639052
-

Authentication

-

- It is possible to perform authenticated requests to the Web API through the use of a bearer token - sent in HTTP Authorization headers. -
- To obtain such a token, an account to the - Software Heritage Authentication service must be created. -
- To generate and manage bearer tokens, a dedicated interface is available on the - user profile page once logged in. -

-

- The following shows how to perform an authenticated request to the Web API using curl. -

export TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhMTMxYTQ1My1hM2IyLTQwMTUtO...
-curl -H "Authorization: Bearer ${TOKEN}" {{ site_base_url }}api/...
-

-

- Authenticated requests can be used to lift rate limiting if the user account has the adequate - permission. - If you are in such a need, please contact us - and we will review your request. -

+ {% if oidc_enabled %} +

Authentication

+

+ It is possible to perform authenticated requests to the Web API through the use of a bearer token + sent in HTTP Authorization headers. +
+ To obtain such a token, an account to the + Software Heritage Authentication service must be created. +
+ To generate and manage bearer tokens, a dedicated interface is available on the + user profile page once logged in. +

+

+ The following shows how to perform an authenticated request to the Web API using curl. +

export TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhMTMxYTQ1My1hM2IyLTQwMTUtO...
+  curl -H "Authorization: Bearer ${TOKEN}" {{ site_base_url }}api/...
+

+

+ Authenticated requests can be used to lift rate limiting if the user account has the adequate + permission. + If you are in such a need, please contact us + and we will review your request. +

+ {% endif %}
{% endblock %} diff --git a/swh/web/auth/templates/logout.html b/swh/web/auth/templates/logout.html index 744e7337..67b57b2d 100644 --- a/swh/web/auth/templates/logout.html +++ b/swh/web/auth/templates/logout.html @@ -1,42 +1,38 @@ {% extends "layout.html" %} {% comment %} Copyright (C) 2018 The Software Heritage developers See the AUTHORS file at the top-level directory of this distribution License: GNU Affero General Public License version 3, or any later version See top-level LICENSE file for more information {% endcomment %} {% load static %} {% block title %}Logged out – Software Heritage archive {% endblock %} {% block navbar-content %}

Logged out

{% endblock %} {% block content %} -{% if 'next_path' in request.GET %} +{% if 'next' in request.GET %}

Your authenticated session expired so you have been automatically logged out.

{% else %}

You have been successfully logged out.

{% endif %}

-{% if oidc_enabled and 'remote_user' in request.GET %} -{% if 'next_path' in request.GET %} - +{% if 'next' in request.GET %} + {% else %} - -{% endif %} -{% else %} - + {% endif %} Log in again ?

-{% if 'next_path' in request.GET %} +{% if 'next' in request.GET %}

Or go back to - + previous page.

{% endif %} {% endblock %} diff --git a/swh/web/auth/urls.py b/swh/web/auth/urls.py index f08e033f..3308836d 100644 --- a/swh/web/auth/urls.py +++ b/swh/web/auth/urls.py @@ -1,52 +1,75 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.contrib.auth.views import LoginView, LogoutView from django.urls import re_path as url from swh.auth.django.views import urlpatterns as auth_urlpatterns from swh.web.auth.views import ( oidc_generate_bearer_token, oidc_generate_bearer_token_complete, oidc_get_bearer_token, oidc_list_bearer_tokens, oidc_profile_view, oidc_revoke_bearer_tokens, ) +from swh.web.config import get_config -urlpatterns = auth_urlpatterns + [ - url( - r"^oidc/generate-bearer-token/$", - oidc_generate_bearer_token, - name="oidc-generate-bearer-token", - ), - url( - r"^oidc/generate-bearer-token-complete/$", - oidc_generate_bearer_token_complete, - name="oidc-generate-bearer-token-complete", - ), - url( - r"^oidc/list-bearer-token/$", - oidc_list_bearer_tokens, - name="oidc-list-bearer-tokens", - ), - url( - r"^oidc/get-bearer-token/$", - oidc_get_bearer_token, - name="oidc-get-bearer-token", - ), - url( - r"^oidc/revoke-bearer-tokens/$", - oidc_revoke_bearer_tokens, - name="oidc-revoke-bearer-tokens", - ), +config = get_config() + +oidc_enabled = bool(config["keycloak"]["server_url"]) + +urlpatterns = [] + +if not oidc_enabled: + urlpatterns = [ + url( + r"^login/$", + LoginView.as_view(template_name="login.html"), + name="login", + ) + ] + +if oidc_enabled or config["e2e_tests_mode"]: + urlpatterns += auth_urlpatterns + [ + url( + r"^oidc/generate-bearer-token/$", + oidc_generate_bearer_token, + name="oidc-generate-bearer-token", + ), + url( + r"^oidc/generate-bearer-token-complete/$", + oidc_generate_bearer_token_complete, + name="oidc-generate-bearer-token-complete", + ), + url( + r"^oidc/list-bearer-token/$", + oidc_list_bearer_tokens, + name="oidc-list-bearer-tokens", + ), + url( + r"^oidc/get-bearer-token/$", + oidc_get_bearer_token, + name="oidc-get-bearer-token", + ), + url( + r"^oidc/revoke-bearer-tokens/$", + oidc_revoke_bearer_tokens, + name="oidc-revoke-bearer-tokens", + ), + url( + r"^oidc/profile/$", + oidc_profile_view, + name="oidc-profile", + ), + ] + +urlpatterns.append( url( - r"^oidc/profile/$", - oidc_profile_view, - name="oidc-profile", - ), - url(r"^login/$", LoginView.as_view(template_name="login.html"), name="login"), - url(r"^logout/$", LogoutView.as_view(template_name="logout.html"), name="logout"), -] + r"^logout/$", + LogoutView.as_view(template_name="logout.html"), + name="logout", + ) +) diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index 0efd1c99..297edacd 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,155 +1,155 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Union, cast from cryptography.fernet import InvalidToken from django.contrib.auth.decorators import login_required from django.core.paginator import Paginator from django.http import HttpRequest from django.http.response import ( HttpResponse, HttpResponseBadRequest, HttpResponseForbidden, HttpResponseRedirect, JsonResponse, ) from django.shortcuts import render from django.views.decorators.http import require_http_methods from swh.auth.django.models import OIDCUser from swh.auth.django.utils import keycloak_oidc_client from swh.auth.django.views import get_oidc_login_data, oidc_login_view from swh.auth.keycloak import KeycloakError, keycloak_error_message from swh.web.auth.models import OIDCUserOfflineTokens from swh.web.auth.utils import decrypt_data, encrypt_data from swh.web.config import get_config from swh.web.utils import reverse from swh.web.utils.exc import ForbiddenExc def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) return oidc_login_view( request, redirect_uri=redirect_uri, scope="openid offline_access" ) def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): raise ForbiddenExc("You are not allowed to generate bearer tokens.") if "error" in request.GET: raise Exception(request.GET["error"]) login_data = get_oidc_login_data(request) oidc_client = keycloak_oidc_client() oidc_profile = oidc_client.authorization_code( code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) user = cast(OIDCUser, request.user) token = oidc_profile["refresh_token"] secret = get_config()["secret_key"].encode() salt = user.sub.encode() encrypted_token = encrypt_data(token.encode(), secret, salt) OIDCUserOfflineTokens.objects.create( user_id=str(user.id), offline_token=encrypted_token ).save() return HttpResponseRedirect(reverse("oidc-profile") + "#tokens") def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() tokens = OIDCUserOfflineTokens.objects.filter(user_id=str(request.user.id)) tokens = tokens.order_by("-creation_date") length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(tokens, length) tokens_data = [ {"id": t.id, "creation_date": t.creation_date.isoformat()} for t in paginator.page(int(page)).object_list ] table_data: Dict[str, Any] = {} table_data["recordsTotal"] = len(tokens_data) table_data["draw"] = int(request.GET["draw"]) table_data["data"] = tokens_data table_data["recordsFiltered"] = len(tokens_data) return JsonResponse(table_data) def _encrypted_token_bytes(token: Union[bytes, memoryview]) -> bytes: # token has been retrieved from a PosgreSQL database if isinstance(token, memoryview): return token.tobytes() else: return token @require_http_methods(["POST"]) def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data( _encrypted_token_bytes(token_data.offline_token), secret, salt ) refresh_token = decrypted_token.decode("ascii") # check token is still valid oidc_client = keycloak_oidc_client() oidc_client.refresh_token(refresh_token) return HttpResponse(refresh_token, content_type="text/plain") except InvalidToken: return HttpResponse(status=401) except KeycloakError as ke: error_msg = keycloak_error_message(ke) if error_msg in ( "invalid_grant: Offline session not active", "invalid_grant: Offline user session not found", ): error_msg = "Bearer token has expired, please generate a new one." return HttpResponseBadRequest(error_msg, content_type="text/plain") @require_http_methods(["POST"]) def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) for token_id in data["token_ids"]: token_data = OIDCUserOfflineTokens.objects.get(id=token_id) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data( _encrypted_token_bytes(token_data.offline_token), secret, salt ) oidc_client = keycloak_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() return HttpResponse(status=200) except InvalidToken: return HttpResponse(status=401) -@login_required(login_url="/oidc/login/", redirect_field_name="next_path") +@login_required(login_url="oidc-login") def oidc_profile_view(request: HttpRequest) -> HttpResponse: return render(request, "profile.html") diff --git a/swh/web/deposit/urls.py b/swh/web/deposit/urls.py index 84e747d9..7f82bd9a 100644 --- a/swh/web/deposit/urls.py +++ b/swh/web/deposit/urls.py @@ -1,48 +1,47 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import requests from requests.auth import HTTPBasicAuth -from django.conf import settings from django.contrib.auth.decorators import user_passes_test from django.http import JsonResponse from django.shortcuts import render from django.urls import re_path as url from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION from swh.web.config import get_config def can_list_deposits(user): return user.is_staff or user.has_perm(ADMIN_LIST_DEPOSIT_PERMISSION) -@user_passes_test(can_list_deposits, login_url=settings.LOGIN_URL) +@user_passes_test(can_list_deposits) def admin_deposit(request): return render(request, "deposit-admin.html") -@user_passes_test(can_list_deposits, login_url=settings.LOGIN_URL) +@user_passes_test(can_list_deposits) def admin_deposit_list(request): config = get_config()["deposit"] private_api_url = config["private_api_url"].rstrip("/") + "/" deposits_list_url = private_api_url + "deposits/datatables/" deposits_list_auth = HTTPBasicAuth( config["private_api_user"], config["private_api_password"] ) deposits = requests.get( deposits_list_url, auth=deposits_list_auth, params=request.GET, timeout=30 ).json() return JsonResponse(deposits) urlpatterns = [ url(r"^admin/deposit/$", admin_deposit, name="admin-deposit"), url(r"^admin/deposit/list/$", admin_deposit_list, name="admin-deposit-list"), ] diff --git a/swh/web/save_code_now/templates/origin-save-help.html b/swh/web/save_code_now/templates/origin-save-help.html index 05c9fadb..115e5dcf 100644 --- a/swh/web/save_code_now/templates/origin-save-help.html +++ b/swh/web/save_code_now/templates/origin-save-help.html @@ -1,54 +1,54 @@ {% extends "./origin-save.html" %} {% comment %} Copyright (C) 2018-2021 The Software Heritage developers See the AUTHORS file at the top-level directory of this distribution License: GNU Affero General Public License version 3, or any later version See top-level LICENSE file for more information {% endcomment %} {% block tab_content %}

A "Save code now" request takes the following parameters:

Once submitted, your save request can either be:

Once a save request has been accepted, you can follow its current status in the submitted save requests list.
- If you submitted requests while authenticated, you will be able + If you submitted requests while authenticated, you will be able to only display your own requests.

{% endblock %} diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py index 5ae708a2..72f04e72 100644 --- a/swh/web/settings/common.py +++ b/swh/web/settings/common.py @@ -1,351 +1,364 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """ Django common settings for swh-web. """ from importlib.util import find_spec import os import sys from typing import Any, Dict from django.utils import encoding from swh.web.config import get_config # Fix django-js-reverse 0.9.1 compatibility with django 4.x # TODO: Remove that hack once a new django-js-reverse release # is available on PyPI if not hasattr(encoding, "force_text"): setattr(encoding, "force_text", encoding.force_str) swh_web_config = get_config() # Build paths inside the project like this: os.path.join(BASE_DIR, ...) PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = swh_web_config["secret_key"] # SECURITY WARNING: don't run with debug turned on in production! DEBUG = swh_web_config["debug"] DEBUG_PROPAGATE_EXCEPTIONS = swh_web_config["debug"] ALLOWED_HOSTS = ["127.0.0.1", "localhost"] + swh_web_config["allowed_hosts"] # Application definition SWH_BASE_DJANGO_APPS = [ "swh.web.webapp", "swh.web.auth", "swh.web.browse", "swh.web.utils", "swh.web.tests", "swh.web.api", ] SWH_EXTRA_DJANGO_APPS = [ app for app in swh_web_config["swh_extra_django_apps"] if app not in SWH_BASE_DJANGO_APPS ] # swh.web.api must be the last loaded application due to the way # its URLS are registered SWH_DJANGO_APPS = SWH_EXTRA_DJANGO_APPS + SWH_BASE_DJANGO_APPS INSTALLED_APPS = [ "django.contrib.admin", "django.contrib.auth", "django.contrib.contenttypes", "django.contrib.sessions", "django.contrib.messages", "django.contrib.staticfiles", "rest_framework", "webpack_loader", "django_js_reverse", "corsheaders", ] + SWH_DJANGO_APPS MIDDLEWARE = [ "django.middleware.security.SecurityMiddleware", "django.contrib.sessions.middleware.SessionMiddleware", "corsheaders.middleware.CorsMiddleware", "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", - "swh.auth.django.middlewares.OIDCSessionExpiredMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", "swh.web.utils.middlewares.ThrottlingHeadersMiddleware", "swh.web.utils.middlewares.ExceptionMiddleware", ] # Compress all assets (static ones and dynamically generated html) # served by django in a local development environment context. # In a production environment, assets compression will be directly # handled by web servers like apache or nginx. if swh_web_config["serve_assets"]: MIDDLEWARE.insert(0, "django.middleware.gzip.GZipMiddleware") ROOT_URLCONF = "swh.web.urls" SWH_APP_TEMPLATES = [os.path.join(PROJECT_DIR, "../templates")] # Add templates directory from each SWH Django application for app in SWH_DJANGO_APPS: try: app_spec = find_spec(app) assert app_spec is not None, f"Django application {app} not found !" assert app_spec.origin is not None SWH_APP_TEMPLATES.append( os.path.join(os.path.dirname(app_spec.origin), "templates") ) except ModuleNotFoundError: assert False, f"Django application {app} not found !" TEMPLATES = [ { "BACKEND": "django.template.backends.django.DjangoTemplates", "DIRS": SWH_APP_TEMPLATES, "APP_DIRS": True, "OPTIONS": { "context_processors": [ "django.template.context_processors.debug", "django.template.context_processors.request", "django.contrib.auth.context_processors.auth", "django.contrib.messages.context_processors.messages", "swh.web.utils.context_processor", ], "libraries": { "swh_templatetags": "swh.web.utils.swh_templatetags", }, }, }, ] DATABASES = { "default": { "ENGINE": "django.db.backends.sqlite3", "NAME": swh_web_config.get("development_db", ""), } } # Password validation # https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { "NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator", # noqa }, { "NAME": "django.contrib.auth.password_validation.MinimumLengthValidator", }, { "NAME": "django.contrib.auth.password_validation.CommonPasswordValidator", }, { "NAME": "django.contrib.auth.password_validation.NumericPasswordValidator", }, ] # Internationalization # https://docs.djangoproject.com/en/1.11/topics/i18n/ LANGUAGE_CODE = "en-us" TIME_ZONE = "UTC" USE_I18N = True USE_L10N = True USE_TZ = True # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/1.11/howto/static-files/ STATIC_URL = "/static/" # static folder location when swh-web has been installed with pip STATIC_DIR = os.path.join(sys.prefix, "share/swh/web/static") if not os.path.exists(STATIC_DIR): # static folder location when developping swh-web STATIC_DIR = os.path.join(PROJECT_DIR, "../../../static") STATICFILES_DIRS = [STATIC_DIR] INTERNAL_IPS = ["127.0.0.1"] throttle_rates = {} http_requests = ["GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"] throttling = swh_web_config["throttling"] for limiter_scope, limiter_conf in throttling["scopes"].items(): if "default" in limiter_conf["limiter_rate"]: throttle_rates[limiter_scope] = limiter_conf["limiter_rate"]["default"] # for backward compatibility else: throttle_rates[limiter_scope] = limiter_conf["limiter_rate"] # register sub scopes specific for HTTP request types for http_request in http_requests: if http_request in limiter_conf["limiter_rate"]: throttle_rates[limiter_scope + "_" + http_request.lower()] = limiter_conf[ "limiter_rate" ][http_request] REST_FRAMEWORK: Dict[str, Any] = { "DEFAULT_RENDERER_CLASSES": ( "rest_framework.renderers.JSONRenderer", "swh.web.api.renderers.YAMLRenderer", "rest_framework.renderers.TemplateHTMLRenderer", ), "DEFAULT_THROTTLE_CLASSES": ( "swh.web.api.throttling.SwhWebRateThrottle", "swh.web.api.throttling.SwhWebUserRateThrottle", ), "DEFAULT_THROTTLE_RATES": throttle_rates, "DEFAULT_AUTHENTICATION_CLASSES": [ "rest_framework.authentication.SessionAuthentication", "swh.auth.django.backends.OIDCBearerTokenAuthentication", ], "EXCEPTION_HANDLER": "swh.web.api.apiresponse.error_response_handler", } LOGGING = { "version": 1, "disable_existing_loggers": False, "filters": { "require_debug_false": { "()": "django.utils.log.RequireDebugFalse", }, "require_debug_true": { "()": "django.utils.log.RequireDebugTrue", }, }, "formatters": { "request": { "format": "[%(asctime)s] [%(levelname)s] %(request)s %(status_code)s", "datefmt": "%d/%b/%Y %H:%M:%S", }, "simple": { "format": "[%(asctime)s] [%(levelname)s] %(message)s", "datefmt": "%d/%b/%Y %H:%M:%S", }, "verbose": { "format": ( "[%(asctime)s] [%(levelname)s] %(name)s.%(funcName)s:%(lineno)s " "- %(message)s" ), "datefmt": "%d/%b/%Y %H:%M:%S", }, }, "handlers": { "console": { "level": "DEBUG", "filters": ["require_debug_true"], "class": "logging.StreamHandler", "formatter": "simple", }, "file": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "simple", }, "file_request": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "request", }, "console_verbose": { "level": "DEBUG", "filters": ["require_debug_true"], "class": "logging.StreamHandler", "formatter": "verbose", }, "file_verbose": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "verbose", }, "null": { "class": "logging.NullHandler", }, }, "loggers": { "": { "handlers": ["console_verbose", "file_verbose"], "level": "DEBUG" if DEBUG else "WARNING", }, "django": { "handlers": ["console"], "level": "DEBUG" if DEBUG else "WARNING", "propagate": False, }, "django.request": { "handlers": ["file_request"], "level": "DEBUG" if DEBUG else "WARNING", "propagate": False, }, "django.db.backends": {"handlers": ["null"], "propagate": False}, "django.utils.autoreload": { "level": "INFO", }, "swh.core.statsd": { "level": "INFO", }, "urllib3": { "level": "INFO", }, }, } WEBPACK_LOADER = { "DEFAULT": { "CACHE": False, "BUNDLE_DIR_NAME": "./", "STATS_FILE": os.path.join(STATIC_DIR, "webpack-stats.json"), "POLL_INTERVAL": 0.1, "TIMEOUT": None, "IGNORE": [".+\\.hot-update.js", ".+\\.map"], } } -LOGIN_URL = "/login/" +AUTHENTICATION_BACKENDS = [ + "django.contrib.auth.backends.ModelBackend", +] + +oidc_enabled = bool(get_config()["keycloak"]["server_url"]) + +if not oidc_enabled: + LOGIN_URL = "login" + LOGOUT_URL = "logout" +else: + LOGIN_URL = "oidc-login" + LOGOUT_URL = "oidc-logout" + AUTHENTICATION_BACKENDS.append( + "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend", + ) + MIDDLEWARE.insert( + MIDDLEWARE.index("django.contrib.auth.middleware.AuthenticationMiddleware") + 1, + "swh.auth.django.middlewares.OIDCSessionExpiredMiddleware", + ) + LOGIN_REDIRECT_URL = "swh-web-homepage" SESSION_ENGINE = "django.contrib.sessions.backends.cache" CACHES = { "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}, } JS_REVERSE_JS_MINIFY = False CORS_ORIGIN_ALLOW_ALL = True CORS_URLS_REGEX = r"^/(badge|api)/.*$" -AUTHENTICATION_BACKENDS = [ - "django.contrib.auth.backends.ModelBackend", - "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend", -] - OIDC_SWH_WEB_CLIENT_ID = "swh-web" SWH_AUTH_SERVER_URL = swh_web_config["keycloak"]["server_url"] SWH_AUTH_REALM_NAME = swh_web_config["keycloak"]["realm_name"] SWH_AUTH_CLIENT_ID = OIDC_SWH_WEB_CLIENT_ID SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW = "logout" DEFAULT_AUTO_FIELD = "django.db.models.AutoField" diff --git a/swh/web/settings/production.py b/swh/web/settings/production.py index f4053d19..6a3dfe62 100644 --- a/swh/web/settings/production.py +++ b/swh/web/settings/production.py @@ -1,71 +1,72 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """ Django production settings for swh-web. """ from .common import ( CACHES, DEBUG, MIDDLEWARE, REST_FRAMEWORK, WEBPACK_LOADER, swh_web_config, ) from .common import * # noqa MIDDLEWARE += [ "swh.web.utils.middlewares.HtmlMinifyMiddleware", ] if swh_web_config.get("throttling", {}).get("cache_uri"): CACHES.update( { "default": { "BACKEND": "django.core.cache.backends.memcached.MemcachedCache", "LOCATION": swh_web_config["throttling"]["cache_uri"], } } ) # Setup support for proxy headers USE_X_FORWARDED_HOST = True SECURE_PROXY_SSL_HEADER = ("HTTP_X_FORWARDED_PROTO", "https") # We're going through seven (or, in that case, 2) proxies thanks to Varnish REST_FRAMEWORK["NUM_PROXIES"] = 2 db_conf = swh_web_config["production_db"] if db_conf.get("name", "").startswith("postgresql://"): # poor man's support for dsn connection string... import psycopg2 with psycopg2.connect(db_conf.get("name")) as cnx: dsn_dict = cnx.get_dsn_parameters() db_conf["name"] = dsn_dict.get("dbname") db_conf["host"] = dsn_dict.get("host") db_conf["port"] = dsn_dict.get("port") db_conf["user"] = dsn_dict.get("user") db_conf["password"] = dsn_dict.get("password") # https://docs.djangoproject.com/en/1.10/ref/settings/#databases DATABASES = { "default": { "ENGINE": "django.db.backends.postgresql", "NAME": db_conf.get("name"), "HOST": db_conf.get("host"), "PORT": db_conf.get("port"), "USER": db_conf.get("user"), "PASSWORD": db_conf.get("password"), } } WEBPACK_LOADER["DEFAULT"]["CACHE"] = not DEBUG -LOGIN_URL = "/oidc/login/" -LOGIN_REDIRECT_URL = "/oidc/profile/" +LOGIN_URL = "oidc-login" +LOGIN_REDIRECT_URL = "oidc-profile" +LOGOUT_URL = "oidc-logout" diff --git a/swh/web/settings/tests.py b/swh/web/settings/tests.py index 53dd08c5..e2e600ce 100644 --- a/swh/web/settings/tests.py +++ b/swh/web/settings/tests.py @@ -1,161 +1,164 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """ Django tests settings for swh-web. """ import os import sys from swh.web.config import get_config scope1_limiter_rate = 3 scope1_limiter_rate_post = 1 scope2_limiter_rate = 5 scope2_limiter_rate_post = 2 scope3_limiter_rate = 1 scope3_limiter_rate_post = 1 save_origin_rate_post = 5 api_raw_object_rate = 5 swh_web_config = get_config() _pytest = "pytest" in sys.argv[0] or "PYTEST_XDIST_WORKER" in os.environ swh_web_config.update( { # enable django debug mode only when running pytest "debug": _pytest, "secret_key": "test", "history_counters_url": "", "throttling": { "cache_uri": None, "scopes": { "swh_api": { "limiter_rate": {"default": "60/min"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_api_origin_search": { "limiter_rate": {"default": "100/min"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_api_origin_visit_latest": { "limiter_rate": {"default": "6000/min"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_vault_cooking": { "limiter_rate": {"default": "120/h", "GET": "60/m"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_save_origin": { "limiter_rate": { "default": "120/h", "POST": "%s/h" % save_origin_rate_post, } }, "swh_raw_object": { "limiter_rate": {"default": f"{api_raw_object_rate}/h"}, }, "scope1": { "limiter_rate": { "default": "%s/min" % scope1_limiter_rate, "POST": "%s/min" % scope1_limiter_rate_post, } }, "scope2": { "limiter_rate": { "default": "%s/min" % scope2_limiter_rate, "POST": "%s/min" % scope2_limiter_rate_post, } }, "scope3": { "limiter_rate": { "default": "%s/min" % scope3_limiter_rate, "POST": "%s/min" % scope3_limiter_rate_post, }, "exempted_networks": ["127.0.0.0/8"], }, }, }, "keycloak": { # disable keycloak use when not running pytest "server_url": "http://localhost:8080/auth/" if _pytest else "", "realm_name": "SoftwareHeritage", }, "swh_extra_django_apps": [ "swh.web.add_forge_now", "swh.web.archive_coverage", "swh.web.badges", "swh.web.banners", "swh.web.deposit", "swh.web.inbound_email", "swh.web.jslicenses", "swh.web.mailmap", "swh.web.metrics", "swh.web.save_code_now", "swh.web.vault", ], } ) from .common import * # noqa from .common import LOGGING # noqa, isort: skip ALLOWED_HOSTS = ["*"] DATABASES = { "default": { "ENGINE": "django.db.backends.postgresql", "NAME": swh_web_config["test_db"]["name"], } } # when running cypress tests, make the webapp fetch data from memory storages if not _pytest: swh_web_config.update( { "debug": True, "e2e_tests_mode": True, # ensure scheduler not available to avoid side effects in cypress tests "scheduler": {"cls": "remote", "url": ""}, } ) from django.conf import settings from swh.web.tests.data import get_tests_data, override_storages test_data = get_tests_data() override_storages( test_data["storage"], test_data["idx_storage"], test_data["search"], test_data["counters"], ) # using sqlite3 for frontend tests build_id = os.environ.get("CYPRESS_PARALLEL_BUILD_ID", "") settings.DATABASES["default"].update( { "ENGINE": "django.db.backends.sqlite3", "NAME": f"swh-web-test{build_id}.sqlite3", } ) # to prevent "database is locked" error when running cypress tests from django.db.backends.signals import connection_created def activate_wal_journal_mode(sender, connection, **kwargs): cursor = connection.cursor() cursor.execute("PRAGMA journal_mode = WAL;") connection_created.connect(activate_wal_journal_mode) else: # Silent DEBUG output when running unit tests LOGGING["handlers"]["console"]["level"] = "INFO" # type: ignore + +LOGIN_URL = "login" if not _pytest else "oidc-login" +LOGOUT_URL = "logout" if not _pytest else "oidc-logout" diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py index 5a02662e..560ddc0c 100644 --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,313 +1,313 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from urllib.parse import urljoin, urlparse import uuid import pytest from django.conf import settings from django.http import QueryDict from swh.auth.keycloak import KeycloakError from swh.web.auth.models import OIDCUserOfflineTokens from swh.web.auth.utils import decrypt_data from swh.web.config import get_config from swh.web.tests.django_asserts import assert_contains from swh.web.tests.helpers import ( check_html_get_response, check_http_get_response, check_http_post_response, ) from swh.web.utils import reverse from swh.web.webapp.urls import default_view as homepage_view def _check_oidc_login_code_flow_data( request, response, keycloak_oidc, redirect_uri, scope="openid" ): parsed_url = urlparse(response["location"]) authorization_url = keycloak_oidc.well_known()["authorization_endpoint"] query_dict = QueryDict(parsed_url.query) # check redirect url is valid assert urljoin(response["location"], parsed_url.path) == authorization_url assert "client_id" in query_dict assert query_dict["client_id"] == settings.OIDC_SWH_WEB_CLIENT_ID assert "response_type" in query_dict assert query_dict["response_type"] == "code" assert "redirect_uri" in query_dict assert query_dict["redirect_uri"] == redirect_uri assert "code_challenge_method" in query_dict assert query_dict["code_challenge_method"] == "S256" assert "scope" in query_dict assert query_dict["scope"] == scope assert "state" in query_dict assert "code_challenge" in query_dict # check a login_data has been registered in user session assert "login_data" in request.session login_data = request.session["login_data"] assert "code_verifier" in login_data assert "state" in login_data assert "redirect_uri" in login_data assert login_data["redirect_uri"] == query_dict["redirect_uri"] return login_data def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default assert not hasattr(request, "user") response = homepage_view(request) assert response.status_code == 200 def test_oidc_generate_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-generate-bearer-token") check_http_get_response(client, url, status_code=403) def _generate_and_test_bearer_token(client, kc_oidc_mock): # user authenticates client.login( code="code", code_verifier="code-verifier", redirect_uri="redirect-uri" ) # user initiates bearer token generation flow url = reverse("oidc-generate-bearer-token") response = check_http_get_response(client, url, status_code=302) request = response.wsgi_request redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) # check login data and redirection to Keycloak is valid login_data = _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri=redirect_uri, scope="openid offline_access", ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-generate-bearer-token-complete' view # to get and save bearer token # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) token_complete_url = reverse( "oidc-generate-bearer-token-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) nb_tokens = len(OIDCUserOfflineTokens.objects.all()) response = check_http_get_response(client, token_complete_url, status_code=302) request = response.wsgi_request # check token has been generated and saved encrypted to database assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1 encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token.tobytes() secret = get_config()["secret_key"].encode() salt = request.user.sub.encode() decrypted_token = decrypt_data(encrypted_token, secret, salt) oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri) assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"] # should redirect to tokens management Web UI assert response["location"] == reverse("oidc-profile") + "#tokens" return decrypted_token @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_oidc): """ Authenticated user should be able to generate a bearer token using OIDC Authorization Code Flow. """ _generate_and_test_bearer_token(client, keycloak_oidc) def test_oidc_list_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) check_http_get_response(client, url, status_code=403) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_oidc_list_bearer_tokens(client, keycloak_oidc): """ User with correct credentials should be allowed to list his tokens. """ nb_tokens = 3 for _ in range(nb_tokens): _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) response = check_http_get_response(client, url, status_code=200) tokens_data = list(reversed(json.loads(response.content.decode("utf-8"))["data"])) for oidc_token in OIDCUserOfflineTokens.objects.all(): assert ( oidc_token.creation_date.isoformat() == tokens_data[oidc_token.id - 1]["creation_date"] ) def test_oidc_get_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-get-bearer-token") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_oidc_get_bearer_token(client, keycloak_oidc): """ User with correct credentials should be allowed to display a token. """ nb_tokens = 3 for i in range(nb_tokens): token = _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse("oidc-get-bearer-token") response = check_http_post_response( client, url, status_code=200, data={"token_id": i + 1}, content_type="text/plain", ) assert response.content == token @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_oidc_get_bearer_token_expired_token(client, keycloak_oidc): """ User with correct credentials should be allowed to display a token. """ _generate_and_test_bearer_token(client, keycloak_oidc) for kc_err_msg in ("Offline session not active", "Offline user session not found"): kc_error_dict = { "error": "invalid_grant", "error_description": kc_err_msg, } keycloak_oidc.refresh_token.side_effect = KeycloakError( error_message=json.dumps(kc_error_dict).encode(), response_code=400 ) url = reverse("oidc-get-bearer-token") response = check_http_post_response( client, url, status_code=400, data={"token_id": 1}, content_type="text/plain", ) assert ( response.content == b"Bearer token has expired, please generate a new one." ) def test_oidc_revoke_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-revoke-bearer-tokens") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_oidc_revoke_bearer_tokens(client, keycloak_oidc): """ User with correct credentials should be allowed to revoke tokens. """ nb_tokens = 3 for _ in range(nb_tokens): _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse("oidc-revoke-bearer-tokens") check_http_post_response( client, url, status_code=200, data={"token_ids": [1]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 2 check_http_post_response( client, url, status_code=200, data={"token_ids": [2, 3]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 0 def test_oidc_profile_view_anonymous_user(client): """ Non authenticated users should be redirected to login page when requesting profile view. """ url = reverse("oidc-profile") - login_url = reverse("oidc-login", query_params={"next_path": url}) + login_url = reverse("oidc-login", query_params={"next": url}) resp = check_http_get_response(client, url, status_code=302) assert resp["location"] == login_url @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_oidc_profile_view(client, keycloak_oidc): """ Authenticated users should be able to request the profile page and link to Keycloak account UI should be present. """ url = reverse("oidc-profile") kc_config = get_config()["keycloak"] client_permissions = ["perm1", "perm2"] keycloak_oidc.client_permissions = client_permissions client.login(code="", code_verifier="", redirect_uri="") resp = check_html_get_response( client, url, status_code=200, template_used="profile.html" ) user = resp.wsgi_request.user kc_account_url = ( f"{kc_config['server_url']}realms/{kc_config['realm_name']}/account/" ) assert_contains(resp, kc_account_url) assert_contains(resp, user.username) assert_contains(resp, user.first_name) assert_contains(resp, user.last_name) assert_contains(resp, user.email) for perm in client_permissions: assert_contains(resp, perm) diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py index d7c94baf..245b7c99 100644 --- a/swh/web/tests/conftest.py +++ b/swh/web/tests/conftest.py @@ -1,1254 +1,1261 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from datetime import timedelta import functools from importlib import import_module, reload import json import os import random import shutil import sys import time from typing import Any, Dict, List, Optional from _pytest.python import Function from hypothesis import HealthCheck from hypothesis import settings as hypothesis_settings import pytest from pytest_django.fixtures import SettingsWrapper from django.conf import settings from django.contrib.auth.models import User from django.core.cache import cache from django.test.utils import setup_databases from django.urls import clear_url_caches from rest_framework.test import APIClient, APIRequestFactory from swh.model.hashutil import ( ALGORITHMS, DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex, ) from swh.model.model import Content, Directory from swh.model.swhids import CoreSWHID, ObjectType from swh.scheduler.tests.common import TASK_TYPES from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.algos.revisions_walker import get_revisions_walker from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest from swh.web.auth.utils import ( ADD_FORGE_MODERATOR_PERMISSION, MAILMAP_ADMIN_PERMISSION, MAILMAP_PERMISSION, ) from swh.web.config import get_config from swh.web.save_code_now.origin_save import get_scheduler_load_task_types from swh.web.tests.data import ( get_tests_data, override_storages, random_content, random_sha1, random_sha1_bytes, random_sha256, ) from swh.web.tests.helpers import create_django_permission from swh.web.utils import browsers_supported_image_mimes, converters from swh.web.utils.typing import OriginVisitInfo os.environ["LC_ALL"] = "C.UTF-8" fossology_missing = shutil.which("nomossa") is None # Register some hypothesis profiles hypothesis_settings.register_profile("default", hypothesis_settings()) # we use getattr here to keep mypy happy regardless hypothesis version function_scoped_fixture_check = ( [getattr(HealthCheck, "function_scoped_fixture")] if hasattr(HealthCheck, "function_scoped_fixture") else [] ) suppress_health_check = [ HealthCheck.too_slow, HealthCheck.filter_too_much, ] + function_scoped_fixture_check hypothesis_settings.register_profile( "swh-web", hypothesis_settings( deadline=None, suppress_health_check=suppress_health_check, ), ) hypothesis_settings.register_profile( "swh-web-fast", hypothesis_settings( deadline=None, max_examples=5, suppress_health_check=suppress_health_check, ), ) def pytest_addoption(parser): parser.addoption("--swh-web-random-seed", action="store", default=None) def pytest_configure(config): # Use fast hypothesis profile by default if none has been # explicitly specified in pytest option if config.getoption("--hypothesis-profile") is None: hypothesis_settings.load_profile("swh-web-fast") # Small hack in order to be able to run the unit tests # without static assets generated by webpack. # Those assets are not really needed for the Python tests # but the django templates will fail to load due to missing # generated file webpack-stats.json describing the js and css # files to include. # So generate a dummy webpack-stats.json file to overcome # that issue. test_dir = os.path.dirname(__file__) # location of the static folder when running tests through tox data_dir = os.path.join(sys.prefix, "share/swh/web") static_dir = os.path.join(data_dir, "static") if not os.path.exists(static_dir): # location of the static folder when running tests locally with pytest static_dir = os.path.join(test_dir, "../../../static") webpack_stats = os.path.join(static_dir, "webpack-stats.json") if os.path.exists(webpack_stats): return django_apps_dir = os.path.join(test_dir, "../../../swh/web") if not os.path.exists(django_apps_dir): # location of the applications folder when running tests with tox django_apps_dir = os.path.join(data_dir, "swh/web") bundles = [] _, apps, _ = next(os.walk(django_apps_dir)) for app in apps: app_assets_dir = os.path.join(django_apps_dir, app, "assets") if os.path.exists(app_assets_dir): if os.path.exists(os.path.join(app_assets_dir, "index.js")): bundles.append(app) else: _, app_bundles, _ = next(os.walk(app_assets_dir)) for app_bundle in app_bundles: if os.path.exists( os.path.join(app_assets_dir, app_bundle, "index.js") ): bundles.append(app_bundle) print(bundles) mock_webpack_stats = { "status": "done", "publicPath": "/static", "chunks": {}, "assets": {}, } for bundle in bundles: asset = f"js/{bundle}.js" mock_webpack_stats["chunks"][bundle] = [asset] mock_webpack_stats["assets"][asset] = { "name": asset, "publicPath": f"/static/{asset}", } with open(webpack_stats, "w") as outfile: json.dump(mock_webpack_stats, outfile) _swh_web_custom_section = "swh-web custom section" _random_seed_cache_key = "swh-web/random-seed" @pytest.fixture(scope="function", autouse=True) def random_seed(pytestconfig): state = random.getstate() seed = pytestconfig.getoption("--swh-web-random-seed") if seed is None: seed = time.time() seed = int(seed) cache.set(_random_seed_cache_key, seed) random.seed(seed) yield seed random.setstate(state) def pytest_report_teststatus(report, *args): if report.when == "call" and report.outcome == "failed": seed = cache.get(_random_seed_cache_key, None) line = ( f'FAILED {report.nodeid}: Use "pytest --swh-web-random-seed={seed} ' f'{report.nodeid}" to reproduce that test failure with same inputs' ) report.sections.append((_swh_web_custom_section, line)) def pytest_terminal_summary(terminalreporter, *args): reports = terminalreporter.getreports("failed") content = os.linesep.join( text for report in reports for secname, text in report.sections if secname == _swh_web_custom_section ) if content: terminalreporter.ensure_newline() terminalreporter.section(_swh_web_custom_section, sep="-", blue=True, bold=True) terminalreporter.line(content) # Clear Django cache before each test @pytest.fixture(autouse=True) def django_cache_cleared(): cache.clear() # Alias rf fixture from pytest-django @pytest.fixture def request_factory(rf): return rf # Fixture to get test client from Django REST Framework @pytest.fixture def api_client(): return APIClient() # Fixture to get API request factory from Django REST Framework @pytest.fixture def api_request_factory(): return APIRequestFactory() # Initialize tests data @pytest.fixture(scope="function", autouse=True) def tests_data(): data = get_tests_data(reset=True) # Update swh-web configuration to use the in-memory storages # instantiated in the tests.data module override_storages( data["storage"], data["idx_storage"], data["search"], data["counters"] ) return data @pytest.fixture(scope="function") def sha1(): """Fixture returning a valid hexadecimal sha1 value.""" return random_sha1() @pytest.fixture(scope="function") def invalid_sha1(): """Fixture returning an invalid sha1 representation.""" return hash_to_hex(bytes(random.randint(0, 255) for _ in range(50))) @pytest.fixture(scope="function") def sha256(): """Fixture returning a valid hexadecimal sha256 value.""" return random_sha256() def _known_swh_objects(tests_data, object_type): return tests_data[object_type] @pytest.fixture(scope="function") def content(tests_data): """Fixture returning a random content ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "contents")) @pytest.fixture(scope="function") def contents(tests_data): """Fixture returning random contents ingested into the test archive.""" return random.choices( _known_swh_objects(tests_data, "contents"), k=random.randint(2, 8) ) def _new_content(tests_data): while True: new_content = random_content() sha1_bytes = hash_to_bytes(new_content["sha1"]) if tests_data["storage"].content_get_data(sha1_bytes) is None: return new_content @pytest.fixture(scope="function") def unknown_content(tests_data): """Fixture returning a random content not ingested into the test archive.""" return _new_content(tests_data) @pytest.fixture(scope="function") def unknown_contents(tests_data): """Fixture returning random contents not ingested into the test archive.""" new_contents = [] new_content_ids = set() nb_contents = random.randint(2, 8) while len(new_contents) != nb_contents: new_content = _new_content(tests_data) if new_content["sha1"] not in new_content_ids: new_contents.append(new_content) new_content_ids.add(new_content["sha1"]) return list(new_contents) @pytest.fixture(scope="function") def empty_content(): """Fixture returning the empty content ingested into the test archive.""" empty_content = Content.from_data(data=b"").to_dict() for algo in DEFAULT_ALGORITHMS: empty_content[algo] = hash_to_hex(empty_content[algo]) return empty_content @functools.lru_cache(maxsize=None) def _content_text(): return list( filter( lambda c: c["mimetype"].startswith("text/"), _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_text(): """ Fixture returning a random textual content ingested into the test archive. """ return random.choice(_content_text()) @functools.lru_cache(maxsize=None) def _content_text_non_utf8(): return list( filter( lambda c: c["mimetype"].startswith("text/") and c["encoding"] not in ("utf-8", "us-ascii"), _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_text_non_utf8(): """Fixture returning a random textual content not encoded to UTF-8 ingested into the test archive. """ return random.choice(_content_text_non_utf8()) @functools.lru_cache(maxsize=None) def _content_application_no_highlight(): return list( filter( lambda c: c["mimetype"].startswith("application/") and c["hljs_language"] == "plaintext", _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_application_no_highlight(): """Fixture returning a random textual content with mimetype starting with application/ and no detected programming language to highlight ingested into the test archive. """ return random.choice(_content_application_no_highlight()) @functools.lru_cache(maxsize=None) def _content_text_no_highlight(): return list( filter( lambda c: c["mimetype"].startswith("text/") and c["hljs_language"] == "plaintext", _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_text_no_highlight(): """Fixture returning a random textual content with no detected programming language to highlight ingested into the test archive. """ return random.choice(_content_text_no_highlight()) @functools.lru_cache(maxsize=None) def _content_image_type(): return list( filter( lambda c: c["mimetype"] in browsers_supported_image_mimes, _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_image_type(): """Fixture returning a random image content ingested into the test archive.""" return random.choice(_content_image_type()) @functools.lru_cache(maxsize=None) def _content_unsupported_image_type_rendering(): return list( filter( lambda c: c["mimetype"].startswith("image/") and c["mimetype"] not in browsers_supported_image_mimes, _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_unsupported_image_type_rendering(): """Fixture returning a random image content ingested into the test archive that can not be rendered by browsers. """ return random.choice(_content_unsupported_image_type_rendering()) @functools.lru_cache(maxsize=None) def _content_utf8_detected_as_binary(): def utf8_binary_detected(content): if content["encoding"] != "binary": return False try: content["raw_data"].decode("utf-8") except Exception: return False else: return True return list( filter(utf8_binary_detected, _known_swh_objects(get_tests_data(), "contents")) ) @pytest.fixture(scope="function") def content_utf8_detected_as_binary(): """Fixture returning a random textual content detected as binary by libmagic while they are valid UTF-8 encoded files. """ return random.choice(_content_utf8_detected_as_binary()) @pytest.fixture(scope="function") def directory(tests_data): """Fixture returning a random directory ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "directories")) @functools.lru_cache(maxsize=None) def _directory_with_entry_type(type_): tests_data = get_tests_data() return list( filter( lambda d: any( [ e["type"] == type_ for e in list(tests_data["storage"].directory_ls(hash_to_bytes(d))) ] ), _known_swh_objects(tests_data, "directories"), ) ) @pytest.fixture(scope="function") def directory_with_subdirs(): """Fixture returning a random directory containing sub directories ingested into the test archive. """ return random.choice(_directory_with_entry_type("dir")) @pytest.fixture(scope="function") def directory_with_files(): """Fixture returning a random directory containing at least one regular file.""" return random.choice(_directory_with_entry_type("file")) @pytest.fixture(scope="function") def unknown_directory(tests_data): """Fixture returning a random directory not ingested into the test archive.""" while True: new_directory = random_sha1() sha1_bytes = hash_to_bytes(new_directory) if list(tests_data["storage"].directory_missing([sha1_bytes])): return new_directory @pytest.fixture(scope="function") def empty_directory(): """Fixture returning the empty directory ingested into the test archive.""" return Directory(entries=()).id.hex() @pytest.fixture(scope="function") def revision(tests_data): """Fixturereturning a random revision ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "revisions")) @pytest.fixture(scope="function") def revisions(tests_data): """Fixture returning random revisions ingested into the test archive.""" return random.choices( _known_swh_objects(tests_data, "revisions"), k=random.randint(2, 8), ) @pytest.fixture(scope="function") def revisions_list(tests_data): """Fixture returning random revisions ingested into the test archive.""" def gen_revisions_list(size): return random.choices( _known_swh_objects(tests_data, "revisions"), k=size, ) return gen_revisions_list @pytest.fixture(scope="function") def unknown_revision(tests_data): """Fixture returning a random revision not ingested into the test archive.""" while True: new_revision = random_sha1() sha1_bytes = hash_to_bytes(new_revision) if tests_data["storage"].revision_get([sha1_bytes])[0] is None: return new_revision def _get_origin_dfs_revisions_walker(tests_data): storage = tests_data["storage"] origin = random.choice(tests_data["origins"][:-1]) snapshot = snapshot_get_latest(storage, origin["url"]) if snapshot.branches[b"HEAD"].target_type.value == "alias": target = snapshot.branches[b"HEAD"].target head = snapshot.branches[target].target else: head = snapshot.branches[b"HEAD"].target return get_revisions_walker("dfs", storage, head) @functools.lru_cache(maxsize=None) def _ancestor_revisions_data(): # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker(get_tests_data()) master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) if not init_rev_found: master_revisions.append(rev) if not rev["parents"]: init_rev_found = True return master_revisions, children @pytest.fixture(scope="function") def ancestor_revisions(): """Fixture returning a pair of revisions ingested into the test archive with an ancestor relation. """ master_revisions, children = _ancestor_revisions_data() # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions) - 1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev["id"]] return { "sha1_git_root": hash_to_hex(root_rev["id"]), "sha1_git": hash_to_hex(ancestor_rev["id"]), "children": [hash_to_hex(r) for r in ancestor_child_revs], } @functools.lru_cache(maxsize=None) def _non_ancestor_revisions_data(): # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker(get_tests_data()) merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev["parents"]) > 1: merge_revs.append(rev) for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) return merge_revs, children @pytest.fixture(scope="function") def non_ancestor_revisions(): """Fixture returning a pair of revisions ingested into the test archive with no ancestor relation. """ merge_revs, children = _non_ancestor_revisions_data() # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev["parents"]): selected_revs = merge_rev["parents"] return { "sha1_git_root": hash_to_hex(selected_revs[0]), "sha1_git": hash_to_hex(selected_revs[1]), } @pytest.fixture(scope="function") def revision_with_submodules(): """Fixture returning a revision that is known to point to a directory with revision entries (aka git submodules) """ return { "rev_sha1_git": "ffcb69001f3f6745dfd5b48f72ab6addb560e234", "rev_dir_sha1_git": "d92a21446387fa28410e5a74379c934298f39ae2", "rev_dir_rev_path": "libtess2", } @pytest.fixture(scope="function") def release(tests_data): """Fixture returning a random release ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "releases")) @pytest.fixture(scope="function") def releases(tests_data): """Fixture returning random releases ingested into the test archive.""" return random.choices( _known_swh_objects(tests_data, "releases"), k=random.randint(2, 8) ) @pytest.fixture(scope="function") def unknown_release(tests_data): """Fixture returning a random release not ingested into the test archive.""" while True: new_release = random_sha1() sha1_bytes = hash_to_bytes(new_release) if tests_data["storage"].release_get([sha1_bytes])[0] is None: return new_release @pytest.fixture(scope="function") def snapshot(tests_data): """Fixture returning a random snapshot ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "snapshots")) @pytest.fixture(scope="function") def unknown_snapshot(tests_data): """Fixture returning a random snapshot not ingested into the test archive.""" while True: new_snapshot = random_sha1() sha1_bytes = hash_to_bytes(new_snapshot) if tests_data["storage"].snapshot_get_branches(sha1_bytes) is None: return new_snapshot @pytest.fixture(scope="function") def origin(tests_data): """Fixture returning a random origin ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "origins")) @functools.lru_cache(maxsize=None) def _origin_with_multiple_visits(): tests_data = get_tests_data() origins = [] storage = tests_data["storage"] for origin in tests_data["origins"]: visit_page = storage.origin_visit_get(origin["url"]) if len(visit_page.results) > 1: origins.append(origin) return origins @pytest.fixture(scope="function") def origin_with_multiple_visits(): """Fixture returning a random origin with multiple visits ingested into the test archive. """ return random.choice(_origin_with_multiple_visits()) @functools.lru_cache(maxsize=None) def _origin_with_releases(): tests_data = get_tests_data() origins = [] for origin in tests_data["origins"]: snapshot = snapshot_get_latest(tests_data["storage"], origin["url"]) if any([b.target_type.value == "release" for b in snapshot.branches.values()]): origins.append(origin) return origins @pytest.fixture(scope="function") def origin_with_releases(): """Fixture returning a random origin with releases ingested into the test archive.""" return random.choice(_origin_with_releases()) @functools.lru_cache(maxsize=None) def _origin_with_pull_request_branches(): tests_data = get_tests_data() origins = [] storage = tests_data["storage"] for origin in storage.origin_list(limit=1000).results: snapshot = snapshot_get_latest(storage, origin.url) if any([b"refs/pull/" in b for b in snapshot.branches]): origins.append(origin) return origins @pytest.fixture(scope="function") def origin_with_pull_request_branches(): """Fixture returning a random origin with pull request branches ingested into the test archive. """ return random.choice(_origin_with_pull_request_branches()) @functools.lru_cache(maxsize=None) def _object_type_swhid(object_type): return list( filter( lambda swhid: swhid.object_type == object_type, _known_swh_objects(get_tests_data(), "swhids"), ) ) @pytest.fixture(scope="function") def content_swhid(): """Fixture returning a qualified SWHID for a random content object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.CONTENT)) @pytest.fixture(scope="function") def directory_swhid(): """Fixture returning a qualified SWHID for a random directory object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.DIRECTORY)) @pytest.fixture(scope="function") def release_swhid(): """Fixture returning a qualified SWHID for a random release object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.RELEASE)) @pytest.fixture(scope="function") def revision_swhid(): """Fixture returning a qualified SWHID for a random revision object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.REVISION)) @pytest.fixture(scope="function") def snapshot_swhid(): """Fixture returning a qualified SWHID for a snapshot object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.SNAPSHOT)) @pytest.fixture(scope="function", params=list(ObjectType)) def unknown_core_swhid(request) -> CoreSWHID: """Fixture returning an unknown core SWHID. Tests using this will be called once per object type. """ return CoreSWHID( object_type=request.param, object_id=random_sha1_bytes(), ) # Fixture to manipulate data from a sample archive used in the tests @pytest.fixture(scope="function") def archive_data(tests_data): return _ArchiveData(tests_data) # Fixture to manipulate indexer data from a sample archive used in the tests @pytest.fixture(scope="function") def indexer_data(tests_data): return _IndexerData(tests_data) # Custom data directory for requests_mock @pytest.fixture def datadir(): return os.path.join(os.path.abspath(os.path.dirname(__file__)), "resources") class _ArchiveData: """ Helper class to manage data from a sample test archive. It is initialized with a reference to an in-memory storage containing raw tests data. It is basically a proxy to Storage interface but it overrides some methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.storage = tests_data["storage"] def __getattr__(self, key): if key == "storage": raise AttributeError(key) # Forward calls to non overridden Storage methods to wrapped # storage instance return getattr(self.storage, key) def content_find(self, content: Dict[str, Any]) -> Dict[str, Any]: cnt_ids_bytes = { algo_hash: hash_to_bytes(content[algo_hash]) for algo_hash in ALGORITHMS if content.get(algo_hash) } cnt = self.storage.content_find(cnt_ids_bytes) return converters.from_content(cnt[0].to_dict()) if cnt else cnt def content_get(self, cnt_id: str) -> Dict[str, Any]: cnt_id_bytes = hash_to_bytes(cnt_id) content = self.storage.content_get([cnt_id_bytes])[0] if content: content_d = content.to_dict() content_d.pop("ctime", None) else: content_d = None return converters.from_swh( content_d, hashess={"sha1", "sha1_git", "sha256", "blake2s256"} ) def content_get_data(self, cnt_id: str) -> Optional[Dict[str, Any]]: cnt_id_bytes = hash_to_bytes(cnt_id) cnt_data = self.storage.content_get_data(cnt_id_bytes) if cnt_data is None: return None return converters.from_content({"data": cnt_data, "sha1": cnt_id_bytes}) def directory_get(self, dir_id): return {"id": dir_id, "content": self.directory_ls(dir_id)} def directory_ls(self, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map( converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes) ) return list(dir_content) def release_get(self, rel_id: str) -> Optional[Dict[str, Any]]: rel_id_bytes = hash_to_bytes(rel_id) rel_data = self.storage.release_get([rel_id_bytes])[0] return converters.from_release(rel_data) if rel_data else None def revision_get(self, rev_id: str) -> Optional[Dict[str, Any]]: rev_id_bytes = hash_to_bytes(rev_id) rev_data = self.storage.revision_get([rev_id_bytes])[0] return converters.from_revision(rev_data) if rev_data else None def revision_log(self, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list( map( converters.from_revision, self.storage.revision_log([rev_id_bytes], limit=limit), ) ) def snapshot_get_latest(self, origin_url): snp = snapshot_get_latest(self.storage, origin_url) return converters.from_snapshot(snp.to_dict()) def origin_get(self, origin_urls): origins = self.storage.origin_get(origin_urls) return [converters.from_origin(o.to_dict()) for o in origins] def origin_visit_get(self, origin_url): next_page_token = None visits = [] while True: visit_page = self.storage.origin_visit_get( origin_url, page_token=next_page_token ) next_page_token = visit_page.next_page_token for visit in visit_page.results: visit_status = self.storage.origin_visit_status_get_latest( origin_url, visit.visit ) visits.append( converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) ) if not next_page_token: break return visits def origin_visit_get_by(self, origin_url: str, visit_id: int) -> OriginVisitInfo: visit = self.storage.origin_visit_get_by(origin_url, visit_id) assert visit is not None visit_status = self.storage.origin_visit_status_get_latest(origin_url, visit_id) assert visit_status is not None return converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) def origin_visit_status_get_latest( self, origin_url, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ): visit_status = origin_get_latest_visit_status( self.storage, origin_url, type=type, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, ) return ( converters.from_origin_visit(visit_status.to_dict()) if visit_status else None ) def snapshot_get(self, snapshot_id): snp = snapshot_get_all_branches(self.storage, hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp.to_dict()) def snapshot_get_branches( self, snapshot_id, branches_from="", branches_count=1000, target_types=None ): partial_branches = self.storage.snapshot_get_branches( hash_to_bytes(snapshot_id), branches_from.encode(), branches_count, target_types, ) return converters.from_partial_branches(partial_branches) def snapshot_get_head(self, snapshot): if snapshot["branches"]["HEAD"]["target_type"] == "alias": target = snapshot["branches"]["HEAD"]["target"] head = snapshot["branches"][target]["target"] else: head = snapshot["branches"]["HEAD"]["target"] return head def snapshot_count_branches(self, snapshot_id): counts = dict.fromkeys(("alias", "release", "revision"), 0) counts.update(self.storage.snapshot_count_branches(hash_to_bytes(snapshot_id))) counts.pop(None, None) return counts class _IndexerData: """ Helper class to manage indexer tests data It is initialized with a reference to an in-memory indexer storage containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.idx_storage = tests_data["idx_storage"] self.mimetype_indexer = tests_data["mimetype_indexer"] self.license_indexer = tests_data["license_indexer"] def content_add_mimetype(self, cnt_id): self.mimetype_indexer.run([hash_to_bytes(cnt_id)]) def content_get_mimetype(self, cnt_id): mimetype = self.idx_storage.content_mimetype_get([hash_to_bytes(cnt_id)])[ 0 ].to_dict() return converters.from_filetype(mimetype) def content_add_license(self, cnt_id): self.license_indexer.run([hash_to_bytes(cnt_id)]) def content_get_license(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) licenses = self.idx_storage.content_fossology_license_get([cnt_id_bytes]) for license in licenses: yield converters.from_swh(license.to_dict(), hashess={"id"}) @pytest.fixture def keycloak_oidc(keycloak_oidc, mocker): keycloak_config = get_config()["keycloak"] keycloak_oidc.server_url = keycloak_config["server_url"] keycloak_oidc.realm_name = keycloak_config["realm_name"] keycloak_oidc.client_id = settings.OIDC_SWH_WEB_CLIENT_ID keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client") keycloak_oidc_client.return_value = keycloak_oidc return keycloak_oidc @pytest.fixture def subtest(request): """A hack to explicitly set up and tear down fixtures. This fixture allows you to set up and tear down fixtures within the test function itself. This is useful (necessary!) for using Hypothesis inside pytest, as hypothesis will call the test function multiple times, without setting up or tearing down fixture state as it is normally the case. Copied from the pytest-subtesthack project, public domain license (https://github.com/untitaker/pytest-subtesthack). """ parent_test = request.node def inner(func): if hasattr(Function, "from_parent"): item = Function.from_parent( parent_test, name=request.function.__name__ + "[]", originalname=request.function.__name__, callobj=func, ) else: item = Function( name=request.function.__name__ + "[]", parent=parent_test, callobj=func ) nextitem = parent_test # prevents pytest from tearing down module fixtures item.ihook.pytest_runtest_setup(item=item) try: item.ihook.pytest_runtest_call(item=item) finally: item.ihook.pytest_runtest_teardown(item=item, nextitem=nextitem) return inner @pytest.fixture def swh_scheduler(swh_scheduler): config = get_config() scheduler = config["scheduler"] config["scheduler"] = swh_scheduler # create load-git and load-hg task types for task_type in TASK_TYPES.values(): # see https://forge.softwareheritage.org/rDSCHc46ffadf7adf24c7eb3ffce062e8ade3818c79cc # noqa task_type["type"] = task_type["type"].replace("load-test-", "load-", 1) swh_scheduler.create_task_type(task_type) # create load-svn task type swh_scheduler.create_task_type( { "type": "load-svn", "description": "Update a Subversion repository", "backend_name": "swh.loader.svn.tasks.DumpMountAndLoadSvnRepository", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) # create load-cvs task type swh_scheduler.create_task_type( { "type": "load-cvs", "description": "Update a CVS repository", "backend_name": "swh.loader.cvs.tasks.DumpMountAndLoadSvnRepository", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) # create load-bzr task type swh_scheduler.create_task_type( { "type": "load-bzr", "description": "Update a Bazaar repository", "backend_name": "swh.loader.bzr.tasks.LoadBazaar", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) # add method to add load-archive-files task type during tests def add_load_archive_task_type(): swh_scheduler.create_task_type( { "type": "load-archive-files", "description": "Load tarballs", "backend_name": "swh.loader.package.archive.tasks.LoadArchive", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) swh_scheduler.add_load_archive_task_type = add_load_archive_task_type yield swh_scheduler config["scheduler"] = scheduler get_scheduler_load_task_types.cache_clear() @pytest.fixture(scope="session") def django_db_setup(request, django_db_blocker, postgresql_proc): from django.conf import settings settings.DATABASES["default"].update( { ("ENGINE", "django.db.backends.postgresql"), ("NAME", get_config()["test_db"]["name"]), ("USER", postgresql_proc.user), ("HOST", postgresql_proc.host), ("PORT", postgresql_proc.port), } ) with django_db_blocker.unblock(): setup_databases( verbosity=request.config.option.verbose, interactive=False, keepdb=False ) @pytest.fixture def staff_user(): return User.objects.create_user(username="admin", password="", is_staff=True) @pytest.fixture def regular_user(): return User.objects.create_user(username="johndoe", password="") @pytest.fixture def regular_user2(): return User.objects.create_user(username="janedoe", password="") @pytest.fixture def add_forge_moderator(): moderator = User.objects.create_user(username="add-forge moderator", password="") moderator.user_permissions.add( create_django_permission(ADD_FORGE_MODERATOR_PERMISSION) ) return moderator @pytest.fixture def mailmap_admin(): mailmap_admin = User.objects.create_user(username="mailmap-admin", password="") mailmap_admin.user_permissions.add( create_django_permission(MAILMAP_ADMIN_PERMISSION) ) return mailmap_admin @pytest.fixture def mailmap_user(): mailmap_user = User.objects.create_user(username="mailmap-user", password="") mailmap_user.user_permissions.add(create_django_permission(MAILMAP_PERMISSION)) return mailmap_user def reload_urlconf(): from django.conf import settings clear_url_caches() - urlconf = settings.ROOT_URLCONF - if urlconf in sys.modules: - reload(sys.modules[urlconf]) - else: - import_module(urlconf) + # force reloading of all URLs as they depend on django settings + # and swh-web configuration + urlconfs = [settings.ROOT_URLCONF] + urlconfs += [f"{app}.urls" for app in settings.SWH_DJANGO_APPS] + for urlconf in urlconfs: + try: + if urlconf in sys.modules: + reload(sys.modules[urlconf]) + else: + import_module(urlconf) + except ModuleNotFoundError: + pass class SwhSettingsWrapper(SettingsWrapper): def __setattr__(self, attr: str, value) -> None: super().__setattr__(attr, value) reload_urlconf() def finalize(self) -> None: super().finalize() reload_urlconf() @pytest.fixture def django_settings(): """Override pytest-django settings fixture in order to reload URLs when modifying settings in test and after test execution as most of them depend on installed django apps in swh-web. """ settings = SwhSettingsWrapper() yield settings settings.finalize() diff --git a/swh/web/tests/deposit/test_views.py b/swh/web/tests/deposit/test_views.py index a1120f4b..7707fb7e 100644 --- a/swh/web/tests/deposit/test_views.py +++ b/swh/web/tests/deposit/test_views.py @@ -1,101 +1,103 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import b64encode import pytest +from django.conf import settings + from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION from swh.web.config import get_config from swh.web.tests.helpers import ( check_html_get_response, check_http_get_response, create_django_permission, ) from swh.web.utils import reverse def test_deposit_admin_view_not_available_for_anonymous_user(client): url = reverse("admin-deposit") resp = check_html_get_response(client, url, status_code=302) - assert resp["location"] == reverse("login", query_params={"next": url}) + assert resp["location"] == reverse(settings.LOGIN_URL, query_params={"next": url}) @pytest.mark.django_db def test_deposit_admin_view_available_for_staff_user(client, staff_user): client.force_login(staff_user) url = reverse("admin-deposit") check_html_get_response( client, url, status_code=200, template_used="deposit-admin.html" ) @pytest.mark.django_db def test_deposit_admin_view_available_for_user_with_permission(client, regular_user): regular_user.user_permissions.add( create_django_permission(ADMIN_LIST_DEPOSIT_PERMISSION) ) client.force_login(regular_user) url = reverse("admin-deposit") check_html_get_response( client, url, status_code=200, template_used="deposit-admin.html" ) @pytest.mark.django_db def test_deposit_admin_view_list_deposits(client, staff_user, requests_mock): deposits_data = { "data": [ { "external_id": "hal-02527986", "id": 1066, "raw_metadata": None, "reception_date": "2022-04-08T14:12:34.143000Z", "status": "rejected", "status_detail": None, "swhid": None, "swhid_context": None, "type": "code", "uri": "https://inria.halpreprod.archives-ouvertes.fr/hal-02527986", }, { "external_id": "hal-01243573", "id": 1065, "raw_metadata": None, "reception_date": "2022-04-08T12:53:50.940000Z", "status": "rejected", "status_detail": None, "swhid": None, "swhid_context": None, "type": "code", "uri": "https://inria.halpreprod.archives-ouvertes.fr/hal-01243573", }, ], "draw": 2, "recordsFiltered": 645, "recordsTotal": 1066, } config = get_config()["deposit"] private_api_url = config["private_api_url"].rstrip("/") + "/" deposits_list_url = private_api_url + "deposits/datatables/" basic_auth_payload = ( config["private_api_user"] + ":" + config["private_api_password"] ).encode() requests_mock.get( deposits_list_url, json=deposits_data, request_headers={ "Authorization": f"Basic {b64encode(basic_auth_payload).decode('ascii')}" }, ) client.force_login(staff_user) url = reverse("admin-deposit-list") check_http_get_response( client, url, status_code=200, content_type="application/json" ) diff --git a/swh/web/tests/save_code_now/test_origin_save_admin.py b/swh/web/tests/save_code_now/test_origin_save_admin.py index 5d7f08f9..c31e13e3 100644 --- a/swh/web/tests/save_code_now/test_origin_save_admin.py +++ b/swh/web/tests/save_code_now/test_origin_save_admin.py @@ -1,227 +1,229 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from urllib.parse import unquote import pytest +from django.conf import settings + from swh.web.save_code_now.models import ( SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_PENDING, SAVE_REQUEST_REJECTED, SAVE_TASK_NOT_YET_SCHEDULED, SaveAuthorizedOrigin, SaveOriginRequest, SaveUnauthorizedOrigin, ) from swh.web.save_code_now.origin_save import can_save_origin from swh.web.tests.helpers import check_http_get_response, check_http_post_response from swh.web.utils import reverse _authorized_origin_url = "https://scm.ourproject.org/anonscm/" _unauthorized_origin_url = "https://www.softwareheritage.org/" pytestmark = pytest.mark.django_db @pytest.fixture(autouse=True) def populated_db(): SaveAuthorizedOrigin.objects.create(url=_authorized_origin_url) SaveUnauthorizedOrigin.objects.create(url=_unauthorized_origin_url) def check_not_login(client, url): - login_url = reverse("login", query_params={"next": url}) + login_url = reverse(settings.LOGIN_URL, query_params={"next": url}) resp = check_http_post_response(client, url, status_code=302) assert unquote(resp.url) == login_url def test_add_authorized_origin_url(client, staff_user): authorized_url = "https://scm.adullact.net/anonscm/" assert can_save_origin(authorized_url) == SAVE_REQUEST_PENDING url = reverse( "admin-origin-save-add-authorized-url", url_args={"origin_url": authorized_url} ) check_not_login(client, url) assert can_save_origin(authorized_url) == SAVE_REQUEST_PENDING client.force_login(staff_user) check_http_post_response(client, url, status_code=200) assert can_save_origin(authorized_url) == SAVE_REQUEST_ACCEPTED def test_remove_authorized_origin_url(client, staff_user): assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_ACCEPTED url = reverse( "admin-origin-save-remove-authorized-url", url_args={"origin_url": _authorized_origin_url}, ) check_not_login(client, url) assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_ACCEPTED client.force_login(staff_user) check_http_post_response(client, url, status_code=200) assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_PENDING def test_add_unauthorized_origin_url(client, staff_user): unauthorized_url = "https://www.yahoo./" assert can_save_origin(unauthorized_url) == SAVE_REQUEST_PENDING url = reverse( "admin-origin-save-add-unauthorized-url", url_args={"origin_url": unauthorized_url}, ) check_not_login(client, url) assert can_save_origin(unauthorized_url) == SAVE_REQUEST_PENDING client.force_login(staff_user) check_http_post_response(client, url, status_code=200) assert can_save_origin(unauthorized_url) == SAVE_REQUEST_REJECTED def test_remove_unauthorized_origin_url(client, staff_user): assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_REJECTED url = reverse( "admin-origin-save-remove-unauthorized-url", url_args={"origin_url": _unauthorized_origin_url}, ) check_not_login(client, url) assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_REJECTED client.force_login(staff_user) check_http_post_response(client, url, status_code=200) assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_PENDING def test_accept_pending_save_request(client, staff_user, swh_scheduler): visit_type = "git" origin_url = "https://v2.pikacode.com/bthate/botlib.git" save_request_url = reverse( "api-1-save-origin", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) response = check_http_post_response(client, save_request_url, status_code=200) assert response.data["save_request_status"] == SAVE_REQUEST_PENDING accept_request_url = reverse( "admin-origin-save-request-accept", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) check_not_login(client, accept_request_url) client.force_login(staff_user) response = check_http_post_response(client, accept_request_url, status_code=200) response = check_http_get_response(client, save_request_url, status_code=200) assert response.data[0]["save_request_status"] == SAVE_REQUEST_ACCEPTED assert response.data[0]["save_task_status"] == SAVE_TASK_NOT_YET_SCHEDULED def test_reject_pending_save_request(client, staff_user, swh_scheduler): visit_type = "git" origin_url = "https://wikipedia.com" save_request_url = reverse( "api-1-save-origin", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) response = check_http_post_response(client, save_request_url, status_code=200) assert response.data["save_request_status"] == SAVE_REQUEST_PENDING reject_request_url = reverse( "admin-origin-save-request-reject", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) check_not_login(client, reject_request_url) client.force_login(staff_user) response = check_http_post_response(client, reject_request_url, status_code=200) response = check_http_get_response(client, save_request_url, status_code=200) assert response.data[0]["save_request_status"] == SAVE_REQUEST_REJECTED assert response.data[0]["note"] is None def test_reject_pending_save_request_not_found(client, staff_user, swh_scheduler): visit_type = "git" origin_url = "https://example.org" reject_request_url = reverse( "admin-origin-save-request-reject", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) client.force_login(staff_user) check_http_post_response(client, reject_request_url, status_code=404) def test_reject_pending_save_request_with_note(client, staff_user, swh_scheduler): visit_type = "git" origin_url = "https://wikipedia.com" save_request_url = reverse( "api-1-save-origin", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) response = check_http_post_response(client, save_request_url, status_code=200) assert response.data["save_request_status"] == SAVE_REQUEST_PENDING reject_request_url = reverse( "admin-origin-save-request-reject", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) data = {"note": "The URL does not target a git repository"} client.force_login(staff_user) response = check_http_post_response( client, reject_request_url, status_code=200, data=data ) response = check_http_get_response(client, save_request_url, status_code=200) assert response.data[0]["save_request_status"] == SAVE_REQUEST_REJECTED assert response.data[0]["note"] == data["note"] def test_remove_save_request(client, staff_user): sor = SaveOriginRequest.objects.create( visit_type="git", origin_url="https://wikipedia.com", status=SAVE_REQUEST_PENDING, ) assert SaveOriginRequest.objects.count() == 1 remove_request_url = reverse( "admin-origin-save-request-remove", url_args={"sor_id": sor.id} ) check_not_login(client, remove_request_url) client.force_login(staff_user) check_http_post_response(client, remove_request_url, status_code=200) assert SaveOriginRequest.objects.count() == 0 diff --git a/swh/web/tests/webapp/test_templates.py b/swh/web/tests/webapp/test_templates.py index 4c21c5a8..b8998e85 100644 --- a/swh/web/tests/webapp/test_templates.py +++ b/swh/web/tests/webapp/test_templates.py @@ -1,113 +1,123 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy import random from pkg_resources import get_distribution import pytest +from django.conf import settings + from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION from swh.web.config import SWH_WEB_SERVER_NAME, SWH_WEB_STAGING_SERVER_NAMES, get_config from swh.web.tests.django_asserts import assert_contains, assert_not_contains from swh.web.tests.helpers import check_http_get_response, create_django_permission from swh.web.utils import reverse swh_web_version = get_distribution("swh.web").version def test_layout_without_ribbon(client): url = reverse("swh-web-homepage") resp = check_http_get_response( client, url, status_code=200, server_name=SWH_WEB_SERVER_NAME ) assert_not_contains(resp, "swh-corner-ribbon") def test_layout_with_staging_ribbon(client): url = reverse("swh-web-homepage") resp = check_http_get_response( client, url, status_code=200, server_name=random.choice(SWH_WEB_STAGING_SERVER_NAMES), ) assert_contains(resp, "swh-corner-ribbon") assert_contains(resp, f"Staging
v{swh_web_version}") def test_layout_with_development_ribbon(client): url = reverse("swh-web-homepage") resp = check_http_get_response( client, url, status_code=200, server_name="localhost", ) assert_contains(resp, "swh-corner-ribbon") assert_contains(resp, f"Development
v{swh_web_version.split('+')[0]}") def test_layout_with_oidc_auth_enabled(client): url = reverse("swh-web-homepage") resp = check_http_get_response(client, url, status_code=200) assert_contains(resp, reverse("oidc-login")) -def test_layout_without_oidc_auth_enabled(client, mocker): +def test_layout_without_oidc_auth_enabled(client, django_settings, mocker): config = deepcopy(get_config()) config["keycloak"]["server_url"] = "" - mock_get_config = mocker.patch("swh.web.utils.get_config") + mock_get_config = mocker.patch("swh.web.config.get_config") mock_get_config.return_value = config + django_settings.LOGIN_URL = "login" + django_settings.LOGOUT_URL = "logout" + django_settings.MIDDLEWARE = [ + mid + for mid in django_settings.MIDDLEWARE + if mid != "swh.auth.django.middlewares.OIDCSessionExpiredMiddleware" + ] + url = reverse("swh-web-homepage") resp = check_http_get_response(client, url, status_code=200) - assert_contains(resp, reverse("login")) + assert_contains(resp, reverse(settings.LOGIN_URL)) def test_layout_swh_web_version_number_display(client): url = reverse("swh-web-homepage") resp = check_http_get_response(client, url, status_code=200) assert_contains(resp, f"swh-web v{swh_web_version}") @pytest.mark.django_db def test_layout_no_deposit_admin_for_anonymous_user(client): url = reverse("swh-web-homepage") resp = check_http_get_response(client, url, status_code=200) assert_not_contains(resp, "swh-deposit-admin-link") @pytest.mark.django_db def test_layout_deposit_admin_for_staff_user(client, staff_user): client.force_login(staff_user) url = reverse("swh-web-homepage") resp = check_http_get_response(client, url, status_code=200) assert_contains(resp, "swh-deposit-admin-link") @pytest.mark.django_db def test_layout_deposit_admin_for_user_with_permission(client, regular_user): regular_user.user_permissions.add( create_django_permission(ADMIN_LIST_DEPOSIT_PERMISSION) ) client.force_login(regular_user) url = reverse("swh-web-homepage") resp = check_http_get_response(client, url, status_code=200) assert_contains(resp, "swh-deposit-admin-link") def test_layout_no_piwik_by_default(client): url = reverse("swh-web-homepage") resp = check_http_get_response(client, url, status_code=200) assert_not_contains(resp, "https://piwik.inria.fr") def test_layout_piwik_in_production(client): url = reverse("swh-web-homepage") resp = check_http_get_response( client, url, status_code=200, server_name=SWH_WEB_SERVER_NAME ) assert_contains(resp, "https://piwik.inria.fr") diff --git a/swh/web/utils/__init__.py b/swh/web/utils/__init__.py index e68a799c..c95660ef 100644 --- a/swh/web/utils/__init__.py +++ b/swh/web/utils/__init__.py @@ -1,520 +1,522 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import functools import os import re from typing import Any, Callable, Dict, List, Mapping, Optional import urllib.parse from bs4 import BeautifulSoup from docutils.core import publish_parts import docutils.parsers.rst import docutils.utils from docutils.writers.html5_polyglot import HTMLTranslator, Writer from iso8601 import ParseError, parse_date from pkg_resources import get_distribution import requests from requests.auth import HTTPBasicAuth from django.conf import settings from django.core.cache import cache from django.core.cache.backends.base import DEFAULT_TIMEOUT from django.http import HttpRequest, QueryDict from django.shortcuts import redirect from django.urls import resolve from django.urls import reverse as django_reverse from swh.web.auth.utils import ( ADD_FORGE_MODERATOR_PERMISSION, ADMIN_LIST_DEPOSIT_PERMISSION, MAILMAP_ADMIN_PERMISSION, ) from swh.web.config import SWH_WEB_SERVER_NAME, get_config, search from swh.web.utils.exc import BadInputExc, sentry_capture_exception SWHID_RE = "swh:1:[a-z]{3}:[0-9a-z]{40}" swh_object_icons = { "alias": "mdi mdi-star", "branch": "mdi mdi-source-branch", "branches": "mdi mdi-source-branch", "content": "mdi mdi-file-document", "cnt": "mdi mdi-file-document", "directory": "mdi mdi-folder", "dir": "mdi mdi-folder", "origin": "mdi mdi-source-repository", "ori": "mdi mdi-source-repository", "person": "mdi mdi-account", "revisions history": "mdi mdi-history", "release": "mdi mdi-tag", "rel": "mdi mdi-tag", "releases": "mdi mdi-tag", "revision": "mdi mdi-rotate-90 mdi-source-commit", "rev": "mdi mdi-rotate-90 mdi-source-commit", "snapshot": "mdi mdi-camera", "snp": "mdi mdi-camera", "visits": "mdi mdi-calendar-month", } def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[Mapping[str, Optional[str]]] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) params: Dict[str, str] = {} if query_params: params = {k: v for k, v in query_params.items() if v is not None} if params: query_dict = QueryDict("", mutable=True) query_dict.update(dict(sorted(params.items()))) url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo and date.tzinfo != timezone.utc: return date.astimezone(tz=timezone.utc) else: return date def parse_iso8601_date_to_utc(iso_date: str) -> datetime: """Given an ISO 8601 datetime string, parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed date Raises: swh.web.utils.exc.BadInputExc: provided date does not respect ISO 8601 format Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - 2007-01-14T20:34:22Z """ try: date = parse_date(iso_date) return datetime_to_utc(date) except ParseError as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}" sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}" ret = re.sub(sha256_re, r"\1...", path) return re.sub(sha1_re, r"\1...", ret) def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M:%S UTC"): """Turns a string representation of an ISO 8601 datetime string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_iso8601_date_to_utc(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip("/").split("/") path_from_root = "" for p in sub_paths: path_from_root += "/" + p path_info.append({"name": p, "path": path_from_root.strip("/")}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components ).get_default_values() settings.report_level = report_level document = docutils.utils.new_document("rst-doc", settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: ip = x_forwarded_for.split(",")[0] else: ip = request.META.get("REMOTE_ADDR") return ip def is_swh_web_development(request: HttpRequest) -> bool: """Indicate if we are running a development version of swh-web.""" site_base_url = request.build_absolute_uri("/") return any( host in site_base_url for host in ("localhost", "127.0.0.1", "testserver") ) def is_swh_web_staging(request: HttpRequest) -> bool: """Indicate if we are running a staging version of swh-web.""" config = get_config() site_base_url = request.build_absolute_uri("/") return any( server_name in site_base_url for server_name in config["staging_server_names"] ) def is_swh_web_production(request: HttpRequest) -> bool: """Indicate if we are running the public production version of swh-web.""" return SWH_WEB_SERVER_NAME in request.build_absolute_uri("/") browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if ( hasattr(request, "user") and request.user.is_authenticated and not hasattr(request.user, "backend") ): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = "django.contrib.auth.backends.ModelBackend" return { "swh_object_icons": swh_object_icons, "available_languages": None, "swh_client_config": config["client_config"], "oidc_enabled": bool(config["keycloak"]["server_url"]), "browsers_supported_image_mimes": browsers_supported_image_mimes, "keycloak": config["keycloak"], "site_base_url": request.build_absolute_uri("/"), "DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"], "status": config["status"], "swh_web_dev": is_swh_web_development(request), "swh_web_staging": is_swh_web_staging(request), "swh_web_prod": is_swh_web_production(request), "swh_web_version": get_distribution("swh.web").version, "iframe_mode": False, "ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION, "ADD_FORGE_MODERATOR_PERMISSION": ADD_FORGE_MODERATOR_PERMISSION, "MAILMAP_ADMIN_PERMISSION": MAILMAP_ADMIN_PERMISSION, "lang": "en", "sidebar_state": request.COOKIES.get("sidebar-state", "expanded"), "SWH_DJANGO_APPS": settings.SWH_DJANGO_APPS, + "login_url": settings.LOGIN_URL, + "logout_url": settings.LOGOUT_URL, } def resolve_branch_alias( snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch["target_type"] == "alias": if branch["target"] in snapshot["branches"]: branch = snapshot["branches"][branch["target"]] else: from swh.web.utils import archive snp = archive.lookup_snapshot( snapshot["id"], branches_from=branch["target"], branches_count=1 ) if snp and branch["target"] in snp["branches"]: branch = snp["branches"][branch["target"]] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { "initial_header_level": 2, "halt_level": 4, "traceback": True, "file_insertion_enabled": False, "raw_enabled": False, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
{pp["html_body"]}
' def prettify_html(html: str) -> str: """ Prettify an HTML document. Args: html: Input HTML document Returns: The prettified HTML document """ return BeautifulSoup(html, "lxml").prettify() def django_cache( timeout: int = DEFAULT_TIMEOUT, catch_exception: bool = False, exception_return_value: Any = None, invalidate_cache_pred: Callable[[Any], bool] = lambda val: False, ): """Decorator to put the result of a function call in Django cache, subsequent calls will directly return the cached value. Args: timeout: The number of seconds value will be hold in cache catch_exception: If :const:`True`, any thrown exception by the decorated function will be caught and not reraised exception_return_value: The value to return if previous parameter is set to :const:`True` invalidate_cache_pred: A predicate function enabling to invalidate the cache under certain conditions, decorated function will then be called again Returns: The returned value of the decorated function for the specified parameters """ def inner(func): @functools.wraps(func) def wrapper(*args, **kwargs): func_args = args + (0,) + tuple(sorted(kwargs.items())) cache_key = str(hash((func.__module__, func.__name__) + func_args)) ret = cache.get(cache_key) if ret is None or invalidate_cache_pred(ret): try: ret = func(*args, **kwargs) except Exception as exc: if catch_exception: sentry_capture_exception(exc) return exception_return_value else: raise else: cache.set(cache_key, ret, timeout=timeout) return ret return wrapper return inner def _deposits_list_url( deposits_list_base_url: str, page_size: int, username: Optional[str] ) -> str: params = {"page_size": str(page_size)} if username is not None: params["username"] = username return f"{deposits_list_base_url}?{urllib.parse.urlencode(params)}" def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]: """Return the list of software deposits using swh-deposit API""" config = get_config()["deposit"] private_api_url = config["private_api_url"].rstrip("/") + "/" deposits_list_base_url = private_api_url + "deposits" deposits_list_auth = HTTPBasicAuth( config["private_api_user"], config["private_api_password"] ) deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=1, username=username ) nb_deposits = requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30 ).json()["count"] @django_cache(invalidate_cache_pred=lambda data: data["count"] != nb_deposits) def _get_deposits_data(): deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=nb_deposits, username=username ) return requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30, ).json() deposits_data = _get_deposits_data() return deposits_data["results"] _origin_visit_types_cache_timeout = 24 * 60 * 60 # 24 hours @django_cache( timeout=_origin_visit_types_cache_timeout, catch_exception=True, exception_return_value=[], ) def origin_visit_types() -> List[str]: """Return the exhaustive list of visit types for origins ingested into the archive. """ return sorted(search().visit_types_count().keys()) def redirect_to_new_route(request, new_route, permanent=True): """Redirect a request to another route with url args and query parameters eg: /origin//log?path=test can be redirected as /log?url=&path=test. This can be used to deprecate routes """ request_path = resolve(request.path_info) args = {**request_path.kwargs, **request.GET.dict()} return redirect( reverse(new_route, query_params=args), permanent=permanent, ) diff --git a/swh/web/webapp/templates/layout.html b/swh/web/webapp/templates/layout.html index f411dc34..d47bd986 100644 --- a/swh/web/webapp/templates/layout.html +++ b/swh/web/webapp/templates/layout.html @@ -1,331 +1,324 @@ {% comment %} Copyright (C) 2015-2022 The Software Heritage developers See the AUTHORS file at the top-level directory of this distribution License: GNU Affero General Public License version 3, or any later version See top-level LICENSE file for more information {% endcomment %} {% load js_reverse %} {% load static %} {% load render_bundle from webpack_loader %} {% load swh_templatetags %} {% block title %}{% endblock %} {% render_bundle 'vendors' %} {% render_bundle 'webapp' %} {% render_bundle 'guided_tour' %} {{ request.user.is_authenticated|json_script:"swh_user_logged_in" }} {% include "includes/favicon.html" %} {% block header %}{% endblock %} {% if swh_web_prod %} {% endif %}
{% if "swh.web.banners" in SWH_DJANGO_APPS %}
{% include "hiring-banner.html" %}
{% endif %}
{% if swh_web_staging %}
Staging
v{{ swh_web_version }}
{% elif swh_web_dev %}
Development
v{{ swh_web_version|split:"+"|first }}
{% endif %} {% block content %}{% endblock %}
{% include "includes/global-modals.html" %}
Software Heritage — Copyright (C) 2015–{% now "Y" %}, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API- Contact- {% if "swh.web.jslicenses" in SWH_DJANGO_APPS %} JavaScript license information- {% endif %} Web API
{% if "production" not in DJANGO_SETTINGS_MODULE %} swh-web v{{ swh_web_version }} {% endif %}
back to top