Changeset View
Changeset View
Standalone View
Standalone View
swh/web/api/apidoc.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | |||||
import functools | import functools | ||||
from functools import wraps | from functools import wraps | ||||
import os | import os | ||||
import re | import re | ||||
import textwrap | import textwrap | ||||
from typing import List | from typing import List | ||||
import docutils.nodes | import docutils.nodes | ||||
Show All 30 Lines | class _HTTPDomainDocVisitor(docutils.nodes.NodeVisitor): | ||||
query_parameter_roles = ('queryparameter', 'queryparam', 'qparam', 'query') | query_parameter_roles = ('queryparameter', 'queryparam', 'qparam', 'query') | ||||
request_header_roles = ('<header', 'reqheader', 'requestheader') | request_header_roles = ('<header', 'reqheader', 'requestheader') | ||||
response_header_roles = ('>header', 'resheader', 'responseheader') | response_header_roles = ('>header', 'resheader', 'responseheader') | ||||
status_code_roles = ('statuscode', 'status', 'code') | status_code_roles = ('statuscode', 'status', 'code') | ||||
def __init__(self, document, urls, data): | def __init__(self, document, data): | ||||
super().__init__(document) | super().__init__(document) | ||||
self.urls = urls | |||||
self.url_idx = 0 | |||||
self.data = data | self.data = data | ||||
self.args_set = set() | self.args_set = set() | ||||
self.params_set = set() | self.params_set = set() | ||||
self.inputs_set = set() | self.inputs_set = set() | ||||
self.returns_set = set() | self.returns_set = set() | ||||
self.status_codes_set = set() | self.status_codes_set = set() | ||||
self.reqheaders_set = set() | self.reqheaders_set = set() | ||||
self.resheaders_set = set() | self.resheaders_set = set() | ||||
▲ Show 20 Lines • Show All 122 Lines • ▼ Show 20 Lines | def visit_paragraph(self, node): | ||||
# only parsed top level paragraphs | # only parsed top level paragraphs | ||||
if isinstance(node.parent, docutils.nodes.block_quote): | if isinstance(node.parent, docutils.nodes.block_quote): | ||||
text = self.process_paragraph(str(node)) | text = self.process_paragraph(str(node)) | ||||
# endpoint description | # endpoint description | ||||
if (not text.startswith('**') and | if (not text.startswith('**') and | ||||
text not in self.data['description']): | text not in self.data['description']): | ||||
self.data['description'] += '\n\n' if self.data['description'] else '' # noqa | self.data['description'] += '\n\n' if self.data['description'] else '' # noqa | ||||
self.data['description'] += text | self.data['description'] += text | ||||
# http methods | |||||
elif text.startswith('**Allowed HTTP Methods:**'): | |||||
text = text.replace('**Allowed HTTP Methods:**', '') | |||||
http_methods = text.strip().split(',') | |||||
http_methods = [m[m.find('`')+1:-1].upper() | |||||
for m in http_methods] | |||||
self.data['urls'].append({'rule': self.urls[self.url_idx], | |||||
'methods': http_methods}) | |||||
self.url_idx += 1 | |||||
def visit_literal_block(self, node): | def visit_literal_block(self, node): | ||||
""" | """ | ||||
Visit literal blocks | Visit literal blocks | ||||
""" | """ | ||||
text = node.astext() | text = node.astext() | ||||
# literal block in endpoint description | # literal block in endpoint description | ||||
if not self.field_list_visited: | if not self.field_list_visited: | ||||
Show All 26 Lines | def visit_warning(self, node): | ||||
text = self.process_paragraph(str(node)) | text = self.process_paragraph(str(node)) | ||||
rst_warning = '\n\n.. warning::\n%s\n' % textwrap.indent(text, '\t') | rst_warning = '\n\n.. warning::\n%s\n' % textwrap.indent(text, '\t') | ||||
if rst_warning not in self.data['description']: | if rst_warning not in self.data['description']: | ||||
self.data['description'] += rst_warning | self.data['description'] += rst_warning | ||||
def unknown_visit(self, node): | def unknown_visit(self, node): | ||||
pass | pass | ||||
def depart_document(self, node): | |||||
""" | |||||
End of parsing extra processing | |||||
""" | |||||
default_methods = ['GET', 'HEAD', 'OPTIONS'] | |||||
# ensure urls info is present and set default http methods | |||||
if not self.data['urls']: | |||||
for url in self.urls: | |||||
self.data['urls'].append({'rule': url, | |||||
'methods': default_methods}) | |||||
def unknown_departure(self, node): | def unknown_departure(self, node): | ||||
pass | pass | ||||
def _parse_httpdomain_doc(doc, data): | def _parse_httpdomain_doc(doc, data): | ||||
doc_lines = doc.split('\n') | doc_lines = doc.split('\n') | ||||
doc_lines_filtered = [] | doc_lines_filtered = [] | ||||
urls = [] | urls = defaultdict(list) | ||||
default_http_methods = ['HEAD', 'OPTIONS'] | |||||
# httpdomain is a sphinx extension that is unknown to docutils but | # httpdomain is a sphinx extension that is unknown to docutils but | ||||
# fortunately we can still parse its directives' content, | # fortunately we can still parse its directives' content, | ||||
# so remove lines with httpdomain directives before executing the | # so remove lines with httpdomain directives before executing the | ||||
# rst parser from docutils | # rst parser from docutils | ||||
for doc_line in doc_lines: | for doc_line in doc_lines: | ||||
if '.. http' not in doc_line: | if '.. http' not in doc_line: | ||||
doc_lines_filtered.append(doc_line) | doc_lines_filtered.append(doc_line) | ||||
else: | else: | ||||
url = doc_line[doc_line.find('/'):] | url = doc_line[doc_line.find('/'):] | ||||
# emphasize url arguments for html rendering | # emphasize url arguments for html rendering | ||||
url = re.sub(r'\((\w+)\)', r' **\(\1\)** ', url) | url = re.sub(r'\((\w+)\)', r' **\(\1\)** ', url) | ||||
urls.append(url) | method = re.search(r'http:(\w+)::', doc_line).group(1) | ||||
urls[url].append(method.upper()) | |||||
for url, methods in urls.items(): | |||||
data['urls'].append({'rule': url, | |||||
'methods': methods + default_http_methods}) | |||||
# parse the rst docstring and do not print system messages about | # parse the rst docstring and do not print system messages about | ||||
# unknown httpdomain roles | # unknown httpdomain roles | ||||
document = parse_rst('\n'.join(doc_lines_filtered), report_level=5) | document = parse_rst('\n'.join(doc_lines_filtered), report_level=5) | ||||
# remove the system_message nodes from the parsed document | # remove the system_message nodes from the parsed document | ||||
for node in document.traverse(docutils.nodes.system_message): | for node in document.traverse(docutils.nodes.system_message): | ||||
node.parent.remove(node) | node.parent.remove(node) | ||||
# visit the document nodes to extract relevant endpoint info | # visit the document nodes to extract relevant endpoint info | ||||
visitor = _HTTPDomainDocVisitor(document, urls, data) | visitor = _HTTPDomainDocVisitor(document, data) | ||||
document.walkabout(visitor) | document.walkabout(visitor) | ||||
class APIDocException(Exception): | class APIDocException(Exception): | ||||
""" | """ | ||||
Custom exception to signal errors in the use of the APIDoc decorators | Custom exception to signal errors in the use of the APIDoc decorators | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 161 Lines • Show Last 20 Lines |