`);
+
+ // Show a modal when the "metadata" button is clicked
+ $('#swh-admin-deposit-list tbody').on('click', 'tr button.metadata', function() {
+ var row = depositsTable.row(this.parentNode.parentNode).data();
+ var metadata = row.raw_metadata;
+ var escapedMetadata = $('').text(metadata).html();
+ swh.webapp.showModalHtml(`Metadata of deposit ${row.id}`,
+ `
${escapedMetadata}
`,
+ '90%');
+ swh.webapp.highlightCode();
+ });
+
// Adding exclusion pattern update behavior, when typing, update search
$('#swh-admin-deposit-list-exclude-filter').keyup(function() {
depositsTable.draw();
});
+
// at last draw the table
depositsTable.draw();
});
$('a.toggle-col').on('click', function(e) {
e.preventDefault();
var column = depositsTable.column($(this).attr('data-column'));
column.visible(!column.visible());
if (column.visible()) {
$(this).removeClass('col-hidden');
} else {
$(this).addClass('col-hidden');
}
});
}
diff --git a/cypress/integration/deposit-admin.spec.js b/cypress/integration/deposit-admin.spec.js
index c8949e39..f3e18a49 100644
--- a/cypress/integration/deposit-admin.spec.js
+++ b/cypress/integration/deposit-admin.spec.js
@@ -1,208 +1,226 @@
/**
* Copyright (C) 2020-2022 The Software Heritage developers
* See the AUTHORS file at the top-level directory of this distribution
* License: GNU Affero General Public License version 3, or any later version
* See top-level LICENSE file for more information
*/
// data to use as request query response
let responseDeposits;
let expectedOrigins;
let depositModerationUrl;
let depositListUrl;
describe('Test moderation deposit Login/logout', function() {
before(function() {
depositModerationUrl = this.Urls.admin_deposit();
});
it('should not display deposit moderation link in sidebar when anonymous', function() {
cy.visit(depositModerationUrl);
cy.get(`.sidebar a[href="${depositModerationUrl}"]`)
.should('not.exist');
});
it('should not display deposit moderation link when connected as unprivileged user', function() {
cy.userLogin();
cy.visit(depositModerationUrl);
cy.get(`.sidebar a[href="${depositModerationUrl}"]`)
.should('not.exist');
});
it('should display deposit moderation link in sidebar when connected as privileged user', function() {
cy.depositLogin();
cy.visit(depositModerationUrl);
cy.get(`.sidebar a[href="${depositModerationUrl}"]`)
.should('exist');
});
it('should display deposit moderation link in sidebar when connected as staff member', function() {
cy.adminLogin();
cy.visit(depositModerationUrl);
cy.get(`.sidebar a[href="${depositModerationUrl}"]`)
.should('exist');
});
});
describe('Test admin deposit page', function() {
before(function() {
depositModerationUrl = this.Urls.admin_deposit();
depositListUrl = this.Urls.admin_deposit_list();
});
beforeEach(() => {
responseDeposits = [
{
'id': 614,
'type': 'code',
'external_id': 'ch-de-1',
'reception_date': '2020-05-18T13:48:27Z',
'status': 'done',
'status_detail': null,
'swhid': 'swh:1:dir:ef04a768',
'swhid_context': 'swh:1:dir:ef04a768;origin=https://w.s.o/c-d-1;visit=swh:1:snp:b234be1e;anchor=swh:1:rev:d24a75c9;path=/',
+ 'raw_metadata': 'bar',
'uri': 'https://w.s.o/c-d-1'
},
{
'id': 613,
'type': 'code',
'external_id': 'ch-de-2',
'reception_date': '2020-05-18T11:20:16Z',
'status': 'done',
'status_detail': null,
'swhid': 'swh:1:dir:181417fb',
'swhid_context': 'swh:1:dir:181417fb;origin=https://w.s.o/c-d-2;visit=swh:1:snp:8c32a2ef;anchor=swh:1:rev:3d1eba04;path=/',
+ 'raw_metadata': null,
'uri': 'https://w.s.o/c-d-2'
},
{
'id': 612,
'type': 'code',
'external_id': 'ch-de-3',
'reception_date': '2020-05-18T11:20:16Z',
'status': 'rejected',
'status_detail': 'incomplete deposit!',
'swhid': null,
'swhid_context': null,
+ 'raw_metadata': null,
'uri': null
}
];
// those are computed from the
expectedOrigins = {
614: 'https://w.s.o/c-d-1',
613: 'https://w.s.o/c-d-2',
612: ''
};
});
- it('Should display properly entries', function() {
+ it('Should properly display entries', function() {
cy.adminLogin();
const testDeposits = responseDeposits;
cy.intercept(`${depositListUrl}**`, {
body: {
'draw': 10,
'recordsTotal': testDeposits.length,
'recordsFiltered': testDeposits.length,
'data': testDeposits
}
}).as('listDeposits');
cy.visit(depositModerationUrl);
cy.location('pathname')
.should('be.equal', depositModerationUrl);
cy.get('#swh-admin-deposit-list')
.should('exist');
cy.wait('@listDeposits').then((xhr) => {
cy.log('response:', xhr.response);
cy.log(xhr.response.body);
const deposits = xhr.response.body.data;
cy.log('Deposits: ', deposits);
expect(deposits.length).to.equal(testDeposits.length);
cy.get('#swh-admin-deposit-list').find('tbody > tr').as('rows');
// only 2 entries
cy.get('@rows').each((row, idx, collection) => {
const deposit = deposits[idx];
const responseDeposit = testDeposits[idx];
assert.isNotNull(deposit);
assert.isNotNull(responseDeposit);
expect(deposit.id).to.be.equal(responseDeposit['id']);
expect(deposit.uri).to.be.equal(responseDeposit['uri']);
expect(deposit.type).to.be.equal(responseDeposit['type']);
expect(deposit.external_id).to.be.equal(responseDeposit['external_id']);
expect(deposit.status).to.be.equal(responseDeposit['status']);
expect(deposit.status_detail).to.be.equal(responseDeposit['status_detail']);
expect(deposit.swhid).to.be.equal(responseDeposit['swhid']);
expect(deposit.swhid_context).to.be.equal(responseDeposit['swhid_context']);
const expectedOrigin = expectedOrigins[deposit.id];
// ensure it's in the dom
cy.contains(deposit.id).should('be.visible');
if (deposit.status !== 'rejected') {
expect(row).to.not.contain(deposit.external_id);
cy.contains(expectedOrigin).should('be.visible');
}
cy.contains(deposit.status).should('be.visible');
// those are hidden by default, so now visible
if (deposit.status_detail !== null) {
cy.contains(deposit.status_detail).should('not.exist');
}
// those are hidden by default
if (deposit.swhid !== null) {
cy.contains(deposit.swhid).should('not.exist');
cy.contains(deposit.swhid_context).should('not.exist');
}
+
+ if (deposit.raw_metadata !== null) {
+ cy.get('button.metadata', {withinSubject: row})
+ .should('exist')
+ .click({force: true});
+ cy.get('#swh-web-modal-html code.xml').should('be.visible');
+
+ // Dismiss the modal
+ cy.get('body').wait(500).type('{esc}');
+ cy.get('#swh-web-modal-html code.xml').should('not.be.visible');
+ } else {
+ cy.get('button.metadata', {withinSubject: row}).should('not.exist');
+ cy.get('#swh-web-modal-html code.xml').should('not.be.visible');
+ }
+
});
// toggling all links and ensure, the previous checks are inverted
cy.get('a.toggle-col').click({'multiple': true}).then(() => {
cy.get('#swh-admin-deposit-list').find('tbody > tr').as('rows');
cy.get('@rows').each((row, idx, collection) => {
const deposit = deposits[idx];
const expectedOrigin = expectedOrigins[deposit.id];
// ensure it's in the dom
- cy.contains(deposit.id).should('not.exist');
+ expect(row).to.not.contain(deposit.id);
if (deposit.status !== 'rejected') {
expect(row).to.not.contain(deposit.external_id);
expect(row).to.contain(expectedOrigin);
}
expect(row).to.not.contain(deposit.status);
// those are hidden by default, so now visible
if (deposit.status_detail !== null) {
cy.contains(deposit.status_detail).should('be.visible');
}
// those are hidden by default, so now they should be visible
if (deposit.swhid !== null) {
cy.contains(deposit.swhid).should('be.visible');
cy.contains(deposit.swhid_context).should('be.visible');
// check SWHID link text formatting
cy.contains(deposit.swhid_context).then(elt => {
expect(elt[0].innerHTML).to.equal(deposit.swhid_context.replace(/;/g, '; '));
});
}
});
});
cy.get('#swh-admin-deposit-list-error')
.should('not.contain',
'An error occurred while retrieving the list of deposits');
});
});
});
diff --git a/swh/web/admin/deposit.py b/swh/web/admin/deposit.py
index f36e801f..0a613170 100644
--- a/swh/web/admin/deposit.py
+++ b/swh/web/admin/deposit.py
@@ -1,129 +1,135 @@
# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import sentry_sdk
from django.conf import settings
from django.contrib.auth.decorators import user_passes_test
from django.core.paginator import Paginator
from django.http import JsonResponse
from django.shortcuts import render
from swh.web.admin.adminurls import admin_route
from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION
from swh.web.common.utils import (
+ get_deposit_raw_metadata,
get_deposits_list,
parse_swh_deposit_origin,
parse_swh_metadata_provenance,
)
def _can_list_deposits(user):
return user.is_staff or user.has_perm(ADMIN_LIST_DEPOSIT_PERMISSION)
@admin_route(r"deposit/", view_name="admin-deposit")
@user_passes_test(_can_list_deposits, login_url=settings.LOGIN_URL)
def _admin_origin_save(request):
return render(request, "admin/deposit.html")
@admin_route(r"deposit/list/", view_name="admin-deposit-list")
@user_passes_test(_can_list_deposits, login_url=settings.LOGIN_URL)
def _admin_deposit_list(request):
table_data = {}
table_data["draw"] = int(request.GET["draw"])
try:
deposits = get_deposits_list(request.GET.get("username"))
deposits_count = len(deposits)
search_value = request.GET["search[value]"]
if search_value:
deposits = [
d
for d in deposits
if any(
search_value.lower() in val
for val in [str(v).lower() for v in d.values()]
)
]
exclude_pattern = request.GET.get("excludePattern")
if exclude_pattern:
deposits = [
d
for d in deposits
if all(
exclude_pattern.lower() not in val
for val in [str(v).lower() for v in d.values()]
)
]
column_order = request.GET["order[0][column]"]
field_order = request.GET["columns[%s][name]" % column_order]
order_dir = request.GET["order[0][dir]"]
deposits = sorted(deposits, key=lambda d: d[field_order] or "")
if order_dir == "desc":
deposits = list(reversed(deposits))
length = int(request.GET["length"])
page = int(request.GET["start"]) / length + 1
paginator = Paginator(deposits, length)
data = paginator.page(page).object_list
table_data["recordsTotal"] = deposits_count
table_data["recordsFiltered"] = len(deposits)
data_list = []
for d in data:
data_dict = {
"id": d["id"],
"type": d["type"],
"external_id": d["external_id"],
"reception_date": d["reception_date"],
"status": d["status"],
"status_detail": d["status_detail"],
"swhid": d["swhid"],
"swhid_context": d["swhid_context"],
}
provenance = None
raw_metadata = d["raw_metadata"]
# for meta deposit, the uri should be the url provenance
if raw_metadata and d["type"] == "meta": # metadata provenance
provenance = parse_swh_metadata_provenance(d["raw_metadata"])
# For code deposits the uri is the origin
# First, trying to determine it out of the raw metadata associated with the
# deposit
elif raw_metadata and d["type"] == "code":
provenance = parse_swh_deposit_origin(raw_metadata)
# For code deposits, if not provided, use the origin_url
if not provenance and d["type"] == "code":
if d["origin_url"]:
provenance = d["origin_url"]
# If still not found, fallback using the swhid context
if not provenance and d["swhid_context"]:
# Trying to compute the origin as we did before in the js
from swh.model.swhids import QualifiedSWHID
swhid = QualifiedSWHID.from_string(d["swhid_context"])
provenance = swhid.origin
data_dict["uri"] = provenance # could be None
# This could be large. As this is not displayed yet, drop it to avoid
# cluttering the data dict
data_dict.pop("raw_metadata", None)
data_list.append(data_dict)
table_data["data"] = data_list
+ for row in table_data["data"]:
+ metadata = get_deposit_raw_metadata(row["id"])
+ if metadata:
+ row["raw_metadata"] = metadata[-1]
+ else:
+ row["raw_metadata"] = None
+
except Exception as exc:
sentry_sdk.capture_exception(exc)
- table_data[
- "error"
- ] = "An error occurred while retrieving the list of deposits !"
+ table_data["error"] = f"Could not retrieve deposits: {exc!r}"
return JsonResponse(table_data)
diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
index b79c7868..098bbad1 100644
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -1,535 +1,549 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import os
import re
from typing import Any, Dict, List, Optional
import urllib.parse
from xml.etree import ElementTree
from bs4 import BeautifulSoup
from docutils.core import publish_parts
import docutils.parsers.rst
import docutils.utils
from docutils.writers.html5_polyglot import HTMLTranslator, Writer
from iso8601 import ParseError, parse_date
from pkg_resources import get_distribution
from prometheus_client.registry import CollectorRegistry
import requests
from requests.auth import HTTPBasicAuth
from django.core.cache import cache
from django.http import HttpRequest, QueryDict
from django.shortcuts import redirect
from django.urls import resolve
from django.urls import reverse as django_reverse
from swh.web.auth.utils import (
ADD_FORGE_MODERATOR_PERMISSION,
ADMIN_LIST_DEPOSIT_PERMISSION,
MAILMAP_ADMIN_PERMISSION,
)
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import QueryParameters
from swh.web.config import SWH_WEB_SERVER_NAME, get_config, search
SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True)
swh_object_icons = {
"alias": "mdi mdi-star",
"branch": "mdi mdi-source-branch",
"branches": "mdi mdi-source-branch",
"content": "mdi mdi-file-document",
"cnt": "mdi mdi-file-document",
"directory": "mdi mdi-folder",
"dir": "mdi mdi-folder",
"origin": "mdi mdi-source-repository",
"ori": "mdi mdi-source-repository",
"person": "mdi mdi-account",
"revisions history": "mdi mdi-history",
"release": "mdi mdi-tag",
"rel": "mdi mdi-tag",
"releases": "mdi mdi-tag",
"revision": "mdi mdi-rotate-90 mdi-source-commit",
"rev": "mdi mdi-rotate-90 mdi-source-commit",
"snapshot": "mdi mdi-camera",
"snp": "mdi mdi-camera",
"visits": "mdi mdi-calendar-month",
}
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[QueryParameters] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url
def datetime_to_utc(date):
"""Returns datetime in UTC without timezone info
Args:
date (datetime.datetime): input datetime with timezone info
Returns:
datetime.datetime: datetime in UTC without timezone info
"""
if date.tzinfo and date.tzinfo != timezone.utc:
return date.astimezone(tz=timezone.utc)
else:
return date
def parse_iso8601_date_to_utc(iso_date: str) -> datetime:
"""Given an ISO 8601 datetime string, parse the result as UTC datetime.
Returns:
a timezone-aware datetime representing the parsed date
Raises:
swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- 2007-01-14T20:34:22Z
"""
try:
date = parse_date(iso_date)
return datetime_to_utc(date)
except ParseError as e:
raise BadInputExc(e)
def shorten_path(path):
"""Shorten the given path: for each hash present, only return the first
8 characters followed by an ellipsis"""
sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}"
sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}"
ret = re.sub(sha256_re, r"\1...", path)
return re.sub(sha1_re, r"\1...", ret)
def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"):
"""Turns a string representation of an ISO 8601 datetime string
to UTC and format it into a more human readable one.
For instance, from the following input
string: '2017-05-04T13:27:13+02:00' the following one
is returned: '04 May 2017, 11:27 UTC'.
Custom format string may also be provided
as parameter
Args:
iso_date (str): a string representation of an ISO 8601 date
fmt (str): optional date formatting string
Returns:
str: a formatted string representation of the input iso date
"""
if not iso_date:
return iso_date
date = parse_iso8601_date_to_utc(iso_date)
return date.strftime(fmt)
def gen_path_info(path):
"""Function to generate path data navigation for use
with a breadcrumb in the swh web ui.
For instance, from a path /folder1/folder2/folder3,
it returns the following list::
[{'name': 'folder1', 'path': 'folder1'},
{'name': 'folder2', 'path': 'folder1/folder2'},
{'name': 'folder3', 'path': 'folder1/folder2/folder3'}]
Args:
path: a filesystem path
Returns:
list: a list of path data for navigation as illustrated above.
"""
path_info = []
if path:
sub_paths = path.strip("/").split("/")
path_from_root = ""
for p in sub_paths:
path_from_root += "/" + p
path_info.append({"name": p, "path": path_from_root.strip("/")})
return path_info
def parse_rst(text, report_level=2):
"""
Parse a reStructuredText string with docutils.
Args:
text (str): string with reStructuredText markups in it
report_level (int): level of docutils report messages to print
(1 info 2 warning 3 error 4 severe 5 none)
Returns:
docutils.nodes.document: a parsed docutils document
"""
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components
).get_default_values()
settings.report_level = report_level
document = docutils.utils.new_document("rst-doc", settings=settings)
parser.parse(text, document)
return document
def get_client_ip(request):
"""
Return the client IP address from an incoming HTTP request.
Args:
request (django.http.HttpRequest): the incoming HTTP request
Returns:
str: The client IP address
"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip
def is_swh_web_development(request: HttpRequest) -> bool:
"""Indicate if we are running a development version of swh-web.
"""
site_base_url = request.build_absolute_uri("/")
return any(
host in site_base_url for host in ("localhost", "127.0.0.1", "testserver")
)
def is_swh_web_staging(request: HttpRequest) -> bool:
"""Indicate if we are running a staging version of swh-web.
"""
config = get_config()
site_base_url = request.build_absolute_uri("/")
return any(
server_name in site_base_url for server_name in config["staging_server_names"]
)
def is_swh_web_production(request: HttpRequest) -> bool:
"""Indicate if we are running the public production version of swh-web.
"""
return SWH_WEB_SERVER_NAME in request.build_absolute_uri("/")
browsers_supported_image_mimes = set(
[
"image/gif",
"image/png",
"image/jpeg",
"image/bmp",
"image/webp",
"image/svg",
"image/svg+xml",
]
)
def context_processor(request):
"""
Django context processor used to inject variables
in all swh-web templates.
"""
config = get_config()
if (
hasattr(request, "user")
and request.user.is_authenticated
and not hasattr(request.user, "backend")
):
# To avoid django.template.base.VariableDoesNotExist errors
# when rendering templates when standard Django user is logged in.
request.user.backend = "django.contrib.auth.backends.ModelBackend"
return {
"swh_object_icons": swh_object_icons,
"available_languages": None,
"swh_client_config": config["client_config"],
"oidc_enabled": bool(config["keycloak"]["server_url"]),
"browsers_supported_image_mimes": browsers_supported_image_mimes,
"keycloak": config["keycloak"],
"site_base_url": request.build_absolute_uri("/"),
"DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"],
"status": config["status"],
"swh_web_dev": is_swh_web_development(request),
"swh_web_staging": is_swh_web_staging(request),
"swh_web_version": get_distribution("swh.web").version,
"iframe_mode": False,
"ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION,
"ADD_FORGE_MODERATOR_PERMISSION": ADD_FORGE_MODERATOR_PERMISSION,
"FEATURES": get_config()["features"],
"MAILMAP_ADMIN_PERMISSION": MAILMAP_ADMIN_PERMISSION,
}
def resolve_branch_alias(
snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Resolve branch alias in snapshot content.
Args:
snapshot: a full snapshot content
branch: a branch alias contained in the snapshot
Returns:
The real snapshot branch that got aliased.
"""
while branch and branch["target_type"] == "alias":
if branch["target"] in snapshot["branches"]:
branch = snapshot["branches"][branch["target"]]
else:
from swh.web.common import archive
snp = archive.lookup_snapshot(
snapshot["id"], branches_from=branch["target"], branches_count=1
)
if snp and branch["target"] in snp["branches"]:
branch = snp["branches"][branch["target"]]
else:
branch = None
return branch
class _NoHeaderHTMLTranslator(HTMLTranslator):
"""
Docutils translator subclass to customize the generation of HTML
from reST-formatted docstrings
"""
def __init__(self, document):
super().__init__(document)
self.body_prefix = []
self.body_suffix = []
_HTML_WRITER = Writer()
_HTML_WRITER.translator_class = _NoHeaderHTMLTranslator
def rst_to_html(rst: str) -> str:
"""
Convert reStructuredText document into HTML.
Args:
rst: A string containing a reStructuredText document
Returns:
Body content of the produced HTML conversion.
"""
settings = {
"initial_header_level": 2,
"halt_level": 4,
"traceback": True,
"file_insertion_enabled": False,
"raw_enabled": False,
}
pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings)
return f'
{pp["html_body"]}
'
def prettify_html(html: str) -> str:
"""
Prettify an HTML document.
Args:
html: Input HTML document
Returns:
The prettified HTML document
"""
return BeautifulSoup(html, "lxml").prettify()
def _deposits_list_url(
deposits_list_base_url: str, page_size: int, username: Optional[str]
) -> str:
params = {"page_size": str(page_size)}
if username is not None:
params["username"] = username
return f"{deposits_list_base_url}?{urllib.parse.urlencode(params)}"
def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]:
"""Return the list of software deposits using swh-deposit API
"""
config = get_config()["deposit"]
deposits_list_base_url = config["private_api_url"] + "deposits"
deposits_list_auth = HTTPBasicAuth(
config["private_api_user"], config["private_api_password"]
)
deposits_list_url = _deposits_list_url(
deposits_list_base_url, page_size=1, username=username
)
nb_deposits = requests.get(
deposits_list_url, auth=deposits_list_auth, timeout=30
).json()["count"]
- deposits_data = cache.get(f"swh-deposit-list-{username}")
+ cache_key = f"swh-deposit-list-{username}"
+ deposits_data = cache.get(cache_key)
if not deposits_data or deposits_data["count"] != nb_deposits:
deposits_list_url = _deposits_list_url(
deposits_list_base_url, page_size=nb_deposits, username=username
)
deposits_data = requests.get(
deposits_list_url, auth=deposits_list_auth, timeout=30,
).json()
- cache.set(f"swh-deposit-list-{username}", deposits_data)
+ cache.set(cache_key, deposits_data)
return deposits_data["results"]
+def get_deposit_raw_metadata(deposit_id: int) -> Optional[str]:
+ cache_key = f"swh-deposit-raw-metadata-{deposit_id}"
+ metadata = cache.get(cache_key)
+ if metadata is None:
+ config = get_config()["deposit"]
+
+ url = f"{config['private_api_url']}/{deposit_id}/meta"
+ metadata = requests.get(url).json()["metadata_raw"]
+ cache.set(cache_key, metadata)
+
+ return metadata
+
+
def origin_visit_types() -> List[str]:
"""Return the exhaustive list of visit types for origins
ingested into the archive.
"""
try:
return sorted(search().visit_types_count().keys())
except Exception:
return []
def redirect_to_new_route(request, new_route, permanent=True):
"""Redirect a request to another route with url args and query parameters
eg: /origin//log?path=test can be redirected as
/log?url=&path=test. This can be used to deprecate routes
"""
request_path = resolve(request.path_info)
args = {**request_path.kwargs, **request.GET.dict()}
return redirect(reverse(new_route, query_params=args), permanent=permanent,)
NAMESPACES = {
"swh": "https://www.softwareheritage.org/schema/2018/deposit",
"schema": "http://schema.org/",
}
def parse_swh_metadata_provenance(raw_metadata: str) -> Optional[str]:
"""Parse swh metadata-provenance out of the raw metadata deposit. If found, returns the
value, None otherwise.
.. code-block:: xml
https://example.org/metadata/url
Args:
raw_metadata: raw metadata out of deposits received
Returns:
Either the metadata provenance url if any or None otherwise
"""
metadata = ElementTree.fromstring(raw_metadata)
url = metadata.findtext(
"swh:deposit/swh:metadata-provenance/schema:url", namespaces=NAMESPACES,
)
return url or None
def parse_swh_deposit_origin(raw_metadata: str) -> Optional[str]:
"""Parses and from metadata document,
if any. They are mutually exclusive and tested as such in the deposit.
.. code-block:: xml
.. code-block:: xml
Returns:
The one not null if any, None otherwise
"""
metadata = ElementTree.fromstring(raw_metadata)
for origin_tag in ["create_origin", "add_to_origin"]:
elt = metadata.find(
f"swh:deposit/swh:{origin_tag}/swh:origin[@url]", namespaces=NAMESPACES
)
if elt is not None:
return elt.attrib["url"]
return None
def has_add_forge_now_permission(user) -> bool:
"""Is a user considered an add-forge-now moderator?
Returns
True if a user is staff or has add forge now moderator permission
"""
return user.is_staff or user.has_perm(ADD_FORGE_MODERATOR_PERMISSION)
diff --git a/swh/web/templates/admin/deposit.html b/swh/web/templates/admin/deposit.html
index ac64670b..ba776f41 100644
--- a/swh/web/templates/admin/deposit.html
+++ b/swh/web/templates/admin/deposit.html
@@ -1,61 +1,63 @@
{% extends "layout.html" %}
{% comment %}
Copyright (C) 2018-2021 The Software Heritage developers
See the AUTHORS file at the top-level directory of this distribution
License: GNU Affero General Public License version 3, or any later version
See top-level LICENSE file for more information
{% endcomment %}
{% load swh_templatetags %}
{% load render_bundle from webpack_loader %}
{% block header %}
{{ block.super }}
{% render_bundle 'admin' %}
{% endblock %}
{% block title %} Deposit administration {% endblock %}
{% block navbar-content %}
Deposit administration
{% endblock %}
{% block content %}
The table below displays the list of software artifacts deposited to
Software Heritage.