diff --git a/swh/web/admin/deposit.py b/swh/web/admin/deposit.py
index b454274b..58e2e097 100644
--- a/swh/web/admin/deposit.py
+++ b/swh/web/admin/deposit.py
@@ -1,109 +1,87 @@
-# Copyright (C) 2018-2019 The Software Heritage developers
+# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import requests
-from requests.auth import HTTPBasicAuth
import sentry_sdk
from django.conf import settings
from django.contrib.admin.views.decorators import staff_member_required
-from django.core.cache import cache
from django.core.paginator import Paginator
from django.http import JsonResponse
from django.shortcuts import render
from swh.web.admin.adminurls import admin_route
-from swh.web.config import get_config
-
-config = get_config()["deposit"]
+from swh.web.common.utils import get_deposits_list
@admin_route(r"deposit/", view_name="admin-deposit")
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def _admin_origin_save(request):
return render(request, "admin/deposit.html")
@admin_route(r"deposit/list/", view_name="admin-deposit-list")
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def _admin_deposit_list(request):
table_data = {}
table_data["draw"] = int(request.GET["draw"])
- deposits_list_url = config["private_api_url"] + "deposits"
- deposits_list_auth = HTTPBasicAuth(
- config["private_api_user"], config["private_api_password"]
- )
try:
- nb_deposits = requests.get(
- "%s?page_size=1" % deposits_list_url, auth=deposits_list_auth, timeout=30
- ).json()["count"]
-
- deposits_data = cache.get("swh-deposit-list")
- if not deposits_data or deposits_data["count"] != nb_deposits:
- deposits_data = requests.get(
- "%s?page_size=%s" % (deposits_list_url, nb_deposits),
- auth=deposits_list_auth,
- timeout=30,
- ).json()
- cache.set("swh-deposit-list", deposits_data)
-
- deposits = deposits_data["results"]
-
+ deposits = get_deposits_list()
+ deposits_count = len(deposits)
search_value = request.GET["search[value]"]
if search_value:
deposits = [
d
for d in deposits
if any(
search_value.lower() in val
for val in [str(v).lower() for v in d.values()]
)
]
exclude_pattern = request.GET.get("excludePattern")
if exclude_pattern:
deposits = [
d
for d in deposits
if all(
exclude_pattern.lower() not in val
for val in [str(v).lower() for v in d.values()]
)
]
column_order = request.GET["order[0][column]"]
field_order = request.GET["columns[%s][name]" % column_order]
order_dir = request.GET["order[0][dir]"]
deposits = sorted(deposits, key=lambda d: d[field_order] or "")
if order_dir == "desc":
deposits = list(reversed(deposits))
length = int(request.GET["length"])
page = int(request.GET["start"]) / length + 1
paginator = Paginator(deposits, length)
data = paginator.page(page).object_list
- table_data["recordsTotal"] = deposits_data["count"]
+ table_data["recordsTotal"] = deposits_count
table_data["recordsFiltered"] = len(deposits)
table_data["data"] = [
{
"id": d["id"],
"external_id": d["external_id"],
"reception_date": d["reception_date"],
"status": d["status"],
"status_detail": d["status_detail"],
"swhid": d["swhid"],
"swhid_context": d["swhid_context"],
}
for d in data
]
except Exception as exc:
sentry_sdk.capture_exception(exc)
table_data["error"] = (
"An error occurred while retrieving " "the list of deposits !"
)
return JsonResponse(table_data)
diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
index 48491564..b15f1014 100644
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -1,356 +1,384 @@
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import os
import re
-from typing import Any, Dict, Optional
+from typing import Any, Dict, List, Optional
from bs4 import BeautifulSoup
from docutils.core import publish_parts
import docutils.parsers.rst
import docutils.utils
from docutils.writers.html5_polyglot import HTMLTranslator, Writer
from iso8601 import ParseError, parse_date
from pkg_resources import get_distribution
from prometheus_client.registry import CollectorRegistry
+import requests
+from requests.auth import HTTPBasicAuth
+from django.core.cache import cache
from django.http import HttpRequest, QueryDict
from django.urls import reverse as django_reverse
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import QueryParameters
from swh.web.config import ORIGIN_VISIT_TYPES, get_config
SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True)
swh_object_icons = {
"alias": "mdi mdi-star",
"branch": "mdi mdi-source-branch",
"branches": "mdi mdi-source-branch",
"content": "mdi mdi-file-document",
"directory": "mdi mdi-folder",
"origin": "mdi mdi-source-repository",
"person": "mdi mdi-account",
"revisions history": "mdi mdi-history",
"release": "mdi mdi-tag",
"releases": "mdi mdi-tag",
"revision": "mdi mdi-rotate-90 mdi-source-commit",
"snapshot": "mdi mdi-camera",
"visits": "mdi mdi-calendar-month",
}
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[QueryParameters] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url
def datetime_to_utc(date):
"""Returns datetime in UTC without timezone info
Args:
date (datetime.datetime): input datetime with timezone info
Returns:
datetime.datetime: datetime in UTC without timezone info
"""
if date.tzinfo and date.tzinfo != timezone.utc:
return date.astimezone(tz=timezone.utc)
else:
return date
def parse_iso8601_date_to_utc(iso_date: str) -> datetime:
"""Given an ISO 8601 datetime string, parse the result as UTC datetime.
Returns:
a timezone-aware datetime representing the parsed date
Raises:
swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- 2007-01-14T20:34:22Z
"""
try:
date = parse_date(iso_date)
return datetime_to_utc(date)
except ParseError as e:
raise BadInputExc(e)
def shorten_path(path):
"""Shorten the given path: for each hash present, only return the first
8 characters followed by an ellipsis"""
sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}"
sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}"
ret = re.sub(sha256_re, r"\1...", path)
return re.sub(sha1_re, r"\1...", ret)
def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"):
"""Turns a string representation of an ISO 8601 datetime string
to UTC and format it into a more human readable one.
For instance, from the following input
string: '2017-05-04T13:27:13+02:00' the following one
is returned: '04 May 2017, 11:27 UTC'.
Custom format string may also be provided
as parameter
Args:
iso_date (str): a string representation of an ISO 8601 date
fmt (str): optional date formatting string
Returns:
str: a formatted string representation of the input iso date
"""
if not iso_date:
return iso_date
date = parse_iso8601_date_to_utc(iso_date)
return date.strftime(fmt)
def gen_path_info(path):
"""Function to generate path data navigation for use
with a breadcrumb in the swh web ui.
For instance, from a path /folder1/folder2/folder3,
it returns the following list::
[{'name': 'folder1', 'path': 'folder1'},
{'name': 'folder2', 'path': 'folder1/folder2'},
{'name': 'folder3', 'path': 'folder1/folder2/folder3'}]
Args:
path: a filesystem path
Returns:
list: a list of path data for navigation as illustrated above.
"""
path_info = []
if path:
sub_paths = path.strip("/").split("/")
path_from_root = ""
for p in sub_paths:
path_from_root += "/" + p
path_info.append({"name": p, "path": path_from_root.strip("/")})
return path_info
def parse_rst(text, report_level=2):
"""
Parse a reStructuredText string with docutils.
Args:
text (str): string with reStructuredText markups in it
report_level (int): level of docutils report messages to print
(1 info 2 warning 3 error 4 severe 5 none)
Returns:
docutils.nodes.document: a parsed docutils document
"""
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components
).get_default_values()
settings.report_level = report_level
document = docutils.utils.new_document("rst-doc", settings=settings)
parser.parse(text, document)
return document
def get_client_ip(request):
"""
Return the client IP address from an incoming HTTP request.
Args:
request (django.http.HttpRequest): the incoming HTTP request
Returns:
str: The client IP address
"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip
browsers_supported_image_mimes = set(
[
"image/gif",
"image/png",
"image/jpeg",
"image/bmp",
"image/webp",
"image/svg",
"image/svg+xml",
]
)
def context_processor(request):
"""
Django context processor used to inject variables
in all swh-web templates.
"""
config = get_config()
if (
hasattr(request, "user")
and request.user.is_authenticated
and not hasattr(request.user, "backend")
):
# To avoid django.template.base.VariableDoesNotExist errors
# when rendering templates when standard Django user is logged in.
request.user.backend = "django.contrib.auth.backends.ModelBackend"
site_base_url = request.build_absolute_uri("/")
return {
"swh_object_icons": swh_object_icons,
"available_languages": None,
"swh_client_config": config["client_config"],
"oidc_enabled": bool(config["keycloak"]["server_url"]),
"browsers_supported_image_mimes": browsers_supported_image_mimes,
"keycloak": config["keycloak"],
"site_base_url": site_base_url,
"DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"],
"status": config["status"],
"swh_web_dev": "localhost" in site_base_url,
"swh_web_staging": any(
[
server_name in site_base_url
for server_name in config["staging_server_names"]
]
),
"swh_web_version": get_distribution("swh.web").version,
"visit_types": ORIGIN_VISIT_TYPES,
}
def resolve_branch_alias(
snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Resolve branch alias in snapshot content.
Args:
snapshot: a full snapshot content
branch: a branch alias contained in the snapshot
Returns:
The real snapshot branch that got aliased.
"""
while branch and branch["target_type"] == "alias":
if branch["target"] in snapshot["branches"]:
branch = snapshot["branches"][branch["target"]]
else:
from swh.web.common import archive
snp = archive.lookup_snapshot(
snapshot["id"], branches_from=branch["target"], branches_count=1
)
if snp and branch["target"] in snp["branches"]:
branch = snp["branches"][branch["target"]]
else:
branch = None
return branch
class _NoHeaderHTMLTranslator(HTMLTranslator):
"""
Docutils translator subclass to customize the generation of HTML
from reST-formatted docstrings
"""
def __init__(self, document):
super().__init__(document)
self.body_prefix = []
self.body_suffix = []
_HTML_WRITER = Writer()
_HTML_WRITER.translator_class = _NoHeaderHTMLTranslator
def rst_to_html(rst: str) -> str:
"""
Convert reStructuredText document into HTML.
Args:
rst: A string containing a reStructuredText document
Returns:
Body content of the produced HTML conversion.
"""
settings = {
"initial_header_level": 2,
"halt_level": 4,
"traceback": True,
}
pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings)
return f'
{pp["html_body"]}
'
def prettify_html(html: str) -> str:
"""
Prettify an HTML document.
Args:
html: Input HTML document
Returns:
The prettified HTML document
"""
return BeautifulSoup(html, "lxml").prettify()
+
+
+def get_deposits_list() -> List[Dict[str, Any]]:
+ """Return the list of software deposits using swh-deposit API
+ """
+ config = get_config()["deposit"]
+ deposits_list_url = config["private_api_url"] + "deposits"
+ deposits_list_auth = HTTPBasicAuth(
+ config["private_api_user"], config["private_api_password"]
+ )
+
+ nb_deposits = requests.get(
+ "%s?page_size=1" % deposits_list_url, auth=deposits_list_auth, timeout=30
+ ).json()["count"]
+
+ deposits_data = cache.get("swh-deposit-list")
+ if not deposits_data or deposits_data["count"] != nb_deposits:
+ deposits_data = requests.get(
+ "%s?page_size=%s" % (deposits_list_url, nb_deposits),
+ auth=deposits_list_auth,
+ timeout=30,
+ ).json()
+ cache.set("swh-deposit-list", deposits_data)
+
+ return deposits_data["results"]
diff --git a/swh/web/config.py b/swh/web/config.py
index 19c4ee81..d36564ab 100644
--- a/swh/web/config.py
+++ b/swh/web/config.py
@@ -1,214 +1,214 @@
-# Copyright (C) 2017-2020 The Software Heritage developers
+# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from typing import Any, Dict
from swh.core import config
from swh.counters import get_counters
from swh.indexer.storage import get_indexer_storage
from swh.scheduler import get_scheduler
from swh.search import get_search
from swh.storage import get_storage
from swh.vault import get_vault
from swh.web import settings
SWH_WEB_INTERNAL_SERVER_NAME = "archive.internal.softwareheritage.org"
STAGING_SERVER_NAMES = [
"webapp.staging.swh.network",
"webapp.internal.staging.swh.network",
]
ORIGIN_VISIT_TYPES = [
"cran",
"deb",
"deposit",
"ftp",
"hg",
"git",
"nixguix",
"npm",
"pypi",
"svn",
"tar",
]
SETTINGS_DIR = os.path.dirname(settings.__file__)
DEFAULT_CONFIG = {
"allowed_hosts": ("list", []),
"search": (
"dict",
{"cls": "remote", "url": "http://127.0.0.1:5010/", "timeout": 10,},
),
"storage": (
"dict",
{"cls": "remote", "url": "http://127.0.0.1:5002/", "timeout": 10,},
),
"indexer_storage": (
"dict",
{"cls": "remote", "url": "http://127.0.0.1:5007/", "timeout": 1,},
),
"counters": (
"dict",
{"cls": "remote", "url": "http://127.0.0.1:5011/", "timeout": 1,},
),
"log_dir": ("string", "/tmp/swh/log"),
"debug": ("bool", False),
"serve_assets": ("bool", False),
"host": ("string", "127.0.0.1"),
"port": ("int", 5004),
"secret_key": ("string", "development key"),
# do not display code highlighting for content > 1MB
"content_display_max_size": ("int", 5 * 1024 * 1024),
"snapshot_content_max_size": ("int", 1000),
"throttling": (
"dict",
{
"cache_uri": None, # production: memcached as cache (127.0.0.1:11211)
# development: in-memory cache so None
"scopes": {
"swh_api": {
"limiter_rate": {"default": "120/h"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_api_origin_search": {
"limiter_rate": {"default": "10/m"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_vault_cooking": {
"limiter_rate": {"default": "120/h", "GET": "60/m"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_save_origin": {
"limiter_rate": {"default": "120/h", "POST": "10/h"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_api_origin_visit_latest": {
"limiter_rate": {"default": "700/m"},
"exempted_networks": ["127.0.0.0/8"],
},
},
},
),
"vault": ("dict", {"cls": "remote", "args": {"url": "http://127.0.0.1:5005/",}}),
"scheduler": ("dict", {"cls": "remote", "url": "http://127.0.0.1:5008/"}),
"development_db": ("string", os.path.join(SETTINGS_DIR, "db.sqlite3")),
"test_db": ("string", os.path.join(SETTINGS_DIR, "testdb.sqlite3")),
"production_db": ("dict", {"name": "swh-web"}),
"deposit": (
"dict",
{
"private_api_url": "https://deposit.softwareheritage.org/1/private/",
"private_api_user": "swhworker",
- "private_api_password": "",
+ "private_api_password": "some-password",
},
),
"coverage_count_origins": ("bool", False),
"e2e_tests_mode": ("bool", False),
"es_workers_index_url": ("string", ""),
"history_counters_url": (
"string",
"https://stats.export.softwareheritage.org/history_counters.json",
),
"client_config": ("dict", {}),
"keycloak": ("dict", {"server_url": "", "realm_name": ""}),
"graph": (
"dict",
{"server_url": "http://graph.internal.softwareheritage.org:5009/graph/"},
),
"status": (
"dict",
{
"server_url": "https://status.softwareheritage.org/",
"json_path": "1.0/status/578e5eddcdc0cc7951000520",
},
),
"metadata_search_backend": ("string", "swh-indexer-storage"), # or "swh-search"
"counters_backend": ("string", "swh-storage"), # or "swh-counters"
"staging_server_names": ("list", STAGING_SERVER_NAMES),
"instance_name": ("str", "archive-test.softwareheritage.org"),
}
swhweb_config: Dict[str, Any] = {}
def get_config(config_file="web/web"):
"""Read the configuration file `config_file`.
If an environment variable SWH_CONFIG_FILENAME is defined, this
takes precedence over the config_file parameter.
In any case, update the app with parameters (secret_key, conf)
and return the parsed configuration as a dict.
If no configuration file is provided, return a default
configuration.
"""
if not swhweb_config:
config_filename = os.environ.get("SWH_CONFIG_FILENAME")
if config_filename:
config_file = config_filename
cfg = config.load_named_config(config_file, DEFAULT_CONFIG)
swhweb_config.update(cfg)
config.prepare_folders(swhweb_config, "log_dir")
if swhweb_config.get("search"):
swhweb_config["search"] = get_search(**swhweb_config["search"])
else:
swhweb_config["search"] = None
swhweb_config["storage"] = get_storage(**swhweb_config["storage"])
swhweb_config["vault"] = get_vault(**swhweb_config["vault"])
swhweb_config["indexer_storage"] = get_indexer_storage(
**swhweb_config["indexer_storage"]
)
swhweb_config["scheduler"] = get_scheduler(**swhweb_config["scheduler"])
swhweb_config["counters"] = get_counters(**swhweb_config["counters"])
return swhweb_config
def search():
"""Return the current application's search.
"""
return get_config()["search"]
def storage():
"""Return the current application's storage.
"""
return get_config()["storage"]
def vault():
"""Return the current application's vault.
"""
return get_config()["vault"]
def indexer_storage():
"""Return the current application's indexer storage.
"""
return get_config()["indexer_storage"]
def scheduler():
"""Return the current application's scheduler.
"""
return get_config()["scheduler"]
def counters():
"""Return the current application's counters.
"""
return get_config()["counters"]
diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py
index 8c8fb274..1c260bea 100644
--- a/swh/web/tests/common/test_utils.py
+++ b/swh/web/tests/common/test_utils.py
@@ -1,230 +1,287 @@
-# Copyright (C) 2017-2020 The Software Heritage developers
+# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from base64 import b64encode
import datetime
from urllib.parse import quote
import pytest
from django.conf.urls import url
from django.test.utils import override_settings
from django.urls.exceptions import NoReverseMatch
from swh.web.common import utils
from swh.web.common.exc import BadInputExc
+from swh.web.config import get_config
def test_shorten_path_noop():
noops = ["/api/", "/browse/", "/content/symbol/foobar/"]
for noop in noops:
assert utils.shorten_path(noop) == noop
def test_shorten_path_sha1():
sha1 = "aafb16d69fd30ff58afdd69036a26047f3aebdc6"
short_sha1 = sha1[:8] + "..."
templates = [
"/api/1/content/sha1:%s/",
"/api/1/content/sha1_git:%s/",
"/api/1/directory/%s/",
"/api/1/content/sha1:%s/ctags/",
]
for template in templates:
assert utils.shorten_path(template % sha1) == template % short_sha1
def test_shorten_path_sha256():
sha256 = "aafb16d69fd30ff58afdd69036a26047" "213add102934013a014dfca031c41aef"
short_sha256 = sha256[:8] + "..."
templates = [
"/api/1/content/sha256:%s/",
"/api/1/directory/%s/",
"/api/1/content/sha256:%s/filetype/",
]
for template in templates:
assert utils.shorten_path(template % sha256) == template % short_sha256
@pytest.mark.parametrize(
"input_timestamp, output_date",
[
(
"2016-01-12",
datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc),
),
(
"2016-01-12T09:19:12+0100",
datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc),
),
(
"2007-01-14T20:34:22Z",
datetime.datetime(2007, 1, 14, 20, 34, 22, tzinfo=datetime.timezone.utc),
),
],
)
def test_parse_iso8601_date_to_utc_ok(input_timestamp, output_date):
assert utils.parse_iso8601_date_to_utc(input_timestamp) == output_date
@pytest.mark.parametrize(
"invalid_iso8601_timestamp", ["Today is January 1, 2047 at 8:21:00AM", "1452591542"]
)
def test_parse_iso8601_date_to_utc_ko(invalid_iso8601_timestamp):
with pytest.raises(BadInputExc):
utils.parse_iso8601_date_to_utc(invalid_iso8601_timestamp)
def test_format_utc_iso_date():
assert (
utils.format_utc_iso_date("2017-05-04T13:27:13+02:00")
== "04 May 2017, 11:27 UTC"
)
def test_gen_path_info():
input_path = "/home/user/swh-environment/swh-web/"
expected_result = [
{"name": "home", "path": "home"},
{"name": "user", "path": "home/user"},
{"name": "swh-environment", "path": "home/user/swh-environment"},
{"name": "swh-web", "path": "home/user/swh-environment/swh-web"},
]
path_info = utils.gen_path_info(input_path)
assert path_info == expected_result
input_path = "home/user/swh-environment/swh-web"
path_info = utils.gen_path_info(input_path)
assert path_info == expected_result
def test_rst_to_html():
rst = (
"Section\n"
"=======\n\n"
"**Some strong text**\n\n"
"* This is a bulleted list.\n"
"* It has two items, the second\n"
" item uses two lines.\n"
"\n"
"1. This is a numbered list.\n"
"2. It has two items too.\n"
"\n"
"#. This is a numbered list.\n"
"#. It has two items too.\n"
)
expected_html = (
'Section
\n'
"
Some strong text
\n"
'
\n"
'
\n'
"This is a numbered list.
\n"
"It has two items too.
\n"
"This is a numbered list.
\n"
"It has two items too.
\n"
"
\n"
"
"
)
assert utils.rst_to_html(rst) == expected_html
def sample_test_view(request, string, number):
pass
def sample_test_view_no_url_args(request):
pass
urlpatterns = [
url(
r"^sample/test/(?P.+)/view/(?P[0-9]+)/$",
sample_test_view,
name="sample-test-view",
),
url(
r"^sample/test/view/no/url/args/$",
sample_test_view_no_url_args,
name="sample-test-view-no-url-args",
),
]
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_url_args_only_ok():
string = "foo"
number = 55
url = utils.reverse(
"sample-test-view", url_args={"string": string, "number": number}
)
assert url == f"/sample/test/{string}/view/{number}/"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_url_args_only_ko():
string = "foo"
with pytest.raises(NoReverseMatch):
utils.reverse("sample-test-view", url_args={"string": string, "number": string})
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_no_url_args():
url = utils.reverse("sample-test-view-no-url-args")
assert url == "/sample/test/view/no/url/args/"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_query_params_only():
start = 0
scope = "foo"
url = utils.reverse(
"sample-test-view-no-url-args", query_params={"start": start, "scope": scope}
)
assert url == f"/sample/test/view/no/url/args/?scope={scope}&start={start}"
url = utils.reverse(
"sample-test-view-no-url-args", query_params={"start": start, "scope": None}
)
assert url == f"/sample/test/view/no/url/args/?start={start}"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_query_params_encode():
libname = "libstc++"
url = utils.reverse(
"sample-test-view-no-url-args", query_params={"libname": libname}
)
assert url == f"/sample/test/view/no/url/args/?libname={quote(libname, safe='/;:')}"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_url_args_query_params():
string = "foo"
number = 55
start = 10
scope = "bar"
url = utils.reverse(
"sample-test-view",
url_args={"string": string, "number": number},
query_params={"start": start, "scope": scope},
)
assert url == f"/sample/test/{string}/view/{number}/?scope={scope}&start={start}"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_absolute_uri(request_factory):
request = request_factory.get(utils.reverse("sample-test-view-no-url-args"))
url = utils.reverse("sample-test-view-no-url-args", request=request)
assert url == f"http://{request.META['SERVER_NAME']}/sample/test/view/no/url/args/"
+
+
+def test_get_deposits_list(requests_mock):
+ deposits_data = {
+ "count": 2,
+ "results": [
+ {
+ "check_task_id": "351820217",
+ "client": 2,
+ "collection": 1,
+ "complete_date": "2021-01-21T07:52:19.919312Z",
+ "external_id": "hal-03116143",
+ "id": 1412,
+ "load_task_id": "351820260",
+ "origin_url": "https://hal.archives-ouvertes.fr/hal-03116143",
+ "parent": None,
+ "reception_date": "2021-01-21T07:52:19.471019Z",
+ "status": "done",
+ "status_detail": None,
+ "swhid": "swh:1:dir:f25157ad1b13cb20ac3457d4f6756b49ac63d079",
+ },
+ {
+ "check_task_id": "381576507",
+ "client": 2,
+ "collection": 1,
+ "complete_date": "2021-07-07T08:00:44.726676Z",
+ "external_id": "hal-03275052",
+ "id": 1693,
+ "load_task_id": "381576508",
+ "origin_url": "https://hal.archives-ouvertes.fr/hal-03275052",
+ "parent": None,
+ "reception_date": "2021-07-07T08:00:44.327661Z",
+ "status": "done",
+ "status_detail": None,
+ "swhid": "swh:1:dir:825fa96d1810177ec08a772ffa5bd34bbd08b89c",
+ },
+ ],
+ }
+
+ config = get_config()["deposit"]
+ deposits_list_url = config["private_api_url"] + "deposits"
+
+ basic_auth_payload = (
+ config["private_api_user"] + ":" + config["private_api_password"]
+ ).encode()
+
+ requests_mock.get(
+ deposits_list_url,
+ json=deposits_data,
+ request_headers={
+ "Authorization": f"Basic {b64encode(basic_auth_payload).decode('ascii')}"
+ },
+ )
+
+ assert utils.get_deposits_list() == deposits_data["results"]