diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
index ca17202a..0ffc772d 100644
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -1,429 +1,449 @@
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import os
import re
from typing import Any, Dict, List, Optional
import urllib.parse
from bs4 import BeautifulSoup
from docutils.core import publish_parts
import docutils.parsers.rst
import docutils.utils
from docutils.writers.html5_polyglot import HTMLTranslator, Writer
from iso8601 import ParseError, parse_date
from pkg_resources import get_distribution
from prometheus_client.registry import CollectorRegistry
import requests
from requests.auth import HTTPBasicAuth
from django.core.cache import cache
from django.http import HttpRequest, QueryDict
from django.shortcuts import redirect
from django.urls import resolve
from django.urls import reverse as django_reverse
from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import QueryParameters
-from swh.web.config import get_config, search
+from swh.web.config import SWH_WEB_SERVER_NAME, get_config, search
SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True)
swh_object_icons = {
"alias": "mdi mdi-star",
"branch": "mdi mdi-source-branch",
"branches": "mdi mdi-source-branch",
"content": "mdi mdi-file-document",
"cnt": "mdi mdi-file-document",
"directory": "mdi mdi-folder",
"dir": "mdi mdi-folder",
"origin": "mdi mdi-source-repository",
"ori": "mdi mdi-source-repository",
"person": "mdi mdi-account",
"revisions history": "mdi mdi-history",
"release": "mdi mdi-tag",
"rel": "mdi mdi-tag",
"releases": "mdi mdi-tag",
"revision": "mdi mdi-rotate-90 mdi-source-commit",
"rev": "mdi mdi-rotate-90 mdi-source-commit",
"snapshot": "mdi mdi-camera",
"snp": "mdi mdi-camera",
"visits": "mdi mdi-calendar-month",
}
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[QueryParameters] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url
def datetime_to_utc(date):
"""Returns datetime in UTC without timezone info
Args:
date (datetime.datetime): input datetime with timezone info
Returns:
datetime.datetime: datetime in UTC without timezone info
"""
if date.tzinfo and date.tzinfo != timezone.utc:
return date.astimezone(tz=timezone.utc)
else:
return date
def parse_iso8601_date_to_utc(iso_date: str) -> datetime:
"""Given an ISO 8601 datetime string, parse the result as UTC datetime.
Returns:
a timezone-aware datetime representing the parsed date
Raises:
swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- 2007-01-14T20:34:22Z
"""
try:
date = parse_date(iso_date)
return datetime_to_utc(date)
except ParseError as e:
raise BadInputExc(e)
def shorten_path(path):
"""Shorten the given path: for each hash present, only return the first
8 characters followed by an ellipsis"""
sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}"
sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}"
ret = re.sub(sha256_re, r"\1...", path)
return re.sub(sha1_re, r"\1...", ret)
def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"):
"""Turns a string representation of an ISO 8601 datetime string
to UTC and format it into a more human readable one.
For instance, from the following input
string: '2017-05-04T13:27:13+02:00' the following one
is returned: '04 May 2017, 11:27 UTC'.
Custom format string may also be provided
as parameter
Args:
iso_date (str): a string representation of an ISO 8601 date
fmt (str): optional date formatting string
Returns:
str: a formatted string representation of the input iso date
"""
if not iso_date:
return iso_date
date = parse_iso8601_date_to_utc(iso_date)
return date.strftime(fmt)
def gen_path_info(path):
"""Function to generate path data navigation for use
with a breadcrumb in the swh web ui.
For instance, from a path /folder1/folder2/folder3,
it returns the following list::
[{'name': 'folder1', 'path': 'folder1'},
{'name': 'folder2', 'path': 'folder1/folder2'},
{'name': 'folder3', 'path': 'folder1/folder2/folder3'}]
Args:
path: a filesystem path
Returns:
list: a list of path data for navigation as illustrated above.
"""
path_info = []
if path:
sub_paths = path.strip("/").split("/")
path_from_root = ""
for p in sub_paths:
path_from_root += "/" + p
path_info.append({"name": p, "path": path_from_root.strip("/")})
return path_info
def parse_rst(text, report_level=2):
"""
Parse a reStructuredText string with docutils.
Args:
text (str): string with reStructuredText markups in it
report_level (int): level of docutils report messages to print
(1 info 2 warning 3 error 4 severe 5 none)
Returns:
docutils.nodes.document: a parsed docutils document
"""
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components
).get_default_values()
settings.report_level = report_level
document = docutils.utils.new_document("rst-doc", settings=settings)
parser.parse(text, document)
return document
def get_client_ip(request):
"""
Return the client IP address from an incoming HTTP request.
Args:
request (django.http.HttpRequest): the incoming HTTP request
Returns:
str: The client IP address
"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip
+def is_swh_web_development(request: HttpRequest) -> bool:
+ """Indicate if we are running a development version of swh-web.
+ """
+ site_base_url = request.build_absolute_uri("/")
+ return any(
+ host in site_base_url for host in ("localhost", "127.0.0.1", "testserver")
+ )
+
+
+def is_swh_web_staging(request: HttpRequest) -> bool:
+ """Indicate if we are running a staging version of swh-web.
+ """
+ config = get_config()
+ site_base_url = request.build_absolute_uri("/")
+ return any(
+ server_name in site_base_url for server_name in config["staging_server_names"]
+ )
+
+
+def is_swh_web_production(request: HttpRequest) -> bool:
+ """Indicate if we are running the public production version of swh-web.
+ """
+ return SWH_WEB_SERVER_NAME in request.build_absolute_uri("/")
+
+
browsers_supported_image_mimes = set(
[
"image/gif",
"image/png",
"image/jpeg",
"image/bmp",
"image/webp",
"image/svg",
"image/svg+xml",
]
)
def context_processor(request):
"""
Django context processor used to inject variables
in all swh-web templates.
"""
config = get_config()
if (
hasattr(request, "user")
and request.user.is_authenticated
and not hasattr(request.user, "backend")
):
# To avoid django.template.base.VariableDoesNotExist errors
# when rendering templates when standard Django user is logged in.
request.user.backend = "django.contrib.auth.backends.ModelBackend"
- site_base_url = request.build_absolute_uri("/")
+
return {
"swh_object_icons": swh_object_icons,
"available_languages": None,
"swh_client_config": config["client_config"],
"oidc_enabled": bool(config["keycloak"]["server_url"]),
"browsers_supported_image_mimes": browsers_supported_image_mimes,
"keycloak": config["keycloak"],
- "site_base_url": site_base_url,
+ "site_base_url": request.build_absolute_uri("/"),
"DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"],
"status": config["status"],
- "swh_web_dev": "localhost" in site_base_url,
- "swh_web_staging": any(
- [
- server_name in site_base_url
- for server_name in config["staging_server_names"]
- ]
- ),
+ "swh_web_dev": is_swh_web_development(request),
+ "swh_web_staging": is_swh_web_staging(request),
"swh_web_version": get_distribution("swh.web").version,
"iframe_mode": False,
"ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION,
}
def resolve_branch_alias(
snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Resolve branch alias in snapshot content.
Args:
snapshot: a full snapshot content
branch: a branch alias contained in the snapshot
Returns:
The real snapshot branch that got aliased.
"""
while branch and branch["target_type"] == "alias":
if branch["target"] in snapshot["branches"]:
branch = snapshot["branches"][branch["target"]]
else:
from swh.web.common import archive
snp = archive.lookup_snapshot(
snapshot["id"], branches_from=branch["target"], branches_count=1
)
if snp and branch["target"] in snp["branches"]:
branch = snp["branches"][branch["target"]]
else:
branch = None
return branch
class _NoHeaderHTMLTranslator(HTMLTranslator):
"""
Docutils translator subclass to customize the generation of HTML
from reST-formatted docstrings
"""
def __init__(self, document):
super().__init__(document)
self.body_prefix = []
self.body_suffix = []
_HTML_WRITER = Writer()
_HTML_WRITER.translator_class = _NoHeaderHTMLTranslator
def rst_to_html(rst: str) -> str:
"""
Convert reStructuredText document into HTML.
Args:
rst: A string containing a reStructuredText document
Returns:
Body content of the produced HTML conversion.
"""
settings = {
"initial_header_level": 2,
"halt_level": 4,
"traceback": True,
}
pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings)
return f'
{pp["html_body"]}
'
def prettify_html(html: str) -> str:
"""
Prettify an HTML document.
Args:
html: Input HTML document
Returns:
The prettified HTML document
"""
return BeautifulSoup(html, "lxml").prettify()
def _deposits_list_url(
deposits_list_base_url: str, page_size: int, username: Optional[str]
) -> str:
params = {"page_size": str(page_size)}
if username is not None:
params["username"] = username
return f"{deposits_list_base_url}?{urllib.parse.urlencode(params)}"
def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]:
"""Return the list of software deposits using swh-deposit API
"""
config = get_config()["deposit"]
deposits_list_base_url = config["private_api_url"] + "deposits"
deposits_list_auth = HTTPBasicAuth(
config["private_api_user"], config["private_api_password"]
)
deposits_list_url = _deposits_list_url(
deposits_list_base_url, page_size=1, username=username
)
nb_deposits = requests.get(
deposits_list_url, auth=deposits_list_auth, timeout=30
).json()["count"]
deposits_data = cache.get(f"swh-deposit-list-{username}")
if not deposits_data or deposits_data["count"] != nb_deposits:
deposits_list_url = _deposits_list_url(
deposits_list_base_url, page_size=nb_deposits, username=username
)
deposits_data = requests.get(
deposits_list_url, auth=deposits_list_auth, timeout=30,
).json()
cache.set(f"swh-deposit-list-{username}", deposits_data)
return deposits_data["results"]
def origin_visit_types() -> List[str]:
"""Return the exhaustive list of visit types for origins
ingested into the archive.
"""
try:
return sorted(search().visit_types_count().keys())
except Exception:
return []
def redirect_to_new_route(request, new_route, permanent=True):
"""Redirect a request to another route with url args and query parameters
eg: /origin//log?path=test can be redirected as
/log?url=&path=test. This can be used to deprecate routes
"""
request_path = resolve(request.path_info)
args = {**request_path.kwargs, **request.GET.dict()}
return redirect(reverse(new_route, query_params=args), permanent=permanent,)
diff --git a/swh/web/config.py b/swh/web/config.py
index 8b62d009..432b034f 100644
--- a/swh/web/config.py
+++ b/swh/web/config.py
@@ -1,208 +1,209 @@
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from typing import Any, Dict
from swh.core import config
from swh.counters import get_counters
from swh.indexer.storage import get_indexer_storage
from swh.scheduler import get_scheduler
from swh.search import get_search
from swh.storage import get_storage
from swh.vault import get_vault
from swh.web import settings
+SWH_WEB_SERVER_NAME = "archive.softwareheritage.org"
SWH_WEB_INTERNAL_SERVER_NAME = "archive.internal.softwareheritage.org"
-STAGING_SERVER_NAMES = [
+SWH_WEB_STAGING_SERVER_NAMES = [
"webapp.staging.swh.network",
"webapp.internal.staging.swh.network",
]
SETTINGS_DIR = os.path.dirname(settings.__file__)
DEFAULT_CONFIG = {
"allowed_hosts": ("list", []),
"storage": (
"dict",
{"cls": "remote", "url": "http://127.0.0.1:5002/", "timeout": 10,},
),
"indexer_storage": (
"dict",
{"cls": "remote", "url": "http://127.0.0.1:5007/", "timeout": 1,},
),
"counters": (
"dict",
{"cls": "remote", "url": "http://127.0.0.1:5011/", "timeout": 1,},
),
"search": (
"dict",
{"cls": "remote", "url": "http://127.0.0.1:5010/", "timeout": 10,},
),
"search_config": (
"dict",
{"metadata_backend": "swh-indexer-storage",}, # or "swh-search"
),
"log_dir": ("string", "/tmp/swh/log"),
"debug": ("bool", False),
"serve_assets": ("bool", False),
"host": ("string", "127.0.0.1"),
"port": ("int", 5004),
"secret_key": ("string", "development key"),
# do not display code highlighting for content > 1MB
"content_display_max_size": ("int", 5 * 1024 * 1024),
"snapshot_content_max_size": ("int", 1000),
"throttling": (
"dict",
{
"cache_uri": None, # production: memcached as cache (127.0.0.1:11211)
# development: in-memory cache so None
"scopes": {
"swh_api": {
"limiter_rate": {"default": "120/h"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_api_origin_search": {
"limiter_rate": {"default": "10/m"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_vault_cooking": {
"limiter_rate": {"default": "120/h", "GET": "60/m"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_save_origin": {
"limiter_rate": {"default": "120/h", "POST": "10/h"},
"exempted_networks": ["127.0.0.0/8"],
},
"swh_api_origin_visit_latest": {
"limiter_rate": {"default": "700/m"},
"exempted_networks": ["127.0.0.0/8"],
},
},
},
),
"vault": ("dict", {"cls": "remote", "args": {"url": "http://127.0.0.1:5005/",}}),
"scheduler": ("dict", {"cls": "remote", "url": "http://127.0.0.1:5008/"}),
"development_db": ("string", os.path.join(SETTINGS_DIR, "db.sqlite3")),
"test_db": ("dict", {"name": "swh-web-test"}),
"production_db": ("dict", {"name": "swh-web"}),
"deposit": (
"dict",
{
"private_api_url": "https://deposit.softwareheritage.org/1/private/",
"private_api_user": "swhworker",
"private_api_password": "some-password",
},
),
"e2e_tests_mode": ("bool", False),
"es_workers_index_url": ("string", ""),
"history_counters_url": (
"string",
(
"http://counters1.internal.softwareheritage.org:5011"
"/counters_history/history.json"
),
),
"client_config": ("dict", {}),
"keycloak": ("dict", {"server_url": "", "realm_name": ""}),
"graph": (
"dict",
{
"server_url": "http://graph.internal.softwareheritage.org:5009/graph/",
"max_edges": {"staff": 0, "user": 100000, "anonymous": 1000},
},
),
"status": (
"dict",
{
"server_url": "https://status.softwareheritage.org/",
"json_path": "1.0/status/578e5eddcdc0cc7951000520",
},
),
"counters_backend": ("string", "swh-storage"), # or "swh-counters"
- "staging_server_names": ("list", STAGING_SERVER_NAMES),
+ "staging_server_names": ("list", SWH_WEB_STAGING_SERVER_NAMES),
"instance_name": ("str", "archive-test.softwareheritage.org"),
"give": ("dict", {"public_key": "", "token": ""}),
}
swhweb_config: Dict[str, Any] = {}
def get_config(config_file="web/web"):
"""Read the configuration file `config_file`.
If an environment variable SWH_CONFIG_FILENAME is defined, this
takes precedence over the config_file parameter.
In any case, update the app with parameters (secret_key, conf)
and return the parsed configuration as a dict.
If no configuration file is provided, return a default
configuration.
"""
if not swhweb_config:
config_filename = os.environ.get("SWH_CONFIG_FILENAME")
if config_filename:
config_file = config_filename
cfg = config.load_named_config(config_file, DEFAULT_CONFIG)
swhweb_config.update(cfg)
config.prepare_folders(swhweb_config, "log_dir")
if swhweb_config.get("search"):
swhweb_config["search"] = get_search(**swhweb_config["search"])
else:
swhweb_config["search"] = None
swhweb_config["storage"] = get_storage(**swhweb_config["storage"])
swhweb_config["vault"] = get_vault(**swhweb_config["vault"])
swhweb_config["indexer_storage"] = get_indexer_storage(
**swhweb_config["indexer_storage"]
)
swhweb_config["scheduler"] = get_scheduler(**swhweb_config["scheduler"])
swhweb_config["counters"] = get_counters(**swhweb_config["counters"])
return swhweb_config
def search():
"""Return the current application's search.
"""
return get_config()["search"]
def storage():
"""Return the current application's storage.
"""
return get_config()["storage"]
def vault():
"""Return the current application's vault.
"""
return get_config()["vault"]
def indexer_storage():
"""Return the current application's indexer storage.
"""
return get_config()["indexer_storage"]
def scheduler():
"""Return the current application's scheduler.
"""
return get_config()["scheduler"]
def counters():
"""Return the current application's counters.
"""
return get_config()["counters"]
diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py
index dee4d6ac..31593a48 100644
--- a/swh/web/tests/common/test_utils.py
+++ b/swh/web/tests/common/test_utils.py
@@ -1,299 +1,316 @@
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from base64 import b64encode
import datetime
from urllib.parse import quote
import pytest
from django.conf.urls import url
from django.test.utils import override_settings
from django.urls.exceptions import NoReverseMatch
from swh.web.common import utils
from swh.web.common.exc import BadInputExc
-from swh.web.config import get_config
+from swh.web.config import SWH_WEB_SERVER_NAME, SWH_WEB_STAGING_SERVER_NAMES, get_config
def test_shorten_path_noop():
noops = ["/api/", "/browse/", "/content/symbol/foobar/"]
for noop in noops:
assert utils.shorten_path(noop) == noop
def test_shorten_path_sha1():
sha1 = "aafb16d69fd30ff58afdd69036a26047f3aebdc6"
short_sha1 = sha1[:8] + "..."
templates = [
"/api/1/content/sha1:%s/",
"/api/1/content/sha1_git:%s/",
"/api/1/directory/%s/",
"/api/1/content/sha1:%s/ctags/",
]
for template in templates:
assert utils.shorten_path(template % sha1) == template % short_sha1
def test_shorten_path_sha256():
sha256 = "aafb16d69fd30ff58afdd69036a26047" "213add102934013a014dfca031c41aef"
short_sha256 = sha256[:8] + "..."
templates = [
"/api/1/content/sha256:%s/",
"/api/1/directory/%s/",
"/api/1/content/sha256:%s/filetype/",
]
for template in templates:
assert utils.shorten_path(template % sha256) == template % short_sha256
@pytest.mark.parametrize(
"input_timestamp, output_date",
[
(
"2016-01-12",
datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc),
),
(
"2016-01-12T09:19:12+0100",
datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc),
),
(
"2007-01-14T20:34:22Z",
datetime.datetime(2007, 1, 14, 20, 34, 22, tzinfo=datetime.timezone.utc),
),
],
)
def test_parse_iso8601_date_to_utc_ok(input_timestamp, output_date):
assert utils.parse_iso8601_date_to_utc(input_timestamp) == output_date
@pytest.mark.parametrize(
"invalid_iso8601_timestamp", ["Today is January 1, 2047 at 8:21:00AM", "1452591542"]
)
def test_parse_iso8601_date_to_utc_ko(invalid_iso8601_timestamp):
with pytest.raises(BadInputExc):
utils.parse_iso8601_date_to_utc(invalid_iso8601_timestamp)
def test_format_utc_iso_date():
assert (
utils.format_utc_iso_date("2017-05-04T13:27:13+02:00")
== "04 May 2017, 11:27 UTC"
)
def test_gen_path_info():
input_path = "/home/user/swh-environment/swh-web/"
expected_result = [
{"name": "home", "path": "home"},
{"name": "user", "path": "home/user"},
{"name": "swh-environment", "path": "home/user/swh-environment"},
{"name": "swh-web", "path": "home/user/swh-environment/swh-web"},
]
path_info = utils.gen_path_info(input_path)
assert path_info == expected_result
input_path = "home/user/swh-environment/swh-web"
path_info = utils.gen_path_info(input_path)
assert path_info == expected_result
def test_rst_to_html():
rst = (
"Section\n"
"=======\n\n"
"**Some strong text**\n\n"
"* This is a bulleted list.\n"
"* It has two items, the second\n"
" item uses two lines.\n"
"\n"
"1. This is a numbered list.\n"
"2. It has two items too.\n"
"\n"
"#. This is a numbered list.\n"
"#. It has two items too.\n"
)
expected_html = (
'Section
\n'
"
Some strong text
\n"
'
\n"
'
\n'
"This is a numbered list.
\n"
"It has two items too.
\n"
"This is a numbered list.
\n"
"It has two items too.
\n"
"
\n"
"
"
)
assert utils.rst_to_html(rst) == expected_html
def sample_test_view(request, string, number):
pass
def sample_test_view_no_url_args(request):
pass
urlpatterns = [
url(
r"^sample/test/(?P.+)/view/(?P[0-9]+)/$",
sample_test_view,
name="sample-test-view",
),
url(
r"^sample/test/view/no/url/args/$",
sample_test_view_no_url_args,
name="sample-test-view-no-url-args",
),
]
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_url_args_only_ok():
string = "foo"
number = 55
url = utils.reverse(
"sample-test-view", url_args={"string": string, "number": number}
)
assert url == f"/sample/test/{string}/view/{number}/"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_url_args_only_ko():
string = "foo"
with pytest.raises(NoReverseMatch):
utils.reverse("sample-test-view", url_args={"string": string, "number": string})
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_no_url_args():
url = utils.reverse("sample-test-view-no-url-args")
assert url == "/sample/test/view/no/url/args/"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_query_params_only():
start = 0
scope = "foo"
url = utils.reverse(
"sample-test-view-no-url-args", query_params={"start": start, "scope": scope}
)
assert url == f"/sample/test/view/no/url/args/?scope={scope}&start={start}"
url = utils.reverse(
"sample-test-view-no-url-args", query_params={"start": start, "scope": None}
)
assert url == f"/sample/test/view/no/url/args/?start={start}"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_query_params_encode():
libname = "libstc++"
url = utils.reverse(
"sample-test-view-no-url-args", query_params={"libname": libname}
)
assert url == f"/sample/test/view/no/url/args/?libname={quote(libname, safe='/;:')}"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_url_args_query_params():
string = "foo"
number = 55
start = 10
scope = "bar"
url = utils.reverse(
"sample-test-view",
url_args={"string": string, "number": number},
query_params={"start": start, "scope": scope},
)
assert url == f"/sample/test/{string}/view/{number}/?scope={scope}&start={start}"
@override_settings(ROOT_URLCONF=__name__)
def test_reverse_absolute_uri(request_factory):
request = request_factory.get(utils.reverse("sample-test-view-no-url-args"))
url = utils.reverse("sample-test-view-no-url-args", request=request)
assert url == f"http://{request.META['SERVER_NAME']}/sample/test/view/no/url/args/"
def test_get_deposits_list(requests_mock):
deposits_data = {
"count": 2,
"results": [
{
"check_task_id": "351820217",
"client": 2,
"collection": 1,
"complete_date": "2021-01-21T07:52:19.919312Z",
"external_id": "hal-03116143",
"id": 1412,
"load_task_id": "351820260",
"origin_url": "https://hal.archives-ouvertes.fr/hal-03116143",
"parent": None,
"reception_date": "2021-01-21T07:52:19.471019Z",
"status": "done",
"status_detail": None,
"swhid": "swh:1:dir:f25157ad1b13cb20ac3457d4f6756b49ac63d079",
},
{
"check_task_id": "381576507",
"client": 2,
"collection": 1,
"complete_date": "2021-07-07T08:00:44.726676Z",
"external_id": "hal-03275052",
"id": 1693,
"load_task_id": "381576508",
"origin_url": "https://hal.archives-ouvertes.fr/hal-03275052",
"parent": None,
"reception_date": "2021-07-07T08:00:44.327661Z",
"status": "done",
"status_detail": None,
"swhid": "swh:1:dir:825fa96d1810177ec08a772ffa5bd34bbd08b89c",
},
],
}
config = get_config()["deposit"]
deposits_list_url = config["private_api_url"] + "deposits"
basic_auth_payload = (
config["private_api_user"] + ":" + config["private_api_password"]
).encode()
requests_mock.get(
deposits_list_url,
json=deposits_data,
request_headers={
"Authorization": f"Basic {b64encode(basic_auth_payload).decode('ascii')}"
},
)
assert utils.get_deposits_list() == deposits_data["results"]
@pytest.mark.parametrize("backend", ["swh-search", "swh-storage"])
def test_origin_visit_types(mocker, backend):
if backend != "swh-search":
# equivalent to not configuring search in the config
search = mocker.patch("swh.web.common.utils.search")
search.return_value = None
assert utils.origin_visit_types() == []
else:
# see swh/web/tests/data.py for origins added for tests
assert utils.origin_visit_types() == ["git", "tar"]
+
+
+@pytest.mark.parametrize("server_name", ["localhost", "127.0.0.1", "testserver"])
+def test_is_swh_web_development(request_factory, server_name):
+ request = request_factory.get("/", SERVER_NAME=server_name)
+ assert utils.is_swh_web_development(request)
+
+
+@pytest.mark.parametrize("server_name", SWH_WEB_STAGING_SERVER_NAMES)
+def test_is_swh_web_staging(request_factory, server_name):
+ request = request_factory.get("/", SERVER_NAME=server_name)
+ assert utils.is_swh_web_staging(request)
+
+
+def test_is_swh_web_production(request_factory):
+ request = request_factory.get("/", SERVER_NAME=SWH_WEB_SERVER_NAME)
+ assert utils.is_swh_web_production(request)
diff --git a/swh/web/tests/test_templates.py b/swh/web/tests/test_templates.py
index a053a4ce..10f53ac2 100644
--- a/swh/web/tests/test_templates.py
+++ b/swh/web/tests/test_templates.py
@@ -1,91 +1,96 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from copy import deepcopy
import random
from pkg_resources import get_distribution
import pytest
from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION
from swh.web.common.utils import reverse
-from swh.web.config import STAGING_SERVER_NAMES, get_config
+from swh.web.config import SWH_WEB_SERVER_NAME, SWH_WEB_STAGING_SERVER_NAMES, get_config
from swh.web.tests.django_asserts import assert_contains, assert_not_contains
from swh.web.tests.utils import check_http_get_response, create_django_permission
swh_web_version = get_distribution("swh.web").version
def test_layout_without_ribbon(client):
url = reverse("swh-web-homepage")
- resp = check_http_get_response(client, url, status_code=200)
+ resp = check_http_get_response(
+ client, url, status_code=200, server_name=SWH_WEB_SERVER_NAME
+ )
assert_not_contains(resp, "swh-corner-ribbon")
def test_layout_with_staging_ribbon(client):
url = reverse("swh-web-homepage")
resp = check_http_get_response(
- client, url, status_code=200, server_name=random.choice(STAGING_SERVER_NAMES),
+ client,
+ url,
+ status_code=200,
+ server_name=random.choice(SWH_WEB_STAGING_SERVER_NAMES),
)
assert_contains(resp, "swh-corner-ribbon")
assert_contains(resp, f"Staging
v{swh_web_version}")
def test_layout_with_development_ribbon(client):
url = reverse("swh-web-homepage")
resp = check_http_get_response(
client, url, status_code=200, server_name="localhost",
)
assert_contains(resp, "swh-corner-ribbon")
assert_contains(resp, f"Development
v{swh_web_version.split('+')[0]}")
def test_layout_with_oidc_auth_enabled(client):
url = reverse("swh-web-homepage")
resp = check_http_get_response(client, url, status_code=200)
assert_contains(resp, reverse("oidc-login"))
def test_layout_without_oidc_auth_enabled(client, mocker):
config = deepcopy(get_config())
config["keycloak"]["server_url"] = ""
mock_get_config = mocker.patch("swh.web.common.utils.get_config")
mock_get_config.return_value = config
url = reverse("swh-web-homepage")
resp = check_http_get_response(client, url, status_code=200)
assert_contains(resp, reverse("login"))
def test_layout_swh_web_version_number_display(client):
url = reverse("swh-web-homepage")
resp = check_http_get_response(client, url, status_code=200)
assert_contains(resp, f"swh-web v{swh_web_version}")
@pytest.mark.django_db
def test_layout_no_deposit_admin_for_anonymous_user(client):
url = reverse("swh-web-homepage")
resp = check_http_get_response(client, url, status_code=200)
assert_not_contains(resp, "swh-deposit-admin-link")
@pytest.mark.django_db
def test_layout_deposit_admin_for_staff_user(client, staff_user):
client.force_login(staff_user)
url = reverse("swh-web-homepage")
resp = check_http_get_response(client, url, status_code=200)
assert_contains(resp, "swh-deposit-admin-link")
@pytest.mark.django_db
def test_layout_deposit_admin_for_user_with_permission(client, regular_user):
regular_user.user_permissions.add(
create_django_permission(ADMIN_LIST_DEPOSIT_PERMISSION)
)
client.force_login(regular_user)
url = reverse("swh-web-homepage")
resp = check_http_get_response(client, url, status_code=200)
assert_contains(resp, "swh-deposit-admin-link")