diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py index 122b945e..c56c0b95 100644 --- a/swh/web/common/identifiers.py +++ b/swh/web/common/identifiers.py @@ -1,168 +1,184 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Dict, Iterable, List, Optional +from typing_extensions import TypedDict + from django.http import QueryDict from swh.model.exceptions import ValidationError from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import ( persistent_identifier, parse_persistent_identifier, - CONTENT, DIRECTORY, ORIGIN, RELEASE, REVISION, SNAPSHOT + CONTENT, DIRECTORY, ORIGIN, RELEASE, REVISION, SNAPSHOT, + PersistentId ) from swh.web.common.exc import BadInputExc +from swh.web.common.typing import QueryParameters from swh.web.common.utils import reverse -def get_swh_persistent_id(object_type, object_id, scheme_version=1): +def get_swh_persistent_id(object_type: str, object_id: str, + scheme_version: int = 1) -> str: """ Returns the persistent identifier for a swh object based on: * the object type * the object id * the swh identifiers scheme version Args: - object_type (str): the swh object type + object_type: the swh object type (content/directory/release/revision/snapshot) - object_id (str): the swh object id (hexadecimal representation + object_id: the swh object id (hexadecimal representation of its hash value) - scheme_version (int): the scheme version of the swh + scheme_version: the scheme version of the swh persistent identifiers Returns: - str: the swh object persistent identifier + the swh object persistent identifier Raises: BadInputExc: if the provided parameters do not enable to generate a valid identifier """ try: swh_id = persistent_identifier(object_type, object_id, scheme_version) except ValidationError as e: raise BadInputExc('Invalid object (%s) for swh persistent id. %s' % (object_id, e)) else: return swh_id -def resolve_swh_persistent_id(swh_id, query_params=None): +ResolvedPersistentId = TypedDict('ResolvedPersistentId', { + 'swh_id_parsed': PersistentId, + 'browse_url': Optional[str] +}) + + +def resolve_swh_persistent_id(swh_id: str, + query_params: Optional[QueryParameters] = None + ) -> ResolvedPersistentId: """ Try to resolve a Software Heritage persistent id into an url for - browsing the pointed object. + browsing the targeted object. Args: - swh_id (str): a Software Heritage persistent identifier - query_params (django.http.QueryDict): optional dict filled with + swh_id: a Software Heritage persistent identifier + query_params: optional dict filled with query parameters to append to the browse url Returns: - dict: a dict with the following keys: + a dict with the following keys: - * **swh_id_parsed (swh.model.identifiers.PersistentId)**: - the parsed identifier - * **browse_url (str)**: the url for browsing the pointed object + * **swh_id_parsed**: the parsed identifier + * **browse_url**: the url for browsing the targeted object """ swh_id_parsed = get_persistent_identifier(swh_id) object_type = swh_id_parsed.object_type object_id = swh_id_parsed.object_id browse_url = None query_dict = QueryDict('', mutable=True) if query_params and len(query_params) > 0: for k in sorted(query_params.keys()): query_dict[k] = query_params[k] if 'origin' in swh_id_parsed.metadata: query_dict['origin'] = swh_id_parsed.metadata['origin'] if object_type == CONTENT: query_string = 'sha1_git:' + object_id fragment = '' if 'lines' in swh_id_parsed.metadata: lines = swh_id_parsed.metadata['lines'].split('-') fragment += '#L' + lines[0] if len(lines) > 1: fragment += '-L' + lines[1] browse_url = reverse('browse-content', url_args={'query_string': query_string}, query_params=query_dict) + fragment elif object_type == DIRECTORY: browse_url = reverse('browse-directory', url_args={'sha1_git': object_id}, query_params=query_dict) elif object_type == RELEASE: browse_url = reverse('browse-release', url_args={'sha1_git': object_id}, query_params=query_dict) elif object_type == REVISION: browse_url = reverse('browse-revision', url_args={'sha1_git': object_id}, query_params=query_dict) elif object_type == SNAPSHOT: browse_url = reverse('browse-snapshot', url_args={'snapshot_id': object_id}, query_params=query_dict) elif object_type == ORIGIN: raise BadInputExc(('Origin PIDs (Persistent Identifiers) are not ' 'publicly resolvable because they are for ' 'internal usage only')) - return {'swh_id_parsed': swh_id_parsed, - 'browse_url': browse_url} + return { + 'swh_id_parsed': swh_id_parsed, + 'browse_url': browse_url + } -def get_persistent_identifier(persistent_id): +def get_persistent_identifier(persistent_id: str) -> PersistentId: """Check if a persistent identifier is valid. - Args: - persistent_id: A string representing a Software Heritage - persistent identifier. + Args: + persistent_id: A string representing a Software Heritage + persistent identifier. - Raises: - BadInputExc: if the provided persistent identifier can - not be parsed. + Raises: + BadInputExc: if the provided persistent identifier can + not be parsed. - Return: - A persistent identifier object. + Return: + A persistent identifier object. """ try: pid_object = parse_persistent_identifier(persistent_id) except ValidationError as ve: raise BadInputExc('Error when parsing identifier: %s' % ' '.join(ve.messages)) else: return pid_object -def group_swh_persistent_identifiers(persistent_ids): +def group_swh_persistent_identifiers(persistent_ids: Iterable[PersistentId] + ) -> Dict[str, List[bytes]]: """ Groups many Software Heritage persistent identifiers into a dictionary depending on their type. Args: - persistent_ids (list): a list of Software Heritage persistent - identifier objects + persistent_ids: an iterable of Software Heritage persistent + identifier objects Returns: A dictionary with: keys: persistent identifier types - values: list(bytes) persistent identifiers id + values: persistent identifiers id Raises: BadInputExc: if one of the provided persistent identifier can not be parsed. """ - pids_by_type = { + pids_by_type: Dict[str, List[bytes]] = { CONTENT: [], DIRECTORY: [], REVISION: [], RELEASE: [], SNAPSHOT: [] } for pid in persistent_ids: obj_id = pid.object_id obj_type = pid.object_type pids_by_type[obj_type].append(hash_to_bytes(obj_id)) return pids_by_type diff --git a/swh/web/common/typing.py b/swh/web/common/typing.py new file mode 100644 index 00000000..01ba4af4 --- /dev/null +++ b/swh/web/common/typing.py @@ -0,0 +1,10 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Union + +from django.http import QueryDict + +QueryParameters = Union[Dict[str, Any], QueryDict] diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index b262b88e..561b6989 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,341 +1,342 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re from datetime import datetime, timezone from dateutil import parser as date_parser from dateutil import tz from typing import Optional, Dict, Any import docutils.parsers.rst import docutils.utils from bs4 import BeautifulSoup from docutils.core import publish_parts from docutils.writers.html5_polyglot import Writer, HTMLTranslator from django.urls import reverse as django_reverse from django.http import QueryDict, HttpRequest from prometheus_client.registry import CollectorRegistry from rest_framework.authentication import SessionAuthentication from swh.web.common.exc import BadInputExc +from swh.web.common.typing import QueryParameters from swh.web.config import get_config SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) swh_object_icons = { 'branch': 'fa fa-code-fork', 'branches': 'fa fa-code-fork', 'content': 'fa fa-file-text', 'directory': 'fa fa-folder', 'person': 'fa fa-user', 'revisions history': 'fa fa-history', 'release': 'fa fa-tag', 'releases': 'fa fa-tag', 'revision': 'octicon-git-commit', 'snapshot': 'fa fa-camera', 'visits': 'fa fa-calendar', } def reverse(viewname: str, url_args: Optional[Dict[str, Any]] = None, - query_params: Optional[Dict[str, Any]] = None, + query_params: Optional[QueryParameters] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse(viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app) if query_params: query_params = {k: v for k, v in query_params.items() if v} if query_params and len(query_params) > 0: query_dict = QueryDict('', mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += ('?' + query_dict.urlencode(safe='/;:')) if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo: return date.astimezone(tz.gettz('UTC')).replace(tzinfo=timezone.utc) else: return date def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as UTC datetime. Returns: datetime.datetime: a timezone-aware datetime representing the parsed value or None if the parsing fails. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ if not timestamp: return None try: date = date_parser.parse(timestamp, ignoretz=False, fuzzy=True) return datetime_to_utc(date) except Exception: try: return datetime.utcfromtimestamp(float(timestamp)).replace( tzinfo=timezone.utc) except (ValueError, OverflowError) as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r'([0-9a-f]{8})[0-9a-z]{56}' sha1_re = r'([0-9a-f]{8})[0-9a-f]{32}' ret = re.sub(sha256_re, r'\1...', path) return re.sub(sha1_re, r'\1...', ret) def format_utc_iso_date(iso_date, fmt='%d %B %Y, %H:%M UTC'): """Turns a string representation of an ISO 8601 date string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_timestamp(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip('/').split('/') path_from_root = '' for p in sub_paths: path_from_root += '/' + p path_info.append({'name': p, 'path': path_from_root.strip('/')}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components).get_default_values() settings.report_level = report_level document = docutils.utils.new_document('rst-doc', settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if request.user.is_authenticated and not hasattr(request.user, 'backend'): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = 'django.contrib.auth.backends.ModelBackend' return { 'swh_object_icons': swh_object_icons, 'available_languages': None, 'swh_client_config': config['client_config'], 'oidc_enabled': bool(config['keycloak']['server_url']), } class EnforceCSRFAuthentication(SessionAuthentication): """ Helper class to enforce CSRF validation on a DRF view when a user is not authenticated. """ def authenticate(self, request): user = getattr(request._request, 'user', None) self.enforce_csrf(request) return (user, None) def resolve_branch_alias(snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch['target_type'] == 'alias': if branch['target'] in snapshot['branches']: branch = snapshot['branches'][branch['target']] else: from swh.web.common import service snp = service.lookup_snapshot( snapshot['id'], branches_from=branch['target'], branches_count=1) if snp and branch['target'] in snp['branches']: branch = snp['branches'][branch['target']] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { 'initial_header_level': 2, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
{pp["html_body"]}
' def prettify_html(html: str) -> str: """ Prettify an HTML document. Args: html: Input HTML document Returns: The prettified HTML document """ return BeautifulSoup(html, 'lxml').prettify() diff --git a/swh/web/misc/badges.py b/swh/web/misc/badges.py index 75bc4cde..596d38c9 100644 --- a/swh/web/misc/badges.py +++ b/swh/web/misc/badges.py @@ -1,170 +1,170 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import b64encode from typing import cast, Optional from django.conf.urls import url from django.contrib.staticfiles import finders from django.http import HttpResponse, HttpRequest from pybadges import badge from swh.model.exceptions import ValidationError from swh.model.identifiers import ( persistent_identifier, parse_persistent_identifier, CONTENT, DIRECTORY, ORIGIN, RELEASE, REVISION, SNAPSHOT ) from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.common.identifiers import resolve_swh_persistent_id from swh.web.common.utils import reverse _orange = '#f36a24' _blue = '#0172b2' _red = '#cd5741' _swh_logo_data = None _badge_config = { CONTENT: { 'color': _blue, 'title': 'Archived source file', }, DIRECTORY: { 'color': _blue, 'title': 'Archived source tree', }, ORIGIN: { 'color': _orange, 'title': 'Archived software repository', }, RELEASE: { 'color': _blue, 'title': 'Archived software release', }, REVISION: { 'color': _blue, 'title': 'Archived commit', }, SNAPSHOT: { 'color': _blue, 'title': 'Archived software repository snapshot', }, 'error': { 'color': _red, 'title': 'An error occurred when generating the badge' } } def _get_logo_data() -> str: """ Get data-URI for Software Heritage SVG logo to embed it in the generated badges. """ global _swh_logo_data if _swh_logo_data is None: swh_logo_path = cast(str, finders.find('img/swh-logo-white.svg')) with open(swh_logo_path, 'rb') as swh_logo_file: _swh_logo_data = ('data:image/svg+xml;base64,%s' % b64encode(swh_logo_file.read()).decode('ascii')) return _swh_logo_data def _swh_badge(request: HttpRequest, object_type: str, object_id: str, object_pid: Optional[str] = '') -> HttpResponse: """ Generate a Software Heritage badge for a given object type and id. Args: request: input http request object_type: The type of swh object to generate a badge for, either *content*, *directory*, *revision*, *release*, *origin* or *snapshot* object_id: The id of the swh object, either an url for origin type or a *sha1* for other object types object_pid: If provided, the object persistent identifier will not be recomputed Returns: HTTP response with content type *image/svg+xml* containing the SVG badge data. If the provided parameters are invalid, HTTP 400 status code will be returned. If the object can not be found in the archive, HTTP 404 status code will be returned. """ left_text = 'error' - whole_link = '' + whole_link = None try: if object_type == ORIGIN: service.lookup_origin({'url': object_id}) right_text = 'repository' whole_link = reverse('browse-origin', url_args={'origin_url': object_id}) else: # when pid is provided, object type and id will be parsed # from it if object_pid: parsed_pid = parse_persistent_identifier(object_pid) object_type = parsed_pid.object_type object_id = parsed_pid.object_id swh_object = service.lookup_object(object_type, object_id) if object_pid: right_text = object_pid else: right_text = persistent_identifier(object_type, object_id) whole_link = resolve_swh_persistent_id(right_text)['browse_url'] # remove pid metadata if any for badge text if object_pid: right_text = right_text.split(';')[0] # use release name for badge text if object_type == RELEASE: right_text = 'release %s' % swh_object['name'] left_text = 'archived' except (BadInputExc, ValidationError): right_text = f'invalid {object_type if object_type else "object"} id' object_type = 'error' except NotFoundExc: right_text = f'{object_type if object_type else "object"} not found' object_type = 'error' badge_data = badge(left_text=left_text, right_text=right_text, right_color=_badge_config[object_type]['color'], whole_link=request.build_absolute_uri(whole_link), whole_title=_badge_config[object_type]['title'], logo=_get_logo_data(), embed_logo=True) return HttpResponse(badge_data, content_type='image/svg+xml') def _swh_badge_pid(request: HttpRequest, object_pid: str) -> HttpResponse: """ Generate a Software Heritage badge for a given object persistent identifier. Args: request (django.http.HttpRequest): input http request object_pid (str): A swh object persistent identifier Returns: django.http.HttpResponse: An http response with content type *image/svg+xml* containing the SVG badge data. If any error occurs, a status code of 400 will be returned. """ return _swh_badge(request, '', '', object_pid) urlpatterns = [ url(r'^badge/(?P[a-z]+)/(?P.+)/$', _swh_badge, name='swh-badge'), url(r'^badge/(?Pswh:[0-9]+:[a-z]+:[0-9a-f]+.*)/$', _swh_badge_pid, name='swh-badge-pid'), ]