diff --git a/cypress/integration/origin-visits.spec.js b/cypress/integration/origin-visits.spec.js index ec0c4bdf..188b0f3e 100644 --- a/cypress/integration/origin-visits.spec.js +++ b/cypress/integration/origin-visits.spec.js @@ -1,94 +1,94 @@ /** * Copyright (C) 2019-2020 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ import {getTime} from '../utils'; let origin; function checkTimeLink(element) { expect(element.text()).not.to.be.empty; const urlParams = new URLSearchParams(element.attr('href').split('?')[1]); const timeStringLink = urlParams.get('timestamp'); // time in link should be equal to that in text assert.deepEqual(getTime(timeStringLink), getTime(element.text())); } function searchInCalendar(date) { cy.contains('label', 'Show all visits') .click(); cy.get(`.year${date.year}`) .click({force: true}); cy.contains('.month', date.monthName) .find('.day-content') .eq(date.date - 1) .trigger('mouseenter') .get('.popover-body') .should('be.visible') - .and('contain', `${date.hours}:${date.minutes} UTC`); + .and('contain', `${date.hours}:${date.minutes}:${date.seconds} UTC`); } describe('Visits tests', function() { before(function() { origin = this.origin[1]; }); beforeEach(function() { cy.visit(`${this.Urls.browse_origin_visits()}?origin_url=${origin.url}`); }); it('should display first full visit time', function() { cy.get('#swh-first-full-visit > .swh-visit-full') .then(($el) => { checkTimeLink($el); searchInCalendar(getTime($el.text())); }); }); it('should display last full visit time', function() { cy.get('#swh-last-full-visit > .swh-visit-full') .then(($el) => { checkTimeLink($el); searchInCalendar(getTime($el.text())); }); }); it('should display last visit time', function() { cy.get('#swh-last-visit > .swh-visit-full') .then(($el) => { checkTimeLink($el); searchInCalendar(getTime($el.text())); }); }); it('should display list of visits and mark them on calendar', function() { cy.get('.swh-visits-list-row .swh-visit-full') .should('be.visible') .each(($el) => { checkTimeLink($el); searchInCalendar(getTime($el.text())); }); }); it('should close calendar popover when leaving day', function() { cy.get('#swh-last-visit > .swh-visit-full') .then(($el) => { const date = getTime($el.text()); searchInCalendar(date); cy.contains('.month', date.monthName) .find('.day-content') .eq(date.date - 1) .trigger('mouseout', {force: true}); cy.get('.popover') .should('not.exist'); }); }); }); diff --git a/cypress/utils/index.js b/cypress/utils/index.js index 063100b7..a2c84253 100644 --- a/cypress/utils/index.js +++ b/cypress/utils/index.js @@ -1,47 +1,48 @@ /** * Copyright (C) 2019-2022 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ /** * Converts string with Time information * to an object with Time information */ export function getTime(text) { const date = new Date(text); function pad(n) { return n < 10 ? '0' + n : n; } const time = { date: date.getUTCDate(), month: date.getUTCMonth(), monthName: date.toLocaleString('en', {month: 'long'}), year: date.getUTCFullYear(), hours: pad(date.getUTCHours()), - minutes: pad(date.getUTCMinutes()) + minutes: pad(date.getUTCMinutes()), + seconds: pad(date.getUTCSeconds()) }; return time; } export function checkLanguageHighlighting(language) { cy.get('code') .should('be.visible') .and('have.class', 'hljs') .and('have.class', language) .and('not.be.empty') .find('table.hljs-ln') .should('be.visible') .and('not.be.empty'); } export function random(start, end) { const range = end - start; return Math.floor(Math.random() * range) + start; } export const describeSlowTests = Cypress.env('SKIP_SLOW_TESTS') === 1 ? describe.skip : describe; diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index 0eaa259f..fcd3f154 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,529 +1,529 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import functools import os import re from typing import Any, Callable, Dict, List, Optional import urllib.parse from bs4 import BeautifulSoup from docutils.core import publish_parts import docutils.parsers.rst import docutils.utils from docutils.writers.html5_polyglot import HTMLTranslator, Writer from iso8601 import ParseError, parse_date from pkg_resources import get_distribution from prometheus_client.registry import CollectorRegistry import requests from requests.auth import HTTPBasicAuth from django.core.cache import cache from django.core.cache.backends.base import DEFAULT_TIMEOUT from django.http import HttpRequest, QueryDict from django.shortcuts import redirect from django.urls import resolve from django.urls import reverse as django_reverse from swh.web.auth.utils import ( ADD_FORGE_MODERATOR_PERMISSION, ADMIN_LIST_DEPOSIT_PERMISSION, MAILMAP_ADMIN_PERMISSION, ) from swh.web.common.exc import BadInputExc, sentry_capture_exception from swh.web.common.typing import QueryParameters from swh.web.config import SWH_WEB_SERVER_NAME, get_config, search SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) SWHID_RE = "swh:1:[a-z]{3}:[0-9a-z]{40}" swh_object_icons = { "alias": "mdi mdi-star", "branch": "mdi mdi-source-branch", "branches": "mdi mdi-source-branch", "content": "mdi mdi-file-document", "cnt": "mdi mdi-file-document", "directory": "mdi mdi-folder", "dir": "mdi mdi-folder", "origin": "mdi mdi-source-repository", "ori": "mdi mdi-source-repository", "person": "mdi mdi-account", "revisions history": "mdi mdi-history", "release": "mdi mdi-tag", "rel": "mdi mdi-tag", "releases": "mdi mdi-tag", "revision": "mdi mdi-rotate-90 mdi-source-commit", "rev": "mdi mdi-rotate-90 mdi-source-commit", "snapshot": "mdi mdi-camera", "snp": "mdi mdi-camera", "visits": "mdi mdi-calendar-month", } def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[QueryParameters] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo and date.tzinfo != timezone.utc: return date.astimezone(tz=timezone.utc) else: return date def parse_iso8601_date_to_utc(iso_date: str) -> datetime: """Given an ISO 8601 datetime string, parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed date Raises: swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - 2007-01-14T20:34:22Z """ try: date = parse_date(iso_date) return datetime_to_utc(date) except ParseError as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}" sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}" ret = re.sub(sha256_re, r"\1...", path) return re.sub(sha1_re, r"\1...", ret) -def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"): +def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M:%S UTC"): """Turns a string representation of an ISO 8601 datetime string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_iso8601_date_to_utc(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip("/").split("/") path_from_root = "" for p in sub_paths: path_from_root += "/" + p path_info.append({"name": p, "path": path_from_root.strip("/")}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components ).get_default_values() settings.report_level = report_level document = docutils.utils.new_document("rst-doc", settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: ip = x_forwarded_for.split(",")[0] else: ip = request.META.get("REMOTE_ADDR") return ip def is_swh_web_development(request: HttpRequest) -> bool: """Indicate if we are running a development version of swh-web.""" site_base_url = request.build_absolute_uri("/") return any( host in site_base_url for host in ("localhost", "127.0.0.1", "testserver") ) def is_swh_web_staging(request: HttpRequest) -> bool: """Indicate if we are running a staging version of swh-web.""" config = get_config() site_base_url = request.build_absolute_uri("/") return any( server_name in site_base_url for server_name in config["staging_server_names"] ) def is_swh_web_production(request: HttpRequest) -> bool: """Indicate if we are running the public production version of swh-web.""" return SWH_WEB_SERVER_NAME in request.build_absolute_uri("/") browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if ( hasattr(request, "user") and request.user.is_authenticated and not hasattr(request.user, "backend") ): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = "django.contrib.auth.backends.ModelBackend" return { "swh_object_icons": swh_object_icons, "available_languages": None, "swh_client_config": config["client_config"], "oidc_enabled": bool(config["keycloak"]["server_url"]), "browsers_supported_image_mimes": browsers_supported_image_mimes, "keycloak": config["keycloak"], "site_base_url": request.build_absolute_uri("/"), "DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"], "status": config["status"], "swh_web_dev": is_swh_web_development(request), "swh_web_staging": is_swh_web_staging(request), "swh_web_version": get_distribution("swh.web").version, "iframe_mode": False, "ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION, "ADD_FORGE_MODERATOR_PERMISSION": ADD_FORGE_MODERATOR_PERMISSION, "FEATURES": get_config()["features"], "MAILMAP_ADMIN_PERMISSION": MAILMAP_ADMIN_PERMISSION, } def resolve_branch_alias( snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch["target_type"] == "alias": if branch["target"] in snapshot["branches"]: branch = snapshot["branches"][branch["target"]] else: from swh.web.common import archive snp = archive.lookup_snapshot( snapshot["id"], branches_from=branch["target"], branches_count=1 ) if snp and branch["target"] in snp["branches"]: branch = snp["branches"][branch["target"]] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { "initial_header_level": 2, "halt_level": 4, "traceback": True, "file_insertion_enabled": False, "raw_enabled": False, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
{pp["html_body"]}
' def prettify_html(html: str) -> str: """ Prettify an HTML document. Args: html: Input HTML document Returns: The prettified HTML document """ return BeautifulSoup(html, "lxml").prettify() def django_cache( timeout: int = DEFAULT_TIMEOUT, catch_exception: bool = False, exception_return_value: Any = None, invalidate_cache_pred: Callable[[Any], bool] = lambda val: False, ): """Decorator to put the result of a function call in Django cache, subsequent calls will directly return the cached value. Args: timeout: The number of seconds value will be hold in cache catch_exception: If :const:`True`, any thrown exception by the decorated function will be caught and not reraised exception_return_value: The value to return if previous parameter is set to :const:`True` invalidate_cache_pred: A predicate function enabling to invalidate the cache under certain conditions, decorated function will then be called again Returns: The returned value of the decorated function for the specified parameters """ def inner(func): @functools.wraps(func) def wrapper(*args, **kwargs): func_args = args + (0,) + tuple(sorted(kwargs.items())) cache_key = str(hash((func.__module__, func.__name__) + func_args)) ret = cache.get(cache_key) if ret is None or invalidate_cache_pred(ret): try: ret = func(*args, **kwargs) except Exception as exc: if catch_exception: sentry_capture_exception(exc) return exception_return_value else: raise else: cache.set(cache_key, ret, timeout=timeout) return ret return wrapper return inner def _deposits_list_url( deposits_list_base_url: str, page_size: int, username: Optional[str] ) -> str: params = {"page_size": str(page_size)} if username is not None: params["username"] = username return f"{deposits_list_base_url}?{urllib.parse.urlencode(params)}" def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]: """Return the list of software deposits using swh-deposit API""" config = get_config()["deposit"] private_api_url = config["private_api_url"].rstrip("/") + "/" deposits_list_base_url = private_api_url + "deposits" deposits_list_auth = HTTPBasicAuth( config["private_api_user"], config["private_api_password"] ) deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=1, username=username ) nb_deposits = requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30 ).json()["count"] @django_cache(invalidate_cache_pred=lambda data: data["count"] != nb_deposits) def _get_deposits_data(): deposits_list_url = _deposits_list_url( deposits_list_base_url, page_size=nb_deposits, username=username ) return requests.get( deposits_list_url, auth=deposits_list_auth, timeout=30, ).json() deposits_data = _get_deposits_data() return deposits_data["results"] _origin_visit_types_cache_timeout = 24 * 60 * 60 # 24 hours @django_cache( timeout=_origin_visit_types_cache_timeout, catch_exception=True, exception_return_value=[], ) def origin_visit_types() -> List[str]: """Return the exhaustive list of visit types for origins ingested into the archive. """ return sorted(search().visit_types_count().keys()) def redirect_to_new_route(request, new_route, permanent=True): """Redirect a request to another route with url args and query parameters eg: /origin//log?path=test can be redirected as /log?url=&path=test. This can be used to deprecate routes """ request_path = resolve(request.path_info) args = {**request_path.kwargs, **request.GET.dict()} return redirect( reverse(new_route, query_params=args), permanent=permanent, ) def has_add_forge_now_permission(user) -> bool: """Is a user considered an add-forge-now moderator? Returns True if a user is staff or has add forge now moderator permission """ return user.is_staff or user.has_perm(ADD_FORGE_MODERATOR_PERMISSION) diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py index 08127875..38551e6a 100644 --- a/swh/web/tests/common/test_utils.py +++ b/swh/web/tests/common/test_utils.py @@ -1,392 +1,392 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import b64encode import datetime import math import sys from urllib.parse import quote import pytest from django.conf.urls import url from django.test.utils import override_settings from django.urls.exceptions import NoReverseMatch from swh.web.common import utils from swh.web.common.exc import BadInputExc from swh.web.config import SWH_WEB_SERVER_NAME, SWH_WEB_STAGING_SERVER_NAMES, get_config def test_shorten_path_noop(): noops = ["/api/", "/browse/", "/content/symbol/foobar/"] for noop in noops: assert utils.shorten_path(noop) == noop def test_shorten_path_sha1(): sha1 = "aafb16d69fd30ff58afdd69036a26047f3aebdc6" short_sha1 = sha1[:8] + "..." templates = [ "/api/1/content/sha1:%s/", "/api/1/content/sha1_git:%s/", "/api/1/directory/%s/", "/api/1/content/sha1:%s/ctags/", ] for template in templates: assert utils.shorten_path(template % sha1) == template % short_sha1 def test_shorten_path_sha256(): sha256 = "aafb16d69fd30ff58afdd69036a26047" "213add102934013a014dfca031c41aef" short_sha256 = sha256[:8] + "..." templates = [ "/api/1/content/sha256:%s/", "/api/1/directory/%s/", "/api/1/content/sha256:%s/filetype/", ] for template in templates: assert utils.shorten_path(template % sha256) == template % short_sha256 @pytest.mark.parametrize( "input_timestamp, output_date", [ ( "2016-01-12", datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), ), ( "2016-01-12T09:19:12+0100", datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc), ), ( "2007-01-14T20:34:22Z", datetime.datetime(2007, 1, 14, 20, 34, 22, tzinfo=datetime.timezone.utc), ), ], ) def test_parse_iso8601_date_to_utc_ok(input_timestamp, output_date): assert utils.parse_iso8601_date_to_utc(input_timestamp) == output_date @pytest.mark.parametrize( "invalid_iso8601_timestamp", ["Today is January 1, 2047 at 8:21:00AM", "1452591542"] ) def test_parse_iso8601_date_to_utc_ko(invalid_iso8601_timestamp): with pytest.raises(BadInputExc): utils.parse_iso8601_date_to_utc(invalid_iso8601_timestamp) def test_format_utc_iso_date(): assert ( utils.format_utc_iso_date("2017-05-04T13:27:13+02:00") - == "04 May 2017, 11:27 UTC" + == "04 May 2017, 11:27:13 UTC" ) def test_gen_path_info(): input_path = "/home/user/swh-environment/swh-web/" expected_result = [ {"name": "home", "path": "home"}, {"name": "user", "path": "home/user"}, {"name": "swh-environment", "path": "home/user/swh-environment"}, {"name": "swh-web", "path": "home/user/swh-environment/swh-web"}, ] path_info = utils.gen_path_info(input_path) assert path_info == expected_result input_path = "home/user/swh-environment/swh-web" path_info = utils.gen_path_info(input_path) assert path_info == expected_result def test_rst_to_html(): rst = ( "Section\n" "=======\n\n" "**Some strong text**\n\n" "* This is a bulleted list.\n" "* It has two items, the second\n" " item uses two lines.\n" "\n" "1. This is a numbered list.\n" "2. It has two items too.\n" "\n" "#. This is a numbered list.\n" "#. It has two items too.\n" ) expected_html = ( '

Section

\n' "

Some strong text

\n" '
    \n' "
  • This is a bulleted list.

  • \n" "
  • It has two items, the second\n" "item uses two lines.

  • \n" "
\n" '
    \n' "
  1. This is a numbered list.

  2. \n" "
  3. It has two items too.

  4. \n" "
  5. This is a numbered list.

  6. \n" "
  7. It has two items too.

  8. \n" "
\n" "
" ) assert utils.rst_to_html(rst) == expected_html def sample_test_view(request, string, number): pass def sample_test_view_no_url_args(request): pass urlpatterns = [ url( r"^sample/test/(?P.+)/view/(?P[0-9]+)/$", sample_test_view, name="sample-test-view", ), url( r"^sample/test/view/no/url/args/$", sample_test_view_no_url_args, name="sample-test-view-no-url-args", ), ] @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_only_ok(): string = "foo" number = 55 url = utils.reverse( "sample-test-view", url_args={"string": string, "number": number} ) assert url == f"/sample/test/{string}/view/{number}/" @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_only_ko(): string = "foo" with pytest.raises(NoReverseMatch): utils.reverse("sample-test-view", url_args={"string": string, "number": string}) @override_settings(ROOT_URLCONF=__name__) def test_reverse_no_url_args(): url = utils.reverse("sample-test-view-no-url-args") assert url == "/sample/test/view/no/url/args/" @override_settings(ROOT_URLCONF=__name__) def test_reverse_query_params_only(): start = 0 scope = "foo" url = utils.reverse( "sample-test-view-no-url-args", query_params={"start": start, "scope": scope} ) assert url == f"/sample/test/view/no/url/args/?scope={scope}&start={start}" url = utils.reverse( "sample-test-view-no-url-args", query_params={"start": start, "scope": None} ) assert url == f"/sample/test/view/no/url/args/?start={start}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_query_params_encode(): libname = "libstc++" url = utils.reverse( "sample-test-view-no-url-args", query_params={"libname": libname} ) assert url == f"/sample/test/view/no/url/args/?libname={quote(libname, safe='/;:')}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_query_params(): string = "foo" number = 55 start = 10 scope = "bar" url = utils.reverse( "sample-test-view", url_args={"string": string, "number": number}, query_params={"start": start, "scope": scope}, ) assert url == f"/sample/test/{string}/view/{number}/?scope={scope}&start={start}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_absolute_uri(request_factory): request = request_factory.get(utils.reverse("sample-test-view-no-url-args")) url = utils.reverse("sample-test-view-no-url-args", request=request) assert url == f"http://{request.META['SERVER_NAME']}/sample/test/view/no/url/args/" def test_get_deposits_list(requests_mock): deposits_data = { "count": 2, "results": [ { "check_task_id": "351820217", "client": 2, "collection": 1, "complete_date": "2021-01-21T07:52:19.919312Z", "external_id": "hal-03116143", "id": 1412, "load_task_id": "351820260", "origin_url": "https://hal.archives-ouvertes.fr/hal-03116143", "parent": None, "reception_date": "2021-01-21T07:52:19.471019Z", "status": "done", "status_detail": None, "swhid": "swh:1:dir:f25157ad1b13cb20ac3457d4f6756b49ac63d079", }, { "check_task_id": "381576507", "client": 2, "collection": 1, "complete_date": "2021-07-07T08:00:44.726676Z", "external_id": "hal-03275052", "id": 1693, "load_task_id": "381576508", "origin_url": "https://hal.archives-ouvertes.fr/hal-03275052", "parent": None, "reception_date": "2021-07-07T08:00:44.327661Z", "status": "done", "status_detail": None, "swhid": "swh:1:dir:825fa96d1810177ec08a772ffa5bd34bbd08b89c", }, ], } config = get_config()["deposit"] private_api_url = config["private_api_url"].rstrip("/") + "/" deposits_list_url = private_api_url + "deposits" basic_auth_payload = ( config["private_api_user"] + ":" + config["private_api_password"] ).encode() requests_mock.get( deposits_list_url, json=deposits_data, request_headers={ "Authorization": f"Basic {b64encode(basic_auth_payload).decode('ascii')}" }, ) assert utils.get_deposits_list() == deposits_data["results"] @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_origin_visit_types(mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config search = mocker.patch("swh.web.common.utils.search") search.return_value = None assert utils.origin_visit_types() == [] else: # see swh/web/tests/data.py for origins added for tests assert utils.origin_visit_types() == ["git", "tar"] @pytest.mark.parametrize("server_name", ["localhost", "127.0.0.1", "testserver"]) def test_is_swh_web_development(request_factory, server_name): request = request_factory.get("/", SERVER_NAME=server_name) assert utils.is_swh_web_development(request) @pytest.mark.parametrize("server_name", SWH_WEB_STAGING_SERVER_NAMES) def test_is_swh_web_staging(request_factory, server_name): request = request_factory.get("/", SERVER_NAME=server_name) assert utils.is_swh_web_staging(request) def test_is_swh_web_production(request_factory): request = request_factory.get("/", SERVER_NAME=SWH_WEB_SERVER_NAME) assert utils.is_swh_web_production(request) def add(x, y): return x + y def test_django_cache(mocker): """Decorated function should be called once and returned value put in django cache.""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache()(add) val = cached_add(1, 2) val2 = cached_add(1, 2) assert val == val2 == 3 assert spy_add.call_count == 1 assert spy_cache_set.call_count == 1 def test_django_cache_invalidate_cache_pred(mocker): """Decorated function should be called twice and returned value put in django cache twice.""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache(invalidate_cache_pred=lambda val: val == 3)(add) val = cached_add(1, 2) val2 = cached_add(1, 2) assert val == val2 == 3 assert spy_add.call_count == 2 assert spy_cache_set.call_count == 2 def test_django_cache_raise_exception(mocker): """Decorated function should be called twice, exceptions should be raised and no value put in django cache""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache()(add) with pytest.raises(TypeError): cached_add(1, "2") with pytest.raises(TypeError): cached_add(1, "2") assert spy_add.call_count == 2 assert spy_cache_set.call_count == 0 def test_django_cache_catch_exception(mocker): """Decorated function should be called twice, exceptions should not be raised, specified fallback value should be returned and no value put in django cache""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache( catch_exception=True, exception_return_value=math.nan )(add) val = cached_add(1, "2") val2 = cached_add(1, "2") assert math.isnan(val) assert math.isnan(val2) assert spy_add.call_count == 2 assert spy_cache_set.call_count == 0