diff --git a/cypress/integration/api-tokens.spec.js b/cypress/integration/api-tokens.spec.js index 90ac8e1c..84acf2cd 100644 --- a/cypress/integration/api-tokens.spec.js +++ b/cypress/integration/api-tokens.spec.js @@ -1,211 +1,211 @@ /** * Copyright (C) 2020 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ describe('Test API tokens UI', function() { it('should ask for user to login', function() { - cy.visit(this.Urls.api_tokens(), {failOnStatusCode: false}); + cy.visit(`${this.Urls.oidc_profile()}#tokens`, {failOnStatusCode: false}); cy.location().should(loc => { expect(loc.pathname).to.eq(this.Urls.oidc_login()); }); }); function initTokensPage(Urls, tokens) { cy.server(); cy.route({ method: 'GET', url: `${Urls.oidc_list_bearer_tokens()}/**`, response: { 'recordsTotal': tokens.length, 'draw': 2, 'recordsFiltered': tokens.length, 'data': tokens } }); // the tested UI should not be accessible for standard Django users // but we need a user logged in for testing it cy.adminLogin(); - cy.visit(Urls.api_tokens()); + cy.visit(`${Urls.oidc_profile()}#tokens`); } function generateToken(Urls, status, tokenValue = '') { cy.route({ method: 'POST', url: `${Urls.oidc_generate_bearer_token()}/**`, response: tokenValue, status: status }).as('generateTokenRequest'); cy.contains('Generate new token') .click(); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Bearer token generation'); cy.get('#swh-user-password-submit') .should('be.disabled'); cy.get('#swh-user-password') .type('secret'); cy.get('#swh-user-password-submit') .should('be.enabled'); cy.get('#swh-user-password-submit') .click(); cy.wait('@generateTokenRequest'); if (status === 200) { cy.get('#swh-user-password-submit') .should('be.disabled'); } } it('should generate and display bearer token', function() { initTokensPage(this.Urls, []); const tokenValue = 'bearer-token-value'; generateToken(this.Urls, 200, tokenValue); cy.get('#swh-token-success-message') .should('contain', 'Below is your token'); cy.get('#swh-bearer-token') .should('contain', tokenValue); }); it('should report errors when token generation failed', function() { initTokensPage(this.Urls, []); generateToken(this.Urls, 400); cy.get('#swh-token-error-message') .should('contain', 'You are not allowed to generate bearer tokens'); cy.get('#swh-web-modal-html .close').click(); generateToken(this.Urls, 401); cy.get('#swh-token-error-message') .should('contain', 'The password is invalid'); cy.get('#swh-web-modal-html .close').click(); generateToken(this.Urls, 500); cy.get('#swh-token-error-message') .should('contain', 'Internal server error'); }); function displayToken(Urls, status, tokenValue = '') { cy.route({ method: 'POST', url: `${Urls.oidc_get_bearer_token()}/**`, response: tokenValue, status: status }).as('getTokenRequest'); cy.contains('Display token') - .click(); + .click({force: true}); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Display bearer token'); cy.get('#swh-user-password-submit') .should('be.disabled'); cy.get('#swh-user-password') - .type('secret'); + .type('secret', {force: true}); cy.get('#swh-user-password-submit') .should('be.enabled'); cy.get('#swh-user-password-submit') - .click(); + .click({force: true}); cy.wait('@getTokenRequest'); if (status === 200) { cy.get('#swh-user-password-submit') .should('be.disabled'); } } it('should show a token when requested', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); const tokenValue = 'token-value'; displayToken(this.Urls, 200, tokenValue); cy.get('#swh-token-success-message') .should('contain', 'Below is your token'); cy.get('#swh-bearer-token') .should('contain', tokenValue); }); it('should report errors when token display failed', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); displayToken(this.Urls, 401); cy.get('#swh-token-error-message') .should('contain', 'The password is invalid'); cy.get('#swh-web-modal-html .close').click(); displayToken(this.Urls, 500); cy.get('#swh-token-error-message') .should('contain', 'Internal server error'); }); function revokeToken(Urls, status) { cy.route({ method: 'POST', url: `${Urls.oidc_revoke_bearer_tokens()}/**`, response: '', status: status }).as('revokeTokenRequest'); cy.contains('Revoke token') - .click(); + .click({force: true}); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Revoke bearer token'); cy.get('#swh-user-password-submit') .should('be.disabled'); cy.get('#swh-user-password') - .type('secret'); + .type('secret', {force: true}); cy.get('#swh-user-password-submit') .should('be.enabled'); cy.get('#swh-user-password-submit') - .click(); + .click({force: true}); cy.wait('@revokeTokenRequest'); if (status === 200) { cy.get('#swh-user-password-submit') .should('be.disabled'); } } it('should revoke a token when requested', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); revokeToken(this.Urls, 200); cy.get('#swh-token-success-message') .should('contain', 'Bearer token successfully revoked'); }); it('should report errors when token revoke failed', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); revokeToken(this.Urls, 401); cy.get('#swh-token-error-message') .should('contain', 'The password is invalid'); cy.get('#swh-web-modal-html .close').click(); revokeToken(this.Urls, 500); cy.get('#swh-token-error-message') .should('contain', 'Internal server error'); }); }); diff --git a/swh/web/api/urls.py b/swh/web/api/urls.py index dea648c9..9033c291 100644 --- a/swh/web/api/urls.py +++ b/swh/web/api/urls.py @@ -1,31 +1,20 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.conf.urls import url -from django.contrib.auth.decorators import login_required -from django.shortcuts import render - from swh.web.api.apiurls import APIUrls import swh.web.api.views.content # noqa import swh.web.api.views.directory # noqa import swh.web.api.views.graph # noqa import swh.web.api.views.identifiers # noqa import swh.web.api.views.origin # noqa import swh.web.api.views.origin_save # noqa import swh.web.api.views.ping # noqa import swh.web.api.views.release # noqa import swh.web.api.views.revision # noqa import swh.web.api.views.snapshot # noqa import swh.web.api.views.stat # noqa import swh.web.api.views.vault # noqa - -@login_required(login_url="/oidc/login/", redirect_field_name="next_path") -def _tokens_view(request): - return render(request, "api/tokens.html") - - urlpatterns = APIUrls.get_url_patterns() -urlpatterns.append(url(r"^tokens/$", _tokens_view, name="api-tokens")) diff --git a/swh/web/assets/src/bundles/auth/index.js b/swh/web/assets/src/bundles/auth/index.js index 1b67e5e1..afcf5fe6 100644 --- a/swh/web/assets/src/bundles/auth/index.js +++ b/swh/web/assets/src/bundles/auth/index.js @@ -1,216 +1,226 @@ /** * Copyright (C) 2020 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ -import {handleFetchError, csrfPost} from 'utils/functions'; +import {handleFetchError, csrfPost, removeUrlFragment} from 'utils/functions'; import './auth.css'; let apiTokensTable; function updateSubmitButtonState() { const val = $('#swh-user-password').val(); $('#swh-user-password-submit').prop('disabled', val.length === 0); } function passwordForm(infoText, buttonText) { const form = `
`; return form; } function errorMessage(message) { return `${message}
`; } function successMessage(message) { return `${message}
`; } function disableSubmitButton() { $('#swh-user-password-submit').prop('disabled', true); $('#swh-user-password').off('change'); $('#swh-user-password').off('keyup'); } function generateToken() { csrfPost(Urls.oidc_generate_bearer_token(), {}, JSON.stringify({password: $('#swh-user-password').val()})) .then(handleFetchError) .then(response => response.text()) .then(token => { disableSubmitButton(); const tokenHtml = `${successMessage('Below is your token.')}${token}`; $(`#swh-password-form`).append(tokenHtml); apiTokensTable.draw(); }) .catch(response => { if (response.status === 400) { $(`#swh-password-form`).append( errorMessage('You are not allowed to generate bearer tokens.')); } else if (response.status === 401) { $(`#swh-password-form`).append(errorMessage('The password is invalid.')); } else { $(`#swh-password-form`).append(errorMessage('Internal server error.')); } }); } function displayToken(tokenId) { const postData = { password: $('#swh-user-password').val(), token_id: tokenId }; csrfPost(Urls.oidc_get_bearer_token(), {}, JSON.stringify(postData)) .then(handleFetchError) .then(response => response.text()) .then(token => { disableSubmitButton(); const tokenHtml = `${successMessage('Below is your token.')}
${token}`; $(`#swh-password-form`).append(tokenHtml); }) .catch(response => { if (response.status === 401) { $(`#swh-password-form`).append(errorMessage('The password is invalid.')); } else { $(`#swh-password-form`).append(errorMessage('Internal server error.')); } }); } function revokeTokens(tokenIds) { const postData = { password: $('#swh-user-password').val(), token_ids: tokenIds }; csrfPost(Urls.oidc_revoke_bearer_tokens(), {}, JSON.stringify(postData)) .then(handleFetchError) .then(() => { disableSubmitButton(); $(`#swh-password-form`).append( successMessage(`Bearer token${tokenIds.length > 1 ? 's' : ''} successfully revoked`)); apiTokensTable.draw(); }) .catch(response => { if (response.status === 401) { $(`#swh-password-form`).append(errorMessage('The password is invalid.')); } else { $(`#swh-password-form`).append(errorMessage('Internal server error.')); } }); } function revokeToken(tokenId) { revokeTokens([tokenId]); } function revokeAllTokens() { const tokenIds = []; const rowsData = apiTokensTable.rows().data(); for (let i = 0; i < rowsData.length; ++i) { tokenIds.push(rowsData[i].id); } revokeTokens(tokenIds); } export function applyTokenAction(action, tokenId) { const actionData = { generate: { modalTitle: 'Bearer token generation', infoText: 'Enter your password and click on the button to generate the token.', buttonText: 'Generate token', submitCallback: generateToken }, display: { modalTitle: 'Display bearer token', infoText: 'Enter your password and click on the button to display the token.', buttonText: 'Display token', submitCallback: displayToken }, revoke: { modalTitle: 'Revoke bearer token', infoText: 'Enter your password and click on the button to revoke the token.', buttonText: 'Revoke token', submitCallback: revokeToken }, revokeAll: { modalTitle: 'Revoke all bearer tokens', infoText: 'Enter your password and click on the button to revoke all tokens.', buttonText: 'Revoke tokens', submitCallback: revokeAllTokens } }; if (!actionData[action]) { return; } const passwordFormHtml = passwordForm( actionData[action].infoText, actionData[action].buttonText); swh.webapp.showModalHtml(actionData[action].modalTitle, passwordFormHtml); $('#swh-user-password').change(updateSubmitButtonState); $('#swh-user-password').keyup(updateSubmitButtonState); $(`#swh-password-form`).submit(event => { event.preventDefault(); event.stopPropagation(); actionData[action].submitCallback(tokenId); }); } -export function initApiTokensPage() { +export function initProfilePage() { $(document).ready(() => { apiTokensTable = $('#swh-bearer-tokens-table') .on('error.dt', (e, settings, techNote, message) => { $('#swh-origin-save-request-list-error').text( 'An error occurred while retrieving the tokens list'); console.log(message); }) .DataTable({ serverSide: true, ajax: Urls.oidc_list_bearer_tokens(), columns: [ { data: 'creation_date', name: 'creation_date', render: (data, type, row) => { if (type === 'display') { let date = new Date(data); return date.toLocaleString(); } return data; } }, { render: (data, type, row) => { const html = ` `; return html; } } ], ordering: false, searching: false, scrollY: '50vh', scrollCollapse: true }); + $('#swh-oidc-profile-tokens-tab').on('shown.bs.tab', () => { + apiTokensTable.draw(); + window.location.hash = '#tokens'; + }); + $('#swh-oidc-profile-account-tab').on('shown.bs.tab', () => { + removeUrlFragment(); + }); + if (window.location.hash === '#tokens') { + $('.nav-tabs a[href="#swh-oidc-profile-tokens"]').tab('show'); + } }); } diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index 9e3d8615..8f019a99 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,243 +1,251 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, cast import uuid from cryptography.fernet import InvalidToken from keycloak.exceptions import KeycloakError import sentry_sdk from django.conf.urls import url from django.contrib.auth import authenticate, login, logout +from django.contrib.auth.decorators import login_required from django.core.cache import cache from django.core.paginator import Paginator from django.http import HttpRequest from django.http.response import ( HttpResponse, HttpResponseForbidden, HttpResponseRedirect, HttpResponseServerError, JsonResponse, ) +from django.shortcuts import render from django.views.decorators.http import require_http_methods from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens from swh.web.auth.utils import ( decrypt_data, encrypt_data, gen_oidc_pkce_codes, get_oidc_client, ) from swh.web.common.exc import BadInputExc from swh.web.common.utils import reverse def oidc_login(request: HttpRequest) -> HttpResponse: """ Django view to initiate login process using OpenID Connect. """ # generate a CSRF token state = str(uuid.uuid4()) redirect_uri = reverse("oidc-login-complete", request=request) code_verifier, code_challenge = gen_oidc_pkce_codes() request.session["login_data"] = { "code_verifier": code_verifier, "state": state, "redirect_uri": redirect_uri, "next_path": request.GET.get("next_path", ""), "prompt": request.GET.get("prompt", ""), } authorization_url_params = { "state": state, "code_challenge": code_challenge, "code_challenge_method": "S256", "scope": "openid", "prompt": request.GET.get("prompt", ""), } oidc_client = get_oidc_client() authorization_url = oidc_client.authorization_url( redirect_uri, **authorization_url_params ) return HttpResponseRedirect(authorization_url) def oidc_login_complete(request: HttpRequest) -> HttpResponse: """ Django view to finalize login process using OpenID Connect. """ if "login_data" not in request.session: raise Exception("Login process has not been initialized.") login_data = request.session["login_data"] next_path = login_data["next_path"] or request.build_absolute_uri("/") if "error" in request.GET: if login_data["prompt"] == "none": # Silent login failed because OIDC session expired. # Redirect to logout page and inform user. logout(request) logout_url = reverse( "logout", query_params={"next_path": next_path, "remote_user": 1} ) return HttpResponseRedirect(logout_url) return HttpResponseServerError(request.GET["error"]) if "code" not in request.GET or "state" not in request.GET: raise BadInputExc("Missing query parameters for authentication.") # get CSRF token returned by OIDC server state = request.GET["state"] if state != login_data["state"]: raise BadInputExc("Wrong CSRF token, aborting login process.") user = authenticate( request=request, code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) if user is None: raise Exception("User authentication failed.") login(request, user) return HttpResponseRedirect(next_path) def oidc_logout(request: HttpRequest) -> HttpResponse: """ Django view to logout using OpenID Connect. """ user = request.user logout(request) if hasattr(user, "refresh_token"): oidc_client = get_oidc_client() user = cast(OIDCUser, user) refresh_token = cast(str, user.refresh_token) # end OpenID Connect session oidc_client.logout(refresh_token) # remove user data from cache cache.delete(f"oidc_user_{user.id}") logout_url = reverse("logout", query_params={"remote_user": 1}) return HttpResponseRedirect(request.build_absolute_uri(logout_url)) @require_http_methods(["POST"]) def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("utf-8")) user = cast(OIDCUser, request.user) oidc_client = get_oidc_client() token = oidc_client.offline_token(user.username, data["password"]) password = data["password"].encode() salt = user.sub.encode() encrypted_token = encrypt_data(token.encode(), password, salt) OIDCUserOfflineTokens.objects.create( user_id=str(user.id), offline_token=encrypted_token ).save() return HttpResponse(token, content_type="text/plain") except KeycloakError as e: sentry_sdk.capture_exception(e) return HttpResponse(status=e.response_code or 500) def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() tokens = OIDCUserOfflineTokens.objects.filter(user_id=str(request.user.id)) tokens = tokens.order_by("-creation_date") length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(tokens, length) tokens_data = [ {"id": t.id, "creation_date": t.creation_date.isoformat()} for t in paginator.page(int(page)).object_list ] table_data: Dict[str, Any] = {} table_data["recordsTotal"] = len(tokens_data) table_data["draw"] = int(request.GET["draw"]) table_data["data"] = tokens_data table_data["recordsFiltered"] = len(tokens_data) return JsonResponse(table_data) @require_http_methods(["POST"]) def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("utf-8")) user = cast(OIDCUser, request.user) token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) password = data["password"].encode() salt = user.sub.encode() decrypted_token = decrypt_data(token_data.offline_token, password, salt) return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain") except InvalidToken: return HttpResponse(status=401) @require_http_methods(["POST"]) def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("utf-8")) user = cast(OIDCUser, request.user) for token_id in data["token_ids"]: token_data = OIDCUserOfflineTokens.objects.get(id=token_id) password = data["password"].encode() salt = user.sub.encode() decrypted_token = decrypt_data(token_data.offline_token, password, salt) oidc_client = get_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() return HttpResponse(status=200) except InvalidToken: return HttpResponse(status=401) +@login_required(login_url="/oidc/login/", redirect_field_name="next_path") +def _oidc_profile_view(request: HttpRequest) -> HttpResponse: + return render(request, "auth/profile.html") + + urlpatterns = [ url(r"^oidc/login/$", oidc_login, name="oidc-login"), url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"), url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"), url( r"^oidc/generate-bearer-token/$", oidc_generate_bearer_token, name="oidc-generate-bearer-token", ), url( r"^oidc/list-bearer-token/$", oidc_list_bearer_tokens, name="oidc-list-bearer-tokens", ), url( r"^oidc/get-bearer-token/$", oidc_get_bearer_token, name="oidc-get-bearer-token", ), url( r"^oidc/revoke-bearer-tokens/$", oidc_revoke_bearer_tokens, name="oidc-revoke-bearer-tokens", ), + url(r"^oidc/profile/$", _oidc_profile_view, name="oidc-profile",), ] diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index 09d5109c..88bf56ce 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,350 +1,351 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import re from typing import Any, Dict, Optional from bs4 import BeautifulSoup from docutils.core import publish_parts import docutils.parsers.rst import docutils.utils from docutils.writers.html5_polyglot import HTMLTranslator, Writer from iso8601 import ParseError, parse_date from prometheus_client.registry import CollectorRegistry from django.http import HttpRequest, QueryDict from django.urls import reverse as django_reverse from rest_framework.authentication import SessionAuthentication from swh.web.common.exc import BadInputExc from swh.web.common.typing import QueryParameters from swh.web.config import get_config SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) swh_object_icons = { "branch": "mdi mdi-source-branch", "branches": "mdi mdi-source-branch", "content": "mdi mdi-file-document", "directory": "mdi mdi-folder", "origin": "mdi mdi-source-repository", "person": "mdi mdi-account", "revisions history": "mdi mdi-history", "release": "mdi mdi-tag", "releases": "mdi mdi-tag", "revision": "mdi mdi-rotate-90 mdi-source-commit", "snapshot": "mdi mdi-camera", "visits": "mdi mdi-calendar-month", } def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[QueryParameters] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo and date.tzinfo != timezone.utc: return date.astimezone(tz=timezone.utc) else: return date def parse_iso8601_date_to_utc(iso_date: str) -> datetime: """Given an ISO 8601 datetime string, parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed date Raises: swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - 2007-01-14T20:34:22Z """ try: date = parse_date(iso_date) return datetime_to_utc(date) except ParseError as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}" sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}" ret = re.sub(sha256_re, r"\1...", path) return re.sub(sha1_re, r"\1...", ret) def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"): """Turns a string representation of an ISO 8601 datetime string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_iso8601_date_to_utc(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip("/").split("/") path_from_root = "" for p in sub_paths: path_from_root += "/" + p path_info.append({"name": p, "path": path_from_root.strip("/")}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components ).get_default_values() settings.report_level = report_level document = docutils.utils.new_document("rst-doc", settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: ip = x_forwarded_for.split(",")[0] else: ip = request.META.get("REMOTE_ADDR") return ip browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if ( hasattr(request, "user") and request.user.is_authenticated and not hasattr(request.user, "backend") ): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = "django.contrib.auth.backends.ModelBackend" return { "swh_object_icons": swh_object_icons, "available_languages": None, "swh_client_config": config["client_config"], "oidc_enabled": bool(config["keycloak"]["server_url"]), "browsers_supported_image_mimes": browsers_supported_image_mimes, + "keycloak": config["keycloak"], } class EnforceCSRFAuthentication(SessionAuthentication): """ Helper class to enforce CSRF validation on a DRF view when a user is not authenticated. """ def authenticate(self, request): user = getattr(request._request, "user", None) self.enforce_csrf(request) return (user, None) def resolve_branch_alias( snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch["target_type"] == "alias": if branch["target"] in snapshot["branches"]: branch = snapshot["branches"][branch["target"]] else: from swh.web.common import archive snp = archive.lookup_snapshot( snapshot["id"], branches_from=branch["target"], branches_count=1 ) if snp and branch["target"] in snp["branches"]: branch = snp["branches"][branch["target"]] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { "initial_header_level": 2, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
- That interface enables to manage bearer tokens for Web API authentication. - A token has to be sent in HTTP authorization headers to make authenticated API requests. -
-
- For instance when using curl
proceed as follows:
-
curl -H "Authorization: Bearer ${TOKEN}" {{ request.scheme }}://{{ request.META.HTTP_HOST }}/api/...- - -
Creation date | -Actions | -
---|
+ Below are the details of your user account. + You can edit your personal information in the + + Software Heritage Account Management + interface. +
+Username | +{{ user.username }} | +
---|---|
First name | +{{ user.first_name }} | +
Last name | +{{ user.last_name }} | +
{{ user.email }} | +|
Permissions: | +
+ {% for perm in user.get_all_permissions %}
+ {{ perm }} + {% endfor %} + |
+
+ That interface enables to manage bearer tokens for Web API authentication. + A token has to be sent in HTTP authorization headers to make authenticated API requests. +
+
+ For instance when using curl
proceed as follows:
+
curl -H "Authorization: Bearer ${TOKEN}" {{ request.scheme }}://{{ request.META.HTTP_HOST }}/api/...+ +
Creation date | +Actions | +
---|