diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py
index debf4efd..d7160d22 100644
--- a/swh/web/common/identifiers.py
+++ b/swh/web/common/identifiers.py
@@ -1,390 +1,387 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict, Iterable, List, Optional
from urllib.parse import quote, unquote
from typing_extensions import TypedDict
from django.http import QueryDict
from swh.model.exceptions import ValidationError
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.swhids import ObjectType, QualifiedSWHID
from swh.web.common import archive
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import (
QueryParameters,
SnapshotContext,
SWHIDContext,
SWHIDInfo,
SWHObjectInfo,
)
from swh.web.common.utils import reverse
def parse_object_type(object_type: str) -> ObjectType:
try:
return ObjectType[object_type.upper()]
except KeyError:
valid_types = ", ".join(variant.name.lower() for variant in ObjectType)
raise BadInputExc(
f"Invalid swh object type! Valid types are {valid_types}; not {object_type}"
)
def gen_swhid(
object_type: ObjectType,
object_id: str,
scheme_version: int = 1,
metadata: SWHIDContext = {},
) -> str:
"""
Returns the SoftWare Heritage persistent IDentifier for a swh object based on:
* the object type
* the object id
* the SWHID scheme version
Args:
object_type: the swh object type
(content/directory/release/revision/snapshot)
object_id: the swh object id (hexadecimal representation
of its hash value)
scheme_version: the scheme version of the SWHIDs
Returns:
the SWHID of the object
Raises:
BadInputExc: if the provided parameters do not enable to
generate a valid identifier
"""
try:
decoded_object_id = hash_to_bytes(object_id)
obj_swhid = str(
QualifiedSWHID(
object_type=object_type,
object_id=decoded_object_id,
scheme_version=scheme_version,
**metadata,
)
)
except (ValidationError, KeyError, ValueError) as e:
raise BadInputExc("Invalid object (%s) for SWHID. %s" % (object_id, e))
else:
return obj_swhid
class ResolvedSWHID(TypedDict):
"""parsed SWHID with context"""
swhid_parsed: QualifiedSWHID
"""URL to browse object according to SWHID context"""
browse_url: Optional[str]
def resolve_swhid(
swhid: str, query_params: Optional[QueryParameters] = None
) -> ResolvedSWHID:
"""
Try to resolve a SoftWare Heritage persistent IDentifier into an url for
browsing the targeted object.
Args:
swhid: a SoftWare Heritage persistent IDentifier
query_params: optional dict filled with
query parameters to append to the browse url
Returns:
a dict with the following keys:
* **swhid_parsed**: the parsed identifier
* **browse_url**: the url for browsing the targeted object
"""
swhid_parsed = get_swhid(swhid)
object_type = swhid_parsed.object_type
object_id = swhid_parsed.object_id
browse_url = None
url_args = {}
query_dict = QueryDict("", mutable=True)
fragment = ""
process_lines = object_type == ObjectType.CONTENT
if query_params and len(query_params) > 0:
for k in sorted(query_params.keys()):
- query_dict[k] = query_params[k]
+ query_dict[k] = str(query_params[k])
if swhid_parsed.origin:
origin_url = unquote(swhid_parsed.origin)
origin_url = archive.lookup_origin({"url": origin_url})["url"]
query_dict["origin_url"] = origin_url
if swhid_parsed.path and swhid_parsed.path != b"/":
query_dict["path"] = swhid_parsed.path.decode("utf8", errors="replace")
if swhid_parsed.anchor:
directory = b""
if swhid_parsed.anchor.object_type == ObjectType.DIRECTORY:
directory = swhid_parsed.anchor.object_id
elif swhid_parsed.anchor.object_type == ObjectType.REVISION:
revision = archive.lookup_revision(
hash_to_hex(swhid_parsed.anchor.object_id)
)
directory = revision["directory"]
elif swhid_parsed.anchor.object_type == ObjectType.RELEASE:
release = archive.lookup_release(
hash_to_hex(swhid_parsed.anchor.object_id)
)
if release["target_type"] == ObjectType.REVISION.name.lower():
revision = archive.lookup_revision(release["target"])
directory = revision["directory"]
if object_type == ObjectType.CONTENT:
if (
not swhid_parsed.origin
and swhid_parsed.anchor.object_type != ObjectType.REVISION
):
# when no origin or revision context, content objects need to have
# their path prefixed by root directory id for breadcrumbs display
query_dict["path"] = hash_to_hex(directory) + query_dict["path"]
else:
# remove leading slash from SWHID content path
- query_dict["path"] = query_dict["path"][1:]
+ query_dict["path"] = str(query_dict["path"]).lstrip("/")
elif object_type == ObjectType.DIRECTORY:
object_id = directory
# remove leading and trailing slashes from SWHID directory path
- if query_dict["path"].endswith("/"):
- query_dict["path"] = query_dict["path"][1:-1]
- else:
- query_dict["path"] = query_dict["path"][1:]
+ query_dict["path"] = str(query_dict["path"]).strip("/")
# snapshot context
if swhid_parsed.visit:
if swhid_parsed.visit.object_type != ObjectType.SNAPSHOT:
raise BadInputExc("Visit must be a snapshot SWHID.")
query_dict["snapshot"] = hash_to_hex(swhid_parsed.visit.object_id)
if swhid_parsed.anchor:
if (
swhid_parsed.anchor.object_type == ObjectType.REVISION
and object_type != ObjectType.REVISION
):
query_dict["revision"] = hash_to_hex(swhid_parsed.anchor.object_id)
elif swhid_parsed.anchor.object_type == ObjectType.RELEASE:
release = archive.lookup_release(
hash_to_hex(swhid_parsed.anchor.object_id)
)
if release:
query_dict["release"] = release["name"]
# browsing content or directory without snapshot context
elif (
object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY)
and swhid_parsed.anchor
):
if swhid_parsed.anchor.object_type == ObjectType.REVISION:
# anchor revision, objects are browsed from its view
object_type = ObjectType.REVISION
object_id = swhid_parsed.anchor.object_id
elif (
object_type == ObjectType.DIRECTORY
and swhid_parsed.anchor.object_type == ObjectType.DIRECTORY
):
# a directory is browsed from its root
object_id = swhid_parsed.anchor.object_id
if object_type == ObjectType.CONTENT:
url_args["query_string"] = f"sha1_git:{hash_to_hex(object_id)}"
elif object_type in (ObjectType.DIRECTORY, ObjectType.RELEASE, ObjectType.REVISION):
url_args["sha1_git"] = hash_to_hex(object_id)
elif object_type == ObjectType.SNAPSHOT:
url_args["snapshot_id"] = hash_to_hex(object_id)
if swhid_parsed.lines and process_lines:
lines = swhid_parsed.lines
fragment += "#L" + str(lines[0])
if lines[1]:
fragment += "-L" + str(lines[1])
if url_args:
browse_url = (
reverse(
f"browse-{object_type.name.lower()}",
url_args=url_args,
query_params=query_dict,
)
+ fragment
)
return ResolvedSWHID(swhid_parsed=swhid_parsed, browse_url=browse_url)
def get_swhid(swhid: str) -> QualifiedSWHID:
"""Check if a SWHID is valid and return it parsed.
Args:
swhid: a SoftWare Heritage persistent IDentifier.
Raises:
BadInputExc: if the provided SWHID can not be parsed.
Return:
A parsed SWHID.
"""
try:
# ensure core part of SWHID is in lower case to avoid parsing error
(core, sep, qualifiers) = swhid.partition(";")
core = core.lower()
return QualifiedSWHID.from_string(core + sep + qualifiers)
except ValidationError as ve:
raise BadInputExc("Error when parsing identifier: %s" % " ".join(ve.messages))
def group_swhids(
swhids: Iterable[QualifiedSWHID],
) -> Dict[ObjectType, List[bytes]]:
"""
Groups many SoftWare Heritage persistent IDentifiers into a
dictionary depending on their type.
Args:
swhids: an iterable of SoftWare Heritage persistent
IDentifier objects
Returns:
A dictionary with:
keys: object types
values: object hashes
"""
swhids_by_type: Dict[ObjectType, List[bytes]] = {
ObjectType.CONTENT: [],
ObjectType.DIRECTORY: [],
ObjectType.REVISION: [],
ObjectType.RELEASE: [],
ObjectType.SNAPSHOT: [],
}
for obj_swhid in swhids:
obj_id = obj_swhid.object_id
obj_type = obj_swhid.object_type
swhids_by_type[obj_type].append(hash_to_bytes(obj_id))
return swhids_by_type
def get_swhids_info(
swh_objects: Iterable[SWHObjectInfo],
snapshot_context: Optional[SnapshotContext] = None,
extra_context: Optional[Dict[str, Any]] = None,
) -> List[SWHIDInfo]:
"""
Returns a list of dict containing info related to SWHIDs of objects.
Args:
swh_objects: an iterable of dict describing archived objects
snapshot_context: optional dict parameter describing the snapshot in
which the objects have been found
extra_context: optional dict filled with extra contextual info about
the objects
Returns:
a list of dict containing SWHIDs info
"""
swhids_info = []
for swh_object in swh_objects:
if not swh_object["object_id"]:
swhids_info.append(
SWHIDInfo(
object_type=swh_object["object_type"],
object_id="",
swhid="",
swhid_url="",
context={},
swhid_with_context=None,
swhid_with_context_url=None,
)
)
continue
object_type = swh_object["object_type"]
object_id = swh_object["object_id"]
swhid_context: SWHIDContext = {}
if snapshot_context:
if snapshot_context["origin_info"] is not None:
swhid_context["origin"] = quote(
snapshot_context["origin_info"]["url"], safe="/?:@&"
)
if object_type != ObjectType.SNAPSHOT:
swhid_context["visit"] = gen_swhid(
ObjectType.SNAPSHOT, snapshot_context["snapshot_id"]
)
if object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY):
if snapshot_context["release_id"] is not None:
swhid_context["anchor"] = gen_swhid(
ObjectType.RELEASE, snapshot_context["release_id"]
)
elif snapshot_context["revision_id"] is not None:
swhid_context["anchor"] = gen_swhid(
ObjectType.REVISION, snapshot_context["revision_id"]
)
if object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY):
if (
extra_context
and "revision" in extra_context
and extra_context["revision"]
and "anchor" not in swhid_context
):
swhid_context["anchor"] = gen_swhid(
ObjectType.REVISION, extra_context["revision"]
)
elif (
extra_context
and "root_directory" in extra_context
and extra_context["root_directory"]
and "anchor" not in swhid_context
and (
object_type != ObjectType.DIRECTORY
or extra_context["root_directory"] != object_id
)
):
swhid_context["anchor"] = gen_swhid(
ObjectType.DIRECTORY, extra_context["root_directory"]
)
path = None
if extra_context and "path" in extra_context:
path = extra_context["path"] or "/"
if "filename" in extra_context and object_type == ObjectType.CONTENT:
path += extra_context["filename"]
if object_type == ObjectType.DIRECTORY and path == "/":
path = None
if path:
swhid_context["path"] = quote(path, safe="/?:@&")
swhid = gen_swhid(object_type, object_id)
swhid_url = reverse("browse-swhid", url_args={"swhid": swhid})
swhid_with_context = None
swhid_with_context_url = None
if swhid_context:
swhid_with_context = gen_swhid(
object_type, object_id, metadata=swhid_context
)
swhid_with_context_url = reverse(
"browse-swhid", url_args={"swhid": swhid_with_context}
)
swhids_info.append(
SWHIDInfo(
object_type=object_type,
object_id=object_id,
swhid=swhid,
swhid_url=swhid_url,
context=swhid_context,
swhid_with_context=swhid_with_context,
swhid_with_context_url=swhid_with_context_url,
)
)
return swhids_info
diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
index b4e708d9..ab89c23d 100644
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -1,520 +1,520 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import functools
import os
import re
from typing import Any, Callable, Dict, List, Optional
import urllib.parse
from bs4 import BeautifulSoup
from docutils.core import publish_parts
import docutils.parsers.rst
import docutils.utils
from docutils.writers.html5_polyglot import HTMLTranslator, Writer
from iso8601 import ParseError, parse_date
from pkg_resources import get_distribution
from prometheus_client.registry import CollectorRegistry
import requests
from requests.auth import HTTPBasicAuth
from django.core.cache import cache
from django.core.cache.backends.base import DEFAULT_TIMEOUT
from django.http import HttpRequest, QueryDict
from django.shortcuts import redirect
from django.urls import resolve
from django.urls import reverse as django_reverse
from swh.web.auth.utils import (
ADD_FORGE_MODERATOR_PERMISSION,
ADMIN_LIST_DEPOSIT_PERMISSION,
MAILMAP_ADMIN_PERMISSION,
)
from swh.web.common.exc import BadInputExc, sentry_capture_exception
from swh.web.common.typing import QueryParameters
from swh.web.config import SWH_WEB_SERVER_NAME, get_config, search
SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True)
SWHID_RE = "swh:1:[a-z]{3}:[0-9a-z]{40}"
swh_object_icons = {
"alias": "mdi mdi-star",
"branch": "mdi mdi-source-branch",
"branches": "mdi mdi-source-branch",
"content": "mdi mdi-file-document",
"cnt": "mdi mdi-file-document",
"directory": "mdi mdi-folder",
"dir": "mdi mdi-folder",
"origin": "mdi mdi-source-repository",
"ori": "mdi mdi-source-repository",
"person": "mdi mdi-account",
"revisions history": "mdi mdi-history",
"release": "mdi mdi-tag",
"rel": "mdi mdi-tag",
"releases": "mdi mdi-tag",
"revision": "mdi mdi-rotate-90 mdi-source-commit",
"rev": "mdi mdi-rotate-90 mdi-source-commit",
"snapshot": "mdi mdi-camera",
"snp": "mdi mdi-camera",
"visits": "mdi mdi-calendar-month",
}
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[QueryParameters] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
- query_dict[k] = query_params[k]
+ query_dict[k] = str(query_params[k])
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url
def datetime_to_utc(date):
"""Returns datetime in UTC without timezone info
Args:
date (datetime.datetime): input datetime with timezone info
Returns:
datetime.datetime: datetime in UTC without timezone info
"""
if date.tzinfo and date.tzinfo != timezone.utc:
return date.astimezone(tz=timezone.utc)
else:
return date
def parse_iso8601_date_to_utc(iso_date: str) -> datetime:
"""Given an ISO 8601 datetime string, parse the result as UTC datetime.
Returns:
a timezone-aware datetime representing the parsed date
Raises:
swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- 2007-01-14T20:34:22Z
"""
try:
date = parse_date(iso_date)
return datetime_to_utc(date)
except ParseError as e:
raise BadInputExc(e)
def shorten_path(path):
"""Shorten the given path: for each hash present, only return the first
8 characters followed by an ellipsis"""
sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}"
sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}"
ret = re.sub(sha256_re, r"\1...", path)
return re.sub(sha1_re, r"\1...", ret)
def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M:%S UTC"):
"""Turns a string representation of an ISO 8601 datetime string
to UTC and format it into a more human readable one.
For instance, from the following input
string: '2017-05-04T13:27:13+02:00' the following one
is returned: '04 May 2017, 11:27 UTC'.
Custom format string may also be provided
as parameter
Args:
iso_date (str): a string representation of an ISO 8601 date
fmt (str): optional date formatting string
Returns:
str: a formatted string representation of the input iso date
"""
if not iso_date:
return iso_date
date = parse_iso8601_date_to_utc(iso_date)
return date.strftime(fmt)
def gen_path_info(path):
"""Function to generate path data navigation for use
with a breadcrumb in the swh web ui.
For instance, from a path /folder1/folder2/folder3,
it returns the following list::
[{'name': 'folder1', 'path': 'folder1'},
{'name': 'folder2', 'path': 'folder1/folder2'},
{'name': 'folder3', 'path': 'folder1/folder2/folder3'}]
Args:
path: a filesystem path
Returns:
list: a list of path data for navigation as illustrated above.
"""
path_info = []
if path:
sub_paths = path.strip("/").split("/")
path_from_root = ""
for p in sub_paths:
path_from_root += "/" + p
path_info.append({"name": p, "path": path_from_root.strip("/")})
return path_info
def parse_rst(text, report_level=2):
"""
Parse a reStructuredText string with docutils.
Args:
text (str): string with reStructuredText markups in it
report_level (int): level of docutils report messages to print
(1 info 2 warning 3 error 4 severe 5 none)
Returns:
docutils.nodes.document: a parsed docutils document
"""
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components
).get_default_values()
settings.report_level = report_level
document = docutils.utils.new_document("rst-doc", settings=settings)
parser.parse(text, document)
return document
def get_client_ip(request):
"""
Return the client IP address from an incoming HTTP request.
Args:
request (django.http.HttpRequest): the incoming HTTP request
Returns:
str: The client IP address
"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip
def is_swh_web_development(request: HttpRequest) -> bool:
"""Indicate if we are running a development version of swh-web."""
site_base_url = request.build_absolute_uri("/")
return any(
host in site_base_url for host in ("localhost", "127.0.0.1", "testserver")
)
def is_swh_web_staging(request: HttpRequest) -> bool:
"""Indicate if we are running a staging version of swh-web."""
config = get_config()
site_base_url = request.build_absolute_uri("/")
return any(
server_name in site_base_url for server_name in config["staging_server_names"]
)
def is_swh_web_production(request: HttpRequest) -> bool:
"""Indicate if we are running the public production version of swh-web."""
return SWH_WEB_SERVER_NAME in request.build_absolute_uri("/")
browsers_supported_image_mimes = set(
[
"image/gif",
"image/png",
"image/jpeg",
"image/bmp",
"image/webp",
"image/svg",
"image/svg+xml",
]
)
def context_processor(request):
"""
Django context processor used to inject variables
in all swh-web templates.
"""
config = get_config()
if (
hasattr(request, "user")
and request.user.is_authenticated
and not hasattr(request.user, "backend")
):
# To avoid django.template.base.VariableDoesNotExist errors
# when rendering templates when standard Django user is logged in.
request.user.backend = "django.contrib.auth.backends.ModelBackend"
return {
"swh_object_icons": swh_object_icons,
"available_languages": None,
"swh_client_config": config["client_config"],
"oidc_enabled": bool(config["keycloak"]["server_url"]),
"browsers_supported_image_mimes": browsers_supported_image_mimes,
"keycloak": config["keycloak"],
"site_base_url": request.build_absolute_uri("/"),
"DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"],
"status": config["status"],
"swh_web_dev": is_swh_web_development(request),
"swh_web_staging": is_swh_web_staging(request),
"swh_web_prod": is_swh_web_production(request),
"swh_web_version": get_distribution("swh.web").version,
"iframe_mode": False,
"ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION,
"ADD_FORGE_MODERATOR_PERMISSION": ADD_FORGE_MODERATOR_PERMISSION,
"FEATURES": get_config()["features"],
"MAILMAP_ADMIN_PERMISSION": MAILMAP_ADMIN_PERMISSION,
}
def resolve_branch_alias(
snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Resolve branch alias in snapshot content.
Args:
snapshot: a full snapshot content
branch: a branch alias contained in the snapshot
Returns:
The real snapshot branch that got aliased.
"""
while branch and branch["target_type"] == "alias":
if branch["target"] in snapshot["branches"]:
branch = snapshot["branches"][branch["target"]]
else:
from swh.web.common import archive
snp = archive.lookup_snapshot(
snapshot["id"], branches_from=branch["target"], branches_count=1
)
if snp and branch["target"] in snp["branches"]:
branch = snp["branches"][branch["target"]]
else:
branch = None
return branch
class _NoHeaderHTMLTranslator(HTMLTranslator):
"""
Docutils translator subclass to customize the generation of HTML
from reST-formatted docstrings
"""
def __init__(self, document):
super().__init__(document)
self.body_prefix = []
self.body_suffix = []
_HTML_WRITER = Writer()
_HTML_WRITER.translator_class = _NoHeaderHTMLTranslator
def rst_to_html(rst: str) -> str:
"""
Convert reStructuredText document into HTML.
Args:
rst: A string containing a reStructuredText document
Returns:
Body content of the produced HTML conversion.
"""
settings = {
"initial_header_level": 2,
"halt_level": 4,
"traceback": True,
"file_insertion_enabled": False,
"raw_enabled": False,
}
pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings)
return f'
{pp["html_body"]}
'
def prettify_html(html: str) -> str:
"""
Prettify an HTML document.
Args:
html: Input HTML document
Returns:
The prettified HTML document
"""
return BeautifulSoup(html, "lxml").prettify()
def django_cache(
timeout: int = DEFAULT_TIMEOUT,
catch_exception: bool = False,
exception_return_value: Any = None,
invalidate_cache_pred: Callable[[Any], bool] = lambda val: False,
):
"""Decorator to put the result of a function call in Django cache,
subsequent calls will directly return the cached value.
Args:
timeout: The number of seconds value will be hold in cache
catch_exception: If :const:`True`, any thrown exception by
the decorated function will be caught and not reraised
exception_return_value: The value to return if previous
parameter is set to :const:`True`
invalidate_cache_pred: A predicate function enabling to
invalidate the cache under certain conditions, decorated
function will then be called again
Returns:
The returned value of the decorated function for the specified
parameters
"""
def inner(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_args = args + (0,) + tuple(sorted(kwargs.items()))
cache_key = str(hash((func.__module__, func.__name__) + func_args))
ret = cache.get(cache_key)
if ret is None or invalidate_cache_pred(ret):
try:
ret = func(*args, **kwargs)
except Exception as exc:
if catch_exception:
sentry_capture_exception(exc)
return exception_return_value
else:
raise
else:
cache.set(cache_key, ret, timeout=timeout)
return ret
return wrapper
return inner
def _deposits_list_url(
deposits_list_base_url: str, page_size: int, username: Optional[str]
) -> str:
params = {"page_size": str(page_size)}
if username is not None:
params["username"] = username
return f"{deposits_list_base_url}?{urllib.parse.urlencode(params)}"
def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]:
"""Return the list of software deposits using swh-deposit API"""
config = get_config()["deposit"]
private_api_url = config["private_api_url"].rstrip("/") + "/"
deposits_list_base_url = private_api_url + "deposits"
deposits_list_auth = HTTPBasicAuth(
config["private_api_user"], config["private_api_password"]
)
deposits_list_url = _deposits_list_url(
deposits_list_base_url, page_size=1, username=username
)
nb_deposits = requests.get(
deposits_list_url, auth=deposits_list_auth, timeout=30
).json()["count"]
@django_cache(invalidate_cache_pred=lambda data: data["count"] != nb_deposits)
def _get_deposits_data():
deposits_list_url = _deposits_list_url(
deposits_list_base_url, page_size=nb_deposits, username=username
)
return requests.get(
deposits_list_url,
auth=deposits_list_auth,
timeout=30,
).json()
deposits_data = _get_deposits_data()
return deposits_data["results"]
_origin_visit_types_cache_timeout = 24 * 60 * 60 # 24 hours
@django_cache(
timeout=_origin_visit_types_cache_timeout,
catch_exception=True,
exception_return_value=[],
)
def origin_visit_types() -> List[str]:
"""Return the exhaustive list of visit types for origins
ingested into the archive.
"""
return sorted(search().visit_types_count().keys())
def redirect_to_new_route(request, new_route, permanent=True):
"""Redirect a request to another route with url args and query parameters
eg: /origin//log?path=test can be redirected as
/log?url=&path=test. This can be used to deprecate routes
"""
request_path = resolve(request.path_info)
args = {**request_path.kwargs, **request.GET.dict()}
return redirect(
reverse(new_route, query_params=args),
permanent=permanent,
)
diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py
index 4a6afdd6..c148985c 100644
--- a/swh/web/tests/conftest.py
+++ b/swh/web/tests/conftest.py
@@ -1,1248 +1,1248 @@
# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from datetime import timedelta
import functools
import json
import os
import random
import shutil
from subprocess import PIPE, run
import sys
import time
from typing import Any, Dict, List, Optional
from _pytest.python import Function
from hypothesis import HealthCheck, settings
import pytest
from django.contrib.auth.models import User
from django.core.cache import cache
-from django.test.utils import setup_databases # type: ignore
+from django.test.utils import setup_databases
from rest_framework.test import APIClient, APIRequestFactory
from swh.model.hashutil import (
ALGORITHMS,
DEFAULT_ALGORITHMS,
hash_to_bytes,
hash_to_hex,
)
from swh.model.model import Content, Directory
from swh.model.swhids import CoreSWHID, ObjectType
from swh.scheduler.tests.common import TASK_TYPES
from swh.storage.algos.origin import origin_get_latest_visit_status
from swh.storage.algos.revisions_walker import get_revisions_walker
from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest
from swh.web.auth.utils import (
ADD_FORGE_MODERATOR_PERMISSION,
MAILMAP_ADMIN_PERMISSION,
MAILMAP_PERMISSION,
OIDC_SWH_WEB_CLIENT_ID,
)
from swh.web.common import converters
from swh.web.common.origin_save import get_scheduler_load_task_types
from swh.web.common.typing import OriginVisitInfo
from swh.web.common.utils import browsers_supported_image_mimes
from swh.web.config import get_config
from swh.web.tests.data import (
get_tests_data,
override_storages,
random_content,
random_sha1,
random_sha1_bytes,
random_sha256,
)
from swh.web.tests.utils import create_django_permission
os.environ["LC_ALL"] = "C.UTF-8"
# Used to skip some tests
ctags_json_missing = (
shutil.which("ctags") is None
or b"+json" not in run(["ctags", "--version"], stdout=PIPE).stdout
)
fossology_missing = shutil.which("nomossa") is None
# Register some hypothesis profiles
settings.register_profile("default", settings())
# we use getattr here to keep mypy happy regardless hypothesis version
function_scoped_fixture_check = (
[getattr(HealthCheck, "function_scoped_fixture")]
if hasattr(HealthCheck, "function_scoped_fixture")
else []
)
suppress_health_check = [
HealthCheck.too_slow,
HealthCheck.filter_too_much,
] + function_scoped_fixture_check
settings.register_profile(
"swh-web",
settings(
deadline=None,
suppress_health_check=suppress_health_check,
),
)
settings.register_profile(
"swh-web-fast",
settings(
deadline=None,
max_examples=5,
suppress_health_check=suppress_health_check,
),
)
def pytest_addoption(parser):
parser.addoption("--swh-web-random-seed", action="store", default=None)
def pytest_configure(config):
# Use fast hypothesis profile by default if none has been
# explicitly specified in pytest option
if config.getoption("--hypothesis-profile") is None:
settings.load_profile("swh-web-fast")
# Small hack in order to be able to run the unit tests
# without static assets generated by webpack.
# Those assets are not really needed for the Python tests
# but the django templates will fail to load due to missing
# generated file webpack-stats.json describing the js and css
# files to include.
# So generate a dummy webpack-stats.json file to overcome
# that issue.
test_dir = os.path.dirname(__file__)
# location of the static folder when running tests through tox
data_dir = os.path.join(sys.prefix, "share/swh/web")
static_dir = os.path.join(data_dir, "static")
if not os.path.exists(static_dir):
# location of the static folder when running tests locally with pytest
static_dir = os.path.join(test_dir, "../../../static")
webpack_stats = os.path.join(static_dir, "webpack-stats.json")
if os.path.exists(webpack_stats):
return
bundles_dir = os.path.join(test_dir, "../../../assets/src/bundles")
if not os.path.exists(bundles_dir):
# location of the bundles folder when running tests with tox
bundles_dir = os.path.join(data_dir, "assets/src/bundles")
_, bundles, _ = next(os.walk(bundles_dir))
mock_webpack_stats = {
"status": "done",
"publicPath": "/static",
"chunks": {},
"assets": {},
}
for bundle in bundles:
asset = f"js/{bundle}.js"
mock_webpack_stats["chunks"][bundle] = [asset]
mock_webpack_stats["assets"][asset] = {
"name": asset,
"publicPath": f"/static/{asset}",
}
with open(webpack_stats, "w") as outfile:
json.dump(mock_webpack_stats, outfile)
_swh_web_custom_section = "swh-web custom section"
_random_seed_cache_key = "swh-web/random-seed"
@pytest.fixture(scope="function", autouse=True)
def random_seed(pytestconfig):
state = random.getstate()
seed = pytestconfig.getoption("--swh-web-random-seed")
if seed is None:
seed = time.time()
seed = int(seed)
cache.set(_random_seed_cache_key, seed)
random.seed(seed)
yield seed
random.setstate(state)
def pytest_report_teststatus(report, *args):
if report.when == "call" and report.outcome == "failed":
seed = cache.get(_random_seed_cache_key, None)
line = (
f'FAILED {report.nodeid}: Use "pytest --swh-web-random-seed={seed} '
f'{report.nodeid}" to reproduce that test failure with same inputs'
)
report.sections.append((_swh_web_custom_section, line))
def pytest_terminal_summary(terminalreporter, *args):
reports = terminalreporter.getreports("failed")
content = os.linesep.join(
text
for report in reports
for secname, text in report.sections
if secname == _swh_web_custom_section
)
if content:
terminalreporter.ensure_newline()
terminalreporter.section(_swh_web_custom_section, sep="-", blue=True, bold=True)
terminalreporter.line(content)
# Clear Django cache before each test
@pytest.fixture(autouse=True)
def django_cache_cleared():
cache.clear()
# Alias rf fixture from pytest-django
@pytest.fixture
def request_factory(rf):
return rf
# Fixture to get test client from Django REST Framework
@pytest.fixture
def api_client():
return APIClient()
# Fixture to get API request factory from Django REST Framework
@pytest.fixture
def api_request_factory():
return APIRequestFactory()
# Initialize tests data
@pytest.fixture(scope="function", autouse=True)
def tests_data():
data = get_tests_data(reset=True)
# Update swh-web configuration to use the in-memory storages
# instantiated in the tests.data module
override_storages(
data["storage"], data["idx_storage"], data["search"], data["counters"]
)
return data
@pytest.fixture(scope="function")
def sha1():
"""Fixture returning a valid hexadecimal sha1 value."""
return random_sha1()
@pytest.fixture(scope="function")
def invalid_sha1():
"""Fixture returning an invalid sha1 representation."""
return hash_to_hex(bytes(random.randint(0, 255) for _ in range(50)))
@pytest.fixture(scope="function")
def sha256():
"""Fixture returning a valid hexadecimal sha256 value."""
return random_sha256()
def _known_swh_objects(tests_data, object_type):
return tests_data[object_type]
@pytest.fixture(scope="function")
def content(tests_data):
"""Fixture returning a random content ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "contents"))
@pytest.fixture(scope="function")
def contents(tests_data):
"""Fixture returning random contents ingested into the test archive."""
return random.choices(
_known_swh_objects(tests_data, "contents"), k=random.randint(2, 8)
)
def _new_content(tests_data):
while True:
new_content = random_content()
sha1_bytes = hash_to_bytes(new_content["sha1"])
if tests_data["storage"].content_get_data(sha1_bytes) is None:
return new_content
@pytest.fixture(scope="function")
def unknown_content(tests_data):
"""Fixture returning a random content not ingested into the test archive."""
return _new_content(tests_data)
@pytest.fixture(scope="function")
def unknown_contents(tests_data):
"""Fixture returning random contents not ingested into the test archive."""
new_contents = []
new_content_ids = set()
nb_contents = random.randint(2, 8)
while len(new_contents) != nb_contents:
new_content = _new_content(tests_data)
if new_content["sha1"] not in new_content_ids:
new_contents.append(new_content)
new_content_ids.add(new_content["sha1"])
return list(new_contents)
@pytest.fixture(scope="function")
def empty_content():
"""Fixture returning the empty content ingested into the test archive."""
empty_content = Content.from_data(data=b"").to_dict()
for algo in DEFAULT_ALGORITHMS:
empty_content[algo] = hash_to_hex(empty_content[algo])
return empty_content
@functools.lru_cache(maxsize=None)
def _content_text():
return list(
filter(
lambda c: c["mimetype"].startswith("text/"),
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_text():
"""
Fixture returning a random textual content ingested into the test archive.
"""
return random.choice(_content_text())
@functools.lru_cache(maxsize=None)
def _content_text_non_utf8():
return list(
filter(
lambda c: c["mimetype"].startswith("text/")
and c["encoding"] not in ("utf-8", "us-ascii"),
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_text_non_utf8():
"""Fixture returning a random textual content not encoded to UTF-8 ingested
into the test archive.
"""
return random.choice(_content_text_non_utf8())
@functools.lru_cache(maxsize=None)
def _content_application_no_highlight():
return list(
filter(
lambda c: c["mimetype"].startswith("application/")
and c["hljs_language"] == "plaintext",
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_application_no_highlight():
"""Fixture returning a random textual content with mimetype
starting with application/ and no detected programming language to
highlight ingested into the test archive.
"""
return random.choice(_content_application_no_highlight())
@functools.lru_cache(maxsize=None)
def _content_text_no_highlight():
return list(
filter(
lambda c: c["mimetype"].startswith("text/")
and c["hljs_language"] == "plaintext",
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_text_no_highlight():
"""Fixture returning a random textual content with no detected
programming language to highlight ingested into the test archive.
"""
return random.choice(_content_text_no_highlight())
@functools.lru_cache(maxsize=None)
def _content_image_type():
return list(
filter(
lambda c: c["mimetype"] in browsers_supported_image_mimes,
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_image_type():
"""Fixture returning a random image content ingested into the test archive."""
return random.choice(_content_image_type())
@functools.lru_cache(maxsize=None)
def _content_unsupported_image_type_rendering():
return list(
filter(
lambda c: c["mimetype"].startswith("image/")
and c["mimetype"] not in browsers_supported_image_mimes,
_known_swh_objects(get_tests_data(), "contents"),
)
)
@pytest.fixture(scope="function")
def content_unsupported_image_type_rendering():
"""Fixture returning a random image content ingested into the test archive that
can not be rendered by browsers.
"""
return random.choice(_content_unsupported_image_type_rendering())
@functools.lru_cache(maxsize=None)
def _content_utf8_detected_as_binary():
def utf8_binary_detected(content):
if content["encoding"] != "binary":
return False
try:
content["raw_data"].decode("utf-8")
except Exception:
return False
else:
return True
return list(
filter(utf8_binary_detected, _known_swh_objects(get_tests_data(), "contents"))
)
@pytest.fixture(scope="function")
def content_utf8_detected_as_binary():
"""Fixture returning a random textual content detected as binary
by libmagic while they are valid UTF-8 encoded files.
"""
return random.choice(_content_utf8_detected_as_binary())
@pytest.fixture(scope="function")
def contents_with_ctags():
"""
Fixture returning contents ingested into the test archive.
Those contents are ctags compatible, that is running ctags on those lay results.
"""
return {
"sha1s": [
"0ab37c02043ebff946c1937523f60aadd0844351",
"15554cf7608dde6bfefac7e3d525596343a85b6f",
"2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd",
"30acd0b47fc25e159e27a980102ddb1c4bea0b95",
"4f81f05aaea3efb981f9d90144f746d6b682285b",
"5153aa4b6e4455a62525bc4de38ed0ff6e7dd682",
"59d08bafa6a749110dfb65ba43a61963d5a5bf9f",
"7568285b2d7f31ae483ae71617bd3db873deaa2c",
"7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4",
"8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03",
"9b3557f1ab4111c8607a4f2ea3c1e53c6992916c",
"9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd",
"c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b",
"e89e55a12def4cd54d5bff58378a3b5119878eb7",
"e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e",
"eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5",
],
"symbol_name": "ABS",
}
@pytest.fixture(scope="function")
def directory(tests_data):
"""Fixture returning a random directory ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "directories"))
@functools.lru_cache(maxsize=None)
def _directory_with_entry_type(type_):
tests_data = get_tests_data()
return list(
filter(
lambda d: any(
[
e["type"] == type_
for e in list(tests_data["storage"].directory_ls(hash_to_bytes(d)))
]
),
_known_swh_objects(tests_data, "directories"),
)
)
@pytest.fixture(scope="function")
def directory_with_subdirs():
"""Fixture returning a random directory containing sub directories ingested
into the test archive.
"""
return random.choice(_directory_with_entry_type("dir"))
@pytest.fixture(scope="function")
def directory_with_files():
"""Fixture returning a random directory containing at least one regular file."""
return random.choice(_directory_with_entry_type("file"))
@pytest.fixture(scope="function")
def unknown_directory(tests_data):
"""Fixture returning a random directory not ingested into the test archive."""
while True:
new_directory = random_sha1()
sha1_bytes = hash_to_bytes(new_directory)
if list(tests_data["storage"].directory_missing([sha1_bytes])):
return new_directory
@pytest.fixture(scope="function")
def empty_directory():
"""Fixture returning the empty directory ingested into the test archive."""
return Directory(entries=()).id.hex()
@pytest.fixture(scope="function")
def revision(tests_data):
"""Fixturereturning a random revision ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "revisions"))
@pytest.fixture(scope="function")
def revisions(tests_data):
"""Fixture returning random revisions ingested into the test archive."""
return random.choices(
_known_swh_objects(tests_data, "revisions"),
k=random.randint(2, 8),
)
@pytest.fixture(scope="function")
def revisions_list(tests_data):
"""Fixture returning random revisions ingested into the test archive."""
def gen_revisions_list(size):
return random.choices(
_known_swh_objects(tests_data, "revisions"),
k=size,
)
return gen_revisions_list
@pytest.fixture(scope="function")
def unknown_revision(tests_data):
"""Fixture returning a random revision not ingested into the test archive."""
while True:
new_revision = random_sha1()
sha1_bytes = hash_to_bytes(new_revision)
if tests_data["storage"].revision_get([sha1_bytes])[0] is None:
return new_revision
def _get_origin_dfs_revisions_walker(tests_data):
storage = tests_data["storage"]
origin = random.choice(tests_data["origins"][:-1])
snapshot = snapshot_get_latest(storage, origin["url"])
if snapshot.branches[b"HEAD"].target_type.value == "alias":
target = snapshot.branches[b"HEAD"].target
head = snapshot.branches[target].target
else:
head = snapshot.branches[b"HEAD"].target
return get_revisions_walker("dfs", storage, head)
@functools.lru_cache(maxsize=None)
def _ancestor_revisions_data():
# get a dfs revisions walker for one of the origins
# loaded into the test archive
revisions_walker = _get_origin_dfs_revisions_walker(get_tests_data())
master_revisions = []
children = defaultdict(list)
init_rev_found = False
# get revisions only authored in the master branch
for rev in revisions_walker:
for rev_p in rev["parents"]:
children[rev_p].append(rev["id"])
if not init_rev_found:
master_revisions.append(rev)
if not rev["parents"]:
init_rev_found = True
return master_revisions, children
@pytest.fixture(scope="function")
def ancestor_revisions():
"""Fixture returning a pair of revisions ingested into the test archive
with an ancestor relation.
"""
master_revisions, children = _ancestor_revisions_data()
# head revision
root_rev = master_revisions[0]
# pick a random revision, different from head, only authored
# in the master branch
ancestor_rev_idx = random.choice(list(range(1, len(master_revisions) - 1)))
ancestor_rev = master_revisions[ancestor_rev_idx]
ancestor_child_revs = children[ancestor_rev["id"]]
return {
"sha1_git_root": hash_to_hex(root_rev["id"]),
"sha1_git": hash_to_hex(ancestor_rev["id"]),
"children": [hash_to_hex(r) for r in ancestor_child_revs],
}
@functools.lru_cache(maxsize=None)
def _non_ancestor_revisions_data():
# get a dfs revisions walker for one of the origins
# loaded into the test archive
revisions_walker = _get_origin_dfs_revisions_walker(get_tests_data())
merge_revs = []
children = defaultdict(list)
# get all merge revisions
for rev in revisions_walker:
if len(rev["parents"]) > 1:
merge_revs.append(rev)
for rev_p in rev["parents"]:
children[rev_p].append(rev["id"])
return merge_revs, children
@pytest.fixture(scope="function")
def non_ancestor_revisions():
"""Fixture returning a pair of revisions ingested into the test archive
with no ancestor relation.
"""
merge_revs, children = _non_ancestor_revisions_data()
# find a merge revisions whose parents have a unique child revision
random.shuffle(merge_revs)
selected_revs = None
for merge_rev in merge_revs:
if all(len(children[rev_p]) == 1 for rev_p in merge_rev["parents"]):
selected_revs = merge_rev["parents"]
return {
"sha1_git_root": hash_to_hex(selected_revs[0]),
"sha1_git": hash_to_hex(selected_revs[1]),
}
@pytest.fixture(scope="function")
def revision_with_submodules():
"""Fixture returning a revision that is known to
point to a directory with revision entries (aka git submodules)
"""
return {
"rev_sha1_git": "ffcb69001f3f6745dfd5b48f72ab6addb560e234",
"rev_dir_sha1_git": "d92a21446387fa28410e5a74379c934298f39ae2",
"rev_dir_rev_path": "libtess2",
}
@pytest.fixture(scope="function")
def release(tests_data):
"""Fixture returning a random release ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "releases"))
@pytest.fixture(scope="function")
def releases(tests_data):
"""Fixture returning random releases ingested into the test archive."""
return random.choices(
_known_swh_objects(tests_data, "releases"), k=random.randint(2, 8)
)
@pytest.fixture(scope="function")
def unknown_release(tests_data):
"""Fixture returning a random release not ingested into the test archive."""
while True:
new_release = random_sha1()
sha1_bytes = hash_to_bytes(new_release)
if tests_data["storage"].release_get([sha1_bytes])[0] is None:
return new_release
@pytest.fixture(scope="function")
def snapshot(tests_data):
"""Fixture returning a random snapshot ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "snapshots"))
@pytest.fixture(scope="function")
def unknown_snapshot(tests_data):
"""Fixture returning a random snapshot not ingested into the test archive."""
while True:
new_snapshot = random_sha1()
sha1_bytes = hash_to_bytes(new_snapshot)
if tests_data["storage"].snapshot_get_branches(sha1_bytes) is None:
return new_snapshot
@pytest.fixture(scope="function")
def origin(tests_data):
"""Fixture returning a random origin ingested into the test archive."""
return random.choice(_known_swh_objects(tests_data, "origins"))
@functools.lru_cache(maxsize=None)
def _origin_with_multiple_visits():
tests_data = get_tests_data()
origins = []
storage = tests_data["storage"]
for origin in tests_data["origins"]:
visit_page = storage.origin_visit_get(origin["url"])
if len(visit_page.results) > 1:
origins.append(origin)
return origins
@pytest.fixture(scope="function")
def origin_with_multiple_visits():
"""Fixture returning a random origin with multiple visits ingested
into the test archive.
"""
return random.choice(_origin_with_multiple_visits())
@functools.lru_cache(maxsize=None)
def _origin_with_releases():
tests_data = get_tests_data()
origins = []
for origin in tests_data["origins"]:
snapshot = snapshot_get_latest(tests_data["storage"], origin["url"])
if any([b.target_type.value == "release" for b in snapshot.branches.values()]):
origins.append(origin)
return origins
@pytest.fixture(scope="function")
def origin_with_releases():
"""Fixture returning a random origin with releases ingested into the test archive."""
return random.choice(_origin_with_releases())
@functools.lru_cache(maxsize=None)
def _origin_with_pull_request_branches():
tests_data = get_tests_data()
origins = []
storage = tests_data["storage"]
for origin in storage.origin_list(limit=1000).results:
snapshot = snapshot_get_latest(storage, origin.url)
if any([b"refs/pull/" in b for b in snapshot.branches]):
origins.append(origin)
return origins
@pytest.fixture(scope="function")
def origin_with_pull_request_branches():
"""Fixture returning a random origin with pull request branches ingested
into the test archive.
"""
return random.choice(_origin_with_pull_request_branches())
@functools.lru_cache(maxsize=None)
def _object_type_swhid(object_type):
return list(
filter(
lambda swhid: swhid.object_type == object_type,
_known_swh_objects(get_tests_data(), "swhids"),
)
)
@pytest.fixture(scope="function")
def content_swhid():
"""Fixture returning a qualified SWHID for a random content object
ingested into the test archive.
"""
return random.choice(_object_type_swhid(ObjectType.CONTENT))
@pytest.fixture(scope="function")
def directory_swhid():
"""Fixture returning a qualified SWHID for a random directory object
ingested into the test archive.
"""
return random.choice(_object_type_swhid(ObjectType.DIRECTORY))
@pytest.fixture(scope="function")
def release_swhid():
"""Fixture returning a qualified SWHID for a random release object
ingested into the test archive.
"""
return random.choice(_object_type_swhid(ObjectType.RELEASE))
@pytest.fixture(scope="function")
def revision_swhid():
"""Fixture returning a qualified SWHID for a random revision object
ingested into the test archive.
"""
return random.choice(_object_type_swhid(ObjectType.REVISION))
@pytest.fixture(scope="function")
def snapshot_swhid():
"""Fixture returning a qualified SWHID for a snapshot object
ingested into the test archive.
"""
return random.choice(_object_type_swhid(ObjectType.SNAPSHOT))
@pytest.fixture(scope="function", params=list(ObjectType))
def unknown_core_swhid(request) -> CoreSWHID:
"""Fixture returning an unknown core SWHID.
Tests using this will be called once per object type.
"""
return CoreSWHID(
object_type=request.param,
object_id=random_sha1_bytes(),
)
# Fixture to manipulate data from a sample archive used in the tests
@pytest.fixture(scope="function")
def archive_data(tests_data):
return _ArchiveData(tests_data)
# Fixture to manipulate indexer data from a sample archive used in the tests
@pytest.fixture(scope="function")
def indexer_data(tests_data):
return _IndexerData(tests_data)
# Custom data directory for requests_mock
@pytest.fixture
def datadir():
return os.path.join(os.path.abspath(os.path.dirname(__file__)), "resources")
class _ArchiveData:
"""
Helper class to manage data from a sample test archive.
It is initialized with a reference to an in-memory storage
containing raw tests data.
It is basically a proxy to Storage interface but it overrides some methods
to retrieve those tests data in a json serializable format in order to ease
tests implementation.
"""
def __init__(self, tests_data):
self.storage = tests_data["storage"]
def __getattr__(self, key):
if key == "storage":
raise AttributeError(key)
# Forward calls to non overridden Storage methods to wrapped
# storage instance
return getattr(self.storage, key)
def content_find(self, content: Dict[str, Any]) -> Dict[str, Any]:
cnt_ids_bytes = {
algo_hash: hash_to_bytes(content[algo_hash])
for algo_hash in ALGORITHMS
if content.get(algo_hash)
}
cnt = self.storage.content_find(cnt_ids_bytes)
return converters.from_content(cnt[0].to_dict()) if cnt else cnt
def content_get(self, cnt_id: str) -> Dict[str, Any]:
cnt_id_bytes = hash_to_bytes(cnt_id)
content = self.storage.content_get([cnt_id_bytes])[0]
if content:
content_d = content.to_dict()
content_d.pop("ctime", None)
else:
content_d = None
return converters.from_swh(
content_d, hashess={"sha1", "sha1_git", "sha256", "blake2s256"}
)
def content_get_data(self, cnt_id: str) -> Optional[Dict[str, Any]]:
cnt_id_bytes = hash_to_bytes(cnt_id)
cnt_data = self.storage.content_get_data(cnt_id_bytes)
if cnt_data is None:
return None
return converters.from_content({"data": cnt_data, "sha1": cnt_id_bytes})
def directory_get(self, dir_id):
return {"id": dir_id, "content": self.directory_ls(dir_id)}
def directory_ls(self, dir_id):
cnt_id_bytes = hash_to_bytes(dir_id)
dir_content = map(
converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes)
)
return list(dir_content)
def release_get(self, rel_id: str) -> Optional[Dict[str, Any]]:
rel_id_bytes = hash_to_bytes(rel_id)
rel_data = self.storage.release_get([rel_id_bytes])[0]
return converters.from_release(rel_data) if rel_data else None
def revision_get(self, rev_id: str) -> Optional[Dict[str, Any]]:
rev_id_bytes = hash_to_bytes(rev_id)
rev_data = self.storage.revision_get([rev_id_bytes])[0]
return converters.from_revision(rev_data) if rev_data else None
def revision_log(self, rev_id, limit=None):
rev_id_bytes = hash_to_bytes(rev_id)
return list(
map(
converters.from_revision,
self.storage.revision_log([rev_id_bytes], limit=limit),
)
)
def snapshot_get_latest(self, origin_url):
snp = snapshot_get_latest(self.storage, origin_url)
return converters.from_snapshot(snp.to_dict())
def origin_get(self, origin_urls):
origins = self.storage.origin_get(origin_urls)
return [converters.from_origin(o.to_dict()) for o in origins]
def origin_visit_get(self, origin_url):
next_page_token = None
visits = []
while True:
visit_page = self.storage.origin_visit_get(
origin_url, page_token=next_page_token
)
next_page_token = visit_page.next_page_token
for visit in visit_page.results:
visit_status = self.storage.origin_visit_status_get_latest(
origin_url, visit.visit
)
visits.append(
converters.from_origin_visit(
{**visit_status.to_dict(), "type": visit.type}
)
)
if not next_page_token:
break
return visits
def origin_visit_get_by(self, origin_url: str, visit_id: int) -> OriginVisitInfo:
visit = self.storage.origin_visit_get_by(origin_url, visit_id)
assert visit is not None
visit_status = self.storage.origin_visit_status_get_latest(origin_url, visit_id)
assert visit_status is not None
return converters.from_origin_visit(
{**visit_status.to_dict(), "type": visit.type}
)
def origin_visit_status_get_latest(
self,
origin_url,
type: Optional[str] = None,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
):
visit_status = origin_get_latest_visit_status(
self.storage,
origin_url,
type=type,
allowed_statuses=allowed_statuses,
require_snapshot=require_snapshot,
)
return (
converters.from_origin_visit(visit_status.to_dict())
if visit_status
else None
)
def snapshot_get(self, snapshot_id):
snp = snapshot_get_all_branches(self.storage, hash_to_bytes(snapshot_id))
return converters.from_snapshot(snp.to_dict())
def snapshot_get_branches(
self, snapshot_id, branches_from="", branches_count=1000, target_types=None
):
partial_branches = self.storage.snapshot_get_branches(
hash_to_bytes(snapshot_id),
branches_from.encode(),
branches_count,
target_types,
)
return converters.from_partial_branches(partial_branches)
def snapshot_get_head(self, snapshot):
if snapshot["branches"]["HEAD"]["target_type"] == "alias":
target = snapshot["branches"]["HEAD"]["target"]
head = snapshot["branches"][target]["target"]
else:
head = snapshot["branches"]["HEAD"]["target"]
return head
def snapshot_count_branches(self, snapshot_id):
counts = dict.fromkeys(("alias", "release", "revision"), 0)
counts.update(self.storage.snapshot_count_branches(hash_to_bytes(snapshot_id)))
counts.pop(None, None)
return counts
class _IndexerData:
"""
Helper class to manage indexer tests data
It is initialized with a reference to an in-memory indexer storage
containing raw tests data.
It also defines class methods to retrieve those tests data in
a json serializable format in order to ease tests implementation.
"""
def __init__(self, tests_data):
self.idx_storage = tests_data["idx_storage"]
self.mimetype_indexer = tests_data["mimetype_indexer"]
self.license_indexer = tests_data["license_indexer"]
self.ctags_indexer = tests_data["ctags_indexer"]
def content_add_mimetype(self, cnt_id):
self.mimetype_indexer.run([hash_to_bytes(cnt_id)])
def content_get_mimetype(self, cnt_id):
mimetype = self.idx_storage.content_mimetype_get([hash_to_bytes(cnt_id)])[
0
].to_dict()
return converters.from_filetype(mimetype)
def content_add_license(self, cnt_id):
self.license_indexer.run([hash_to_bytes(cnt_id)])
def content_get_license(self, cnt_id):
cnt_id_bytes = hash_to_bytes(cnt_id)
licenses = self.idx_storage.content_fossology_license_get([cnt_id_bytes])
for license in licenses:
yield converters.from_swh(license.to_dict(), hashess={"id"})
def content_add_ctags(self, cnt_id):
self.ctags_indexer.run([hash_to_bytes(cnt_id)])
def content_get_ctags(self, cnt_id):
cnt_id_bytes = hash_to_bytes(cnt_id)
ctags = self.idx_storage.content_ctags_get([cnt_id_bytes])
for ctag in ctags:
yield converters.from_swh(ctag, hashess={"id"})
@pytest.fixture
def keycloak_oidc(keycloak_oidc, mocker):
keycloak_config = get_config()["keycloak"]
keycloak_oidc.server_url = keycloak_config["server_url"]
keycloak_oidc.realm_name = keycloak_config["realm_name"]
keycloak_oidc.client_id = OIDC_SWH_WEB_CLIENT_ID
keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client")
keycloak_oidc_client.return_value = keycloak_oidc
return keycloak_oidc
@pytest.fixture
def subtest(request):
"""A hack to explicitly set up and tear down fixtures.
This fixture allows you to set up and tear down fixtures within the test
function itself. This is useful (necessary!) for using Hypothesis inside
pytest, as hypothesis will call the test function multiple times, without
setting up or tearing down fixture state as it is normally the case.
Copied from the pytest-subtesthack project, public domain license
(https://github.com/untitaker/pytest-subtesthack).
"""
parent_test = request.node
def inner(func):
if hasattr(Function, "from_parent"):
item = Function.from_parent(
parent_test,
name=request.function.__name__ + "[]",
originalname=request.function.__name__,
callobj=func,
)
else:
item = Function(
name=request.function.__name__ + "[]", parent=parent_test, callobj=func
)
nextitem = parent_test # prevents pytest from tearing down module fixtures
item.ihook.pytest_runtest_setup(item=item)
item.ihook.pytest_runtest_call(item=item)
item.ihook.pytest_runtest_teardown(item=item, nextitem=nextitem)
return inner
@pytest.fixture
def swh_scheduler(swh_scheduler):
config = get_config()
scheduler = config["scheduler"]
config["scheduler"] = swh_scheduler
# create load-git and load-hg task types
for task_type in TASK_TYPES.values():
# see https://forge.softwareheritage.org/rDSCHc46ffadf7adf24c7eb3ffce062e8ade3818c79cc # noqa
task_type["type"] = task_type["type"].replace("load-test-", "load-", 1)
swh_scheduler.create_task_type(task_type)
# create load-svn task type
swh_scheduler.create_task_type(
{
"type": "load-svn",
"description": "Update a Subversion repository",
"backend_name": "swh.loader.svn.tasks.DumpMountAndLoadSvnRepository",
"default_interval": timedelta(days=64),
"min_interval": timedelta(hours=12),
"max_interval": timedelta(days=64),
"backoff_factor": 2,
"max_queue_length": None,
"num_retries": 7,
"retry_delay": timedelta(hours=2),
}
)
# create load-cvs task type
swh_scheduler.create_task_type(
{
"type": "load-cvs",
"description": "Update a CVS repository",
"backend_name": "swh.loader.cvs.tasks.DumpMountAndLoadSvnRepository",
"default_interval": timedelta(days=64),
"min_interval": timedelta(hours=12),
"max_interval": timedelta(days=64),
"backoff_factor": 2,
"max_queue_length": None,
"num_retries": 7,
"retry_delay": timedelta(hours=2),
}
)
# create load-bzr task type
swh_scheduler.create_task_type(
{
"type": "load-bzr",
"description": "Update a Bazaar repository",
"backend_name": "swh.loader.bzr.tasks.LoadBazaar",
"default_interval": timedelta(days=64),
"min_interval": timedelta(hours=12),
"max_interval": timedelta(days=64),
"backoff_factor": 2,
"max_queue_length": None,
"num_retries": 7,
"retry_delay": timedelta(hours=2),
}
)
# add method to add load-archive-files task type during tests
def add_load_archive_task_type():
swh_scheduler.create_task_type(
{
"type": "load-archive-files",
"description": "Load tarballs",
"backend_name": "swh.loader.package.archive.tasks.LoadArchive",
"default_interval": timedelta(days=64),
"min_interval": timedelta(hours=12),
"max_interval": timedelta(days=64),
"backoff_factor": 2,
"max_queue_length": None,
"num_retries": 7,
"retry_delay": timedelta(hours=2),
}
)
swh_scheduler.add_load_archive_task_type = add_load_archive_task_type
yield swh_scheduler
config["scheduler"] = scheduler
get_scheduler_load_task_types.cache_clear()
@pytest.fixture(scope="session")
def django_db_setup(request, django_db_blocker, postgresql_proc):
from django.conf import settings
settings.DATABASES["default"].update(
{
("ENGINE", "django.db.backends.postgresql"),
("NAME", get_config()["test_db"]["name"]),
("USER", postgresql_proc.user),
("HOST", postgresql_proc.host),
("PORT", postgresql_proc.port),
}
)
with django_db_blocker.unblock():
setup_databases(
verbosity=request.config.option.verbose, interactive=False, keepdb=False
)
@pytest.fixture
def staff_user():
return User.objects.create_user(username="admin", password="", is_staff=True)
@pytest.fixture
def regular_user():
return User.objects.create_user(username="johndoe", password="")
@pytest.fixture
def regular_user2():
return User.objects.create_user(username="janedoe", password="")
@pytest.fixture
def add_forge_moderator():
moderator = User.objects.create_user(username="add-forge moderator", password="")
moderator.user_permissions.add(
create_django_permission(ADD_FORGE_MODERATOR_PERMISSION)
)
return moderator
@pytest.fixture
def mailmap_admin():
mailmap_admin = User.objects.create_user(username="mailmap-admin", password="")
mailmap_admin.user_permissions.add(
create_django_permission(MAILMAP_ADMIN_PERMISSION)
)
return mailmap_admin
@pytest.fixture
def mailmap_user():
mailmap_user = User.objects.create_user(username="mailmap-user", password="")
mailmap_user.user_permissions.add(create_django_permission(MAILMAP_PERMISSION))
return mailmap_user
diff --git a/swh/web/tests/utils.py b/swh/web/tests/utils.py
index aaa3bd3a..fca57fdc 100644
--- a/swh/web/tests/utils.py
+++ b/swh/web/tests/utils.py
@@ -1,254 +1,254 @@
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict, Optional, cast
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
-from django.http import HttpResponse, StreamingHttpResponse
+from django.http.response import HttpResponse, HttpResponseBase, StreamingHttpResponse
from django.test.client import Client
from rest_framework.response import Response
from rest_framework.test import APIClient
from swh.web.tests.django_asserts import assert_template_used
def _assert_http_response(
- response: HttpResponse, status_code: int, content_type: str
-) -> HttpResponse:
+ response: HttpResponseBase, status_code: int, content_type: str
+) -> HttpResponseBase:
if isinstance(response, Response):
drf_response = cast(Response, response)
error_context = (
drf_response.data.pop("traceback")
if isinstance(drf_response.data, dict) and "traceback" in drf_response.data
else drf_response.data
)
elif isinstance(response, StreamingHttpResponse):
error_context = getattr(response, "traceback", response.streaming_content)
- else:
+ elif isinstance(response, HttpResponse):
error_context = getattr(response, "traceback", response.content)
assert response.status_code == status_code, error_context
if content_type != "*/*":
assert response["Content-Type"].startswith(content_type)
return response
def check_http_get_response(
client: Client,
url: str,
status_code: int,
content_type: str = "*/*",
http_origin: Optional[str] = None,
server_name: Optional[str] = None,
-) -> HttpResponse:
+) -> HttpResponseBase:
"""Helper function to check HTTP response for a GET request.
Args:
client: Django test client
url: URL to check response
status_code: expected HTTP status code
content_type: expected response content type
http_origin: optional HTTP_ORIGIN header value
Returns:
The HTTP response
"""
return _assert_http_response(
response=client.get(
url,
HTTP_ACCEPT=content_type,
HTTP_ORIGIN=http_origin,
SERVER_NAME=server_name if server_name else "testserver",
),
status_code=status_code,
content_type=content_type,
)
def check_http_post_response(
client: Client,
url: str,
status_code: int,
content_type: str = "*/*",
request_content_type="application/json",
data: Optional[Dict[str, Any]] = None,
http_origin: Optional[str] = None,
-) -> HttpResponse:
+) -> HttpResponseBase:
"""Helper function to check HTTP response for a POST request.
Args:
client: Django test client
url: URL to check response
status_code: expected HTTP status code
content_type: expected response content type
request_content_type: content type of request body
data: optional POST data
Returns:
The HTTP response
"""
return _assert_http_response(
response=client.post(
url,
data=data,
content_type=request_content_type,
HTTP_ACCEPT=content_type,
HTTP_ORIGIN=http_origin,
),
status_code=status_code,
content_type=content_type,
)
def check_api_get_responses(
api_client: APIClient, url: str, status_code: int
) -> Response:
"""Helper function to check Web API responses for GET requests
for all accepted content types (JSON, YAML, HTML).
Args:
api_client: DRF test client
url: Web API URL to check responses
status_code: expected HTTP status code
Returns:
The Web API JSON response
"""
# check JSON response
response_json = check_http_get_response(
api_client, url, status_code, content_type="application/json"
)
# check HTML response (API Web UI)
check_http_get_response(api_client, url, status_code, content_type="text/html")
# check YAML response
check_http_get_response(
api_client, url, status_code, content_type="application/yaml"
)
return cast(Response, response_json)
def check_api_post_response(
api_client: APIClient,
url: str,
status_code: int,
content_type: str = "*/*",
data: Optional[Dict[str, Any]] = None,
-) -> HttpResponse:
+) -> HttpResponseBase:
"""Helper function to check Web API response for a POST request
for all accepted content types.
Args:
api_client: DRF test client
url: Web API URL to check response
status_code: expected HTTP status code
Returns:
The HTTP response
"""
return _assert_http_response(
response=api_client.post(
url,
data=data,
format="json",
HTTP_ACCEPT=content_type,
),
status_code=status_code,
content_type=content_type,
)
def check_api_post_responses(
api_client: APIClient,
url: str,
status_code: int,
data: Optional[Dict[str, Any]] = None,
) -> Response:
"""Helper function to check Web API responses for POST requests
for all accepted content types (JSON, YAML).
Args:
api_client: DRF test client
url: Web API URL to check responses
status_code: expected HTTP status code
Returns:
The Web API JSON response
"""
# check JSON response
response_json = check_api_post_response(
api_client, url, status_code, content_type="application/json", data=data
)
# check YAML response
check_api_post_response(
api_client, url, status_code, content_type="application/yaml", data=data
)
return cast(Response, response_json)
def check_html_get_response(
client: Client,
url: str,
status_code: int,
template_used: Optional[str] = None,
http_origin: Optional[str] = None,
server_name: Optional[str] = None,
-) -> HttpResponse:
+) -> HttpResponseBase:
"""Helper function to check HTML responses for a GET request.
Args:
client: Django test client
url: URL to check responses
status_code: expected HTTP status code
template_used: optional used Django template to check
Returns:
The HTML response
"""
response = check_http_get_response(
client,
url,
status_code,
content_type="text/html",
http_origin=http_origin,
server_name=server_name,
)
if template_used is not None:
assert_template_used(response, template_used)
return response
def create_django_permission(perm_name: str) -> Permission:
"""Create permission out of a permission name string
Args:
perm_name: Permission name (e.g. swh.web.api.throttling_exempted,
swh.ambassador, ...)
Returns:
The persisted permission
"""
perm_splitted = perm_name.split(".")
app_label = ".".join(perm_splitted[:-1])
perm_name = perm_splitted[-1]
content_type = ContentType.objects.create(
id=1000 + ContentType.objects.count(),
app_label=app_label,
model=perm_splitted[-1],
)
return Permission.objects.create(
codename=perm_name,
name=perm_name,
content_type=content_type,
id=1000 + Permission.objects.count(),
)