diff --git a/swh/web/admin/deposit.py b/swh/web/admin/deposit.py
index eec132b1..0be07976 100644
--- a/swh/web/admin/deposit.py
+++ b/swh/web/admin/deposit.py
@@ -1,135 +1,44 @@
# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import sentry_sdk
+
+import requests
+from requests.auth import HTTPBasicAuth
from django.conf import settings
from django.contrib.auth.decorators import user_passes_test
-from django.core.paginator import Paginator
from django.http import JsonResponse
from django.shortcuts import render
from swh.web.admin.adminurls import admin_route
from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION
-from swh.web.common.utils import (
- get_deposit_raw_metadata,
- get_deposits_list,
- parse_swh_deposit_origin,
- parse_swh_metadata_provenance,
-)
+from swh.web.config import get_config
def _can_list_deposits(user):
return user.is_staff or user.has_perm(ADMIN_LIST_DEPOSIT_PERMISSION)
@admin_route(r"deposit/", view_name="admin-deposit")
@user_passes_test(_can_list_deposits, login_url=settings.LOGIN_URL)
def _admin_origin_save(request):
return render(request, "admin/deposit.html")
@admin_route(r"deposit/list/", view_name="admin-deposit-list")
@user_passes_test(_can_list_deposits, login_url=settings.LOGIN_URL)
def _admin_deposit_list(request):
- table_data = {}
- table_data["draw"] = int(request.GET["draw"])
- try:
- deposits = get_deposits_list(request.GET.get("username"))
- deposits_count = len(deposits)
- search_value = request.GET["search[value]"]
- if search_value:
- deposits = [
- d
- for d in deposits
- if any(
- search_value.lower() in val
- for val in [str(v).lower() for v in d.values()]
- )
- ]
-
- exclude_pattern = request.GET.get("excludePattern")
- if exclude_pattern:
- deposits = [
- d
- for d in deposits
- if all(
- exclude_pattern.lower() not in val
- for val in [str(v).lower() for v in d.values()]
- )
- ]
-
- column_order = request.GET["order[0][column]"]
- field_order = request.GET["columns[%s][name]" % column_order]
- order_dir = request.GET["order[0][dir]"]
-
- deposits = sorted(deposits, key=lambda d: d[field_order] or "")
- if order_dir == "desc":
- deposits = list(reversed(deposits))
-
- length = int(request.GET["length"])
- page = int(request.GET["start"]) / length + 1
- paginator = Paginator(deposits, length)
- data = paginator.page(page).object_list
- table_data["recordsTotal"] = deposits_count
- table_data["recordsFiltered"] = len(deposits)
- data_list = []
- for d in data:
- data_dict = {
- "id": d["id"],
- "type": d["type"],
- "external_id": d["external_id"],
- "reception_date": d["reception_date"],
- "status": d["status"],
- "status_detail": d["status_detail"],
- "swhid": d["swhid"],
- "swhid_context": d["swhid_context"],
- }
- provenance = None
- raw_metadata = d["raw_metadata"]
- # for meta deposit, the uri should be the url provenance
- if raw_metadata and d["type"] == "meta": # metadata provenance
- provenance = parse_swh_metadata_provenance(d["raw_metadata"])
- # For code deposits the uri is the origin
- # First, trying to determine it out of the raw metadata associated with the
- # deposit
- elif raw_metadata and d["type"] == "code":
- provenance = parse_swh_deposit_origin(raw_metadata)
-
- # For code deposits, if not provided, use the origin_url
- if not provenance and d["type"] == "code":
- if d["origin_url"]:
- provenance = d["origin_url"]
-
- # If still not found, fallback using the swhid context
- if not provenance and d["swhid_context"]:
- # Trying to compute the origin as we did before in the js
- from swh.model.swhids import QualifiedSWHID
-
- swhid = QualifiedSWHID.from_string(d["swhid_context"])
- provenance = swhid.origin
-
- data_dict["uri"] = provenance # could be None
-
- # This could be large. As this is not displayed yet, drop it to avoid
- # cluttering the data dict
- data_dict.pop("raw_metadata", None)
-
- data_list.append(data_dict)
-
- table_data["data"] = data_list
-
- for row in table_data["data"]:
- metadata = get_deposit_raw_metadata(row["id"])
- if metadata:
- row["raw_metadata"] = metadata
- else:
- row["raw_metadata"] = None
-
- except Exception as exc:
- sentry_sdk.capture_exception(exc)
- table_data["error"] = f"Could not retrieve deposits: {exc!r}"
-
- return JsonResponse(table_data)
+ config = get_config()["deposit"]
+ private_api_url = config["private_api_url"].rstrip("/") + "/"
+ deposits_list_url = private_api_url + "deposits/datatables/"
+ deposits_list_auth = HTTPBasicAuth(
+ config["private_api_user"], config["private_api_password"]
+ )
+
+ deposits = requests.get(
+ deposits_list_url, auth=deposits_list_auth, params=request.GET, timeout=30
+ ).json()
+
+ return JsonResponse(deposits)
diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
index 1853f6a4..d8bb0fcf 100644
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -1,602 +1,528 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import functools
import os
import re
from typing import Any, Callable, Dict, List, Optional
import urllib.parse
-from xml.etree import ElementTree
from bs4 import BeautifulSoup
from docutils.core import publish_parts
import docutils.parsers.rst
import docutils.utils
from docutils.writers.html5_polyglot import HTMLTranslator, Writer
from iso8601 import ParseError, parse_date
from pkg_resources import get_distribution
from prometheus_client.registry import CollectorRegistry
import requests
from requests.auth import HTTPBasicAuth
import sentry_sdk
from django.core.cache import cache
from django.core.cache.backends.base import DEFAULT_TIMEOUT
from django.http import HttpRequest, QueryDict
from django.shortcuts import redirect
from django.urls import resolve
from django.urls import reverse as django_reverse
from swh.web.auth.utils import (
ADD_FORGE_MODERATOR_PERMISSION,
ADMIN_LIST_DEPOSIT_PERMISSION,
MAILMAP_ADMIN_PERMISSION,
)
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import QueryParameters
from swh.web.config import SWH_WEB_SERVER_NAME, get_config, search
SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True)
swh_object_icons = {
"alias": "mdi mdi-star",
"branch": "mdi mdi-source-branch",
"branches": "mdi mdi-source-branch",
"content": "mdi mdi-file-document",
"cnt": "mdi mdi-file-document",
"directory": "mdi mdi-folder",
"dir": "mdi mdi-folder",
"origin": "mdi mdi-source-repository",
"ori": "mdi mdi-source-repository",
"person": "mdi mdi-account",
"revisions history": "mdi mdi-history",
"release": "mdi mdi-tag",
"rel": "mdi mdi-tag",
"releases": "mdi mdi-tag",
"revision": "mdi mdi-rotate-90 mdi-source-commit",
"rev": "mdi mdi-rotate-90 mdi-source-commit",
"snapshot": "mdi mdi-camera",
"snp": "mdi mdi-camera",
"visits": "mdi mdi-calendar-month",
}
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[QueryParameters] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url
def datetime_to_utc(date):
"""Returns datetime in UTC without timezone info
Args:
date (datetime.datetime): input datetime with timezone info
Returns:
datetime.datetime: datetime in UTC without timezone info
"""
if date.tzinfo and date.tzinfo != timezone.utc:
return date.astimezone(tz=timezone.utc)
else:
return date
def parse_iso8601_date_to_utc(iso_date: str) -> datetime:
"""Given an ISO 8601 datetime string, parse the result as UTC datetime.
Returns:
a timezone-aware datetime representing the parsed date
Raises:
swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- 2007-01-14T20:34:22Z
"""
try:
date = parse_date(iso_date)
return datetime_to_utc(date)
except ParseError as e:
raise BadInputExc(e)
def shorten_path(path):
"""Shorten the given path: for each hash present, only return the first
8 characters followed by an ellipsis"""
sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}"
sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}"
ret = re.sub(sha256_re, r"\1...", path)
return re.sub(sha1_re, r"\1...", ret)
def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"):
"""Turns a string representation of an ISO 8601 datetime string
to UTC and format it into a more human readable one.
For instance, from the following input
string: '2017-05-04T13:27:13+02:00' the following one
is returned: '04 May 2017, 11:27 UTC'.
Custom format string may also be provided
as parameter
Args:
iso_date (str): a string representation of an ISO 8601 date
fmt (str): optional date formatting string
Returns:
str: a formatted string representation of the input iso date
"""
if not iso_date:
return iso_date
date = parse_iso8601_date_to_utc(iso_date)
return date.strftime(fmt)
def gen_path_info(path):
"""Function to generate path data navigation for use
with a breadcrumb in the swh web ui.
For instance, from a path /folder1/folder2/folder3,
it returns the following list::
[{'name': 'folder1', 'path': 'folder1'},
{'name': 'folder2', 'path': 'folder1/folder2'},
{'name': 'folder3', 'path': 'folder1/folder2/folder3'}]
Args:
path: a filesystem path
Returns:
list: a list of path data for navigation as illustrated above.
"""
path_info = []
if path:
sub_paths = path.strip("/").split("/")
path_from_root = ""
for p in sub_paths:
path_from_root += "/" + p
path_info.append({"name": p, "path": path_from_root.strip("/")})
return path_info
def parse_rst(text, report_level=2):
"""
Parse a reStructuredText string with docutils.
Args:
text (str): string with reStructuredText markups in it
report_level (int): level of docutils report messages to print
(1 info 2 warning 3 error 4 severe 5 none)
Returns:
docutils.nodes.document: a parsed docutils document
"""
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components
).get_default_values()
settings.report_level = report_level
document = docutils.utils.new_document("rst-doc", settings=settings)
parser.parse(text, document)
return document
def get_client_ip(request):
"""
Return the client IP address from an incoming HTTP request.
Args:
request (django.http.HttpRequest): the incoming HTTP request
Returns:
str: The client IP address
"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip
def is_swh_web_development(request: HttpRequest) -> bool:
"""Indicate if we are running a development version of swh-web."""
site_base_url = request.build_absolute_uri("/")
return any(
host in site_base_url for host in ("localhost", "127.0.0.1", "testserver")
)
def is_swh_web_staging(request: HttpRequest) -> bool:
"""Indicate if we are running a staging version of swh-web."""
config = get_config()
site_base_url = request.build_absolute_uri("/")
return any(
server_name in site_base_url for server_name in config["staging_server_names"]
)
def is_swh_web_production(request: HttpRequest) -> bool:
"""Indicate if we are running the public production version of swh-web."""
return SWH_WEB_SERVER_NAME in request.build_absolute_uri("/")
browsers_supported_image_mimes = set(
[
"image/gif",
"image/png",
"image/jpeg",
"image/bmp",
"image/webp",
"image/svg",
"image/svg+xml",
]
)
def context_processor(request):
"""
Django context processor used to inject variables
in all swh-web templates.
"""
config = get_config()
if (
hasattr(request, "user")
and request.user.is_authenticated
and not hasattr(request.user, "backend")
):
# To avoid django.template.base.VariableDoesNotExist errors
# when rendering templates when standard Django user is logged in.
request.user.backend = "django.contrib.auth.backends.ModelBackend"
return {
"swh_object_icons": swh_object_icons,
"available_languages": None,
"swh_client_config": config["client_config"],
"oidc_enabled": bool(config["keycloak"]["server_url"]),
"browsers_supported_image_mimes": browsers_supported_image_mimes,
"keycloak": config["keycloak"],
"site_base_url": request.build_absolute_uri("/"),
"DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"],
"status": config["status"],
"swh_web_dev": is_swh_web_development(request),
"swh_web_staging": is_swh_web_staging(request),
"swh_web_version": get_distribution("swh.web").version,
"iframe_mode": False,
"ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION,
"ADD_FORGE_MODERATOR_PERMISSION": ADD_FORGE_MODERATOR_PERMISSION,
"FEATURES": get_config()["features"],
"MAILMAP_ADMIN_PERMISSION": MAILMAP_ADMIN_PERMISSION,
}
def resolve_branch_alias(
snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Resolve branch alias in snapshot content.
Args:
snapshot: a full snapshot content
branch: a branch alias contained in the snapshot
Returns:
The real snapshot branch that got aliased.
"""
while branch and branch["target_type"] == "alias":
if branch["target"] in snapshot["branches"]:
branch = snapshot["branches"][branch["target"]]
else:
from swh.web.common import archive
snp = archive.lookup_snapshot(
snapshot["id"], branches_from=branch["target"], branches_count=1
)
if snp and branch["target"] in snp["branches"]:
branch = snp["branches"][branch["target"]]
else:
branch = None
return branch
class _NoHeaderHTMLTranslator(HTMLTranslator):
"""
Docutils translator subclass to customize the generation of HTML
from reST-formatted docstrings
"""
def __init__(self, document):
super().__init__(document)
self.body_prefix = []
self.body_suffix = []
_HTML_WRITER = Writer()
_HTML_WRITER.translator_class = _NoHeaderHTMLTranslator
def rst_to_html(rst: str) -> str:
"""
Convert reStructuredText document into HTML.
Args:
rst: A string containing a reStructuredText document
Returns:
Body content of the produced HTML conversion.
"""
settings = {
"initial_header_level": 2,
"halt_level": 4,
"traceback": True,
"file_insertion_enabled": False,
"raw_enabled": False,
}
pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings)
return f'
{pp["html_body"]}
'
def prettify_html(html: str) -> str:
"""
Prettify an HTML document.
Args:
html: Input HTML document
Returns:
The prettified HTML document
"""
return BeautifulSoup(html, "lxml").prettify()
def django_cache(
timeout: int = DEFAULT_TIMEOUT,
catch_exception: bool = False,
exception_return_value: Any = None,
invalidate_cache_pred: Callable[[Any], bool] = lambda val: False,
):
"""Decorator to put the result of a function call in Django cache,
subsequent calls will directly return the cached value.
Args:
timeout: The number of seconds value will be hold in cache
catch_exception: If :const:`True`, any thrown exception by
the decorated function will be caught and not reraised
exception_return_value: The value to return if previous
parameter is set to :const:`True`
invalidate_cache_pred: A predicate function enabling to
invalidate the cache under certain conditions, decorated
function will then be called again
Returns:
The returned value of the decorated function for the specified
parameters
"""
def inner(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_args = args + (0,) + tuple(sorted(kwargs.items()))
cache_key = str(hash((func.__module__, func.__name__) + func_args))
ret = cache.get(cache_key)
if ret is None or invalidate_cache_pred(ret):
try:
ret = func(*args, **kwargs)
except Exception as exc:
sentry_sdk.capture_exception(exc)
if catch_exception:
return exception_return_value
else:
raise
else:
cache.set(cache_key, ret, timeout=timeout)
return ret
return wrapper
return inner
def _deposits_list_url(
deposits_list_base_url: str, page_size: int, username: Optional[str]
) -> str:
params = {"page_size": str(page_size)}
if username is not None:
params["username"] = username
return f"{deposits_list_base_url}?{urllib.parse.urlencode(params)}"
def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]:
"""Return the list of software deposits using swh-deposit API"""
config = get_config()["deposit"]
- deposits_list_base_url = config["private_api_url"] + "deposits"
+ private_api_url = config["private_api_url"].rstrip("/") + "/"
+ deposits_list_base_url = private_api_url + "deposits"
deposits_list_auth = HTTPBasicAuth(
config["private_api_user"], config["private_api_password"]
)
deposits_list_url = _deposits_list_url(
deposits_list_base_url, page_size=1, username=username
)
nb_deposits = requests.get(
deposits_list_url, auth=deposits_list_auth, timeout=30
).json()["count"]
@django_cache(invalidate_cache_pred=lambda data: data["count"] != nb_deposits)
def _get_deposits_data():
deposits_list_url = _deposits_list_url(
deposits_list_base_url, page_size=nb_deposits, username=username
)
return requests.get(
deposits_list_url,
auth=deposits_list_auth,
timeout=30,
).json()
deposits_data = _get_deposits_data()
return deposits_data["results"]
-@django_cache()
-def get_deposit_raw_metadata(deposit_id: int) -> Optional[str]:
- config = get_config()["deposit"]
- url = f"{config['private_api_url']}/{deposit_id}/meta"
- return requests.get(url).json()["raw_metadata"]
-
-
_origin_visit_types_cache_timeout = 24 * 60 * 60 # 24 hours
@django_cache(
timeout=_origin_visit_types_cache_timeout,
catch_exception=True,
exception_return_value=[],
)
def origin_visit_types() -> List[str]:
"""Return the exhaustive list of visit types for origins
ingested into the archive.
"""
return sorted(search().visit_types_count().keys())
def redirect_to_new_route(request, new_route, permanent=True):
"""Redirect a request to another route with url args and query parameters
eg: /origin//log?path=test can be redirected as
/log?url=&path=test. This can be used to deprecate routes
"""
request_path = resolve(request.path_info)
args = {**request_path.kwargs, **request.GET.dict()}
return redirect(
reverse(new_route, query_params=args),
permanent=permanent,
)
-NAMESPACES = {
- "swh": "https://www.softwareheritage.org/schema/2018/deposit",
- "schema": "http://schema.org/",
-}
-
-
-def parse_swh_metadata_provenance(raw_metadata: str) -> Optional[str]:
- """Parse swh metadata-provenance out of the raw metadata deposit. If found, returns the
- value, None otherwise.
-
- .. code-block:: xml
-
-
-
- https://example.org/metadata/url
-
-
-
- Args:
- raw_metadata: raw metadata out of deposits received
-
- Returns:
- Either the metadata provenance url if any or None otherwise
-
- """
- metadata = ElementTree.fromstring(raw_metadata)
- url = metadata.findtext(
- "swh:deposit/swh:metadata-provenance/schema:url",
- namespaces=NAMESPACES,
- )
- return url or None
-
-
-def parse_swh_deposit_origin(raw_metadata: str) -> Optional[str]:
- """Parses and from metadata document,
- if any. They are mutually exclusive and tested as such in the deposit.
-
- .. code-block:: xml
-
-
-
-
-
-
-
- .. code-block:: xml
-
-
-
-
-
-
-
- Returns:
- The one not null if any, None otherwise
-
- """
- metadata = ElementTree.fromstring(raw_metadata)
- for origin_tag in ["create_origin", "add_to_origin"]:
- elt = metadata.find(
- f"swh:deposit/swh:{origin_tag}/swh:origin[@url]", namespaces=NAMESPACES
- )
- if elt is not None:
- return elt.attrib["url"]
- return None
-
-
def has_add_forge_now_permission(user) -> bool:
"""Is a user considered an add-forge-now moderator?
Returns
True if a user is staff or has add forge now moderator permission
"""
return user.is_staff or user.has_perm(ADD_FORGE_MODERATOR_PERMISSION)
diff --git a/swh/web/tests/admin/test_deposit.py b/swh/web/tests/admin/test_deposit.py
index ceeb2860..75f64aed 100644
--- a/swh/web/tests/admin/test_deposit.py
+++ b/swh/web/tests/admin/test_deposit.py
@@ -1,37 +1,101 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from base64 import b64encode
+
import pytest
from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION
from swh.web.common.utils import reverse
-from swh.web.tests.utils import check_html_get_response, create_django_permission
+from swh.web.config import get_config
+from swh.web.tests.utils import (
+ check_html_get_response,
+ check_http_get_response,
+ create_django_permission,
+)
def test_deposit_admin_view_not_available_for_anonymous_user(client):
url = reverse("admin-deposit")
resp = check_html_get_response(client, url, status_code=302)
assert resp["location"] == reverse("login", query_params={"next": url})
@pytest.mark.django_db
def test_deposit_admin_view_available_for_staff_user(client, staff_user):
client.force_login(staff_user)
url = reverse("admin-deposit")
check_html_get_response(
client, url, status_code=200, template_used="admin/deposit.html"
)
@pytest.mark.django_db
def test_deposit_admin_view_available_for_user_with_permission(client, regular_user):
regular_user.user_permissions.add(
create_django_permission(ADMIN_LIST_DEPOSIT_PERMISSION)
)
client.force_login(regular_user)
url = reverse("admin-deposit")
check_html_get_response(
client, url, status_code=200, template_used="admin/deposit.html"
)
+
+
+@pytest.mark.django_db
+def test_deposit_admin_view_list_deposits(client, staff_user, requests_mock):
+ deposits_data = {
+ "data": [
+ {
+ "external_id": "hal-02527986",
+ "id": 1066,
+ "raw_metadata": None,
+ "reception_date": "2022-04-08T14:12:34.143000Z",
+ "status": "rejected",
+ "status_detail": None,
+ "swhid": None,
+ "swhid_context": None,
+ "type": "code",
+ "uri": "https://inria.halpreprod.archives-ouvertes.fr/hal-02527986",
+ },
+ {
+ "external_id": "hal-01243573",
+ "id": 1065,
+ "raw_metadata": None,
+ "reception_date": "2022-04-08T12:53:50.940000Z",
+ "status": "rejected",
+ "status_detail": None,
+ "swhid": None,
+ "swhid_context": None,
+ "type": "code",
+ "uri": "https://inria.halpreprod.archives-ouvertes.fr/hal-01243573",
+ },
+ ],
+ "draw": 2,
+ "recordsFiltered": 645,
+ "recordsTotal": 1066,
+ }
+
+ config = get_config()["deposit"]
+ private_api_url = config["private_api_url"].rstrip("/") + "/"
+ deposits_list_url = private_api_url + "deposits/datatables/"
+
+ basic_auth_payload = (
+ config["private_api_user"] + ":" + config["private_api_password"]
+ ).encode()
+
+ requests_mock.get(
+ deposits_list_url,
+ json=deposits_data,
+ request_headers={
+ "Authorization": f"Basic {b64encode(basic_auth_payload).decode('ascii')}"
+ },
+ )
+
+ client.force_login(staff_user)
+ url = reverse("admin-deposit-list")
+ check_http_get_response(
+ client, url, status_code=200, content_type="application/json"
+ )
diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py
index 65cf28f5..08127875 100644
--- a/swh/web/tests/common/test_utils.py
+++ b/swh/web/tests/common/test_utils.py
@@ -1,433 +1,392 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from base64 import b64encode
import datetime
import math
-from os.path import join
import sys
from urllib.parse import quote
import pytest
from django.conf.urls import url
from django.test.utils import override_settings
from django.urls.exceptions import NoReverseMatch
from swh.web.common import utils
from swh.web.common.exc import BadInputExc
from swh.web.config import SWH_WEB_SERVER_NAME, SWH_WEB_STAGING_SERVER_NAMES, get_config
def test_shorten_path_noop():
noops = ["/api/", "/browse/", "/content/symbol/foobar/"]
for noop in noops:
assert utils.shorten_path(noop) == noop
def test_shorten_path_sha1():
sha1 = "aafb16d69fd30ff58afdd69036a26047f3aebdc6"
short_sha1 = sha1[:8] + "..."
templates = [
"/api/1/content/sha1:%s/",
"/api/1/content/sha1_git:%s/",
"/api/1/directory/%s/",
"/api/1/content/sha1:%s/ctags/",
]
for template in templates:
assert utils.shorten_path(template % sha1) == template % short_sha1
def test_shorten_path_sha256():
sha256 = "aafb16d69fd30ff58afdd69036a26047" "213add102934013a014dfca031c41aef"
short_sha256 = sha256[:8] + "..."
templates = [
"/api/1/content/sha256:%s/",
"/api/1/directory/%s/",
"/api/1/content/sha256:%s/filetype/",
]
for template in templates:
assert utils.shorten_path(template % sha256) == template % short_sha256
@pytest.mark.parametrize(
"input_timestamp, output_date",
[
(
"2016-01-12",
datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc),
),
(
"2016-01-12T09:19:12+0100",
datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc),
),
(
"2007-01-14T20:34:22Z",
datetime.datetime(2007, 1, 14, 20, 34, 22, tzinfo=datetime.timezone.utc),
),
],
)
def test_parse_iso8601_date_to_utc_ok(input_timestamp, output_date):
assert utils.parse_iso8601_date_to_utc(input_timestamp) == output_date
@pytest.mark.parametrize(
"invalid_iso8601_timestamp", ["Today is January 1, 2047 at 8:21:00AM", "1452591542"]
)
def test_parse_iso8601_date_to_utc_ko(invalid_iso8601_timestamp):
with pytest.raises(BadInputExc):
utils.parse_iso8601_date_to_utc(invalid_iso8601_timestamp)
def test_format_utc_iso_date():
assert (
utils.format_utc_iso_date("2017-05-04T13:27:13+02:00")
== "04 May 2017, 11:27 UTC"
)
def test_gen_path_info():
input_path = "/home/user/swh-environment/swh-web/"
expected_result = [
{"name": "home", "path": "home"},
{"name": "user", "path": "home/user"},
{"name": "swh-environment", "path": "home/user/swh-environment"},
{"name": "swh-web", "path": "home/user/swh-environment/swh-web"},
]
path_info = utils.gen_path_info(input_path)
assert path_info == expected_result
input_path = "home/user/swh-environment/swh-web"
path_info = utils.gen_path_info(input_path)
assert path_info == expected_result
def test_rst_to_html():
rst = (
"Section\n"
"=======\n\n"
"**Some strong text**\n\n"
"* This is a bulleted list.\n"
"* It has two items, the second\n"
" item uses two lines.\n"
"\n"
"1. This is a numbered list.\n"
"2. It has two items too.\n"
"\n"
"#. This is a numbered list.\n"
"#. It has two items too.\n"
)
expected_html = (
'Section
\n'
"
Some strong text
\n"
'
\n"
'
\n'
"This is a numbered list.
\n"
"It has two items too.
\n"
"This is a numbered list.
\n"
"It has two items too.
\n"
"
\n"
"
"
)
assert utils.rst_to_html(rst) == expected_html
def sample_test_view(request, string, number):
pass
def sample_test_view_no_url_args(request):
pass
urlpatterns = [
url(
r"^sample/test/(?P.+)/view/(?P[0-9]+)/$",
sample_test_view,
name="sample-test-view",
),
url(
r"^sample/test/view/no/url/args/$",
sample_test_view_no_url_args,
name="sample-test-view-no-url-args",
),
]
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_url_args_only_ok():
string = "foo"
number = 55
url = utils.reverse(
"sample-test-view", url_args={"string": string, "number": number}
)
assert url == f"/sample/test/{string}/view/{number}/"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_url_args_only_ko():
string = "foo"
with pytest.raises(NoReverseMatch):
utils.reverse("sample-test-view", url_args={"string": string, "number": string})
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_no_url_args():
url = utils.reverse("sample-test-view-no-url-args")
assert url == "/sample/test/view/no/url/args/"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_query_params_only():
start = 0
scope = "foo"
url = utils.reverse(
"sample-test-view-no-url-args", query_params={"start": start, "scope": scope}
)
assert url == f"/sample/test/view/no/url/args/?scope={scope}&start={start}"
url = utils.reverse(
"sample-test-view-no-url-args", query_params={"start": start, "scope": None}
)
assert url == f"/sample/test/view/no/url/args/?start={start}"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_query_params_encode():
libname = "libstc++"
url = utils.reverse(
"sample-test-view-no-url-args", query_params={"libname": libname}
)
assert url == f"/sample/test/view/no/url/args/?libname={quote(libname, safe='/;:')}"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_url_args_query_params():
string = "foo"
number = 55
start = 10
scope = "bar"
url = utils.reverse(
"sample-test-view",
url_args={"string": string, "number": number},
query_params={"start": start, "scope": scope},
)
assert url == f"/sample/test/{string}/view/{number}/?scope={scope}&start={start}"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_absolute_uri(request_factory):
request = request_factory.get(utils.reverse("sample-test-view-no-url-args"))
url = utils.reverse("sample-test-view-no-url-args", request=request)
assert url == f"http://{request.META['SERVER_NAME']}/sample/test/view/no/url/args/"
def test_get_deposits_list(requests_mock):
deposits_data = {
"count": 2,
"results": [
{
"check_task_id": "351820217",
"client": 2,
"collection": 1,
"complete_date": "2021-01-21T07:52:19.919312Z",
"external_id": "hal-03116143",
"id": 1412,
"load_task_id": "351820260",
"origin_url": "https://hal.archives-ouvertes.fr/hal-03116143",
"parent": None,
"reception_date": "2021-01-21T07:52:19.471019Z",
"status": "done",
"status_detail": None,
"swhid": "swh:1:dir:f25157ad1b13cb20ac3457d4f6756b49ac63d079",
},
{
"check_task_id": "381576507",
"client": 2,
"collection": 1,
"complete_date": "2021-07-07T08:00:44.726676Z",
"external_id": "hal-03275052",
"id": 1693,
"load_task_id": "381576508",
"origin_url": "https://hal.archives-ouvertes.fr/hal-03275052",
"parent": None,
"reception_date": "2021-07-07T08:00:44.327661Z",
"status": "done",
"status_detail": None,
"swhid": "swh:1:dir:825fa96d1810177ec08a772ffa5bd34bbd08b89c",
},
],
}
config = get_config()["deposit"]
- deposits_list_url = config["private_api_url"] + "deposits"
+ private_api_url = config["private_api_url"].rstrip("/") + "/"
+ deposits_list_url = private_api_url + "deposits"
basic_auth_payload = (
config["private_api_user"] + ":" + config["private_api_password"]
).encode()
requests_mock.get(
deposits_list_url,
json=deposits_data,
request_headers={
"Authorization": f"Basic {b64encode(basic_auth_payload).decode('ascii')}"
},
)
assert utils.get_deposits_list() == deposits_data["results"]
@pytest.mark.parametrize("backend", ["swh-search", "swh-storage"])
def test_origin_visit_types(mocker, backend):
if backend != "swh-search":
# equivalent to not configuring search in the config
search = mocker.patch("swh.web.common.utils.search")
search.return_value = None
assert utils.origin_visit_types() == []
else:
# see swh/web/tests/data.py for origins added for tests
assert utils.origin_visit_types() == ["git", "tar"]
@pytest.mark.parametrize("server_name", ["localhost", "127.0.0.1", "testserver"])
def test_is_swh_web_development(request_factory, server_name):
request = request_factory.get("/", SERVER_NAME=server_name)
assert utils.is_swh_web_development(request)
@pytest.mark.parametrize("server_name", SWH_WEB_STAGING_SERVER_NAMES)
def test_is_swh_web_staging(request_factory, server_name):
request = request_factory.get("/", SERVER_NAME=server_name)
assert utils.is_swh_web_staging(request)
def test_is_swh_web_production(request_factory):
request = request_factory.get("/", SERVER_NAME=SWH_WEB_SERVER_NAME)
assert utils.is_swh_web_production(request)
-@pytest.mark.parametrize(
- "raw_metadata_file,expected_url",
- [
- ("raw-metadata-provenance.xml", "https://example.org/metadata/provenance"),
- ("raw-metadata-no-swh.xml", None),
- ],
-)
-def test_parse_swh_provenance(datadir, raw_metadata_file, expected_url):
- metadata_path = join(datadir, "deposit", raw_metadata_file)
- with open(metadata_path, "r") as f:
- raw_metadata = f.read()
-
- actual_url = utils.parse_swh_metadata_provenance(raw_metadata)
-
- assert actual_url == expected_url
-
-
-@pytest.mark.parametrize(
- "raw_metadata_file,expected_url",
- [
- (
- "raw-metadata-create-origin.xml",
- "https://example.org/metadata/create-origin",
- ),
- (
- "raw-metadata-add-to-origin.xml",
- "https://example.org/metadata/add-to-origin",
- ),
- ("raw-metadata-no-swh.xml", None),
- ],
-)
-def test_parse_swh_origins(datadir, raw_metadata_file, expected_url):
- metadata_path = join(datadir, "deposit", raw_metadata_file)
- with open(metadata_path, "r") as f:
- raw_metadata = f.read()
-
- actual_url = utils.parse_swh_deposit_origin(raw_metadata)
-
- assert actual_url == expected_url
-
-
def add(x, y):
return x + y
def test_django_cache(mocker):
"""Decorated function should be called once and returned value
put in django cache."""
spy_add = mocker.spy(sys.modules[__name__], "add")
spy_cache_set = mocker.spy(utils.cache, "set")
cached_add = utils.django_cache()(add)
val = cached_add(1, 2)
val2 = cached_add(1, 2)
assert val == val2 == 3
assert spy_add.call_count == 1
assert spy_cache_set.call_count == 1
def test_django_cache_invalidate_cache_pred(mocker):
"""Decorated function should be called twice and returned value
put in django cache twice."""
spy_add = mocker.spy(sys.modules[__name__], "add")
spy_cache_set = mocker.spy(utils.cache, "set")
cached_add = utils.django_cache(invalidate_cache_pred=lambda val: val == 3)(add)
val = cached_add(1, 2)
val2 = cached_add(1, 2)
assert val == val2 == 3
assert spy_add.call_count == 2
assert spy_cache_set.call_count == 2
def test_django_cache_raise_exception(mocker):
"""Decorated function should be called twice, exceptions should be
raised and no value put in django cache"""
spy_add = mocker.spy(sys.modules[__name__], "add")
spy_cache_set = mocker.spy(utils.cache, "set")
cached_add = utils.django_cache()(add)
with pytest.raises(TypeError):
cached_add(1, "2")
with pytest.raises(TypeError):
cached_add(1, "2")
assert spy_add.call_count == 2
assert spy_cache_set.call_count == 0
def test_django_cache_catch_exception(mocker):
"""Decorated function should be called twice, exceptions should not be
raised, specified fallback value should be returned and no value put
in django cache"""
spy_add = mocker.spy(sys.modules[__name__], "add")
spy_cache_set = mocker.spy(utils.cache, "set")
cached_add = utils.django_cache(
catch_exception=True, exception_return_value=math.nan
)(add)
val = cached_add(1, "2")
val2 = cached_add(1, "2")
assert math.isnan(val)
assert math.isnan(val2)
assert spy_add.call_count == 2
assert spy_cache_set.call_count == 0