Changeset View
Changeset View
Standalone View
Standalone View
swh/web/common/utils.py
Show All 17 Lines | |||||
from prometheus_client.registry import CollectorRegistry | from prometheus_client.registry import CollectorRegistry | ||||
import requests | import requests | ||||
from requests.auth import HTTPBasicAuth | from requests.auth import HTTPBasicAuth | ||||
from django.core.cache import cache | from django.core.cache import cache | ||||
from django.http import HttpRequest, QueryDict | from django.http import HttpRequest, QueryDict | ||||
from django.urls import reverse as django_reverse | from django.urls import reverse as django_reverse | ||||
from swh.web.auth.utils import ADMIN_LIST_DEPOSIT_PERMISSION | |||||
from swh.web.common.exc import BadInputExc | from swh.web.common.exc import BadInputExc | ||||
from swh.web.common.typing import QueryParameters | from swh.web.common.typing import QueryParameters | ||||
from swh.web.config import get_config, search | from swh.web.config import get_config, search | ||||
SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) | SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) | ||||
swh_object_icons = { | swh_object_icons = { | ||||
"alias": "mdi mdi-star", | "alias": "mdi mdi-star", | ||||
▲ Show 20 Lines • Show All 246 Lines • ▼ Show 20 Lines | return { | ||||
"swh_web_staging": any( | "swh_web_staging": any( | ||||
[ | [ | ||||
server_name in site_base_url | server_name in site_base_url | ||||
for server_name in config["staging_server_names"] | for server_name in config["staging_server_names"] | ||||
] | ] | ||||
), | ), | ||||
"swh_web_version": get_distribution("swh.web").version, | "swh_web_version": get_distribution("swh.web").version, | ||||
"iframe_mode": False, | "iframe_mode": False, | ||||
"ADMIN_LIST_DEPOSIT_PERMISSION": ADMIN_LIST_DEPOSIT_PERMISSION, | |||||
} | } | ||||
def resolve_branch_alias( | def resolve_branch_alias( | ||||
snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] | snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] | ||||
) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
""" | """ | ||||
Resolve branch alias in snapshot content. | Resolve branch alias in snapshot content. | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | Args: | ||||
html: Input HTML document | html: Input HTML document | ||||
Returns: | Returns: | ||||
The prettified HTML document | The prettified HTML document | ||||
""" | """ | ||||
return BeautifulSoup(html, "lxml").prettify() | return BeautifulSoup(html, "lxml").prettify() | ||||
def get_deposits_list() -> List[Dict[str, Any]]: | def _deposits_list_url( | ||||
deposits_list_base_url: str, page_size: int, username: Optional[str] | |||||
) -> str: | |||||
deposits_list_url = f"{deposits_list_base_url}?page_size={page_size}" | |||||
if username is not None: | |||||
deposits_list_url += f"&username={username}" | |||||
return deposits_list_url | |||||
def get_deposits_list(username: Optional[str] = None) -> List[Dict[str, Any]]: | |||||
"""Return the list of software deposits using swh-deposit API | """Return the list of software deposits using swh-deposit API | ||||
""" | """ | ||||
config = get_config()["deposit"] | config = get_config()["deposit"] | ||||
deposits_list_url = config["private_api_url"] + "deposits" | deposits_list_base_url = config["private_api_url"] + "deposits" | ||||
deposits_list_auth = HTTPBasicAuth( | deposits_list_auth = HTTPBasicAuth( | ||||
config["private_api_user"], config["private_api_password"] | config["private_api_user"], config["private_api_password"] | ||||
) | ) | ||||
deposits_list_url = _deposits_list_url( | |||||
deposits_list_base_url, page_size=1, username=username | |||||
) | |||||
nb_deposits = requests.get( | nb_deposits = requests.get( | ||||
"%s?page_size=1" % deposits_list_url, auth=deposits_list_auth, timeout=30 | deposits_list_url, auth=deposits_list_auth, timeout=30 | ||||
).json()["count"] | ).json()["count"] | ||||
deposits_data = cache.get("swh-deposit-list") | deposits_data = cache.get(f"swh-deposit-list-{username}") | ||||
if not deposits_data or deposits_data["count"] != nb_deposits: | if not deposits_data or deposits_data["count"] != nb_deposits: | ||||
deposits_list_url = _deposits_list_url( | |||||
deposits_list_base_url, page_size=nb_deposits, username=username | |||||
) | |||||
deposits_data = requests.get( | deposits_data = requests.get( | ||||
"%s?page_size=%s" % (deposits_list_url, nb_deposits), | deposits_list_url, auth=deposits_list_auth, timeout=30, | ||||
auth=deposits_list_auth, | |||||
timeout=30, | |||||
).json() | ).json() | ||||
cache.set("swh-deposit-list", deposits_data) | cache.set(f"swh-deposit-list-{username}", deposits_data) | ||||
return deposits_data["results"] | return deposits_data["results"] | ||||
def origin_visit_types() -> List[str]: | def origin_visit_types() -> List[str]: | ||||
"""Return the exhaustive list of visit types for origins | """Return the exhaustive list of visit types for origins | ||||
ingested into the archive. | ingested into the archive. | ||||
""" | """ | ||||
try: | try: | ||||
return sorted(search().visit_types_count().keys()) | return sorted(search().visit_types_count().keys()) | ||||
except Exception: | except Exception: | ||||
return [] | return [] |