diff --git a/swh/web/api/apidoc.py b/swh/web/api/apidoc.py index 0d484217..a8ca7257 100644 --- a/swh/web/api/apidoc.py +++ b/swh/web/api/apidoc.py @@ -1,440 +1,478 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import functools from functools import wraps import os import re import textwrap from typing import List import docutils.nodes import docutils.parsers.rst import docutils.utils from rest_framework.decorators import api_view from swh.web.api.apiresponse import make_api_response from swh.web.api.apiurls import APIUrls from swh.web.common.utils import parse_rst class _HTTPDomainDocVisitor(docutils.nodes.NodeVisitor): """ - docutils visitor for walking on a parsed rst document containing sphinx + docutils visitor for walking on a parsed docutils document containing sphinx httpdomain roles. Its purpose is to extract relevant info regarding swh api endpoints (for instance url arguments) from their docstring written - using sphinx httpdomain. + using sphinx httpdomain; and produce the main description back into a ReST + string """ # httpdomain roles we want to parse (based on sphinxcontrib.httpdomain 1.6) parameter_roles = ("param", "parameter", "arg", "argument") request_json_object_roles = ("reqjsonobj", "reqjson", "jsonobj", ">json") response_json_array_roles = ("resjsonarr", ">jsonarr") query_parameter_roles = ("queryparameter", "queryparam", "qparam", "query") request_header_roles = ("header", "resheader", "responseheader") status_code_roles = ("statuscode", "status", "code") def __init__(self, document, data): super().__init__(document) self.data = data self.args_set = set() self.params_set = set() self.inputs_set = set() self.returns_set = set() self.status_codes_set = set() self.reqheaders_set = set() self.resheaders_set = set() self.field_list_visited = False self.current_json_obj = None - def process_paragraph(self, par): - """ - Process extracted paragraph text before display. - Cleanup document model markups and transform the - paragraph into a valid raw rst string (as the apidoc - documentation transform rst to html when rendering). - """ - par = par.replace("\n", " ") - # keep emphasized, strong and literal text - par = par.replace("", "*") - par = par.replace("", "*") - par = par.replace("", "**") - par = par.replace("", "**") - par = par.replace("", "``") - par = par.replace("", "``") - # keep links to web pages - if "', - r"`\1 <\2>`_", - par, - ) - # remove parsed document markups but keep rst links - par = re.sub(r"<[^<]+?>(?!`_)", "", par) - # api urls cleanup to generate valid links afterwards - subs_made = 1 - while subs_made: - (par, subs_made) = re.subn(r"(:http:.*)(\(\w+\))", r"\1", par) - subs_made = 1 - while subs_made: - (par, subs_made) = re.subn(r"(:http:.*)(\[.*\])", r"\1", par) - par = re.sub(r"([^:])//", r"\1/", par) - # transform references to api endpoints doc into valid rst links - par = re.sub(":http:get:`([^,`]*)`", r"`\1 <\1doc/>`_", par) - # transform references to some elements into bold text - par = re.sub(":http:header:`(.*)`", r"**\1**", par) - par = re.sub(":func:`(.*)`", r"**\1**", par) - return par + def _default_visit(self, node: docutils.nodes.Element) -> str: + """Simply visits a text node, drops its start and end tags, visits + the children, and concatenates their results.""" + return "".join(map(self.dispatch_visit, node.children)) + + def visit_emphasis(self, node: docutils.nodes.emphasis) -> str: + return f"*{self._default_visit(node)}*" + + def visit_strong(self, node: docutils.nodes.emphasis) -> str: + return f"**{self._default_visit(node)}**" + + def visit_reference(self, node: docutils.nodes.reference) -> str: + text = self._default_visit(node) + refuri = node.attributes.get("refuri") + if refuri is not None: + return f"`{text} <{refuri}>`__" + else: + return f"`{text}`_" + + def visit_target(self, node: docutils.nodes.reference) -> str: + parts = ["\n"] + parts.extend( + f".. _{name}: {node.attributes['refuri']}" + for name in node.attributes["names"] + ) + return "\n".join(parts) + + def visit_literal(self, node: docutils.nodes.literal) -> str: + return f"``{self._default_visit(node)}``" def visit_field_list(self, node): """ Visit parsed rst field lists to extract relevant info regarding api endpoint. """ self.field_list_visited = True for child in node.traverse(): + # TODO: instead of traversing recursively, we should inspect the children + # directly (they can be and directly, or + # a node containing both) + # get the parsed field name if isinstance(child, docutils.nodes.field_name): field_name = child.astext() # parse field text - elif isinstance(child, docutils.nodes.paragraph): - text = self.process_paragraph(str(child)) + elif isinstance(child, docutils.nodes.field_body): + text = self._default_visit(child).strip() + assert text, str(child) field_data = field_name.split(" ") # Parameters if field_data[0] in self.parameter_roles: if field_data[2] not in self.args_set: self.data["args"].append( {"name": field_data[2], "type": field_data[1], "doc": text} ) self.args_set.add(field_data[2]) # Query Parameters if field_data[0] in self.query_parameter_roles: if field_data[2] not in self.params_set: self.data["params"].append( {"name": field_data[2], "type": field_data[1], "doc": text} ) self.params_set.add(field_data[2]) # Request data type if ( field_data[0] in self.request_json_array_roles or field_data[0] in self.request_json_object_roles ): # array if field_data[0] in self.request_json_array_roles: self.data["input_type"] = "array" # object else: self.data["input_type"] = "object" # input object field if field_data[2] not in self.inputs_set: self.data["inputs"].append( {"name": field_data[2], "type": field_data[1], "doc": text} ) self.inputs_set.add(field_data[2]) self.current_json_obj = self.data["inputs"][-1] # Response type if ( field_data[0] in self.response_json_array_roles or field_data[0] in self.response_json_object_roles ): # array if field_data[0] in self.response_json_array_roles: self.data["return_type"] = "array" # object else: self.data["return_type"] = "object" # returned object field if field_data[2] not in self.returns_set: self.data["returns"].append( {"name": field_data[2], "type": field_data[1], "doc": text} ) self.returns_set.add(field_data[2]) self.current_json_obj = self.data["returns"][-1] # Status Codes if field_data[0] in self.status_code_roles: if field_data[1] not in self.status_codes_set: self.data["status_codes"].append( {"code": field_data[1], "doc": text} ) self.status_codes_set.add(field_data[1]) # Request Headers if field_data[0] in self.request_header_roles: if field_data[1] not in self.reqheaders_set: self.data["reqheaders"].append( {"name": field_data[1], "doc": text} ) self.reqheaders_set.add(field_data[1]) # Response Headers if field_data[0] in self.response_header_roles: if field_data[1] not in self.resheaders_set: resheader = {"name": field_data[1], "doc": text} self.data["resheaders"].append(resheader) self.resheaders_set.add(field_data[1]) if ( resheader["name"] == "Content-Type" and resheader["doc"] == "application/octet-stream" ): self.data["return_type"] = "octet stream" - def visit_paragraph(self, node): + # Don't return anything in the description; these nodes only add text + # to other fields + return "" + + # visit_field_list collects and handles these with a more global view: + visit_field = visit_field_name = visit_field_body = _default_visit + + def visit_paragraph(self, node: docutils.nodes.paragraph) -> str: """ Visit relevant paragraphs to parse """ # only parsed top level paragraphs - if isinstance(node.parent, docutils.nodes.block_quote): - text = self.process_paragraph(str(node)) - # endpoint description - if not text.startswith("**") and text not in self.data["description"]: - self.data["description"] += "\n\n" if self.data["description"] else "" - self.data["description"] += text - - def visit_literal_block(self, node): + text = self._default_visit(node) + + return "\n\n" + text + + def visit_literal_block(self, node: docutils.nodes.literal_block) -> str: """ Visit literal blocks """ text = node.astext() - # literal block in endpoint description - if not self.field_list_visited: - self.data["description"] += ":\n\n%s\n" % textwrap.indent(text, "\t") + + return f"\n\n::\n\n{textwrap.indent(text, ' ')}\n" + + def visit_bullet_list(self, node: docutils.nodes.bullet_list) -> str: + parts = ["\n\n"] + for child in node.traverse(): + # process list item + if isinstance(child, docutils.nodes.paragraph): + line_text = self.dispatch_visit(child) + parts.append("\t* %s\n" % textwrap.indent(line_text, "\t ").strip()) + return "".join(parts) + + # visit_bullet_list collects and handles this with a more global view: + visit_list_item = _default_visit + + def visit_warning(self, node: docutils.nodes.warning) -> str: + text = self._default_visit(node) + return "\n\n.. warning::\n%s\n" % textwrap.indent(text, "\t") + + def visit_Text(self, node: docutils.nodes.Text) -> str: + """Leaf node""" + return str(node).replace("\n", " ") # Prettier in generated HTML + + def visit_problematic(self, node: docutils.nodes.problematic) -> str: + # api urls cleanup to generate valid links afterwards + text = self._default_visit(node) + subs_made = 1 + while subs_made: + (text, subs_made) = re.subn(r"(:http:.*)(\(\w+\))", r"\1", text) + subs_made = 1 + while subs_made: + (text, subs_made) = re.subn(r"(:http:.*)(\[.*\])", r"\1", text) + text = re.sub(r"([^:])//", r"\1/", text) + # transform references to api endpoints doc into valid rst links + text = re.sub(":http:get:`([^,`]*)`", r"`\1 <\1doc/>`_", text) + # transform references to some elements into bold text + text = re.sub(":http:header:`(.*)`", r"**\1**", text) + text = re.sub(":func:`(.*)`", r"**\1**", text) + # extract example urls if ":swh_web_api:" in text: - examples_str = re.sub(".*`(.+)`.*", r"/api/1/\1", text) + # Extract examples to their own section + examples_str = re.sub(":swh_web_api:`(.+)`.*", r"/api/1/\1", text) self.data["examples"] += examples_str.split("\n") + return text - def visit_bullet_list(self, node): - # bullet list in endpoint description - if not self.field_list_visited: - self.data["description"] += "\n\n" - for child in node.traverse(): - # process list item - if isinstance(child, docutils.nodes.paragraph): - line_text = self.process_paragraph(str(child)) - self.data["description"] += "\t* %s\n" % line_text - elif self.current_json_obj: - self.current_json_obj["doc"] += "\n\n" - for child in node.traverse(): - # process list item - if isinstance(child, docutils.nodes.paragraph): - line_text = self.process_paragraph(str(child)) - self.current_json_obj["doc"] += "\t\t* %s\n" % line_text - self.current_json_obj = None - - def visit_warning(self, node): - text = self.process_paragraph(str(node)) - rst_warning = "\n\n.. warning::\n%s\n" % textwrap.indent(text, "\t") - if rst_warning not in self.data["description"]: - self.data["description"] += rst_warning - - def unknown_visit(self, node): - pass + def visit_block_quote(self, node: docutils.nodes.block_quote) -> str: + return self._default_visit(node) + return ( + f".. code-block::\n" + f"{textwrap.indent(self._default_visit(node), ' ')}\n" + ) + + def visit_title_reference(self, node: docutils.nodes.title_reference) -> str: + text = self._default_visit(node) + raise Exception( + f"Unexpected title reference. " + f"Possible cause: you used `{text}` instead of ``{text}``" + ) + + def visit_document(self, node: docutils.nodes.document) -> None: + text = self._default_visit(node) + + # Strip examples; they are displayed separately + text = re.split("\n\\*\\*Examples?:\\*\\*\n", text)[0] + + self.data["description"] = text.strip() + + def unknown_visit(self, node) -> str: + raise NotImplementedError( + f"Unknown node type: {node.__class__.__name__}. Value: {node}" + ) def unknown_departure(self, node): pass def _parse_httpdomain_doc(doc, data): doc_lines = doc.split("\n") doc_lines_filtered = [] urls = defaultdict(list) default_http_methods = ["HEAD", "OPTIONS"] # httpdomain is a sphinx extension that is unknown to docutils but # fortunately we can still parse its directives' content, # so remove lines with httpdomain directives before executing the # rst parser from docutils for doc_line in doc_lines: if ".. http" not in doc_line: doc_lines_filtered.append(doc_line) else: url = doc_line[doc_line.find("/") :] # emphasize url arguments for html rendering url = re.sub(r"\((\w+)\)", r" **\(\1\)** ", url) method = re.search(r"http:(\w+)::", doc_line).group(1) urls[url].append(method.upper()) for url, methods in urls.items(): data["urls"].append({"rule": url, "methods": methods + default_http_methods}) # parse the rst docstring and do not print system messages about # unknown httpdomain roles document = parse_rst("\n".join(doc_lines_filtered), report_level=5) # remove the system_message nodes from the parsed document for node in document.traverse(docutils.nodes.system_message): node.parent.remove(node) # visit the document nodes to extract relevant endpoint info visitor = _HTTPDomainDocVisitor(document, data) document.walkabout(visitor) class APIDocException(Exception): """ Custom exception to signal errors in the use of the APIDoc decorators """ def api_doc( route: str, noargs: bool = False, tags: List[str] = [], api_version: str = "1", ): """ Decorator for an API endpoint implementation used to generate a dedicated view displaying its HTML documentation. The documentation will be generated from the endpoint docstring based on sphinxcontrib-httpdomain format. Args: route: documentation page's route noargs: set to True if the route has no arguments, and its result should be displayed anytime its documentation is requested. Default to False tags: Further information on api endpoints. Two values are possibly expected: * hidden: remove the entry points from the listing * upcoming: display the entry point but it is not followable api_version: api version string """ tags_set = set(tags) # @api_doc() Decorator call def decorator(f): # if the route is not hidden, add it to the index if "hidden" not in tags_set: doc_data = get_doc_data(f, route, noargs) doc_desc = doc_data["description"] - first_dot_pos = doc_desc.find(".") APIUrls.add_doc_route( route, - doc_desc[: first_dot_pos + 1], + re.split(r"\.\s", doc_desc)[0], noargs=noargs, api_version=api_version, tags=tags_set, ) # create a dedicated view to display endpoint HTML doc @api_view(["GET", "HEAD"]) @wraps(f) def doc_view(request): doc_data = get_doc_data(f, route, noargs) return make_api_response(request, None, doc_data) route_name = "%s-doc" % route[1:-1].replace("/", "-") urlpattern = f"^{api_version}{route}doc/$" view_name = "api-%s-%s" % (api_version, route_name) APIUrls.add_url_pattern(urlpattern, doc_view, view_name) @wraps(f) def documented_view(request, **kwargs): doc_data = get_doc_data(f, route, noargs) try: return {"data": f(request, **kwargs), "doc_data": doc_data} except Exception as exc: exc.doc_data = doc_data raise exc return documented_view return decorator @functools.lru_cache(maxsize=32) def get_doc_data(f, route, noargs): """ Build documentation data for the decorated api endpoint function """ data = { "description": "", "response_data": None, "urls": [], "args": [], "params": [], "input_type": "", "inputs": [], "resheaders": [], "reqheaders": [], "return_type": "", "returns": [], "status_codes": [], "examples": [], "route": route, "noargs": noargs, } if not f.__doc__: raise APIDocException( "apidoc: expected a docstring" " for function %s" % (f.__name__,) ) # use raw docstring as endpoint documentation if sphinx # httpdomain is not used if ".. http" not in f.__doc__: data["description"] = f.__doc__ # else parse the sphinx httpdomain docstring with docutils # (except when building the swh-web documentation through autodoc # sphinx extension, not needed and raise errors with sphinx >= 1.7) elif "SWH_DOC_BUILD" not in os.environ: _parse_httpdomain_doc(f.__doc__, data) # process input/returned object info for nicer html display inputs_list = "" returns_list = "" for inp in data["inputs"]: # special case for array of non object type, for instance # :jsonarr string -: an array of string if ret["name"] != "-": returns_list += "\t* **%s (%s)**: %s\n" % ( ret["name"], ret["type"], - ret["doc"], + textwrap.indent(ret["doc"], "\t "), ) data["inputs_list"] = inputs_list data["returns_list"] = returns_list return data DOC_COMMON_HEADERS = """ :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` :resheader Content-Type: this depends on :http:header:`Accept` header of request""" DOC_RESHEADER_LINK = """ :resheader Link: indicates that a subsequent result page is available and contains the url pointing to it """ DEFAULT_SUBSTITUTIONS = { "common_headers": DOC_COMMON_HEADERS, "resheader_link": DOC_RESHEADER_LINK, } def format_docstring(**substitutions): def decorator(f): f.__doc__ = f.__doc__.format(**{**DEFAULT_SUBSTITUTIONS, **substitutions}) return f return decorator diff --git a/swh/web/api/views/content.py b/swh/web/api/views/content.py index aade2fe9..5f3e3d9a 100644 --- a/swh/web/api/views/content.py +++ b/swh/web/api/views/content.py @@ -1,409 +1,409 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from django.http import HttpResponse from swh.web.api import utils from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.views.utils import api_lookup from swh.web.common import archive from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse @api_route( r"/content/(?P[0-9a-z_:]*[0-9a-f]+)/filetype/", "api-1-content-filetype", checksum_args=["q"], ) @api_doc("/content/filetype/") @format_docstring() def api_content_filetype(request, q): """ .. http:get:: /api/1/content/[(hash_type):](hash)/filetype/ Get information about the detected MIME type of a content object. :param string hash_type: optional parameter specifying which hashing algorithm has been used to compute the content checksum. It can be either ``sha1``, ``sha1_git``, ``sha256`` or ``blake2s256``. If that parameter is not - provided, it is assumed that the hashing algorithm used is `sha1`. + provided, it is assumed that the hashing algorithm used is ``sha1``. :param string hash: hexadecimal representation of the checksum value computed with the specified hashing algorithm. :>json object content_url: link to :http:get:`/api/1/content/[(hash_type):](hash)/` for getting information about the content :>json string encoding: the detected content encoding :>json string id: the **sha1** identifier of the content :>json string mimetype: the detected MIME type of the content :>json object tool: information about the tool used to detect the content filetype {common_headers} :statuscode 200: no error :statuscode 400: an invalid **hash_type** or **hash** has been provided :statuscode 404: requested content can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`content/sha1:dc2830a9e72f23c1dfebef4413003221baa5fb62/filetype/` """ return api_lookup( archive.lookup_content_filetype, q, notfound_msg="No filetype information found for content {}.".format(q), enrich_fn=utils.enrich_metadata_endpoint, request=request, ) @api_route( r"/content/(?P[0-9a-z_:]*[0-9a-f]+)/language/", "api-1-content-language", checksum_args=["q"], ) @api_doc("/content/language/") @format_docstring() def api_content_language(request, q): """ .. http:get:: /api/1/content/[(hash_type):](hash)/language/ Get information about the programming language used in a content object. Note: this endpoint currently returns no data. :param string hash_type: optional parameter specifying which hashing algorithm has been used to compute the content checksum. It can be either ``sha1``, ``sha1_git``, ``sha256`` or ``blake2s256``. If that parameter is not provided, it is assumed that the hashing algorithm used is ``sha1``. :param string hash: hexadecimal representation of the checksum value computed with the specified hashing algorithm. :>json object content_url: link to :http:get:`/api/1/content/[(hash_type):](hash)/` for getting information about the content :>json string id: the **sha1** identifier of the content :>json string lang: the detected programming language if any :>json object tool: information about the tool used to detect the programming language {common_headers} :statuscode 200: no error :statuscode 400: an invalid **hash_type** or **hash** has been provided :statuscode 404: requested content can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`content/sha1:dc2830a9e72f23c1dfebef4413003221baa5fb62/language/` """ return api_lookup( archive.lookup_content_language, q, notfound_msg="No language information found for content {}.".format(q), enrich_fn=utils.enrich_metadata_endpoint, request=request, ) @api_route( r"/content/(?P[0-9a-z_:]*[0-9a-f]+)/license/", "api-1-content-license", checksum_args=["q"], ) @api_doc("/content/license/") @format_docstring() def api_content_license(request, q): """ .. http:get:: /api/1/content/[(hash_type):](hash)/license/ Get information about the license of a content object. :param string hash_type: optional parameter specifying which hashing algorithm has been used to compute the content checksum. It can be either ``sha1``, ``sha1_git``, ``sha256`` or ``blake2s256``. If that parameter is not provided, it is assumed that the hashing algorithm used is ``sha1``. :param string hash: hexadecimal representation of the checksum value computed with the specified hashing algorithm. :>json object content_url: link to :http:get:`/api/1/content/[(hash_type):](hash)/` for getting information about the content :>json string id: the **sha1** identifier of the content :>json array licenses: array of strings containing the detected license names :>json object tool: information about the tool used to detect the license {common_headers} :statuscode 200: no error :statuscode 400: an invalid **hash_type** or **hash** has been provided :statuscode 404: requested content can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`content/sha1:dc2830a9e72f23c1dfebef4413003221baa5fb62/license/` """ return api_lookup( archive.lookup_content_license, q, notfound_msg="No license information found for content {}.".format(q), enrich_fn=utils.enrich_metadata_endpoint, request=request, ) @api_route(r"/content/(?P[0-9a-z_:]*[0-9a-f]+)/ctags/", "api-1-content-ctags") @api_doc("/content/ctags/", tags=["hidden"]) def api_content_ctags(request, q): """ Get information about all `Ctags `_-style symbols defined in a content object. """ return api_lookup( archive.lookup_content_ctags, q, notfound_msg="No ctags symbol found for content {}.".format(q), enrich_fn=utils.enrich_metadata_endpoint, request=request, ) @api_route( r"/content/(?P[0-9a-z_:]*[0-9a-f]+)/raw/", "api-1-content-raw", checksum_args=["q"], ) @api_doc("/content/raw/") def api_content_raw(request, q): """ .. http:get:: /api/1/content/[(hash_type):](hash)/raw/ Get the raw content of a content object (aka a "blob"), as a byte sequence. :param string hash_type: optional parameter specifying which hashing algorithm has been used to compute the content checksum. It can be either ``sha1``, ``sha1_git``, ``sha256`` or ``blake2s256``. If that parameter is not provided, it is assumed that the hashing algorithm used is ``sha1``. :param string hash: hexadecimal representation of the checksum value computed with the specified hashing algorithm. :query string filename: if provided, the downloaded content will get that filename :resheader Content-Type: application/octet-stream :statuscode 200: no error :statuscode 400: an invalid **hash_type** or **hash** has been provided :statuscode 404: requested content can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`content/sha1:dc2830a9e72f23c1dfebef4413003221baa5fb62/raw/` """ def generate(content): yield content["data"] content_raw = archive.lookup_content_raw(q) if not content_raw: raise NotFoundExc("Content %s is not found." % q) filename = request.query_params.get("filename") if not filename: filename = "content_%s_raw" % q.replace(":", "_") response = HttpResponse( generate(content_raw), content_type="application/octet-stream" ) response["Content-disposition"] = "attachment; filename=%s" % filename return response @api_route(r"/content/symbol/(?P.+)/", "api-1-content-symbol") @api_doc("/content/symbol/", tags=["hidden"]) def api_content_symbol(request, q=None): """Search content objects by `Ctags `_-style symbol (e.g., function name, data type, method, ...). """ result = {} last_sha1 = request.query_params.get("last_sha1", None) per_page = int(request.query_params.get("per_page", "10")) def lookup_exp(exp, last_sha1=last_sha1, per_page=per_page): exp = list(archive.lookup_expression(exp, last_sha1, per_page)) return exp if exp else None symbols = api_lookup( lookup_exp, q, notfound_msg="No indexed raw content match expression '{}'.".format(q), enrich_fn=functools.partial(utils.enrich_content, top_url=True), request=request, ) if symbols: nb_symbols = len(symbols) if nb_symbols == per_page: query_params = {} new_last_sha1 = symbols[-1]["sha1"] query_params["last_sha1"] = new_last_sha1 if request.query_params.get("per_page"): query_params["per_page"] = per_page result["headers"] = { "link-next": reverse( "api-1-content-symbol", url_args={"q": q}, query_params=query_params, request=request, ) } result.update({"results": symbols}) return result @api_route(r"/content/known/search/", "api-1-content-known", methods=["POST"]) @api_route(r"/content/known/(?P(?!search).*)/", "api-1-content-known") @api_doc("/content/known/", tags=["hidden"]) @format_docstring() def api_check_content_known(request, q=None): """ .. http:get:: /api/1/content/known/(sha1)[,(sha1), ...,(sha1)]/ Check whether some content(s) (aka "blob(s)") is present in the archive based on its **sha1** checksum. :param string sha1: hexadecimal representation of the **sha1** checksum value for the content to check existence. Multiple values can be provided separated by ','. {common_headers} :>json array search_res: array holding the search result for each provided **sha1** :>json object search_stats: some statistics regarding the number of **sha1** provided and the percentage of those found in the archive :statuscode 200: no error :statuscode 400: an invalid **sha1** has been provided **Example:** .. parsed-literal:: :swh_web_api:`content/known/dc2830a9e72f23c1dfebef4413003221baa5fb62,0c3f19cb47ebfbe643fb19fa94c874d18fa62d12/` """ response = {"search_res": None, "search_stats": None} search_stats = {"nbfiles": 0, "pct": 0} search_res = None queries = [] # GET: Many hash separated values request if q: hashes = q.split(",") for v in hashes: queries.append({"filename": None, "sha1": v}) # POST: Many hash requests in post form submission elif request.method == "POST": data = request.data # Remove potential inputs with no associated value for k, v in data.items(): if v is not None: if k == "q" and len(v) > 0: queries.append({"filename": None, "sha1": v}) elif v != "": queries.append({"filename": k, "sha1": v}) if queries: lookup = archive.lookup_multiple_hashes(queries) result = [] nb_queries = len(queries) for el in lookup: res_d = {"sha1": el["sha1"], "found": el["found"]} if "filename" in el and el["filename"]: res_d["filename"] = el["filename"] result.append(res_d) search_res = result nbfound = len([x for x in lookup if x["found"]]) search_stats["nbfiles"] = nb_queries search_stats["pct"] = (nbfound / nb_queries) * 100 response["search_res"] = search_res response["search_stats"] = search_stats return response @api_route( r"/content/(?P[0-9a-z_:]*[0-9a-f]+)/", "api-1-content", checksum_args=["q"] ) @api_doc("/content/") @format_docstring() def api_content_metadata(request, q): """ .. http:get:: /api/1/content/[(hash_type):](hash)/ Get information about a content (aka a "blob") object. In the archive, a content object is identified based on checksum values computed using various hashing algorithms. :param string hash_type: optional parameter specifying which hashing algorithm has been used to compute the content checksum. It can be either ``sha1``, ``sha1_git``, ``sha256`` or ``blake2s256``. If that parameter is not provided, it is assumed that the hashing algorithm used is ``sha1``. :param string hash: hexadecimal representation of the checksum value computed with the specified hashing algorithm. {common_headers} :>json object checksums: object holding the computed checksum values for the requested content :>json string data_url: link to :http:get:`/api/1/content/[(hash_type):](hash)/raw/` for downloading the content raw bytes :>json string filetype_url: link to :http:get:`/api/1/content/[(hash_type):](hash)/filetype/` for getting information about the content MIME type :>json string language_url: link to :http:get:`/api/1/content/[(hash_type):](hash)/language/` for getting information about the programming language used in the content :>json number length: length of the content in bytes :>json string license_url: link to :http:get:`/api/1/content/[(hash_type):](hash)/license/` for getting information about the license of the content :statuscode 200: no error :statuscode 400: an invalid **hash_type** or **hash** has been provided :statuscode 404: requested content can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`content/sha1_git:fe95a46679d128ff167b7c55df5d02356c5a1ae1/` """ return api_lookup( archive.lookup_content, q, notfound_msg="Content with {} not found.".format(q), enrich_fn=functools.partial(utils.enrich_content, query_string=q), request=request, ) diff --git a/swh/web/api/views/metadata.py b/swh/web/api/views/metadata.py index 0597f511..4161d337 100644 --- a/swh/web/api/views/metadata.py +++ b/swh/web/api/views/metadata.py @@ -1,253 +1,249 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import re import iso8601 from django.http import HttpResponse from swh.model import hashutil, identifiers from swh.model.model import MetadataAuthority, MetadataAuthorityType from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.common import archive, converters from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.common.utils import reverse SWHID_RE = "swh:1:[a-z]{3}:[0-9a-z]{40}" @api_route( f"/raw-extrinsic-metadata/swhid/(?P{SWHID_RE})/", "api-1-raw-extrinsic-metadata-swhid", ) @api_doc("/raw-extrinsic-metadata/swhid/") @format_docstring() def api_raw_extrinsic_metadata_swhid(request, target): """ .. http:get:: /api/1/raw-extrinsic-metadata/swhid/(target) - Returns raw `extrinsic metadata`_ collected on a given object. - - .. _extrinsic metadata: https://docs.softwareheritage.org/devel/glossary.html#term-extrinsic-metadata + Returns raw `extrinsic metadata `__ collected on a given object. :param string target: The SWHID of the object whose metadata should be returned :query string authority: A metadata authority identifier, formatted as - ` `. Required. + `` ``. Required. :query string after: An ISO representation of the minimum timestamp of metadata to fetch. Defaults to allowing all metadata. :query int limit: Maximum number of metadata objects to return. {common_headers} :>jsonarr string target: SWHID of the object described by this metadata :>jsonarr string discovery_date: ISO8601 timestamp of the moment this metadata was collected. :>jsonarr object authority: authority this metadata is coming from :>jsonarr object fetcher: tool used to fetch the metadata :>jsonarr string format: short identifier of the format of the metadata :>jsonarr string metadata_url: link to download the metadata "blob" itself :>jsonarr string origin: URL of the origin in which context's the metadata is valid, if any :>jsonarr int visit: identifier of the visit in which context's the metadata is valid, if any :>jsonarr string snapshot: SWHID of the snapshot in which context's the metadata is valid, if any :>jsonarr string release: SWHID of the release in which context's the metadata is valid, if any :>jsonarr string revision: SWHID of the revision in which context's the metadata is valid, if any :>jsonarr string path: SWHID of the path in which context's is valid if any, relative to a release or revision as anchor :>jsonarr string directory: SWHID of the directory in which context's the metadata is valid, if any :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`raw-extrinsic-metadata/swhid/swh:1:dir:a2faa28028657859c16ff506924212b33f0e1307/?authority=forge%20https://pypi.org/` """ # noqa authority_str: str = request.query_params.get("authority") after_str: str = request.query_params.get("after") limit_str: str = request.query_params.get("limit", "100") page_token_str: str = request.query_params.get("page_token") if not authority_str: raise BadInputExc("The 'authority' query parameter is required.") if " " not in authority_str.strip(): raise BadInputExc("The 'authority' query parameter should contain a space.") (authority_type_str, authority_url) = authority_str.split(" ", 1) try: authority_type = MetadataAuthorityType(authority_type_str) except ValueError: raise BadInputExc( f"Invalid 'authority' type, should be one of: " f"{', '.join(member.value for member in MetadataAuthorityType)}" ) authority = MetadataAuthority(authority_type, authority_url) if after_str: try: after = iso8601.parse_date(after_str) except iso8601.ParseError: raise BadInputExc("Invalid format for 'after' parameter.") from None else: after = None try: limit = int(limit_str) except ValueError: raise BadInputExc("'limit' parameter must be an integer.") from None limit = min(limit, 10000) try: target = identifiers.CoreSWHID.from_string(target).to_extended() except identifiers.ValidationError as e: raise BadInputExc(f"Invalid target SWHID: {e.args[0]}") from None if page_token_str: page_token = base64.urlsafe_b64decode(page_token_str) else: page_token = None result_page = archive.storage.raw_extrinsic_metadata_get( target=target, authority=authority, after=after, page_token=page_token, limit=limit, ) results = [] for metadata in result_page.results: result = converters.from_raw_extrinsic_metadata(metadata) # We can't reliably send metadata directly, because it is a bytestring, # and we have to return JSON documents. result["metadata_url"] = reverse( "api-1-raw-extrinsic-metadata-get", url_args={"id": hashutil.hash_to_hex(metadata.id)}, query_params={"filename": f"{target}_metadata"}, request=request, ) results.append(result) response = { "results": results, "headers": {}, } if result_page.next_page_token is not None: response["headers"]["link-next"] = reverse( "api-1-raw-extrinsic-metadata", query_params=dict( authority=authority_str, after=after_str, limit=limit_str, page_token=base64.urlsafe_b64encode(result_page.next_page_token), ), request=request, ) return response @api_route( "/raw-extrinsic-metadata/get/(?P[0-9a-z]+)/", "api-1-raw-extrinsic-metadata-get", ) def api_raw_extrinsic_metadata_get(request, id): # This is an internal endpoint that should only be accessed via URLs given # by /raw-extrinsic-metadata/swhid/; so it is not documented. metadata = archive.storage.raw_extrinsic_metadata_get_by_ids( [hashutil.hash_to_bytes(id)] ) if not metadata: raise NotFoundExc( "Metadata not found. Use /raw-extrinsic-metadata/swhid/ to access metadata." ) response = HttpResponse( metadata[0].metadata, content_type="application/octet-stream" ) filename = request.query_params.get("filename") if filename and re.match("[a-zA-Z0-9:._-]+", filename): response["Content-disposition"] = f'attachment; filename="{filename}"' else: # It should always be not-None and match the regexp if the URL was created by # /raw-extrinsic-metadata/swhid/, but we're better safe than sorry. response["Content-disposition"] = "attachment" return response @api_route( f"/raw-extrinsic-metadata/swhid/(?P{SWHID_RE})/authorities/", "api-1-raw-extrinsic-metadata-swhid-authorities", ) @api_doc("/raw-extrinsic-metadata/swhid/authorities/") @format_docstring() def api_raw_extrinsic_metadata_swhid_authorities(request, target): """ .. http:get:: /api/1/raw-extrinsic-metadata/swhid/(target)/authorities/ Returns a list of metadata authorities that provided metadata on the given target. - They can then be used to get the raw `extrinsic metadata`_ collected on + They can then be used to get the raw `extrinsic metadata `__ collected on that object from each of the authorities. - .. _extrinsic metadata: https://docs.softwareheritage.org/devel/glossary.html#term-extrinsic-metadata - :param string target: The SWHID of the object whose metadata-providing authorities should be returned {common_headers} :>jsonarr string type: Type of authority (deposit_client, forge, registry) :>jsonarr string url: Unique IRI identifying the authority :>jsonarr object metadata_list_url: URL to get the list of metadata objects on the given object from this authority :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`raw-extrinsic-metadata/swhid/swh:1:dir:a2faa28028657859c16ff506924212b33f0e1307/authorities/` """ # noqa target_str = target try: target = identifiers.CoreSWHID.from_string(target_str).to_extended() except identifiers.ValidationError as e: raise BadInputExc(f"Invalid target SWHID: {e.args[0]}") from None authorities = archive.storage.raw_extrinsic_metadata_get_authorities(target=target) results = [ { **authority.to_dict(), "metadata_list_url": reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": target_str}, query_params={"authority": f"{authority.type.value} {authority.url}"}, request=request, ), } for authority in authorities ] return { "results": results, "headers": {}, } diff --git a/swh/web/api/views/origin.py b/swh/web/api/views/origin.py index 98196106..b3a0800d 100644 --- a/swh/web/api/views/origin.py +++ b/swh/web/api/views/origin.py @@ -1,475 +1,475 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from distutils.util import strtobool from functools import partial from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.utils import ( enrich_origin, enrich_origin_search_result, enrich_origin_visit, ) from swh.web.api.views.utils import api_lookup from swh.web.common import archive from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse DOC_RETURN_ORIGIN = """ :>json string origin_visits_url: link to in order to get information about the visits for that origin :>json string url: the origin canonical url """ DOC_RETURN_ORIGIN_ARRAY = DOC_RETURN_ORIGIN.replace(":>json", ":>jsonarr") DOC_RETURN_ORIGIN_VISIT = """ :>json string date: ISO representation of the visit date (in UTC) :>json str origin: the origin canonical url :>json string origin_url: link to get information about the origin :>jsonarr string snapshot: the snapshot identifier of the visit (may be null if status is not **full**). :>jsonarr string snapshot_url: link to :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get information about the snapshot of the visit (may be null if status is not **full**). :>json string status: status of the visit (either **full**, **partial** or **ongoing**) :>json number visit: the unique identifier of the visit """ DOC_RETURN_ORIGIN_VISIT_ARRAY = DOC_RETURN_ORIGIN_VISIT.replace(":>json", ":>jsonarr") DOC_RETURN_ORIGIN_VISIT_ARRAY += """ :>jsonarr number id: the unique identifier of the origin :>jsonarr string origin_visit_url: link to :http:get:`/api/1/origin/(origin_url)/visit/(visit_id)/` in order to get information about the visit """ @api_route(r"/origins/", "api-1-origins") @api_doc("/origins/", noargs=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origins(request): """ .. http:get:: /api/1/origins/ Get list of archived software origins. .. warning:: - This endpoint used to provide an `origin_from` query parameter, + This endpoint used to provide an ``origin_from`` query parameter, and guarantee an order on results. This is no longer true, and only the Link header should be used for paginating through results. :query int origin_count: The maximum number of origins to return (default to 100, can not exceed 10000) {return_origin_array} {common_headers} {resheader_link} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origins?origin_count=500` """ old_param_origin_from = request.query_params.get("origin_from") if old_param_origin_from: raise BadInputExc("Please use the Link header to browse through result") page_token = request.query_params.get("page_token", None) limit = min(int(request.query_params.get("origin_count", "100")), 10000) page_result = archive.lookup_origins(page_token, limit) origins = [enrich_origin(o, request=request) for o in page_result.results] next_page_token = page_result.next_page_token response = {"results": origins, "headers": {}} if next_page_token is not None: response["headers"]["link-next"] = reverse( "api-1-origins", query_params={"page_token": next_page_token, "origin_count": limit}, request=request, ) return response @api_route(r"/origin/(?P.+)/get/", "api-1-origin") @api_doc("/origin/") @format_docstring(return_origin=DOC_RETURN_ORIGIN) def api_origin(request, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/get/ Get information about a software origin. :param string origin_url: the origin url {return_origin} {common_headers} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/python/cpython/get/` """ ori_dict = {"url": origin_url} error_msg = "Origin with url %s not found." % ori_dict["url"] return api_lookup( archive.lookup_origin, ori_dict, notfound_msg=error_msg, enrich_fn=enrich_origin, request=request, ) @api_route( r"/origin/search/(?P.+)/", "api-1-origin-search", throttle_scope="swh_api_origin_search", ) @api_doc("/origin/search/") @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_search(request, url_pattern): """ .. http:get:: /api/1/origin/search/(url_pattern)/ Search for software origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. .. warning:: - This endpoint used to provide an `offset` query parameter, + This endpoint used to provide an ``offset`` query parameter, and guarantee an order on results. This is no longer true, and only the Link header should be used for paginating through results. :param string url_pattern: a string pattern :query int limit: the maximum number of found origins to return (bounded to 1000) :query boolean with_visit: if true, only return origins with at least one visit by Software heritage {return_origin_array} {common_headers} {resheader_link} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/search/python/?limit=2` """ result = {} limit = min(int(request.query_params.get("limit", "70")), 1000) page_token = request.query_params.get("page_token") with_visit = request.query_params.get("with_visit", "false") visit_type = request.query_params.get("visit_type") (results, page_token) = api_lookup( archive.search_origin, url_pattern, limit, bool(strtobool(with_visit)), [visit_type] if visit_type else None, page_token, enrich_fn=enrich_origin_search_result, request=request, ) if page_token is not None: query_params = {} query_params["limit"] = limit query_params["page_token"] = page_token query_params["visit_type"] = visit_type result["headers"] = { "link-next": reverse( "api-1-origin-search", url_args={"url_pattern": url_pattern}, query_params=query_params, request=request, ) } result.update({"results": results}) return result @api_route(r"/origin/metadata-search/", "api-1-origin-metadata-search") @api_doc("/origin/metadata-search/", noargs=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_metadata_search(request): """ .. http:get:: /api/1/origin/metadata-search/ Search for software origins whose metadata (expressed as a JSON-LD/CodeMeta dictionary) match the provided criteria. For now, only full-text search on this dictionary is supported. :query str fulltext: a string that will be matched against origin metadata; results are ranked and ordered starting with the best ones. :query int limit: the maximum number of found origins to return (bounded to 100) {return_origin_array} {common_headers} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/metadata-search/?limit=2&fulltext=Jane%20Doe` """ fulltext = request.query_params.get("fulltext", None) limit = min(int(request.query_params.get("limit", "70")), 100) if not fulltext: content = '"fulltext" must be provided and non-empty.' raise BadInputExc(content) results = api_lookup( archive.search_origin_metadata, fulltext, limit, request=request ) return { "results": results, } @api_route(r"/origin/(?P.*)/visits/", "api-1-origin-visits") @api_doc("/origin/visits/") @format_docstring(return_origin_visit_array=DOC_RETURN_ORIGIN_VISIT_ARRAY) def api_origin_visits(request, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/visits/ Get information about all visits of a software origin. Visits are returned sorted in descending order according to their date. :param str origin_url: a software origin URL :query int per_page: specify the number of visits to list, for pagination purposes :query int last_visit: visit to start listing from, for pagination purposes {common_headers} {resheader_link} {return_origin_visit_array} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visits/` """ result = {} origin_query = {"url": origin_url} notfound_msg = "No origin {} found".format(origin_url) url_args_next = {"origin_url": origin_url} per_page = int(request.query_params.get("per_page", "10")) last_visit = request.query_params.get("last_visit") if last_visit: last_visit = int(last_visit) def _lookup_origin_visits(origin_query, last_visit=last_visit, per_page=per_page): all_visits = get_origin_visits(origin_query) all_visits.reverse() visits = [] if not last_visit: visits = all_visits[:per_page] else: for i, v in enumerate(all_visits): if v["visit"] == last_visit: visits = all_visits[i + 1 : i + 1 + per_page] break for v in visits: yield v results = api_lookup( _lookup_origin_visits, origin_query, notfound_msg=notfound_msg, enrich_fn=partial( enrich_origin_visit, with_origin_link=False, with_origin_visit_link=True ), request=request, ) if results: nb_results = len(results) if nb_results == per_page: new_last_visit = results[-1]["visit"] query_params = {} query_params["last_visit"] = new_last_visit if request.query_params.get("per_page"): query_params["per_page"] = per_page result["headers"] = { "link-next": reverse( "api-1-origin-visits", url_args=url_args_next, query_params=query_params, request=request, ) } result.update({"results": results}) return result @api_route( r"/origin/(?P.*)/visit/latest/", "api-1-origin-visit-latest", throttle_scope="swh_api_origin_visit_latest", ) @api_doc("/origin/visit/latest/") @format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) def api_origin_visit_latest(request, origin_url=None): """ .. http:get:: /api/1/origin/(origin_url)/visit/latest/ Get information about the latest visit of a software origin. :param str origin_url: a software origin URL :query boolean require_snapshot: if true, only return a visit with a snapshot {common_headers} {return_origin_visit} :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visit/latest/` """ require_snapshot = request.query_params.get("require_snapshot", "false") return api_lookup( archive.lookup_origin_visit_latest, origin_url, bool(strtobool(require_snapshot)), notfound_msg=("No visit for origin {} found".format(origin_url)), enrich_fn=partial( enrich_origin_visit, with_origin_link=True, with_origin_visit_link=False ), request=request, ) @api_route( r"/origin/(?P.*)/visit/(?P[0-9]+)/", "api-1-origin-visit" ) @api_doc("/origin/visit/") @format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) def api_origin_visit(request, visit_id, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/visit/(visit_id)/ Get information about a specific visit of a software origin. :param str origin_url: a software origin URL :param int visit_id: a visit identifier {common_headers} {return_origin_visit} :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visit/1/` """ return api_lookup( archive.lookup_origin_visit, origin_url, int(visit_id), notfound_msg=("No visit {} for origin {} found".format(visit_id, origin_url)), enrich_fn=partial( enrich_origin_visit, with_origin_link=True, with_origin_visit_link=False ), request=request, ) @api_route( r"/origin/(?P.+)" "/intrinsic-metadata", "api-origin-intrinsic-metadata" ) @api_doc("/origin/intrinsic-metadata/") @format_docstring() def api_origin_intrinsic_metadata(request, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/intrinsic-metadata Get intrinsic metadata of a software origin (as a JSON-LD/CodeMeta dictionary). :param string origin_url: the origin url :>json string ???: intrinsic metadata field of the origin {common_headers} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/python/cpython/intrinsic-metadata` """ return api_lookup( archive.lookup_origin_intrinsic_metadata, origin_url, notfound_msg=f"Origin with url {origin_url} not found", enrich_fn=enrich_origin, request=request, ) diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index 1115f970..48491564 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,354 +1,356 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import os import re from typing import Any, Dict, Optional from bs4 import BeautifulSoup from docutils.core import publish_parts import docutils.parsers.rst import docutils.utils from docutils.writers.html5_polyglot import HTMLTranslator, Writer from iso8601 import ParseError, parse_date from pkg_resources import get_distribution from prometheus_client.registry import CollectorRegistry from django.http import HttpRequest, QueryDict from django.urls import reverse as django_reverse from swh.web.common.exc import BadInputExc from swh.web.common.typing import QueryParameters from swh.web.config import ORIGIN_VISIT_TYPES, get_config SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) swh_object_icons = { "alias": "mdi mdi-star", "branch": "mdi mdi-source-branch", "branches": "mdi mdi-source-branch", "content": "mdi mdi-file-document", "directory": "mdi mdi-folder", "origin": "mdi mdi-source-repository", "person": "mdi mdi-account", "revisions history": "mdi mdi-history", "release": "mdi mdi-tag", "releases": "mdi mdi-tag", "revision": "mdi mdi-rotate-90 mdi-source-commit", "snapshot": "mdi mdi-camera", "visits": "mdi mdi-calendar-month", } def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[QueryParameters] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo and date.tzinfo != timezone.utc: return date.astimezone(tz=timezone.utc) else: return date def parse_iso8601_date_to_utc(iso_date: str) -> datetime: """Given an ISO 8601 datetime string, parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed date Raises: swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - 2007-01-14T20:34:22Z """ try: date = parse_date(iso_date) return datetime_to_utc(date) except ParseError as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}" sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}" ret = re.sub(sha256_re, r"\1...", path) return re.sub(sha1_re, r"\1...", ret) def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"): """Turns a string representation of an ISO 8601 datetime string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_iso8601_date_to_utc(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip("/").split("/") path_from_root = "" for p in sub_paths: path_from_root += "/" + p path_info.append({"name": p, "path": path_from_root.strip("/")}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components ).get_default_values() settings.report_level = report_level document = docutils.utils.new_document("rst-doc", settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: ip = x_forwarded_for.split(",")[0] else: ip = request.META.get("REMOTE_ADDR") return ip browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if ( hasattr(request, "user") and request.user.is_authenticated and not hasattr(request.user, "backend") ): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = "django.contrib.auth.backends.ModelBackend" site_base_url = request.build_absolute_uri("/") return { "swh_object_icons": swh_object_icons, "available_languages": None, "swh_client_config": config["client_config"], "oidc_enabled": bool(config["keycloak"]["server_url"]), "browsers_supported_image_mimes": browsers_supported_image_mimes, "keycloak": config["keycloak"], "site_base_url": site_base_url, "DJANGO_SETTINGS_MODULE": os.environ["DJANGO_SETTINGS_MODULE"], "status": config["status"], "swh_web_dev": "localhost" in site_base_url, "swh_web_staging": any( [ server_name in site_base_url for server_name in config["staging_server_names"] ] ), "swh_web_version": get_distribution("swh.web").version, "visit_types": ORIGIN_VISIT_TYPES, } def resolve_branch_alias( snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch["target_type"] == "alias": if branch["target"] in snapshot["branches"]: branch = snapshot["branches"][branch["target"]] else: from swh.web.common import archive snp = archive.lookup_snapshot( snapshot["id"], branches_from=branch["target"], branches_count=1 ) if snp and branch["target"] in snp["branches"]: branch = snp["branches"][branch["target"]] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { "initial_header_level": 2, + "halt_level": 4, + "traceback": True, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
{pp["html_body"]}
' def prettify_html(html: str) -> str: """ Prettify an HTML document. Args: html: Input HTML document Returns: The prettified HTML document """ return BeautifulSoup(html, "lxml").prettify() diff --git a/swh/web/tests/api/test_apidoc.py b/swh/web/tests/api/test_apidoc.py index c2e5d554..5c6e27d7 100644 --- a/swh/web/tests/api/test_apidoc.py +++ b/swh/web/tests/api/test_apidoc.py @@ -1,488 +1,488 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import textwrap import pytest from rest_framework.response import Response from swh.storage.exc import StorageAPIError, StorageDBError from swh.web.api.apidoc import _parse_httpdomain_doc, api_doc from swh.web.api.apiurls import api_route from swh.web.common.exc import BadInputExc, ForbiddenExc, NotFoundExc from swh.web.common.utils import prettify_html, reverse from swh.web.tests.utils import check_api_get_responses, check_html_get_response _httpdomain_doc = """ .. http:get:: /api/1/revision/(sha1_git)/ Get information about a revision in the archive. Revisions are identified by **sha1** checksums, compatible with Git commit identifiers. See :func:`swh.model.identifiers.revision_identifier` in our data model module for details about how they are computed. :param string sha1_git: hexadecimal representation of the revision **sha1_git** identifier :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` :resheader Content-Type: this depends on :http:header:`Accept` header of request :json object author: information about the author of the revision :>json object committer: information about the committer of the revision :>json string committer_date: ISO representation of the commit date (in UTC) :>json string date: ISO representation of the revision date (in UTC) :>json string directory: the unique identifier that revision points to :>json string directory_url: link to :http:get:`/api/1/directory/(sha1_git)/[(path)/]` to get information about the directory associated to the revision :>json string id: the revision unique identifier :>json boolean merge: whether or not the revision corresponds to a merge commit :>json string message: the message associated to the revision :>json array parents: the parents of the revision, i.e. the previous revisions that head directly to it, each entry of that array contains an unique parent revision identifier but also a link to :http:get:`/api/1/revision/(sha1_git)/` to get more information about it :>json string type: the type of the revision :statuscode 200: no error :statuscode 400: an invalid **sha1_git** value has been provided :statuscode 404: requested revision can not be found in the archive - **Request:** + **Example:** .. parsed-literal:: :swh_web_api:`revision/aafb16d69fd30ff58afdd69036a26047f3aebdc6/` """ _exception_http_code = { BadInputExc: 400, ForbiddenExc: 403, NotFoundExc: 404, Exception: 500, StorageAPIError: 503, StorageDBError: 503, } def test_apidoc_nodoc_failure(): with pytest.raises(Exception): @api_doc("/my/nodoc/url/") def apidoc_nodoc_tester(request, arga=0, argb=0): return Response(arga + argb) @api_route(r"/some/(?P[0-9]+)/(?P[0-9]+)/", "api-1-some-doc-route") @api_doc("/some/doc/route/") def apidoc_route(request, myarg, myotherarg, akw=0): """ Sample doc """ return {"result": int(myarg) + int(myotherarg) + akw} def test_apidoc_route_doc(client): url = reverse("api-1-some-doc-route-doc") check_html_get_response( client, url, status_code=200, template_used="api/apidoc.html" ) def test_apidoc_route_fn(api_client): url = reverse("api-1-some-doc-route", url_args={"myarg": 1, "myotherarg": 1}) check_api_get_responses(api_client, url, status_code=200) @api_route(r"/test/error/(?P.+)/", "api-1-test-error") @api_doc("/test/error/") def apidoc_test_error_route(request, exc_name): """ Sample doc """ for e in _exception_http_code.keys(): if e.__name__ == exc_name: raise e("Error") def test_apidoc_error(api_client): for exc, code in _exception_http_code.items(): url = reverse("api-1-test-error", url_args={"exc_name": exc.__name__}) check_api_get_responses(api_client, url, status_code=code) @api_route( r"/some/full/(?P[0-9]+)/(?P[0-9]+)/", "api-1-some-complete-doc-route", ) @api_doc("/some/complete/doc/route/") def apidoc_full_stack(request, myarg, myotherarg, akw=0): """ Sample doc """ return {"result": int(myarg) + int(myotherarg) + akw} def test_apidoc_full_stack_doc(client): url = reverse("api-1-some-complete-doc-route-doc") check_html_get_response( client, url, status_code=200, template_used="api/apidoc.html" ) def test_apidoc_full_stack_fn(api_client): url = reverse( "api-1-some-complete-doc-route", url_args={"myarg": 1, "myotherarg": 1} ) check_api_get_responses(api_client, url, status_code=200) @api_route(r"/test/post/only/", "api-1-test-post-only", methods=["POST"]) @api_doc("/test/post/only/") def apidoc_test_post_only(request, exc_name): """ Sample doc """ return {"result": "some data"} def test_apidoc_post_only(client): # a dedicated view accepting GET requests should have # been created to display the HTML documentation url = reverse("api-1-test-post-only-doc") check_html_get_response( client, url, status_code=200, template_used="api/apidoc.html" ) def test_api_doc_parse_httpdomain(): doc_data = { "description": "", "urls": [], "args": [], "params": [], "resheaders": [], "reqheaders": [], "input_type": "", "inputs": [], "return_type": "", "returns": [], "status_codes": [], "examples": [], } _parse_httpdomain_doc(_httpdomain_doc, doc_data) expected_urls = [ { "rule": "/api/1/revision/ **\\(sha1_git\\)** /", "methods": ["GET", "HEAD", "OPTIONS"], } ] assert "urls" in doc_data assert doc_data["urls"] == expected_urls expected_description = ( "Get information about a revision in the archive. " "Revisions are identified by **sha1** checksums, " "compatible with Git commit identifiers. See " "**swh.model.identifiers.revision_identifier** in " "our data model module for details about how they " "are computed." ) assert "description" in doc_data assert doc_data["description"] == expected_description expected_args = [ { "name": "sha1_git", "type": "string", "doc": ( "hexadecimal representation of the revision " "**sha1_git** identifier" ), } ] assert "args" in doc_data assert doc_data["args"] == expected_args expected_params = [] assert "params" in doc_data assert doc_data["params"] == expected_params expected_reqheaders = [ { "doc": ( "the requested response content type, either " "``application/json`` (default) or ``application/yaml``" ), "name": "Accept", } ] assert "reqheaders" in doc_data assert doc_data["reqheaders"] == expected_reqheaders expected_resheaders = [ {"doc": "this depends on **Accept** header of request", "name": "Content-Type"} ] assert "resheaders" in doc_data assert doc_data["resheaders"] == expected_resheaders expected_statuscodes = [ {"code": "200", "doc": "no error"}, {"code": "400", "doc": "an invalid **sha1_git** value has been provided"}, {"code": "404", "doc": "requested revision can not be found in the archive"}, ] assert "status_codes" in doc_data assert doc_data["status_codes"] == expected_statuscodes expected_input_type = "object" assert "input_type" in doc_data assert doc_data["input_type"] == expected_input_type expected_inputs = [ {"name": "n", "type": "int", "doc": "sample input integer"}, {"name": "s", "type": "string", "doc": "sample input string"}, {"name": "a", "type": "array", "doc": "sample input array"}, ] assert "inputs" in doc_data assert doc_data["inputs"] == expected_inputs expected_return_type = "object" assert "return_type" in doc_data assert doc_data["return_type"] == expected_return_type expected_returns = [ { "name": "author", "type": "object", "doc": "information about the author of the revision", }, { "name": "committer", "type": "object", "doc": "information about the committer of the revision", }, { "name": "committer_date", "type": "string", "doc": "ISO representation of the commit date (in UTC)", }, { "name": "date", "type": "string", "doc": "ISO representation of the revision date (in UTC)", }, { "name": "directory", "type": "string", "doc": "the unique identifier that revision points to", }, { "name": "directory_url", "type": "string", "doc": ( "link to `/api/1/directory/ `_ " "to get information about the directory associated to " "the revision" ), }, {"name": "id", "type": "string", "doc": "the revision unique identifier"}, { "name": "merge", "type": "boolean", "doc": "whether or not the revision corresponds to a merge commit", }, { "name": "message", "type": "string", "doc": "the message associated to the revision", }, { "name": "parents", "type": "array", "doc": ( "the parents of the revision, i.e. the previous revisions " "that head directly to it, each entry of that array " "contains an unique parent revision identifier but also a " "link to `/api/1/revision/ `_ " "to get more information about it" ), }, {"name": "type", "type": "string", "doc": "the type of the revision"}, ] assert "returns" in doc_data assert doc_data["returns"] == expected_returns expected_examples = ["/api/1/revision/aafb16d69fd30ff58afdd69036a26047f3aebdc6/"] assert "examples" in doc_data assert doc_data["examples"] == expected_examples @api_route(r"/post/endpoint/", "api-1-post-endpoint", methods=["POST"]) @api_doc("/post/endpoint/") def apidoc_test_post_endpoint(request): """ .. http:post:: /api/1/post/endpoint/ Endpoint documentation :json object : an object whose keys are input SWHIDs and values objects with the following keys: * **known (bool)**: whether the object was found """ pass def test_apidoc_input_output_doc(client): url = reverse("api-1-post-endpoint-doc") rv = check_html_get_response( client, url, status_code=200, template_used="api/apidoc.html" ) input_html_doc = textwrap.indent( ( '
\n' '
\n' " array\n" "
\n" '
\n' "

\n" " Input array of SWHIDs\n" "

\n" "
\n" "
\n" ), " " * 7, ) output_html_doc = textwrap.indent( ( '
\n' '
\n' " object\n" "
\n" '
\n' "

\n" " an object containing the following keys:\n" "

\n" '
\n' "
\n" "
    \n" "
  • \n" "

    \n" " \n" " <swhid> (object)\n" " \n" - " : an object whose keys are input SWHIDs" + " : an object whose keys are input SWHIDs" " and values objects with the following keys:\n" "

    \n" "
    \n" '
      \n' "
    • \n" "

      \n" " \n" " known (bool)\n" " \n" " : whether the object was found\n" "

      \n" "
    • \n" "
    \n" "
    \n" "
  • \n" "
\n" "
\n" "
\n" "
\n" "
\n" ), " " * 7, ) html = prettify_html(rv.content) assert input_html_doc in html assert output_html_doc in html @api_route(r"/endpoint/links/in/doc/", "api-1-endpoint-links-in-doc") @api_doc("/endpoint/links/in/doc/") def apidoc_test_endpoint_with_links_in_doc(request): """ .. http:get:: /api/1/post/endpoint/ Endpoint documentation with links to :http:get:`/api/1/content/[(hash_type):](hash)/`, :http:get:`/api/1/directory/(sha1_git)/[(path)/]` and `archive `_. """ pass def test_apidoc_with_links(client): url = reverse("api-1-endpoint-links-in-doc") rv = check_html_get_response( client, url, status_code=200, template_used="api/apidoc.html" ) html = prettify_html(rv.content) first_link = textwrap.indent( ( '\n' " /api/1/content/\n" "" ), " " * 9, ) second_link = textwrap.indent( ( '\n' " /api/1/directory/\n" "" ), " " * 9, ) third_link = textwrap.indent( ( '\n' " archive\n" "" ), " " * 9, ) assert first_link in html assert second_link in html assert third_link in html