diff --git a/swh/web/api/views/metadata.py b/swh/web/api/views/metadata.py index f0271eff..2595ce0e 100644 --- a/swh/web/api/views/metadata.py +++ b/swh/web/api/views/metadata.py @@ -1,254 +1,252 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import re import iso8601 from django.http import HttpResponse from swh.model import hashutil, swhids from swh.model.model import MetadataAuthority, MetadataAuthorityType from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.common import archive, converters from swh.web.common.exc import BadInputExc, NotFoundExc -from swh.web.common.utils import reverse - -SWHID_RE = "swh:1:[a-z]{3}:[0-9a-z]{40}" +from swh.web.common.utils import SWHID_RE, reverse @api_route( f"/raw-extrinsic-metadata/swhid/(?P{SWHID_RE})/", "api-1-raw-extrinsic-metadata-swhid", ) @api_doc("/raw-extrinsic-metadata/swhid/") @format_docstring() def api_raw_extrinsic_metadata_swhid(request, target): """ .. http:get:: /api/1/raw-extrinsic-metadata/swhid/(target) Returns raw `extrinsic metadata `__ collected on a given object. :param string target: The core SWHID of the object whose metadata should be returned :query string authority: A metadata authority identifier, formatted as `` ``. Required. :query string after: ISO8601 representation of the minimum timestamp of metadata to fetch. Defaults to allowing all metadata. :query int limit: Maximum number of metadata objects to return. {common_headers} :>jsonarr string target: SWHID of the object described by this metadata :>jsonarr string discovery_date: ISO8601/RFC3339 timestamp of the moment this metadata was collected. :>jsonarr object authority: authority this metadata is coming from :>jsonarr object fetcher: tool used to fetch the metadata :>jsonarr string format: short identifier of the format of the metadata :>jsonarr string metadata_url: link to download the metadata "blob" itself :>jsonarr string origin: URL of the origin in which context's the metadata is valid, if any :>jsonarr int visit: identifier of the visit in which context's the metadata is valid, if any :>jsonarr string snapshot: SWHID of the snapshot in which context's the metadata is valid, if any :>jsonarr string release: SWHID of the release in which context's the metadata is valid, if any :>jsonarr string revision: SWHID of the revision in which context's the metadata is valid, if any :>jsonarr string path: SWHID of the path in which context's is valid if any, relative to a release or revision as anchor :>jsonarr string directory: SWHID of the directory in which context's the metadata is valid, if any :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`raw-extrinsic-metadata/swhid/swh:1:dir:a2faa28028657859c16ff506924212b33f0e1307/?authority=forge%20https://pypi.org/` """ # noqa authority_str: str = request.query_params.get("authority") after_str: str = request.query_params.get("after") limit_str: str = request.query_params.get("limit", "100") page_token_str: str = request.query_params.get("page_token") if not authority_str: raise BadInputExc("The 'authority' query parameter is required.") if " " not in authority_str.strip(): raise BadInputExc("The 'authority' query parameter should contain a space.") (authority_type_str, authority_url) = authority_str.split(" ", 1) try: authority_type = MetadataAuthorityType(authority_type_str) except ValueError: raise BadInputExc( f"Invalid 'authority' type, should be one of: " f"{', '.join(member.value for member in MetadataAuthorityType)}" ) authority = MetadataAuthority(authority_type, authority_url) if after_str: try: after = iso8601.parse_date(after_str) except iso8601.ParseError: raise BadInputExc("Invalid format for 'after' parameter.") from None else: after = None try: limit = int(limit_str) except ValueError: raise BadInputExc("'limit' parameter must be an integer.") from None limit = min(limit, 10000) try: target = swhids.CoreSWHID.from_string(target).to_extended() except swhids.ValidationError as e: raise BadInputExc(f"Invalid target SWHID: {e.args[0]}") from None if page_token_str: page_token = base64.urlsafe_b64decode(page_token_str) else: page_token = None result_page = archive.storage.raw_extrinsic_metadata_get( target=target, authority=authority, after=after, page_token=page_token, limit=limit, ) results = [] for metadata in result_page.results: result = converters.from_raw_extrinsic_metadata(metadata) # We can't reliably send metadata directly, because it is a bytestring, # and we have to return JSON documents. result["metadata_url"] = reverse( "api-1-raw-extrinsic-metadata-get", url_args={"id": hashutil.hash_to_hex(metadata.id)}, query_params={"filename": f"{target}_metadata"}, request=request, ) results.append(result) response = { "results": results, "headers": {}, } if result_page.next_page_token is not None: response["headers"]["link-next"] = reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": target}, query_params=dict( authority=authority_str, after=after_str, limit=limit_str, page_token=base64.urlsafe_b64encode( result_page.next_page_token.encode() ), ), request=request, ) return response @api_route( "/raw-extrinsic-metadata/get/(?P[0-9a-z]+)/", "api-1-raw-extrinsic-metadata-get", ) def api_raw_extrinsic_metadata_get(request, id): # This is an internal endpoint that should only be accessed via URLs given # by /raw-extrinsic-metadata/swhid/; so it is not documented. metadata = archive.storage.raw_extrinsic_metadata_get_by_ids( [hashutil.hash_to_bytes(id)] ) if not metadata: raise NotFoundExc( "Metadata not found. Use /raw-extrinsic-metadata/swhid/ to access metadata." ) response = HttpResponse( metadata[0].metadata, content_type="application/octet-stream" ) filename = request.query_params.get("filename") if filename and re.match("[a-zA-Z0-9:._-]+", filename): response["Content-disposition"] = f'attachment; filename="{filename}"' else: # It should always be not-None and match the regexp if the URL was created by # /raw-extrinsic-metadata/swhid/, but we're better safe than sorry. response["Content-disposition"] = "attachment" return response @api_route( f"/raw-extrinsic-metadata/swhid/(?P{SWHID_RE})/authorities/", "api-1-raw-extrinsic-metadata-swhid-authorities", ) @api_doc("/raw-extrinsic-metadata/swhid/authorities/") @format_docstring() def api_raw_extrinsic_metadata_swhid_authorities(request, target): """ .. http:get:: /api/1/raw-extrinsic-metadata/swhid/(target)/authorities/ Returns a list of metadata authorities that provided metadata on the given target. They can then be used to get the raw `extrinsic metadata `__ collected on that object from each of the authorities. :param string target: The core SWHID of the object whose metadata-providing authorities should be returned {common_headers} :>jsonarr string type: Type of authority (deposit_client, forge, registry) :>jsonarr string url: Unique IRI identifying the authority :>jsonarr object metadata_list_url: URL to get the list of metadata objects on the given object from this authority :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`raw-extrinsic-metadata/swhid/swh:1:dir:a2faa28028657859c16ff506924212b33f0e1307/authorities/` """ # noqa target_str = target try: target = swhids.CoreSWHID.from_string(target_str).to_extended() except swhids.ValidationError as e: raise BadInputExc(f"Invalid target SWHID: {e.args[0]}") from None authorities = archive.storage.raw_extrinsic_metadata_get_authorities(target=target) results = [ { **authority.to_dict(), "metadata_list_url": reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": target_str}, query_params={"authority": f"{authority.type.value} {authority.url}"}, request=request, ), } for authority in authorities ] return { "results": results, "headers": {}, } diff --git a/swh/web/api/views/vault.py b/swh/web/api/views/vault.py index 07ad8d9c..6e277607 100644 --- a/swh/web/api/views/vault.py +++ b/swh/web/api/views/vault.py @@ -1,514 +1,513 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict from django.http import HttpResponse from django.shortcuts import redirect from swh.model.hashutil import hash_to_hex from swh.model.swhids import CoreSWHID, ObjectType from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.views.utils import api_lookup from swh.web.common import archive, query from swh.web.common.exc import BadInputExc -from swh.web.common.utils import reverse +from swh.web.common.utils import SWHID_RE, reverse + ###################################################### # Common -SWHID_RE = "swh:1:[a-z]{3}:[0-9a-z]{40}" - # XXX: a bit spaghetti. Would be better with class-based views. def _dispatch_cook_progress(request, bundle_type: str, swhid: CoreSWHID): if request.method == "GET": return api_lookup( archive.vault_progress, bundle_type, swhid, notfound_msg=f"Cooking of {swhid} was never requested.", request=request, ) elif request.method == "POST": email = request.POST.get("email", request.GET.get("email", None)) return api_lookup( archive.vault_cook, bundle_type, swhid, email, notfound_msg=f"{swhid} not found.", request=request, ) def _vault_response( vault_response: Dict[str, Any], add_legacy_items: bool ) -> Dict[str, Any]: d = { "fetch_url": vault_response["fetch_url"], "progress_message": vault_response["progress_msg"], "id": vault_response["task_id"], "status": vault_response["task_status"], "swhid": str(vault_response["swhid"]), } if add_legacy_items: d["obj_type"] = vault_response["swhid"].object_type.name.lower() d["obj_id"] = hash_to_hex(vault_response["swhid"].object_id) return d ###################################################### # Flat bundles @api_route( f"/vault/flat/(?P{SWHID_RE})/", "api-1-vault-cook-flat", methods=["GET", "POST"], throttle_scope="swh_vault_cooking", never_cache=True, ) @api_doc("/vault/flat/") @format_docstring() def api_vault_cook_flat(request, swhid): """ .. http:get:: /api/1/vault/flat/(swhid)/ .. http:post:: /api/1/vault/flat/(swhid)/ Request the cooking of a simple archive, typically for a directory. That endpoint enables to create a vault cooking task for a directory through a POST request or check the status of a previously created one through a GET request. Once the cooking task has been executed, the resulting archive can be downloaded using the dedicated endpoint :http:get:`/api/1/vault/flat/(swhid)/raw/`. Then to extract the cooked directory in the current one, use:: $ tar xvf path/to/swh_1_*.tar.gz :param string swhid: the object's SWHID :query string email: e-mail to notify when the archive is ready {common_headers} :>json string fetch_url: the url from which to download the archive once it has been cooked (see :http:get:`/api/1/vault/flat/(swhid)/raw/`) :>json string progress_message: message describing the cooking task progress :>json number id: the cooking task id :>json string status: the cooking task status (either **new**, **pending**, **done** or **failed**) :>json string swhid: the identifier of the object to cook :statuscode 200: no error :statuscode 400: an invalid directory identifier has been provided :statuscode 404: requested directory did not receive any cooking request yet (in case of GET) or can not be found in the archive (in case of POST) """ swhid = CoreSWHID.from_string(swhid) if swhid.object_type == ObjectType.DIRECTORY: res = _dispatch_cook_progress(request, "flat", swhid) res["fetch_url"] = reverse( "api-1-vault-fetch-flat", url_args={"swhid": str(swhid)}, request=request, ) return _vault_response(res, add_legacy_items=False) elif swhid.object_type == ObjectType.CONTENT: raise BadInputExc( "Content objects do not need to be cooked, " "use `/api/1/content/raw/` instead." ) elif swhid.object_type == ObjectType.REVISION: # TODO: support revisions too? (the vault allows it) raise BadInputExc( "Only directories can be cooked as 'flat' bundles. " "Use `/api/1/vault/gitfast/` to cook revisions, as gitfast bundles." ) else: raise BadInputExc("Only directories can be cooked as 'flat' bundles.") @api_route( r"/vault/directory/(?P[0-9a-f]+)/", "api-1-vault-cook-directory", methods=["GET", "POST"], checksum_args=["dir_id"], throttle_scope="swh_vault_cooking", never_cache=True, ) @api_doc("/vault/directory/", tags=["deprecated"]) @format_docstring() def api_vault_cook_directory(request, dir_id): """ .. http:get:: /api/1/vault/directory/(dir_id)/ This endpoint was replaced by :http:get:`/api/1/vault/flat/(swhid)/` """ _, obj_id = query.parse_hash_with_algorithms_or_throws( dir_id, ["sha1"], "Only sha1_git is supported." ) swhid = f"swh:1:dir:{obj_id.hex()}" res = _dispatch_cook_progress(request, "flat", CoreSWHID.from_string(swhid)) res["fetch_url"] = reverse( "api-1-vault-fetch-flat", url_args={"swhid": swhid}, request=request, ) return _vault_response(res, add_legacy_items=True) @api_route( f"/vault/flat/(?P{SWHID_RE})/raw/", "api-1-vault-fetch-flat", ) @api_doc("/vault/flat/raw/") def api_vault_fetch_flat(request, swhid): """ .. http:get:: /api/1/vault/flat/(swhid)/raw/ Fetch the cooked archive for a flat bundle. See :http:get:`/api/1/vault/flat/(swhid)/` to get more details on 'flat' bundle cooking. :param string swhid: the SWHID of the object to cook :resheader Content-Type: application/gzip :statuscode 200: no error :statuscode 404: requested directory did not receive any cooking request yet (in case of GET) or can not be found in the archive (in case of POST) """ res = api_lookup( archive.vault_fetch, "flat", CoreSWHID.from_string(swhid), notfound_msg=f"Cooked archive for {swhid} not found.", request=request, ) fname = "{}.tar.gz".format(swhid) response = HttpResponse(res, content_type="application/gzip") response["Content-disposition"] = "attachment; filename={}".format( fname.replace(":", "_") ) return response @api_route( r"/vault/directory/(?P[0-9a-f]+)/raw/", "api-1-vault-fetch-directory", checksum_args=["dir_id"], ) @api_doc("/vault/directory/raw/", tags=["hidden", "deprecated"]) def api_vault_fetch_directory(request, dir_id): """ .. http:get:: /api/1/vault/directory/(dir_id)/raw/ This endpoint was replaced by :http:get:`/api/1/vault/flat/(swhid)/raw/` """ _, obj_id = query.parse_hash_with_algorithms_or_throws( dir_id, ["sha1"], "Only sha1_git is supported." ) rev_flat_raw_url = reverse( "api-1-vault-fetch-flat", url_args={"swhid": f"swh:1:dir:{dir_id}"} ) return redirect(rev_flat_raw_url) ###################################################### # gitfast bundles @api_route( f"/vault/gitfast/(?P{SWHID_RE})/", "api-1-vault-cook-gitfast", methods=["GET", "POST"], throttle_scope="swh_vault_cooking", never_cache=True, ) @api_doc("/vault/gitfast/") @format_docstring() def api_vault_cook_gitfast(request, swhid): """ .. http:get:: /api/1/vault/gitfast/(swhid)/ .. http:post:: /api/1/vault/gitfast/(swhid)/ Request the cooking of a gitfast archive for a revision or check its cooking status. That endpoint enables to create a vault cooking task for a revision through a POST request or check the status of a previously created one through a GET request. Once the cooking task has been executed, the resulting gitfast archive can be downloaded using the dedicated endpoint :http:get:`/api/1/vault/gitfast/(swhid)/raw/`. Then to import the revision in the current directory, use:: $ git init $ zcat path/to/swh_1_rev_*.gitfast.gz | git fast-import $ git checkout HEAD :param string swhid: the revision's permanent identifiers :query string email: e-mail to notify when the gitfast archive is ready {common_headers} :>json string fetch_url: the url from which to download the archive once it has been cooked (see :http:get:`/api/1/vault/gitfast/(swhid)/raw/`) :>json string progress_message: message describing the cooking task progress :>json number id: the cooking task id :>json string status: the cooking task status (new/pending/done/failed) :>json string swhid: the identifier of the object to cook :statuscode 200: no error :statuscode 404: requested directory did not receive any cooking request yet (in case of GET) or can not be found in the archive (in case of POST) """ swhid = CoreSWHID.from_string(swhid) if swhid.object_type == ObjectType.REVISION: res = _dispatch_cook_progress(request, "gitfast", swhid) res["fetch_url"] = reverse( "api-1-vault-fetch-gitfast", url_args={"swhid": str(swhid)}, request=request, ) return _vault_response(res, add_legacy_items=False) elif swhid.object_type == ObjectType.CONTENT: raise BadInputExc( "Content objects do not need to be cooked, " "use `/api/1/content/raw/` instead." ) elif swhid.object_type == ObjectType.DIRECTORY: raise BadInputExc( "Only revisions can be cooked as 'gitfast' bundles. " "Use `/api/1/vault/flat/` to cook directories, as flat bundles." ) else: raise BadInputExc("Only revisions can be cooked as 'gitfast' bundles.") @api_route( r"/vault/revision/(?P[0-9a-f]+)/gitfast/", "api-1-vault-cook-revision_gitfast", methods=["GET", "POST"], checksum_args=["rev_id"], throttle_scope="swh_vault_cooking", never_cache=True, ) @api_doc("/vault/revision/gitfast/", tags=["deprecated"]) @format_docstring() def api_vault_cook_revision_gitfast(request, rev_id): """ .. http:get:: /api/1/vault/revision/(rev_id)/gitfast/ This endpoint was replaced by :http:get:`/api/1/vault/gitfast/(swhid)/` """ _, obj_id = query.parse_hash_with_algorithms_or_throws( rev_id, ["sha1"], "Only sha1_git is supported." ) swhid = f"swh:1:rev:{obj_id.hex()}" res = _dispatch_cook_progress(request, "gitfast", CoreSWHID.from_string(swhid)) res["fetch_url"] = reverse( "api-1-vault-fetch-gitfast", url_args={"swhid": swhid}, request=request, ) return _vault_response(res, add_legacy_items=True) @api_route( f"/vault/gitfast/(?P{SWHID_RE})/raw/", "api-1-vault-fetch-gitfast", ) @api_doc("/vault/gitfast/raw/") def api_vault_fetch_revision_gitfast(request, swhid): """ .. http:get:: /api/1/vault/gitfast/(swhid)/raw/ Fetch the cooked gitfast archive for a revision. See :http:get:`/api/1/vault/gitfast/(swhid)/` to get more details on gitfast cooking. :param string rev_id: the revision's sha1 identifier :resheader Content-Type: application/gzip :statuscode 200: no error :statuscode 404: requested directory did not receive any cooking request yet (in case of GET) or can not be found in the archive (in case of POST) """ res = api_lookup( archive.vault_fetch, "gitfast", CoreSWHID.from_string(swhid), notfound_msg="Cooked archive for {} not found.".format(swhid), request=request, ) fname = "{}.gitfast.gz".format(swhid) response = HttpResponse(res, content_type="application/gzip") response["Content-disposition"] = "attachment; filename={}".format( fname.replace(":", "_") ) return response @api_route( r"/vault/revision/(?P[0-9a-f]+)/gitfast/raw/", "api-1-vault-fetch-revision_gitfast", checksum_args=["rev_id"], ) @api_doc("/vault/revision_gitfast/raw/", tags=["hidden", "deprecated"]) def _api_vault_revision_gitfast_raw(request, rev_id): """ .. http:get:: /api/1/vault/revision/(rev_id)/gitfast/raw/ This endpoint was replaced by :http:get:`/api/1/vault/gitfast/(swhid)/raw/` """ rev_gitfast_raw_url = reverse( "api-1-vault-fetch-gitfast", url_args={"swhid": f"swh:1:rev:{rev_id}"} ) return redirect(rev_gitfast_raw_url) ###################################################### # git_bare bundles @api_route( f"/vault/git-bare/(?P{SWHID_RE})/", "api-1-vault-cook-git-bare", methods=["GET", "POST"], throttle_scope="swh_vault_cooking", never_cache=True, ) @api_doc("/vault/git-bare/") @format_docstring() def api_vault_cook_git_bare(request, swhid): """ .. http:get:: /api/1/vault/git-bare/(swhid)/ .. http:post:: /api/1/vault/git-bare/(swhid)/ Request the cooking of a git-bare archive for a revision or check its cooking status. That endpoint enables to create a vault cooking task for a revision through a POST request or check the status of a previously created one through a GET request. Once the cooking task has been executed, the resulting git-bare archive can be downloaded using the dedicated endpoint :http:get:`/api/1/vault/git-bare/(swhid)/raw/`. Then to import the revision in the current directory, use:: $ tar -xf path/to/swh_1_rev_*.git.tar $ git clone swh:1:rev:*.git new_repository (replace ``swh:1:rev:*`` with the SWHID of the requested revision) This will create a directory called ``new_repository``, which is a git repository containing the requested objects. :param string swhid: the revision's permanent identifier :query string email: e-mail to notify when the git-bare archive is ready {common_headers} :>json string fetch_url: the url from which to download the archive once it has been cooked (see :http:get:`/api/1/vault/git-bare/(swhid)/raw/`) :>json string progress_message: message describing the cooking task progress :>json number id: the cooking task id :>json string status: the cooking task status (new/pending/done/failed) :>json string swhid: the identifier of the object to cook :statuscode 200: no error :statuscode 404: requested directory did not receive any cooking request yet (in case of GET) or can not be found in the archive (in case of POST) """ swhid = CoreSWHID.from_string(swhid) if swhid.object_type == ObjectType.REVISION: res = _dispatch_cook_progress(request, "git_bare", swhid) res["fetch_url"] = reverse( "api-1-vault-fetch-git-bare", url_args={"swhid": str(swhid)}, request=request, ) return _vault_response(res, add_legacy_items=False) elif swhid.object_type == ObjectType.CONTENT: raise BadInputExc( "Content objects do not need to be cooked, " "use `/api/1/content/raw/` instead." ) elif swhid.object_type == ObjectType.DIRECTORY: raise BadInputExc( "Only revisions can be cooked as 'git-bare' bundles. " "Use `/api/1/vault/flat/` to cook directories, as flat bundles." ) else: raise BadInputExc("Only revisions can be cooked as 'git-bare' bundles.") @api_route( f"/vault/git-bare/(?P{SWHID_RE})/raw/", "api-1-vault-fetch-git-bare", ) @api_doc("/vault/git-bare/raw/") def api_vault_fetch_revision_git_bare(request, swhid): """ .. http:get:: /api/1/vault/git-bare/(swhid)/raw/ Fetch the cooked git-bare archive for a revision. See :http:get:`/api/1/vault/git-bare/(swhid)/` to get more details on git-bare cooking. :param string swhid: the revision's permanent identifier :resheader Content-Type: application/x-tar :statuscode 200: no error :statuscode 404: requested directory did not receive any cooking request yet (in case of GET) or can not be found in the archive (in case of POST) """ res = api_lookup( archive.vault_fetch, "git_bare", CoreSWHID.from_string(swhid), notfound_msg="Cooked archive for {} not found.".format(swhid), request=request, ) fname = "{}.git.tar".format(swhid) response = HttpResponse(res, content_type="application/x-tar") response["Content-disposition"] = "attachment; filename={}".format( fname.replace(":", "_") ) return response diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index 41f5d934..0eaa259f 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,527 +1,529 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import functools import os import re from typing import Any, Callable, Dict, List, Optional import urllib.parse from bs4 import BeautifulSoup from docutils.core import publish_parts import docutils.parsers.rst import docutils.utils from docutils.writers.html5_polyglot import HTMLTranslator, Writer from iso8601 import ParseError, parse_date from pkg_resources import get_distribution from prometheus_client.registry import CollectorRegistry import requests from requests.auth import HTTPBasicAuth from django.core.cache import cache from django.core.cache.backends.base import DEFAULT_TIMEOUT from django.http import HttpRequest, QueryDict from django.shortcuts import redirect from django.urls import resolve from django.urls import reverse as django_reverse from swh.web.auth.utils import ( ADD_FORGE_MODERATOR_PERMISSION, ADMIN_LIST_DEPOSIT_PERMISSION, MAILMAP_ADMIN_PERMISSION, ) from swh.web.common.exc import BadInputExc, sentry_capture_exception from swh.web.common.typing import QueryParameters from swh.web.config import SWH_WEB_SERVER_NAME, get_config, search SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) +SWHID_RE = "swh:1:[a-z]{3}:[0-9a-z]{40}" + swh_object_icons = { "alias": "mdi mdi-star", "branch": "mdi mdi-source-branch", "branches": "mdi mdi-source-branch", "content": "mdi mdi-file-document", "cnt": "mdi mdi-file-document", "directory": "mdi mdi-folder", "dir": "mdi mdi-folder", "origin": "mdi mdi-source-repository", "ori": "mdi mdi-source-repository", "person": "mdi mdi-account", "revisions history": "mdi mdi-history", "release": "mdi mdi-tag", "rel": "mdi mdi-tag", "releases": "mdi mdi-tag", "revision": "mdi mdi-rotate-90 mdi-source-commit", "rev": "mdi mdi-rotate-90 mdi-source-commit", "snapshot": "mdi mdi-camera", "snp": "mdi mdi-camera", "visits": "mdi mdi-calendar-month", } def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[QueryParameters] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo and date.tzinfo != timezone.utc: return date.astimezone(tz=timezone.utc) else: return date def parse_iso8601_date_to_utc(iso_date: str) -> datetime: """Given an ISO 8601 datetime string, parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed date Raises: swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - 2007-01-14T20:34:22Z """ try: date = parse_date(iso_date) return datetime_to_utc(date) except ParseError as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}" sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}" ret = re.sub(sha256_re, r"\1...", path) return re.sub(sha1_re, r"\1...", ret) def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"): """Turns a string representation of an ISO 8601 datetime string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_iso8601_date_to_utc(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip("/").split("/") path_from_root = "" for p in sub_paths: path_from_root += "/" + p path_info.append({"name": p, "path": path_from_root.strip("/")}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components ).get_default_values() settings.report_level = report_level document = docutils.utils.new_document("rst-doc", settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: ip = x_forwarded_for.split(",")[0] else: ip = request.META.get("REMOTE_ADDR") return ip def is_swh_web_development(request: HttpRequest) -> bool: """Indicate if we are running a development version of swh-web.""" site_base_url = request.build_absolute_uri("/") return any( host in site_base_url for host in ("localhost", "127.0.0.1", "testserver") ) def is_swh_web_staging(request: HttpRequest) -> bool: """Indicate if we are running a staging version of swh-web.""" config = get_config() site_base_url = request.build_absolute_uri("/") return any( server_name in site_base_url for server_name in config["staging_server_names"] ) def is_swh_web_production(request: HttpRequest) -> bool: """Indicate if we are running the public production version of swh-web.""" return SWH_WEB_SERVER_NAME in request.build_absolute_uri("/") browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if ( hasattr(request, "user") and request.user.is_authenticated and not hasattr(request.user, "backend") ): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = "django.contrib.auth.backends.ModelBackend" return { "swh_object_icons": swh_object_icons, "available_languages": None, "swh_client_config": config["client_config"], "oidc_enabled": bool(config["keycloak"]["server_url"]), "browsers_supported_image_mimes": browsers_supported_image_mimes, "keycloak": config["keycloak"], "site_base_url": request.build_absolute_uri("/"), "DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"], "status": config["status"], "swh_web_dev": is_swh_web_development(request), "swh_web_staging": is_swh_web_staging(request), "swh_web_version": get_distribution("swh.web").version, "iframe_mode": False, "ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION, "ADD_FORGE_MODERATOR_PERMISSION": ADD_FORGE_MODERATOR_PERMISSION, "FEATURES": get_config()["features"], "MAILMAP_ADMIN_PERMISSION": MAILMAP_ADMIN_PERMISSION, } def resolve_branch_alias( snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch["target_type"] == "alias": if branch["target"] in snapshot["branches"]: branch = snapshot["branches"][branch["target"]] else: from swh.web.common import archive snp = archive.lookup_snapshot( snapshot["id"], branches_from=branch["target"], branches_count=1 ) if snp and branch["target"] in snp["branches"]: branch = snp["branches"][branch["target"]] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { "initial_header_level": 2, "halt_level": 4, "traceback": True, "file_insertion_enabled": False, "raw_enabled": False, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
{pp["html_body"]}
' def prettify_html(html: str) -> str: """ Prettify an HTML document. Args: html: Input HTML document Returns: The prettified HTML document """ return BeautifulSoup(html, "lxml").prettify() def django_cache( timeout: int = DEFAULT_TIMEOUT, catch_exception: bool = False, exception_return_value: Any = None, invalidate_cache_pred: Callable[[Any], bool] = lambda val: False, ): """Decorator to put the result of a function call in Django cache, subsequent calls will directly return the cached value. Args: timeout: The number of seconds value will be hold in cache catch_exception: If :const:`True`, any thrown exception by the decorated function will be caught and not reraised exception_return_value: The value to return if previous parameter is set to :const:`True` invalidate_cache_pred: A predicate function enabling to invalidate the cache under certain conditions, decorated function will then be called again Returns: The returned value of the decorated function for the specified parameters """ def inner(func): @functools.wraps(func) def wrapper(*args, **kwargs): func_args = args + (0,) + tuple(sorted(kwargs.items())) cache_key = str(hash((func.__module__, func.__name__) + func_args)) ret = cache.get(cache_key) if ret is None or invalidate_cache_pred(ret): try: ret = func(*args, **kwargs) except Exception as exc: if catch_exception: sentry_capture_exception(exc) return exception_return_value else: raise else: cache.set(cache_key, ret, timeout=timeout) return ret return wrapper return inner def _deposits_list_url( deposits_list_base_url: str, page_size: int, username: Optional[str] ) -> str: params = {"page_size": str(page_size)} if username is not None: params["username"] = username return f"{deposits_list_base_url}?{urllib.parse.urlencode(params)}" def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]: """Return the list of software deposits using swh-deposit API""" config = get_config()["deposit"] private_api_url = config["private_api_url"].rstrip("/") + "/" deposits_list_base_url = private_api_url + "deposits" deposits_list_auth = HTTPBasicAuth( config["private_api_user"], config["private_api_password"] ) deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=1, username=username ) nb_deposits = requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30 ).json()["count"] @django_cache(invalidate_cache_pred=lambda data: data["count"] != nb_deposits) def _get_deposits_data(): deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=nb_deposits, username=username ) return requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30, ).json() deposits_data = _get_deposits_data() return deposits_data["results"] _origin_visit_types_cache_timeout = 24 * 60 * 60 # 24 hours @django_cache( timeout=_origin_visit_types_cache_timeout, catch_exception=True, exception_return_value=[], ) def origin_visit_types() -> List[str]: """Return the exhaustive list of visit types for origins ingested into the archive. """ return sorted(search().visit_types_count().keys()) def redirect_to_new_route(request, new_route, permanent=True): """Redirect a request to another route with url args and query parameters eg: /origin//log?path=test can be redirected as /log?url=&path=test. This can be used to deprecate routes """ request_path = resolve(request.path_info) args = {**request_path.kwargs, **request.GET.dict()} return redirect( reverse(new_route, query_params=args), permanent=permanent, ) def has_add_forge_now_permission(user) -> bool: """Is a user considered an add-forge-now moderator? Returns True if a user is staff or has add forge now moderator permission """ return user.is_staff or user.has_perm(ADD_FORGE_MODERATOR_PERMISSION)