diff --git a/swh/web/admin/deposit.py b/swh/web/admin/deposit.py index eec132b1..0be07976 100644 --- a/swh/web/admin/deposit.py +++ b/swh/web/admin/deposit.py @@ -1,135 +1,44 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -import sentry_sdk + +import requests +from requests.auth import HTTPBasicAuth from django.conf import settings from django.contrib.auth.decorators import user_passes_test -from django.core.paginator import Paginator from django.http import JsonResponse from django.shortcuts import render from swh.web.admin.adminurls import admin_route from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION -from swh.web.common.utils import ( - get_deposit_raw_metadata, - get_deposits_list, - parse_swh_deposit_origin, - parse_swh_metadata_provenance, -) +from swh.web.config import get_config def _can_list_deposits(user): return user.is_staff or user.has_perm(ADMIN_LIST_DEPOSIT_PERMISSION) @admin_route(r"deposit/", view_name="admin-deposit") @user_passes_test(_can_list_deposits, login_url=settings.LOGIN_URL) def _admin_origin_save(request): return render(request, "admin/deposit.html") @admin_route(r"deposit/list/", view_name="admin-deposit-list") @user_passes_test(_can_list_deposits, login_url=settings.LOGIN_URL) def _admin_deposit_list(request): - table_data = {} - table_data["draw"] = int(request.GET["draw"]) - try: - deposits = get_deposits_list(request.GET.get("username")) - deposits_count = len(deposits) - search_value = request.GET["search[value]"] - if search_value: - deposits = [ - d - for d in deposits - if any( - search_value.lower() in val - for val in [str(v).lower() for v in d.values()] - ) - ] - - exclude_pattern = request.GET.get("excludePattern") - if exclude_pattern: - deposits = [ - d - for d in deposits - if all( - exclude_pattern.lower() not in val - for val in [str(v).lower() for v in d.values()] - ) - ] - - column_order = request.GET["order[0][column]"] - field_order = request.GET["columns[%s][name]" % column_order] - order_dir = request.GET["order[0][dir]"] - - deposits = sorted(deposits, key=lambda d: d[field_order] or "") - if order_dir == "desc": - deposits = list(reversed(deposits)) - - length = int(request.GET["length"]) - page = int(request.GET["start"]) / length + 1 - paginator = Paginator(deposits, length) - data = paginator.page(page).object_list - table_data["recordsTotal"] = deposits_count - table_data["recordsFiltered"] = len(deposits) - data_list = [] - for d in data: - data_dict = { - "id": d["id"], - "type": d["type"], - "external_id": d["external_id"], - "reception_date": d["reception_date"], - "status": d["status"], - "status_detail": d["status_detail"], - "swhid": d["swhid"], - "swhid_context": d["swhid_context"], - } - provenance = None - raw_metadata = d["raw_metadata"] - # for meta deposit, the uri should be the url provenance - if raw_metadata and d["type"] == "meta": # metadata provenance - provenance = parse_swh_metadata_provenance(d["raw_metadata"]) - # For code deposits the uri is the origin - # First, trying to determine it out of the raw metadata associated with the - # deposit - elif raw_metadata and d["type"] == "code": - provenance = parse_swh_deposit_origin(raw_metadata) - - # For code deposits, if not provided, use the origin_url - if not provenance and d["type"] == "code": - if d["origin_url"]: - provenance = d["origin_url"] - - # If still not found, fallback using the swhid context - if not provenance and d["swhid_context"]: - # Trying to compute the origin as we did before in the js - from swh.model.swhids import QualifiedSWHID - - swhid = QualifiedSWHID.from_string(d["swhid_context"]) - provenance = swhid.origin - - data_dict["uri"] = provenance # could be None - - # This could be large. As this is not displayed yet, drop it to avoid - # cluttering the data dict - data_dict.pop("raw_metadata", None) - - data_list.append(data_dict) - - table_data["data"] = data_list - - for row in table_data["data"]: - metadata = get_deposit_raw_metadata(row["id"]) - if metadata: - row["raw_metadata"] = metadata - else: - row["raw_metadata"] = None - - except Exception as exc: - sentry_sdk.capture_exception(exc) - table_data["error"] = f"Could not retrieve deposits: {exc!r}" - - return JsonResponse(table_data) + config = get_config()["deposit"] + private_api_url = config["private_api_url"].rstrip("/") + "/" + deposits_list_url = private_api_url + "deposits/datatables/" + deposits_list_auth = HTTPBasicAuth( + config["private_api_user"], config["private_api_password"] + ) + + deposits = requests.get( + deposits_list_url, auth=deposits_list_auth, params=request.GET, timeout=30 + ).json() + + return JsonResponse(deposits) diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index 1853f6a4..d8bb0fcf 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,602 +1,528 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import functools import os import re from typing import Any, Callable, Dict, List, Optional import urllib.parse -from xml.etree import ElementTree from bs4 import BeautifulSoup from docutils.core import publish_parts import docutils.parsers.rst import docutils.utils from docutils.writers.html5_polyglot import HTMLTranslator, Writer from iso8601 import ParseError, parse_date from pkg_resources import get_distribution from prometheus_client.registry import CollectorRegistry import requests from requests.auth import HTTPBasicAuth import sentry_sdk from django.core.cache import cache from django.core.cache.backends.base import DEFAULT_TIMEOUT from django.http import HttpRequest, QueryDict from django.shortcuts import redirect from django.urls import resolve from django.urls import reverse as django_reverse from swh.web.auth.utils import ( ADD_FORGE_MODERATOR_PERMISSION, ADMIN_LIST_DEPOSIT_PERMISSION, MAILMAP_ADMIN_PERMISSION, ) from swh.web.common.exc import BadInputExc from swh.web.common.typing import QueryParameters from swh.web.config import SWH_WEB_SERVER_NAME, get_config, search SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) swh_object_icons = { "alias": "mdi mdi-star", "branch": "mdi mdi-source-branch", "branches": "mdi mdi-source-branch", "content": "mdi mdi-file-document", "cnt": "mdi mdi-file-document", "directory": "mdi mdi-folder", "dir": "mdi mdi-folder", "origin": "mdi mdi-source-repository", "ori": "mdi mdi-source-repository", "person": "mdi mdi-account", "revisions history": "mdi mdi-history", "release": "mdi mdi-tag", "rel": "mdi mdi-tag", "releases": "mdi mdi-tag", "revision": "mdi mdi-rotate-90 mdi-source-commit", "rev": "mdi mdi-rotate-90 mdi-source-commit", "snapshot": "mdi mdi-camera", "snp": "mdi mdi-camera", "visits": "mdi mdi-calendar-month", } def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[QueryParameters] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo and date.tzinfo != timezone.utc: return date.astimezone(tz=timezone.utc) else: return date def parse_iso8601_date_to_utc(iso_date: str) -> datetime: """Given an ISO 8601 datetime string, parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed date Raises: swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - 2007-01-14T20:34:22Z """ try: date = parse_date(iso_date) return datetime_to_utc(date) except ParseError as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}" sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}" ret = re.sub(sha256_re, r"\1...", path) return re.sub(sha1_re, r"\1...", ret) def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"): """Turns a string representation of an ISO 8601 datetime string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_iso8601_date_to_utc(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip("/").split("/") path_from_root = "" for p in sub_paths: path_from_root += "/" + p path_info.append({"name": p, "path": path_from_root.strip("/")}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components ).get_default_values() settings.report_level = report_level document = docutils.utils.new_document("rst-doc", settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: ip = x_forwarded_for.split(",")[0] else: ip = request.META.get("REMOTE_ADDR") return ip def is_swh_web_development(request: HttpRequest) -> bool: """Indicate if we are running a development version of swh-web.""" site_base_url = request.build_absolute_uri("/") return any( host in site_base_url for host in ("localhost", "127.0.0.1", "testserver") ) def is_swh_web_staging(request: HttpRequest) -> bool: """Indicate if we are running a staging version of swh-web.""" config = get_config() site_base_url = request.build_absolute_uri("/") return any( server_name in site_base_url for server_name in config["staging_server_names"] ) def is_swh_web_production(request: HttpRequest) -> bool: """Indicate if we are running the public production version of swh-web.""" return SWH_WEB_SERVER_NAME in request.build_absolute_uri("/") browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if ( hasattr(request, "user") and request.user.is_authenticated and not hasattr(request.user, "backend") ): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = "django.contrib.auth.backends.ModelBackend" return { "swh_object_icons": swh_object_icons, "available_languages": None, "swh_client_config": config["client_config"], "oidc_enabled": bool(config["keycloak"]["server_url"]), "browsers_supported_image_mimes": browsers_supported_image_mimes, "keycloak": config["keycloak"], "site_base_url": request.build_absolute_uri("/"), "DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"], "status": config["status"], "swh_web_dev": is_swh_web_development(request), "swh_web_staging": is_swh_web_staging(request), "swh_web_version": get_distribution("swh.web").version, "iframe_mode": False, "ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION, "ADD_FORGE_MODERATOR_PERMISSION": ADD_FORGE_MODERATOR_PERMISSION, "FEATURES": get_config()["features"], "MAILMAP_ADMIN_PERMISSION": MAILMAP_ADMIN_PERMISSION, } def resolve_branch_alias( snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch["target_type"] == "alias": if branch["target"] in snapshot["branches"]: branch = snapshot["branches"][branch["target"]] else: from swh.web.common import archive snp = archive.lookup_snapshot( snapshot["id"], branches_from=branch["target"], branches_count=1 ) if snp and branch["target"] in snp["branches"]: branch = snp["branches"][branch["target"]] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { "initial_header_level": 2, "halt_level": 4, "traceback": True, "file_insertion_enabled": False, "raw_enabled": False, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
{pp["html_body"]}
' def prettify_html(html: str) -> str: """ Prettify an HTML document. Args: html: Input HTML document Returns: The prettified HTML document """ return BeautifulSoup(html, "lxml").prettify() def django_cache( timeout: int = DEFAULT_TIMEOUT, catch_exception: bool = False, exception_return_value: Any = None, invalidate_cache_pred: Callable[[Any], bool] = lambda val: False, ): """Decorator to put the result of a function call in Django cache, subsequent calls will directly return the cached value. Args: timeout: The number of seconds value will be hold in cache catch_exception: If :const:`True`, any thrown exception by the decorated function will be caught and not reraised exception_return_value: The value to return if previous parameter is set to :const:`True` invalidate_cache_pred: A predicate function enabling to invalidate the cache under certain conditions, decorated function will then be called again Returns: The returned value of the decorated function for the specified parameters """ def inner(func): @functools.wraps(func) def wrapper(*args, **kwargs): func_args = args + (0,) + tuple(sorted(kwargs.items())) cache_key = str(hash((func.__module__, func.__name__) + func_args)) ret = cache.get(cache_key) if ret is None or invalidate_cache_pred(ret): try: ret = func(*args, **kwargs) except Exception as exc: sentry_sdk.capture_exception(exc) if catch_exception: return exception_return_value else: raise else: cache.set(cache_key, ret, timeout=timeout) return ret return wrapper return inner def _deposits_list_url( deposits_list_base_url: str, page_size: int, username: Optional[str] ) -> str: params = {"page_size": str(page_size)} if username is not None: params["username"] = username return f"{deposits_list_base_url}?{urllib.parse.urlencode(params)}" def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]: """Return the list of software deposits using swh-deposit API""" config = get_config()["deposit"] - deposits_list_base_url = config["private_api_url"] + "deposits" + private_api_url = config["private_api_url"].rstrip("/") + "/" + deposits_list_base_url = private_api_url + "deposits" deposits_list_auth = HTTPBasicAuth( config["private_api_user"], config["private_api_password"] ) deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=1, username=username ) nb_deposits = requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30 ).json()["count"] @django_cache(invalidate_cache_pred=lambda data: data["count"] != nb_deposits) def _get_deposits_data(): deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=nb_deposits, username=username ) return requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30, ).json() deposits_data = _get_deposits_data() return deposits_data["results"] -@django_cache() -def get_deposit_raw_metadata(deposit_id: int) -> Optional[str]: - config = get_config()["deposit"] - url = f"{config['private_api_url']}/{deposit_id}/meta" - return requests.get(url).json()["raw_metadata"] - - _origin_visit_types_cache_timeout = 24 * 60 * 60 # 24 hours @django_cache( timeout=_origin_visit_types_cache_timeout, catch_exception=True, exception_return_value=[], ) def origin_visit_types() -> List[str]: """Return the exhaustive list of visit types for origins ingested into the archive. """ return sorted(search().visit_types_count().keys()) def redirect_to_new_route(request, new_route, permanent=True): """Redirect a request to another route with url args and query parameters eg: /origin//log?path=test can be redirected as /log?url=&path=test. This can be used to deprecate routes """ request_path = resolve(request.path_info) args = {**request_path.kwargs, **request.GET.dict()} return redirect( reverse(new_route, query_params=args), permanent=permanent, ) -NAMESPACES = { - "swh": "https://www.softwareheritage.org/schema/2018/deposit", - "schema": "http://schema.org/", -} - - -def parse_swh_metadata_provenance(raw_metadata: str) -> Optional[str]: - """Parse swh metadata-provenance out of the raw metadata deposit. If found, returns the - value, None otherwise. - - .. code-block:: xml - - - - https://example.org/metadata/url - - - - Args: - raw_metadata: raw metadata out of deposits received - - Returns: - Either the metadata provenance url if any or None otherwise - - """ - metadata = ElementTree.fromstring(raw_metadata) - url = metadata.findtext( - "swh:deposit/swh:metadata-provenance/schema:url", - namespaces=NAMESPACES, - ) - return url or None - - -def parse_swh_deposit_origin(raw_metadata: str) -> Optional[str]: - """Parses and from metadata document, - if any. They are mutually exclusive and tested as such in the deposit. - - .. code-block:: xml - - - - - - - - .. code-block:: xml - - - - - - - - Returns: - The one not null if any, None otherwise - - """ - metadata = ElementTree.fromstring(raw_metadata) - for origin_tag in ["create_origin", "add_to_origin"]: - elt = metadata.find( - f"swh:deposit/swh:{origin_tag}/swh:origin[@url]", namespaces=NAMESPACES - ) - if elt is not None: - return elt.attrib["url"] - return None - - def has_add_forge_now_permission(user) -> bool: """Is a user considered an add-forge-now moderator? Returns True if a user is staff or has add forge now moderator permission """ return user.is_staff or user.has_perm(ADD_FORGE_MODERATOR_PERMISSION) diff --git a/swh/web/tests/admin/test_deposit.py b/swh/web/tests/admin/test_deposit.py index ceeb2860..75f64aed 100644 --- a/swh/web/tests/admin/test_deposit.py +++ b/swh/web/tests/admin/test_deposit.py @@ -1,37 +1,101 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +from base64 import b64encode + import pytest from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION from swh.web.common.utils import reverse -from swh.web.tests.utils import check_html_get_response, create_django_permission +from swh.web.config import get_config +from swh.web.tests.utils import ( + check_html_get_response, + check_http_get_response, + create_django_permission, +) def test_deposit_admin_view_not_available_for_anonymous_user(client): url = reverse("admin-deposit") resp = check_html_get_response(client, url, status_code=302) assert resp["location"] == reverse("login", query_params={"next": url}) @pytest.mark.django_db def test_deposit_admin_view_available_for_staff_user(client, staff_user): client.force_login(staff_user) url = reverse("admin-deposit") check_html_get_response( client, url, status_code=200, template_used="admin/deposit.html" ) @pytest.mark.django_db def test_deposit_admin_view_available_for_user_with_permission(client, regular_user): regular_user.user_permissions.add( create_django_permission(ADMIN_LIST_DEPOSIT_PERMISSION) ) client.force_login(regular_user) url = reverse("admin-deposit") check_html_get_response( client, url, status_code=200, template_used="admin/deposit.html" ) + + +@pytest.mark.django_db +def test_deposit_admin_view_list_deposits(client, staff_user, requests_mock): + deposits_data = { + "data": [ + { + "external_id": "hal-02527986", + "id": 1066, + "raw_metadata": None, + "reception_date": "2022-04-08T14:12:34.143000Z", + "status": "rejected", + "status_detail": None, + "swhid": None, + "swhid_context": None, + "type": "code", + "uri": "https://inria.halpreprod.archives-ouvertes.fr/hal-02527986", + }, + { + "external_id": "hal-01243573", + "id": 1065, + "raw_metadata": None, + "reception_date": "2022-04-08T12:53:50.940000Z", + "status": "rejected", + "status_detail": None, + "swhid": None, + "swhid_context": None, + "type": "code", + "uri": "https://inria.halpreprod.archives-ouvertes.fr/hal-01243573", + }, + ], + "draw": 2, + "recordsFiltered": 645, + "recordsTotal": 1066, + } + + config = get_config()["deposit"] + private_api_url = config["private_api_url"].rstrip("/") + "/" + deposits_list_url = private_api_url + "deposits/datatables/" + + basic_auth_payload = ( + config["private_api_user"] + ":" + config["private_api_password"] + ).encode() + + requests_mock.get( + deposits_list_url, + json=deposits_data, + request_headers={ + "Authorization": f"Basic {b64encode(basic_auth_payload).decode('ascii')}" + }, + ) + + client.force_login(staff_user) + url = reverse("admin-deposit-list") + check_http_get_response( + client, url, status_code=200, content_type="application/json" + ) diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py index 65cf28f5..08127875 100644 --- a/swh/web/tests/common/test_utils.py +++ b/swh/web/tests/common/test_utils.py @@ -1,433 +1,392 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import b64encode import datetime import math -from os.path import join import sys from urllib.parse import quote import pytest from django.conf.urls import url from django.test.utils import override_settings from django.urls.exceptions import NoReverseMatch from swh.web.common import utils from swh.web.common.exc import BadInputExc from swh.web.config import SWH_WEB_SERVER_NAME, SWH_WEB_STAGING_SERVER_NAMES, get_config def test_shorten_path_noop(): noops = ["/api/", "/browse/", "/content/symbol/foobar/"] for noop in noops: assert utils.shorten_path(noop) == noop def test_shorten_path_sha1(): sha1 = "aafb16d69fd30ff58afdd69036a26047f3aebdc6" short_sha1 = sha1[:8] + "..." templates = [ "/api/1/content/sha1:%s/", "/api/1/content/sha1_git:%s/", "/api/1/directory/%s/", "/api/1/content/sha1:%s/ctags/", ] for template in templates: assert utils.shorten_path(template % sha1) == template % short_sha1 def test_shorten_path_sha256(): sha256 = "aafb16d69fd30ff58afdd69036a26047" "213add102934013a014dfca031c41aef" short_sha256 = sha256[:8] + "..." templates = [ "/api/1/content/sha256:%s/", "/api/1/directory/%s/", "/api/1/content/sha256:%s/filetype/", ] for template in templates: assert utils.shorten_path(template % sha256) == template % short_sha256 @pytest.mark.parametrize( "input_timestamp, output_date", [ ( "2016-01-12", datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), ), ( "2016-01-12T09:19:12+0100", datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc), ), ( "2007-01-14T20:34:22Z", datetime.datetime(2007, 1, 14, 20, 34, 22, tzinfo=datetime.timezone.utc), ), ], ) def test_parse_iso8601_date_to_utc_ok(input_timestamp, output_date): assert utils.parse_iso8601_date_to_utc(input_timestamp) == output_date @pytest.mark.parametrize( "invalid_iso8601_timestamp", ["Today is January 1, 2047 at 8:21:00AM", "1452591542"] ) def test_parse_iso8601_date_to_utc_ko(invalid_iso8601_timestamp): with pytest.raises(BadInputExc): utils.parse_iso8601_date_to_utc(invalid_iso8601_timestamp) def test_format_utc_iso_date(): assert ( utils.format_utc_iso_date("2017-05-04T13:27:13+02:00") == "04 May 2017, 11:27 UTC" ) def test_gen_path_info(): input_path = "/home/user/swh-environment/swh-web/" expected_result = [ {"name": "home", "path": "home"}, {"name": "user", "path": "home/user"}, {"name": "swh-environment", "path": "home/user/swh-environment"}, {"name": "swh-web", "path": "home/user/swh-environment/swh-web"}, ] path_info = utils.gen_path_info(input_path) assert path_info == expected_result input_path = "home/user/swh-environment/swh-web" path_info = utils.gen_path_info(input_path) assert path_info == expected_result def test_rst_to_html(): rst = ( "Section\n" "=======\n\n" "**Some strong text**\n\n" "* This is a bulleted list.\n" "* It has two items, the second\n" " item uses two lines.\n" "\n" "1. This is a numbered list.\n" "2. It has two items too.\n" "\n" "#. This is a numbered list.\n" "#. It has two items too.\n" ) expected_html = ( '

Section

\n' "

Some strong text

\n" '
    \n' "
  • This is a bulleted list.

  • \n" "
  • It has two items, the second\n" "item uses two lines.

  • \n" "
\n" '
    \n' "
  1. This is a numbered list.

  2. \n" "
  3. It has two items too.

  4. \n" "
  5. This is a numbered list.

  6. \n" "
  7. It has two items too.

  8. \n" "
\n" "
" ) assert utils.rst_to_html(rst) == expected_html def sample_test_view(request, string, number): pass def sample_test_view_no_url_args(request): pass urlpatterns = [ url( r"^sample/test/(?P.+)/view/(?P[0-9]+)/$", sample_test_view, name="sample-test-view", ), url( r"^sample/test/view/no/url/args/$", sample_test_view_no_url_args, name="sample-test-view-no-url-args", ), ] @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_only_ok(): string = "foo" number = 55 url = utils.reverse( "sample-test-view", url_args={"string": string, "number": number} ) assert url == f"/sample/test/{string}/view/{number}/" @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_only_ko(): string = "foo" with pytest.raises(NoReverseMatch): utils.reverse("sample-test-view", url_args={"string": string, "number": string}) @override_settings(ROOT_URLCONF=__name__) def test_reverse_no_url_args(): url = utils.reverse("sample-test-view-no-url-args") assert url == "/sample/test/view/no/url/args/" @override_settings(ROOT_URLCONF=__name__) def test_reverse_query_params_only(): start = 0 scope = "foo" url = utils.reverse( "sample-test-view-no-url-args", query_params={"start": start, "scope": scope} ) assert url == f"/sample/test/view/no/url/args/?scope={scope}&start={start}" url = utils.reverse( "sample-test-view-no-url-args", query_params={"start": start, "scope": None} ) assert url == f"/sample/test/view/no/url/args/?start={start}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_query_params_encode(): libname = "libstc++" url = utils.reverse( "sample-test-view-no-url-args", query_params={"libname": libname} ) assert url == f"/sample/test/view/no/url/args/?libname={quote(libname, safe='/;:')}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_query_params(): string = "foo" number = 55 start = 10 scope = "bar" url = utils.reverse( "sample-test-view", url_args={"string": string, "number": number}, query_params={"start": start, "scope": scope}, ) assert url == f"/sample/test/{string}/view/{number}/?scope={scope}&start={start}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_absolute_uri(request_factory): request = request_factory.get(utils.reverse("sample-test-view-no-url-args")) url = utils.reverse("sample-test-view-no-url-args", request=request) assert url == f"http://{request.META['SERVER_NAME']}/sample/test/view/no/url/args/" def test_get_deposits_list(requests_mock): deposits_data = { "count": 2, "results": [ { "check_task_id": "351820217", "client": 2, "collection": 1, "complete_date": "2021-01-21T07:52:19.919312Z", "external_id": "hal-03116143", "id": 1412, "load_task_id": "351820260", "origin_url": "https://hal.archives-ouvertes.fr/hal-03116143", "parent": None, "reception_date": "2021-01-21T07:52:19.471019Z", "status": "done", "status_detail": None, "swhid": "swh:1:dir:f25157ad1b13cb20ac3457d4f6756b49ac63d079", }, { "check_task_id": "381576507", "client": 2, "collection": 1, "complete_date": "2021-07-07T08:00:44.726676Z", "external_id": "hal-03275052", "id": 1693, "load_task_id": "381576508", "origin_url": "https://hal.archives-ouvertes.fr/hal-03275052", "parent": None, "reception_date": "2021-07-07T08:00:44.327661Z", "status": "done", "status_detail": None, "swhid": "swh:1:dir:825fa96d1810177ec08a772ffa5bd34bbd08b89c", }, ], } config = get_config()["deposit"] - deposits_list_url = config["private_api_url"] + "deposits" + private_api_url = config["private_api_url"].rstrip("/") + "/" + deposits_list_url = private_api_url + "deposits" basic_auth_payload = ( config["private_api_user"] + ":" + config["private_api_password"] ).encode() requests_mock.get( deposits_list_url, json=deposits_data, request_headers={ "Authorization": f"Basic {b64encode(basic_auth_payload).decode('ascii')}" }, ) assert utils.get_deposits_list() == deposits_data["results"] @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_origin_visit_types(mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config search = mocker.patch("swh.web.common.utils.search") search.return_value = None assert utils.origin_visit_types() == [] else: # see swh/web/tests/data.py for origins added for tests assert utils.origin_visit_types() == ["git", "tar"] @pytest.mark.parametrize("server_name", ["localhost", "127.0.0.1", "testserver"]) def test_is_swh_web_development(request_factory, server_name): request = request_factory.get("/", SERVER_NAME=server_name) assert utils.is_swh_web_development(request) @pytest.mark.parametrize("server_name", SWH_WEB_STAGING_SERVER_NAMES) def test_is_swh_web_staging(request_factory, server_name): request = request_factory.get("/", SERVER_NAME=server_name) assert utils.is_swh_web_staging(request) def test_is_swh_web_production(request_factory): request = request_factory.get("/", SERVER_NAME=SWH_WEB_SERVER_NAME) assert utils.is_swh_web_production(request) -@pytest.mark.parametrize( - "raw_metadata_file,expected_url", - [ - ("raw-metadata-provenance.xml", "https://example.org/metadata/provenance"), - ("raw-metadata-no-swh.xml", None), - ], -) -def test_parse_swh_provenance(datadir, raw_metadata_file, expected_url): - metadata_path = join(datadir, "deposit", raw_metadata_file) - with open(metadata_path, "r") as f: - raw_metadata = f.read() - - actual_url = utils.parse_swh_metadata_provenance(raw_metadata) - - assert actual_url == expected_url - - -@pytest.mark.parametrize( - "raw_metadata_file,expected_url", - [ - ( - "raw-metadata-create-origin.xml", - "https://example.org/metadata/create-origin", - ), - ( - "raw-metadata-add-to-origin.xml", - "https://example.org/metadata/add-to-origin", - ), - ("raw-metadata-no-swh.xml", None), - ], -) -def test_parse_swh_origins(datadir, raw_metadata_file, expected_url): - metadata_path = join(datadir, "deposit", raw_metadata_file) - with open(metadata_path, "r") as f: - raw_metadata = f.read() - - actual_url = utils.parse_swh_deposit_origin(raw_metadata) - - assert actual_url == expected_url - - def add(x, y): return x + y def test_django_cache(mocker): """Decorated function should be called once and returned value put in django cache.""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache()(add) val = cached_add(1, 2) val2 = cached_add(1, 2) assert val == val2 == 3 assert spy_add.call_count == 1 assert spy_cache_set.call_count == 1 def test_django_cache_invalidate_cache_pred(mocker): """Decorated function should be called twice and returned value put in django cache twice.""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache(invalidate_cache_pred=lambda val: val == 3)(add) val = cached_add(1, 2) val2 = cached_add(1, 2) assert val == val2 == 3 assert spy_add.call_count == 2 assert spy_cache_set.call_count == 2 def test_django_cache_raise_exception(mocker): """Decorated function should be called twice, exceptions should be raised and no value put in django cache""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache()(add) with pytest.raises(TypeError): cached_add(1, "2") with pytest.raises(TypeError): cached_add(1, "2") assert spy_add.call_count == 2 assert spy_cache_set.call_count == 0 def test_django_cache_catch_exception(mocker): """Decorated function should be called twice, exceptions should not be raised, specified fallback value should be returned and no value put in django cache""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache( catch_exception=True, exception_return_value=math.nan )(add) val = cached_add(1, "2") val2 = cached_add(1, "2") assert math.isnan(val) assert math.isnan(val2) assert spy_add.call_count == 2 assert spy_cache_set.call_count == 0