`);
// Adding exclusion pattern update behavior, when typing, update search
$('#swh-admin-deposit-list-exclude-filter').keyup(function() {
depositsTable.draw();
});
// at last draw the table
depositsTable.draw();
});
$('a.toggle-col').on('click', function(e) {
e.preventDefault();
var column = depositsTable.column($(this).attr('data-column'));
column.visible(!column.visible());
if (column.visible()) {
$(this).removeClass('col-hidden');
} else {
$(this).addClass('col-hidden');
}
});
}
diff --git a/swh/web/admin/deposit.py b/swh/web/admin/deposit.py
index 58e2e097..e6bb3cf5 100644
--- a/swh/web/admin/deposit.py
+++ b/swh/web/admin/deposit.py
@@ -1,87 +1,92 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import sentry_sdk
from django.conf import settings
-from django.contrib.admin.views.decorators import staff_member_required
+from django.contrib.auth.decorators import user_passes_test
from django.core.paginator import Paginator
from django.http import JsonResponse
from django.shortcuts import render
from swh.web.admin.adminurls import admin_route
+from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION
from swh.web.common.utils import get_deposits_list
+def _can_list_deposits(user):
+ return user.is_staff or user.has_perm(ADMIN_LIST_DEPOSIT_PERMISSION)
+
+
@admin_route(r"deposit/", view_name="admin-deposit")
-@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
+@user_passes_test(_can_list_deposits, login_url=settings.LOGIN_URL)
def _admin_origin_save(request):
return render(request, "admin/deposit.html")
@admin_route(r"deposit/list/", view_name="admin-deposit-list")
-@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
+@user_passes_test(_can_list_deposits, login_url=settings.LOGIN_URL)
def _admin_deposit_list(request):
table_data = {}
table_data["draw"] = int(request.GET["draw"])
try:
- deposits = get_deposits_list()
+ deposits = get_deposits_list(request.GET.get("username"))
deposits_count = len(deposits)
search_value = request.GET["search[value]"]
if search_value:
deposits = [
d
for d in deposits
if any(
search_value.lower() in val
for val in [str(v).lower() for v in d.values()]
)
]
exclude_pattern = request.GET.get("excludePattern")
if exclude_pattern:
deposits = [
d
for d in deposits
if all(
exclude_pattern.lower() not in val
for val in [str(v).lower() for v in d.values()]
)
]
column_order = request.GET["order[0][column]"]
field_order = request.GET["columns[%s][name]" % column_order]
order_dir = request.GET["order[0][dir]"]
deposits = sorted(deposits, key=lambda d: d[field_order] or "")
if order_dir == "desc":
deposits = list(reversed(deposits))
length = int(request.GET["length"])
page = int(request.GET["start"]) / length + 1
paginator = Paginator(deposits, length)
data = paginator.page(page).object_list
table_data["recordsTotal"] = deposits_count
table_data["recordsFiltered"] = len(deposits)
table_data["data"] = [
{
"id": d["id"],
"external_id": d["external_id"],
"reception_date": d["reception_date"],
"status": d["status"],
"status_detail": d["status_detail"],
"swhid": d["swhid"],
"swhid_context": d["swhid_context"],
}
for d in data
]
except Exception as exc:
sentry_sdk.capture_exception(exc)
table_data["error"] = (
"An error occurred while retrieving " "the list of deposits !"
)
return JsonResponse(table_data)
diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py
index 0f13e78e..e213a714 100644
--- a/swh/web/auth/utils.py
+++ b/swh/web/auth/utils.py
@@ -1,95 +1,96 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from base64 import urlsafe_b64encode
from typing import List
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from django.http.request import HttpRequest
OIDC_SWH_WEB_CLIENT_ID = "swh-web"
SWH_AMBASSADOR_PERMISSION = "swh.ambassador"
API_SAVE_ORIGIN_PERMISSION = "swh.web.api.save_origin"
+ADMIN_LIST_DEPOSIT_PERMISSION = "swh.web.admin.list_deposits"
def _get_fernet(password: bytes, salt: bytes) -> Fernet:
"""
Instantiate a Fernet system from a password and a salt value
(see https://cryptography.io/en/latest/fernet/).
Args:
password: user password that will be used to generate a Fernet key
derivation function
salt: value that will be used to generate a Fernet key
derivation function
Returns:
The Fernet system
"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend(),
)
key = urlsafe_b64encode(kdf.derive(password))
return Fernet(key)
def encrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes:
"""
Encrypt data using Fernet system (symmetric encryption).
Args:
data: input data to encrypt
password: user password that will be used to generate a Fernet key
derivation function
salt: value that will be used to generate a Fernet key
derivation function
Returns:
The encrypted data
"""
return _get_fernet(password, salt).encrypt(data)
def decrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes:
"""
Decrypt data using Fernet system (symmetric encryption).
Args:
data: input data to decrypt
password: user password that will be used to generate a Fernet key
derivation function
salt: value that will be used to generate a Fernet key
derivation function
Returns:
The decrypted data
"""
return _get_fernet(password, salt).decrypt(data)
def privileged_user(request: HttpRequest, permissions: List[str] = []) -> bool:
"""Determine whether a user is authenticated and is a privileged one (e.g ambassador).
This allows such user to have access to some more actions (e.g. bypass save code now
review, access to 'archives' type...).
A user is considered as privileged if he is a staff member or has any permission
from those provided as parameters.
Args:
request: Input django HTTP request
permissions: list of permission names to determine if user is privileged or not
Returns:
Whether the user is privileged or not.
"""
user = request.user
return user.is_authenticated and (
user.is_staff or any([user.has_perm(perm) for perm in permissions])
)
diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
index 34d3a65a..21ff2135 100644
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -1,400 +1,416 @@
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import os
import re
from typing import Any, Dict, List, Optional
from bs4 import BeautifulSoup
from docutils.core import publish_parts
import docutils.parsers.rst
import docutils.utils
from docutils.writers.html5_polyglot import HTMLTranslator, Writer
from iso8601 import ParseError, parse_date
from pkg_resources import get_distribution
from prometheus_client.registry import CollectorRegistry
import requests
from requests.auth import HTTPBasicAuth
from django.core.cache import cache
from django.http import HttpRequest, QueryDict
from django.urls import reverse as django_reverse
+from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import QueryParameters
from swh.web.config import get_config, search
SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True)
swh_object_icons = {
"alias": "mdi mdi-star",
"branch": "mdi mdi-source-branch",
"branches": "mdi mdi-source-branch",
"content": "mdi mdi-file-document",
"cnt": "mdi mdi-file-document",
"directory": "mdi mdi-folder",
"dir": "mdi mdi-folder",
"origin": "mdi mdi-source-repository",
"ori": "mdi mdi-source-repository",
"person": "mdi mdi-account",
"revisions history": "mdi mdi-history",
"release": "mdi mdi-tag",
"rel": "mdi mdi-tag",
"releases": "mdi mdi-tag",
"revision": "mdi mdi-rotate-90 mdi-source-commit",
"rev": "mdi mdi-rotate-90 mdi-source-commit",
"snapshot": "mdi mdi-camera",
"snp": "mdi mdi-camera",
"visits": "mdi mdi-calendar-month",
}
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[QueryParameters] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url
def datetime_to_utc(date):
"""Returns datetime in UTC without timezone info
Args:
date (datetime.datetime): input datetime with timezone info
Returns:
datetime.datetime: datetime in UTC without timezone info
"""
if date.tzinfo and date.tzinfo != timezone.utc:
return date.astimezone(tz=timezone.utc)
else:
return date
def parse_iso8601_date_to_utc(iso_date: str) -> datetime:
"""Given an ISO 8601 datetime string, parse the result as UTC datetime.
Returns:
a timezone-aware datetime representing the parsed date
Raises:
swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- 2007-01-14T20:34:22Z
"""
try:
date = parse_date(iso_date)
return datetime_to_utc(date)
except ParseError as e:
raise BadInputExc(e)
def shorten_path(path):
"""Shorten the given path: for each hash present, only return the first
8 characters followed by an ellipsis"""
sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}"
sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}"
ret = re.sub(sha256_re, r"\1...", path)
return re.sub(sha1_re, r"\1...", ret)
def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"):
"""Turns a string representation of an ISO 8601 datetime string
to UTC and format it into a more human readable one.
For instance, from the following input
string: '2017-05-04T13:27:13+02:00' the following one
is returned: '04 May 2017, 11:27 UTC'.
Custom format string may also be provided
as parameter
Args:
iso_date (str): a string representation of an ISO 8601 date
fmt (str): optional date formatting string
Returns:
str: a formatted string representation of the input iso date
"""
if not iso_date:
return iso_date
date = parse_iso8601_date_to_utc(iso_date)
return date.strftime(fmt)
def gen_path_info(path):
"""Function to generate path data navigation for use
with a breadcrumb in the swh web ui.
For instance, from a path /folder1/folder2/folder3,
it returns the following list::
[{'name': 'folder1', 'path': 'folder1'},
{'name': 'folder2', 'path': 'folder1/folder2'},
{'name': 'folder3', 'path': 'folder1/folder2/folder3'}]
Args:
path: a filesystem path
Returns:
list: a list of path data for navigation as illustrated above.
"""
path_info = []
if path:
sub_paths = path.strip("/").split("/")
path_from_root = ""
for p in sub_paths:
path_from_root += "/" + p
path_info.append({"name": p, "path": path_from_root.strip("/")})
return path_info
def parse_rst(text, report_level=2):
"""
Parse a reStructuredText string with docutils.
Args:
text (str): string with reStructuredText markups in it
report_level (int): level of docutils report messages to print
(1 info 2 warning 3 error 4 severe 5 none)
Returns:
docutils.nodes.document: a parsed docutils document
"""
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components
).get_default_values()
settings.report_level = report_level
document = docutils.utils.new_document("rst-doc", settings=settings)
parser.parse(text, document)
return document
def get_client_ip(request):
"""
Return the client IP address from an incoming HTTP request.
Args:
request (django.http.HttpRequest): the incoming HTTP request
Returns:
str: The client IP address
"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip
browsers_supported_image_mimes = set(
[
"image/gif",
"image/png",
"image/jpeg",
"image/bmp",
"image/webp",
"image/svg",
"image/svg+xml",
]
)
def context_processor(request):
"""
Django context processor used to inject variables
in all swh-web templates.
"""
config = get_config()
if (
hasattr(request, "user")
and request.user.is_authenticated
and not hasattr(request.user, "backend")
):
# To avoid django.template.base.VariableDoesNotExist errors
# when rendering templates when standard Django user is logged in.
request.user.backend = "django.contrib.auth.backends.ModelBackend"
site_base_url = request.build_absolute_uri("/")
return {
"swh_object_icons": swh_object_icons,
"available_languages": None,
"swh_client_config": config["client_config"],
"oidc_enabled": bool(config["keycloak"]["server_url"]),
"browsers_supported_image_mimes": browsers_supported_image_mimes,
"keycloak": config["keycloak"],
"site_base_url": site_base_url,
"DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"],
"status": config["status"],
"swh_web_dev": "localhost" in site_base_url,
"swh_web_staging": any(
[
server_name in site_base_url
for server_name in config["staging_server_names"]
]
),
"swh_web_version": get_distribution("swh.web").version,
"iframe_mode": False,
+ "ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION,
}
def resolve_branch_alias(
snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Resolve branch alias in snapshot content.
Args:
snapshot: a full snapshot content
branch: a branch alias contained in the snapshot
Returns:
The real snapshot branch that got aliased.
"""
while branch and branch["target_type"] == "alias":
if branch["target"] in snapshot["branches"]:
branch = snapshot["branches"][branch["target"]]
else:
from swh.web.common import archive
snp = archive.lookup_snapshot(
snapshot["id"], branches_from=branch["target"], branches_count=1
)
if snp and branch["target"] in snp["branches"]:
branch = snp["branches"][branch["target"]]
else:
branch = None
return branch
class _NoHeaderHTMLTranslator(HTMLTranslator):
"""
Docutils translator subclass to customize the generation of HTML
from reST-formatted docstrings
"""
def __init__(self, document):
super().__init__(document)
self.body_prefix = []
self.body_suffix = []
_HTML_WRITER = Writer()
_HTML_WRITER.translator_class = _NoHeaderHTMLTranslator
def rst_to_html(rst: str) -> str:
"""
Convert reStructuredText document into HTML.
Args:
rst: A string containing a reStructuredText document
Returns:
Body content of the produced HTML conversion.
"""
settings = {
"initial_header_level": 2,
"halt_level": 4,
"traceback": True,
}
pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings)
return f'
{pp["html_body"]}
'
def prettify_html(html: str) -> str:
"""
Prettify an HTML document.
Args:
html: Input HTML document
Returns:
The prettified HTML document
"""
return BeautifulSoup(html, "lxml").prettify()
-def get_deposits_list() -> List[Dict[str, Any]]:
+def _deposits_list_url(
+ deposits_list_base_url: str, page_size: int, username: Optional[str]
+) -> str:
+ deposits_list_url = f"{deposits_list_base_url}?page_size={page_size}"
+ if username is not None:
+ deposits_list_url += f"&username={username}"
+ return deposits_list_url
+
+
+def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]:
"""Return the list of software deposits using swh-deposit API
"""
config = get_config()["deposit"]
- deposits_list_url = config["private_api_url"] + "deposits"
+ deposits_list_base_url = config["private_api_url"] + "deposits"
deposits_list_auth = HTTPBasicAuth(
config["private_api_user"], config["private_api_password"]
)
+ deposits_list_url = _deposits_list_url(
+ deposits_list_base_url, page_size=1, username=username
+ )
+
nb_deposits = requests.get(
- "%s?page_size=1" % deposits_list_url, auth=deposits_list_auth, timeout=30
+ deposits_list_url, auth=deposits_list_auth, timeout=30
).json()["count"]
- deposits_data = cache.get("swh-deposit-list")
+ deposits_data = cache.get(f"swh-deposit-list-{username}")
if not deposits_data or deposits_data["count"] != nb_deposits:
+ deposits_list_url = _deposits_list_url(
+ deposits_list_base_url, page_size=nb_deposits, username=username
+ )
deposits_data = requests.get(
- "%s?page_size=%s" % (deposits_list_url, nb_deposits),
- auth=deposits_list_auth,
- timeout=30,
+ deposits_list_url, auth=deposits_list_auth, timeout=30,
).json()
- cache.set("swh-deposit-list", deposits_data)
+ cache.set(f"swh-deposit-list-{username}", deposits_data)
return deposits_data["results"]
def origin_visit_types() -> List[str]:
"""Return the exhaustive list of visit types for origins
ingested into the archive.
"""
try:
return sorted(search().visit_types_count().keys())
except Exception:
return []
diff --git a/swh/web/templates/admin/deposit.html b/swh/web/templates/admin/deposit.html
index a3865ec7..a15647c0 100644
--- a/swh/web/templates/admin/deposit.html
+++ b/swh/web/templates/admin/deposit.html
@@ -1,59 +1,59 @@
{% extends "layout.html" %}
{% comment %}
-Copyright (C) 2018-2019 The Software Heritage developers
+Copyright (C) 2018-2021 The Software Heritage developers
See the AUTHORS file at the top-level directory of this distribution
License: GNU Affero General Public License version 3, or any later version
See top-level LICENSE file for more information
{% endcomment %}
{% load swh_templatetags %}
{% load render_bundle from webpack_loader %}
{% block header %}
{{ block.super }}
{% render_bundle 'admin' %}
{% endblock %}
{% block title %} Deposit administration {% endblock %}
{% block navbar-content %}
Deposit administration
{% endblock %}
{% block content %}
The table below displays the list of software deposits into the archive
submitted through HAL.
{% endblock content %}
diff --git a/swh/web/templates/layout.html b/swh/web/templates/layout.html
index d9b52a33..fb5622aa 100644
--- a/swh/web/templates/layout.html
+++ b/swh/web/templates/layout.html
@@ -1,288 +1,290 @@
{% comment %}
Copyright (C) 2015-2021 The Software Heritage developers
See the AUTHORS file at the top-level directory of this distribution
License: GNU Affero General Public License version 3, or any later version
See top-level LICENSE file for more information
{% endcomment %}
{% load js_reverse %}
{% load static %}
{% load render_bundle from webpack_loader %}
{% load swh_templatetags %}
{% block title %}{% endblock %}
{% render_bundle 'vendors' %}
{% render_bundle 'webapp' %}
{% render_bundle 'guided_tour' %}
{{ request.user.is_authenticated|json_script:"swh_user_logged_in" }}
{% block header %}{% endblock %}
{% if not swh_web_dev and not swh_web_staging %}
{% endif %}