diff --git a/swh/web/admin/deposit.py b/swh/web/admin/deposit.py index b454274b..58e2e097 100644 --- a/swh/web/admin/deposit.py +++ b/swh/web/admin/deposit.py @@ -1,109 +1,87 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -import requests -from requests.auth import HTTPBasicAuth import sentry_sdk from django.conf import settings from django.contrib.admin.views.decorators import staff_member_required -from django.core.cache import cache from django.core.paginator import Paginator from django.http import JsonResponse from django.shortcuts import render from swh.web.admin.adminurls import admin_route -from swh.web.config import get_config - -config = get_config()["deposit"] +from swh.web.common.utils import get_deposits_list @admin_route(r"deposit/", view_name="admin-deposit") @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def _admin_origin_save(request): return render(request, "admin/deposit.html") @admin_route(r"deposit/list/", view_name="admin-deposit-list") @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def _admin_deposit_list(request): table_data = {} table_data["draw"] = int(request.GET["draw"]) - deposits_list_url = config["private_api_url"] + "deposits" - deposits_list_auth = HTTPBasicAuth( - config["private_api_user"], config["private_api_password"] - ) try: - nb_deposits = requests.get( - "%s?page_size=1" % deposits_list_url, auth=deposits_list_auth, timeout=30 - ).json()["count"] - - deposits_data = cache.get("swh-deposit-list") - if not deposits_data or deposits_data["count"] != nb_deposits: - deposits_data = requests.get( - "%s?page_size=%s" % (deposits_list_url, nb_deposits), - auth=deposits_list_auth, - timeout=30, - ).json() - cache.set("swh-deposit-list", deposits_data) - - deposits = deposits_data["results"] - + deposits = get_deposits_list() + deposits_count = len(deposits) search_value = request.GET["search[value]"] if search_value: deposits = [ d for d in deposits if any( search_value.lower() in val for val in [str(v).lower() for v in d.values()] ) ] exclude_pattern = request.GET.get("excludePattern") if exclude_pattern: deposits = [ d for d in deposits if all( exclude_pattern.lower() not in val for val in [str(v).lower() for v in d.values()] ) ] column_order = request.GET["order[0][column]"] field_order = request.GET["columns[%s][name]" % column_order] order_dir = request.GET["order[0][dir]"] deposits = sorted(deposits, key=lambda d: d[field_order] or "") if order_dir == "desc": deposits = list(reversed(deposits)) length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(deposits, length) data = paginator.page(page).object_list - table_data["recordsTotal"] = deposits_data["count"] + table_data["recordsTotal"] = deposits_count table_data["recordsFiltered"] = len(deposits) table_data["data"] = [ { "id": d["id"], "external_id": d["external_id"], "reception_date": d["reception_date"], "status": d["status"], "status_detail": d["status_detail"], "swhid": d["swhid"], "swhid_context": d["swhid_context"], } for d in data ] except Exception as exc: sentry_sdk.capture_exception(exc) table_data["error"] = ( "An error occurred while retrieving " "the list of deposits !" ) return JsonResponse(table_data) diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index 48491564..b15f1014 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,356 +1,384 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import os import re -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from bs4 import BeautifulSoup from docutils.core import publish_parts import docutils.parsers.rst import docutils.utils from docutils.writers.html5_polyglot import HTMLTranslator, Writer from iso8601 import ParseError, parse_date from pkg_resources import get_distribution from prometheus_client.registry import CollectorRegistry +import requests +from requests.auth import HTTPBasicAuth +from django.core.cache import cache from django.http import HttpRequest, QueryDict from django.urls import reverse as django_reverse from swh.web.common.exc import BadInputExc from swh.web.common.typing import QueryParameters from swh.web.config import ORIGIN_VISIT_TYPES, get_config SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) swh_object_icons = { "alias": "mdi mdi-star", "branch": "mdi mdi-source-branch", "branches": "mdi mdi-source-branch", "content": "mdi mdi-file-document", "directory": "mdi mdi-folder", "origin": "mdi mdi-source-repository", "person": "mdi mdi-account", "revisions history": "mdi mdi-history", "release": "mdi mdi-tag", "releases": "mdi mdi-tag", "revision": "mdi mdi-rotate-90 mdi-source-commit", "snapshot": "mdi mdi-camera", "visits": "mdi mdi-calendar-month", } def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[QueryParameters] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo and date.tzinfo != timezone.utc: return date.astimezone(tz=timezone.utc) else: return date def parse_iso8601_date_to_utc(iso_date: str) -> datetime: """Given an ISO 8601 datetime string, parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed date Raises: swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - 2007-01-14T20:34:22Z """ try: date = parse_date(iso_date) return datetime_to_utc(date) except ParseError as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}" sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}" ret = re.sub(sha256_re, r"\1...", path) return re.sub(sha1_re, r"\1...", ret) def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"): """Turns a string representation of an ISO 8601 datetime string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_iso8601_date_to_utc(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip("/").split("/") path_from_root = "" for p in sub_paths: path_from_root += "/" + p path_info.append({"name": p, "path": path_from_root.strip("/")}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components ).get_default_values() settings.report_level = report_level document = docutils.utils.new_document("rst-doc", settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: ip = x_forwarded_for.split(",")[0] else: ip = request.META.get("REMOTE_ADDR") return ip browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if ( hasattr(request, "user") and request.user.is_authenticated and not hasattr(request.user, "backend") ): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = "django.contrib.auth.backends.ModelBackend" site_base_url = request.build_absolute_uri("/") return { "swh_object_icons": swh_object_icons, "available_languages": None, "swh_client_config": config["client_config"], "oidc_enabled": bool(config["keycloak"]["server_url"]), "browsers_supported_image_mimes": browsers_supported_image_mimes, "keycloak": config["keycloak"], "site_base_url": site_base_url, "DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"], "status": config["status"], "swh_web_dev": "localhost" in site_base_url, "swh_web_staging": any( [ server_name in site_base_url for server_name in config["staging_server_names"] ] ), "swh_web_version": get_distribution("swh.web").version, "visit_types": ORIGIN_VISIT_TYPES, } def resolve_branch_alias( snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch["target_type"] == "alias": if branch["target"] in snapshot["branches"]: branch = snapshot["branches"][branch["target"]] else: from swh.web.common import archive snp = archive.lookup_snapshot( snapshot["id"], branches_from=branch["target"], branches_count=1 ) if snp and branch["target"] in snp["branches"]: branch = snp["branches"][branch["target"]] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { "initial_header_level": 2, "halt_level": 4, "traceback": True, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
{pp["html_body"]}
' def prettify_html(html: str) -> str: """ Prettify an HTML document. Args: html: Input HTML document Returns: The prettified HTML document """ return BeautifulSoup(html, "lxml").prettify() + + +def get_deposits_list() -> List[Dict[str, Any]]: + """Return the list of software deposits using swh-deposit API + """ + config = get_config()["deposit"] + deposits_list_url = config["private_api_url"] + "deposits" + deposits_list_auth = HTTPBasicAuth( + config["private_api_user"], config["private_api_password"] + ) + + nb_deposits = requests.get( + "%s?page_size=1" % deposits_list_url, auth=deposits_list_auth, timeout=30 + ).json()["count"] + + deposits_data = cache.get("swh-deposit-list") + if not deposits_data or deposits_data["count"] != nb_deposits: + deposits_data = requests.get( + "%s?page_size=%s" % (deposits_list_url, nb_deposits), + auth=deposits_list_auth, + timeout=30, + ).json() + cache.set("swh-deposit-list", deposits_data) + + return deposits_data["results"] diff --git a/swh/web/config.py b/swh/web/config.py index 19c4ee81..d36564ab 100644 --- a/swh/web/config.py +++ b/swh/web/config.py @@ -1,214 +1,214 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict from swh.core import config from swh.counters import get_counters from swh.indexer.storage import get_indexer_storage from swh.scheduler import get_scheduler from swh.search import get_search from swh.storage import get_storage from swh.vault import get_vault from swh.web import settings SWH_WEB_INTERNAL_SERVER_NAME = "archive.internal.softwareheritage.org" STAGING_SERVER_NAMES = [ "webapp.staging.swh.network", "webapp.internal.staging.swh.network", ] ORIGIN_VISIT_TYPES = [ "cran", "deb", "deposit", "ftp", "hg", "git", "nixguix", "npm", "pypi", "svn", "tar", ] SETTINGS_DIR = os.path.dirname(settings.__file__) DEFAULT_CONFIG = { "allowed_hosts": ("list", []), "search": ( "dict", {"cls": "remote", "url": "http://127.0.0.1:5010/", "timeout": 10,}, ), "storage": ( "dict", {"cls": "remote", "url": "http://127.0.0.1:5002/", "timeout": 10,}, ), "indexer_storage": ( "dict", {"cls": "remote", "url": "http://127.0.0.1:5007/", "timeout": 1,}, ), "counters": ( "dict", {"cls": "remote", "url": "http://127.0.0.1:5011/", "timeout": 1,}, ), "log_dir": ("string", "/tmp/swh/log"), "debug": ("bool", False), "serve_assets": ("bool", False), "host": ("string", "127.0.0.1"), "port": ("int", 5004), "secret_key": ("string", "development key"), # do not display code highlighting for content > 1MB "content_display_max_size": ("int", 5 * 1024 * 1024), "snapshot_content_max_size": ("int", 1000), "throttling": ( "dict", { "cache_uri": None, # production: memcached as cache (127.0.0.1:11211) # development: in-memory cache so None "scopes": { "swh_api": { "limiter_rate": {"default": "120/h"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_api_origin_search": { "limiter_rate": {"default": "10/m"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_vault_cooking": { "limiter_rate": {"default": "120/h", "GET": "60/m"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_save_origin": { "limiter_rate": {"default": "120/h", "POST": "10/h"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_api_origin_visit_latest": { "limiter_rate": {"default": "700/m"}, "exempted_networks": ["127.0.0.0/8"], }, }, }, ), "vault": ("dict", {"cls": "remote", "args": {"url": "http://127.0.0.1:5005/",}}), "scheduler": ("dict", {"cls": "remote", "url": "http://127.0.0.1:5008/"}), "development_db": ("string", os.path.join(SETTINGS_DIR, "db.sqlite3")), "test_db": ("string", os.path.join(SETTINGS_DIR, "testdb.sqlite3")), "production_db": ("dict", {"name": "swh-web"}), "deposit": ( "dict", { "private_api_url": "https://deposit.softwareheritage.org/1/private/", "private_api_user": "swhworker", - "private_api_password": "", + "private_api_password": "some-password", }, ), "coverage_count_origins": ("bool", False), "e2e_tests_mode": ("bool", False), "es_workers_index_url": ("string", ""), "history_counters_url": ( "string", "https://stats.export.softwareheritage.org/history_counters.json", ), "client_config": ("dict", {}), "keycloak": ("dict", {"server_url": "", "realm_name": ""}), "graph": ( "dict", {"server_url": "http://graph.internal.softwareheritage.org:5009/graph/"}, ), "status": ( "dict", { "server_url": "https://status.softwareheritage.org/", "json_path": "1.0/status/578e5eddcdc0cc7951000520", }, ), "metadata_search_backend": ("string", "swh-indexer-storage"), # or "swh-search" "counters_backend": ("string", "swh-storage"), # or "swh-counters" "staging_server_names": ("list", STAGING_SERVER_NAMES), "instance_name": ("str", "archive-test.softwareheritage.org"), } swhweb_config: Dict[str, Any] = {} def get_config(config_file="web/web"): """Read the configuration file `config_file`. If an environment variable SWH_CONFIG_FILENAME is defined, this takes precedence over the config_file parameter. In any case, update the app with parameters (secret_key, conf) and return the parsed configuration as a dict. If no configuration file is provided, return a default configuration. """ if not swhweb_config: config_filename = os.environ.get("SWH_CONFIG_FILENAME") if config_filename: config_file = config_filename cfg = config.load_named_config(config_file, DEFAULT_CONFIG) swhweb_config.update(cfg) config.prepare_folders(swhweb_config, "log_dir") if swhweb_config.get("search"): swhweb_config["search"] = get_search(**swhweb_config["search"]) else: swhweb_config["search"] = None swhweb_config["storage"] = get_storage(**swhweb_config["storage"]) swhweb_config["vault"] = get_vault(**swhweb_config["vault"]) swhweb_config["indexer_storage"] = get_indexer_storage( **swhweb_config["indexer_storage"] ) swhweb_config["scheduler"] = get_scheduler(**swhweb_config["scheduler"]) swhweb_config["counters"] = get_counters(**swhweb_config["counters"]) return swhweb_config def search(): """Return the current application's search. """ return get_config()["search"] def storage(): """Return the current application's storage. """ return get_config()["storage"] def vault(): """Return the current application's vault. """ return get_config()["vault"] def indexer_storage(): """Return the current application's indexer storage. """ return get_config()["indexer_storage"] def scheduler(): """Return the current application's scheduler. """ return get_config()["scheduler"] def counters(): """Return the current application's counters. """ return get_config()["counters"] diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py index 8c8fb274..1c260bea 100644 --- a/swh/web/tests/common/test_utils.py +++ b/swh/web/tests/common/test_utils.py @@ -1,230 +1,287 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +from base64 import b64encode import datetime from urllib.parse import quote import pytest from django.conf.urls import url from django.test.utils import override_settings from django.urls.exceptions import NoReverseMatch from swh.web.common import utils from swh.web.common.exc import BadInputExc +from swh.web.config import get_config def test_shorten_path_noop(): noops = ["/api/", "/browse/", "/content/symbol/foobar/"] for noop in noops: assert utils.shorten_path(noop) == noop def test_shorten_path_sha1(): sha1 = "aafb16d69fd30ff58afdd69036a26047f3aebdc6" short_sha1 = sha1[:8] + "..." templates = [ "/api/1/content/sha1:%s/", "/api/1/content/sha1_git:%s/", "/api/1/directory/%s/", "/api/1/content/sha1:%s/ctags/", ] for template in templates: assert utils.shorten_path(template % sha1) == template % short_sha1 def test_shorten_path_sha256(): sha256 = "aafb16d69fd30ff58afdd69036a26047" "213add102934013a014dfca031c41aef" short_sha256 = sha256[:8] + "..." templates = [ "/api/1/content/sha256:%s/", "/api/1/directory/%s/", "/api/1/content/sha256:%s/filetype/", ] for template in templates: assert utils.shorten_path(template % sha256) == template % short_sha256 @pytest.mark.parametrize( "input_timestamp, output_date", [ ( "2016-01-12", datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), ), ( "2016-01-12T09:19:12+0100", datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc), ), ( "2007-01-14T20:34:22Z", datetime.datetime(2007, 1, 14, 20, 34, 22, tzinfo=datetime.timezone.utc), ), ], ) def test_parse_iso8601_date_to_utc_ok(input_timestamp, output_date): assert utils.parse_iso8601_date_to_utc(input_timestamp) == output_date @pytest.mark.parametrize( "invalid_iso8601_timestamp", ["Today is January 1, 2047 at 8:21:00AM", "1452591542"] ) def test_parse_iso8601_date_to_utc_ko(invalid_iso8601_timestamp): with pytest.raises(BadInputExc): utils.parse_iso8601_date_to_utc(invalid_iso8601_timestamp) def test_format_utc_iso_date(): assert ( utils.format_utc_iso_date("2017-05-04T13:27:13+02:00") == "04 May 2017, 11:27 UTC" ) def test_gen_path_info(): input_path = "/home/user/swh-environment/swh-web/" expected_result = [ {"name": "home", "path": "home"}, {"name": "user", "path": "home/user"}, {"name": "swh-environment", "path": "home/user/swh-environment"}, {"name": "swh-web", "path": "home/user/swh-environment/swh-web"}, ] path_info = utils.gen_path_info(input_path) assert path_info == expected_result input_path = "home/user/swh-environment/swh-web" path_info = utils.gen_path_info(input_path) assert path_info == expected_result def test_rst_to_html(): rst = ( "Section\n" "=======\n\n" "**Some strong text**\n\n" "* This is a bulleted list.\n" "* It has two items, the second\n" " item uses two lines.\n" "\n" "1. This is a numbered list.\n" "2. It has two items too.\n" "\n" "#. This is a numbered list.\n" "#. It has two items too.\n" ) expected_html = ( '

Section

\n' "

Some strong text

\n" '\n" '
    \n' "
  1. This is a numbered list.

  2. \n" "
  3. It has two items too.

  4. \n" "
  5. This is a numbered list.

  6. \n" "
  7. It has two items too.

  8. \n" "
\n" "
" ) assert utils.rst_to_html(rst) == expected_html def sample_test_view(request, string, number): pass def sample_test_view_no_url_args(request): pass urlpatterns = [ url( r"^sample/test/(?P.+)/view/(?P[0-9]+)/$", sample_test_view, name="sample-test-view", ), url( r"^sample/test/view/no/url/args/$", sample_test_view_no_url_args, name="sample-test-view-no-url-args", ), ] @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_only_ok(): string = "foo" number = 55 url = utils.reverse( "sample-test-view", url_args={"string": string, "number": number} ) assert url == f"/sample/test/{string}/view/{number}/" @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_only_ko(): string = "foo" with pytest.raises(NoReverseMatch): utils.reverse("sample-test-view", url_args={"string": string, "number": string}) @override_settings(ROOT_URLCONF=__name__) def test_reverse_no_url_args(): url = utils.reverse("sample-test-view-no-url-args") assert url == "/sample/test/view/no/url/args/" @override_settings(ROOT_URLCONF=__name__) def test_reverse_query_params_only(): start = 0 scope = "foo" url = utils.reverse( "sample-test-view-no-url-args", query_params={"start": start, "scope": scope} ) assert url == f"/sample/test/view/no/url/args/?scope={scope}&start={start}" url = utils.reverse( "sample-test-view-no-url-args", query_params={"start": start, "scope": None} ) assert url == f"/sample/test/view/no/url/args/?start={start}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_query_params_encode(): libname = "libstc++" url = utils.reverse( "sample-test-view-no-url-args", query_params={"libname": libname} ) assert url == f"/sample/test/view/no/url/args/?libname={quote(libname, safe='/;:')}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_query_params(): string = "foo" number = 55 start = 10 scope = "bar" url = utils.reverse( "sample-test-view", url_args={"string": string, "number": number}, query_params={"start": start, "scope": scope}, ) assert url == f"/sample/test/{string}/view/{number}/?scope={scope}&start={start}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_absolute_uri(request_factory): request = request_factory.get(utils.reverse("sample-test-view-no-url-args")) url = utils.reverse("sample-test-view-no-url-args", request=request) assert url == f"http://{request.META['SERVER_NAME']}/sample/test/view/no/url/args/" + + +def test_get_deposits_list(requests_mock): + deposits_data = { + "count": 2, + "results": [ + { + "check_task_id": "351820217", + "client": 2, + "collection": 1, + "complete_date": "2021-01-21T07:52:19.919312Z", + "external_id": "hal-03116143", + "id": 1412, + "load_task_id": "351820260", + "origin_url": "https://hal.archives-ouvertes.fr/hal-03116143", + "parent": None, + "reception_date": "2021-01-21T07:52:19.471019Z", + "status": "done", + "status_detail": None, + "swhid": "swh:1:dir:f25157ad1b13cb20ac3457d4f6756b49ac63d079", + }, + { + "check_task_id": "381576507", + "client": 2, + "collection": 1, + "complete_date": "2021-07-07T08:00:44.726676Z", + "external_id": "hal-03275052", + "id": 1693, + "load_task_id": "381576508", + "origin_url": "https://hal.archives-ouvertes.fr/hal-03275052", + "parent": None, + "reception_date": "2021-07-07T08:00:44.327661Z", + "status": "done", + "status_detail": None, + "swhid": "swh:1:dir:825fa96d1810177ec08a772ffa5bd34bbd08b89c", + }, + ], + } + + config = get_config()["deposit"] + deposits_list_url = config["private_api_url"] + "deposits" + + basic_auth_payload = ( + config["private_api_user"] + ":" + config["private_api_password"] + ).encode() + + requests_mock.get( + deposits_list_url, + json=deposits_data, + request_headers={ + "Authorization": f"Basic {b64encode(basic_auth_payload).decode('ascii')}" + }, + ) + + assert utils.get_deposits_list() == deposits_data["results"]