diff --git a/swh/web/api/apidoc.py b/swh/web/api/apidoc.py index 2cc48fc1..3ccc6417 100644 --- a/swh/web/api/apidoc.py +++ b/swh/web/api/apidoc.py @@ -1,465 +1,453 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import functools from functools import wraps import os import re import textwrap from typing import List import docutils.nodes import docutils.parsers.rst import docutils.utils from rest_framework.decorators import api_view import sentry_sdk from swh.web.common.utils import parse_rst from swh.web.api.apiurls import APIUrls from swh.web.api.apiresponse import make_api_response, error_response class _HTTPDomainDocVisitor(docutils.nodes.NodeVisitor): """ docutils visitor for walking on a parsed rst document containing sphinx httpdomain roles. Its purpose is to extract relevant info regarding swh api endpoints (for instance url arguments) from their docstring written using sphinx httpdomain. """ # httpdomain roles we want to parse (based on sphinxcontrib.httpdomain 1.6) parameter_roles = ("param", "parameter", "arg", "argument") request_json_object_roles = ("reqjsonobj", "reqjson", "jsonobj", ">json") response_json_array_roles = ("resjsonarr", ">jsonarr") query_parameter_roles = ("queryparameter", "queryparam", "qparam", "query") request_header_roles = ("header", "resheader", "responseheader") status_code_roles = ("statuscode", "status", "code") def __init__(self, document, data): super().__init__(document) self.data = data self.args_set = set() self.params_set = set() self.inputs_set = set() self.returns_set = set() self.status_codes_set = set() self.reqheaders_set = set() self.resheaders_set = set() self.field_list_visited = False self.current_json_obj = None def process_paragraph(self, par): """ Process extracted paragraph text before display. Cleanup document model markups and transform the paragraph into a valid raw rst string (as the apidoc documentation transform rst to html when rendering). """ par = par.replace("\n", " ") # keep emphasized, strong and literal text par = par.replace("", "*") par = par.replace("", "*") par = par.replace("", "**") par = par.replace("", "**") par = par.replace("", "``") par = par.replace("", "``") # keep links to web pages if "', r"`\1 <\2>`_", par, ) # remove parsed document markups but keep rst links par = re.sub(r"<[^<]+?>(?!`_)", "", par) # api urls cleanup to generate valid links afterwards subs_made = 1 while subs_made: (par, subs_made) = re.subn(r"(:http:.*)(\(\w+\))", r"\1", par) subs_made = 1 while subs_made: (par, subs_made) = re.subn(r"(:http:.*)(\[.*\])", r"\1", par) par = par.replace("//", "/") # transform references to api endpoints doc into valid rst links par = re.sub(":http:get:`([^,`]*)`", r"`\1 <\1doc/>`_", par) # transform references to some elements into bold text par = re.sub(":http:header:`(.*)`", r"**\1**", par) par = re.sub(":func:`(.*)`", r"**\1**", par) return par def visit_field_list(self, node): """ Visit parsed rst field lists to extract relevant info regarding api endpoint. """ self.field_list_visited = True for child in node.traverse(): # get the parsed field name if isinstance(child, docutils.nodes.field_name): field_name = child.astext() # parse field text elif isinstance(child, docutils.nodes.paragraph): text = self.process_paragraph(str(child)) field_data = field_name.split(" ") # Parameters if field_data[0] in self.parameter_roles: if field_data[2] not in self.args_set: self.data["args"].append( {"name": field_data[2], "type": field_data[1], "doc": text} ) self.args_set.add(field_data[2]) # Query Parameters if field_data[0] in self.query_parameter_roles: if field_data[2] not in self.params_set: self.data["params"].append( {"name": field_data[2], "type": field_data[1], "doc": text} ) self.params_set.add(field_data[2]) # Request data type if ( field_data[0] in self.request_json_array_roles or field_data[0] in self.request_json_object_roles ): # array if field_data[0] in self.request_json_array_roles: self.data["input_type"] = "array" # object else: self.data["input_type"] = "object" # input object field if field_data[2] not in self.inputs_set: self.data["inputs"].append( {"name": field_data[2], "type": field_data[1], "doc": text} ) self.inputs_set.add(field_data[2]) self.current_json_obj = self.data["inputs"][-1] # Response type if ( field_data[0] in self.response_json_array_roles or field_data[0] in self.response_json_object_roles ): # array if field_data[0] in self.response_json_array_roles: self.data["return_type"] = "array" # object else: self.data["return_type"] = "object" # returned object field if field_data[2] not in self.returns_set: self.data["returns"].append( {"name": field_data[2], "type": field_data[1], "doc": text} ) self.returns_set.add(field_data[2]) self.current_json_obj = self.data["returns"][-1] # Status Codes if field_data[0] in self.status_code_roles: if field_data[1] not in self.status_codes_set: self.data["status_codes"].append( {"code": field_data[1], "doc": text} ) self.status_codes_set.add(field_data[1]) # Request Headers if field_data[0] in self.request_header_roles: if field_data[1] not in self.reqheaders_set: self.data["reqheaders"].append( {"name": field_data[1], "doc": text} ) self.reqheaders_set.add(field_data[1]) # Response Headers if field_data[0] in self.response_header_roles: if field_data[1] not in self.resheaders_set: resheader = {"name": field_data[1], "doc": text} self.data["resheaders"].append(resheader) self.resheaders_set.add(field_data[1]) if ( resheader["name"] == "Content-Type" and resheader["doc"] == "application/octet-stream" ): self.data["return_type"] = "octet stream" def visit_paragraph(self, node): """ Visit relevant paragraphs to parse """ # only parsed top level paragraphs if isinstance(node.parent, docutils.nodes.block_quote): text = self.process_paragraph(str(node)) # endpoint description if not text.startswith("**") and text not in self.data["description"]: self.data["description"] += "\n\n" if self.data["description"] else "" self.data["description"] += text def visit_literal_block(self, node): """ Visit literal blocks """ text = node.astext() # literal block in endpoint description if not self.field_list_visited: self.data["description"] += ":\n\n%s\n" % textwrap.indent(text, "\t") # extract example url if ":swh_web_api:" in text: self.data["examples"].append("/api/1/" + re.sub(".*`(.*)`.*", r"\1", text)) def visit_bullet_list(self, node): # bullet list in endpoint description if not self.field_list_visited: self.data["description"] += "\n\n" for child in node.traverse(): # process list item if isinstance(child, docutils.nodes.paragraph): line_text = self.process_paragraph(str(child)) self.data["description"] += "\t* %s\n" % line_text elif self.current_json_obj: self.current_json_obj["doc"] += "\n\n" for child in node.traverse(): # process list item if isinstance(child, docutils.nodes.paragraph): line_text = self.process_paragraph(str(child)) self.current_json_obj["doc"] += "\t\t* %s\n" % line_text self.current_json_obj = None def visit_warning(self, node): text = self.process_paragraph(str(node)) rst_warning = "\n\n.. warning::\n%s\n" % textwrap.indent(text, "\t") if rst_warning not in self.data["description"]: self.data["description"] += rst_warning def unknown_visit(self, node): pass def unknown_departure(self, node): pass def _parse_httpdomain_doc(doc, data): doc_lines = doc.split("\n") doc_lines_filtered = [] urls = defaultdict(list) default_http_methods = ["HEAD", "OPTIONS"] # httpdomain is a sphinx extension that is unknown to docutils but # fortunately we can still parse its directives' content, # so remove lines with httpdomain directives before executing the # rst parser from docutils for doc_line in doc_lines: if ".. http" not in doc_line: doc_lines_filtered.append(doc_line) else: url = doc_line[doc_line.find("/") :] # emphasize url arguments for html rendering url = re.sub(r"\((\w+)\)", r" **\(\1\)** ", url) method = re.search(r"http:(\w+)::", doc_line).group(1) urls[url].append(method.upper()) for url, methods in urls.items(): data["urls"].append({"rule": url, "methods": methods + default_http_methods}) # parse the rst docstring and do not print system messages about # unknown httpdomain roles document = parse_rst("\n".join(doc_lines_filtered), report_level=5) # remove the system_message nodes from the parsed document for node in document.traverse(docutils.nodes.system_message): node.parent.remove(node) # visit the document nodes to extract relevant endpoint info visitor = _HTTPDomainDocVisitor(document, data) document.walkabout(visitor) class APIDocException(Exception): """ Custom exception to signal errors in the use of the APIDoc decorators """ def api_doc( route: str, noargs: bool = False, - need_params: bool = False, tags: List[str] = [], handle_response: bool = False, api_version: str = "1", ): """ Decorator for an API endpoint implementation used to generate a dedicated view displaying its HTML documentation. The documentation will be generated from the endpoint docstring based on sphinxcontrib-httpdomain format. Args: route: documentation page's route noargs: set to True if the route has no arguments, and its result should be displayed anytime its documentation is requested. Default to False - need_params: specify the route requires query parameters - otherwise errors will occur. It enables to avoid displaying the - invalid response in its HTML documentation. Default to False. tags: Further information on api endpoints. Two values are possibly expected: * hidden: remove the entry points from the listing * upcoming: display the entry point but it is not followable handle_response: indicate if the decorated function takes care of creating the HTTP response or delegates that task to the apiresponse module api_version: api version string """ tags_set = set(tags) # @api_doc() Decorator call def decorator(f): # if the route is not hidden, add it to the index if "hidden" not in tags_set: doc_data = get_doc_data(f, route, noargs) doc_desc = doc_data["description"] first_dot_pos = doc_desc.find(".") APIUrls.add_doc_route( route, doc_desc[: first_dot_pos + 1], noargs=noargs, api_version=api_version, tags=tags_set, ) # create a dedicated view to display endpoint HTML doc @api_view(["GET", "HEAD"]) @wraps(f) def doc_view(request): doc_data = get_doc_data(f, route, noargs) return make_api_response(request, None, doc_data) route_name = "%s-doc" % route[1:-1].replace("/", "-") urlpattern = f"^{api_version}{route}doc/$" view_name = "api-%s-%s" % (api_version, route_name) APIUrls.add_url_pattern(urlpattern, doc_view, view_name) @wraps(f) def documented_view(request, **kwargs): doc_data = get_doc_data(f, route, noargs) - try: response = f(request, **kwargs) except Exception as exc: sentry_sdk.capture_exception(exc) - if ( - request.accepted_media_type == "text/html" - and need_params - and not request.query_params - ): - response = None - else: - return error_response(request, exc, doc_data) + return error_response(request, exc, doc_data) if handle_response: return response else: return make_api_response(request, response, doc_data) return documented_view return decorator @functools.lru_cache(maxsize=32) def get_doc_data(f, route, noargs): """ Build documentation data for the decorated api endpoint function """ data = { "description": "", "response_data": None, "urls": [], "args": [], "params": [], "input_type": "", "inputs": [], "resheaders": [], "reqheaders": [], "return_type": "", "returns": [], "status_codes": [], "examples": [], "route": route, "noargs": noargs, } if not f.__doc__: raise APIDocException( "apidoc: expected a docstring" " for function %s" % (f.__name__,) ) # use raw docstring as endpoint documentation if sphinx # httpdomain is not used if ".. http" not in f.__doc__: data["description"] = f.__doc__ # else parse the sphinx httpdomain docstring with docutils # (except when building the swh-web documentation through autodoc # sphinx extension, not needed and raise errors with sphinx >= 1.7) elif "SWH_WEB_DOC_BUILD" not in os.environ: _parse_httpdomain_doc(f.__doc__, data) # process input/returned object info for nicer html display inputs_list = "" returns_list = "" for inp in data["inputs"]: # special case for array of non object type, for instance # :jsonarr string -: an array of string if ret["name"] != "-": returns_list += "\t* **%s (%s)**: %s\n" % ( ret["name"], ret["type"], ret["doc"], ) data["inputs_list"] = inputs_list data["returns_list"] = returns_list return data DOC_COMMON_HEADERS = """ :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` :resheader Content-Type: this depends on :http:header:`Accept` header of request""" DOC_RESHEADER_LINK = """ :resheader Link: indicates that a subsequent result page is available and contains the url pointing to it """ DEFAULT_SUBSTITUTIONS = { "common_headers": DOC_COMMON_HEADERS, "resheader_link": DOC_RESHEADER_LINK, } def format_docstring(**substitutions): def decorator(f): f.__doc__ = f.__doc__.format(**{**DEFAULT_SUBSTITUTIONS, **substitutions}) return f return decorator diff --git a/swh/web/api/views/origin.py b/swh/web/api/views/origin.py index 4918d957..09e8e02d 100644 --- a/swh/web/api/views/origin.py +++ b/swh/web/api/views/origin.py @@ -1,470 +1,469 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from distutils.util import strtobool from functools import partial from swh.web.common import service from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.utils import enrich_origin, enrich_origin_visit from swh.web.api.views.utils import api_lookup DOC_RETURN_ORIGIN = """ :>json string origin_visits_url: link to in order to get information about the visits for that origin :>json string url: the origin canonical url """ DOC_RETURN_ORIGIN_ARRAY = DOC_RETURN_ORIGIN.replace(":>json", ":>jsonarr") DOC_RETURN_ORIGIN_VISIT = """ :>json string date: ISO representation of the visit date (in UTC) :>json str origin: the origin canonical url :>json string origin_url: link to get information about the origin :>jsonarr string snapshot: the snapshot identifier of the visit (may be null if status is not **full**). :>jsonarr string snapshot_url: link to :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get information about the snapshot of the visit (may be null if status is not **full**). :>json string status: status of the visit (either **full**, **partial** or **ongoing**) :>json number visit: the unique identifier of the visit """ DOC_RETURN_ORIGIN_VISIT_ARRAY = DOC_RETURN_ORIGIN_VISIT.replace(":>json", ":>jsonarr") DOC_RETURN_ORIGIN_VISIT_ARRAY += """ :>jsonarr number id: the unique identifier of the origin :>jsonarr string origin_visit_url: link to :http:get:`/api/1/origin/(origin_url)/visit/(visit_id)/` in order to get information about the visit """ @api_route(r"/origins/", "api-1-origins") @api_doc("/origins/", noargs=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origins(request): """ .. http:get:: /api/1/origins/ Get list of archived software origins. .. warning:: This endpoint used to provide an `origin_from` query parameter, and guarantee an order on results. This is no longer true, and only the Link header should be used for paginating through results. :query int origin_count: The maximum number of origins to return (default to 100, can not exceed 10000) {return_origin_array} {common_headers} {resheader_link} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origins?origin_count=500` """ old_param_origin_from = request.query_params.get("origin_from") if old_param_origin_from: raise BadInputExc("Please use the Link header to browse through result") page_token = request.query_params.get("page_token", None) limit = min(int(request.query_params.get("origin_count", "100")), 10000) page_result = service.lookup_origins(page_token, limit) origins = [enrich_origin(o, request=request) for o in page_result.results] next_page_token = page_result.next_page_token response = {"results": origins, "headers": {}} if next_page_token is not None: response["headers"]["link-next"] = reverse( "api-1-origins", query_params={"page_token": next_page_token, "origin_count": limit}, request=request, ) return response @api_route(r"/origin/(?P.+)/get/", "api-1-origin") @api_doc("/origin/") @format_docstring(return_origin=DOC_RETURN_ORIGIN) def api_origin(request, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/get/ Get information about a software origin. :param string origin_url: the origin url {return_origin} {common_headers} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/python/cpython/get/` """ ori_dict = {"url": origin_url} error_msg = "Origin with url %s not found." % ori_dict["url"] return api_lookup( service.lookup_origin, ori_dict, notfound_msg=error_msg, enrich_fn=enrich_origin, request=request, ) @api_route( r"/origin/search/(?P.+)/", "api-1-origin-search", throttle_scope="swh_api_origin_search", ) @api_doc("/origin/search/") @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_search(request, url_pattern): """ .. http:get:: /api/1/origin/search/(url_pattern)/ Search for software origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. .. warning:: This endpoint used to provide an `offset` query parameter, and guarantee an order on results. This is no longer true, and only the Link header should be used for paginating through results. :param string url_pattern: a string pattern :query int limit: the maximum number of found origins to return (bounded to 1000) :query boolean with_visit: if true, only return origins with at least one visit by Software heritage {return_origin_array} {common_headers} {resheader_link} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/search/python/?limit=2` """ result = {} limit = min(int(request.query_params.get("limit", "70")), 1000) page_token = request.query_params.get("page_token") with_visit = request.query_params.get("with_visit", "false") (results, page_token) = api_lookup( service.search_origin, url_pattern, limit, bool(strtobool(with_visit)), page_token, enrich_fn=enrich_origin, request=request, ) if page_token is not None: query_params = {} query_params["limit"] = limit query_params["page_token"] = page_token result["headers"] = { "link-next": reverse( "api-1-origin-search", url_args={"url_pattern": url_pattern}, query_params=query_params, request=request, ) } result.update({"results": results}) return result @api_route(r"/origin/metadata-search/", "api-1-origin-metadata-search") -@api_doc("/origin/metadata-search/", noargs=True, need_params=True) +@api_doc("/origin/metadata-search/", noargs=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_metadata_search(request): """ .. http:get:: /api/1/origin/metadata-search/ Search for software origins whose metadata (expressed as a JSON-LD/CodeMeta dictionary) match the provided criteria. For now, only full-text search on this dictionary is supported. :query str fulltext: a string that will be matched against origin metadata; results are ranked and ordered starting with the best ones. :query int limit: the maximum number of found origins to return (bounded to 100) {return_origin_array} {common_headers} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/metadata-search/?limit=2&fulltext=Jane%20Doe` """ fulltext = request.query_params.get("fulltext", None) limit = min(int(request.query_params.get("limit", "70")), 100) - if not fulltext: content = '"fulltext" must be provided and non-empty.' raise BadInputExc(content) results = api_lookup( service.search_origin_metadata, fulltext, limit, request=request ) return { "results": results, } @api_route(r"/origin/(?P.*)/visits/", "api-1-origin-visits") @api_doc("/origin/visits/") @format_docstring(return_origin_visit_array=DOC_RETURN_ORIGIN_VISIT_ARRAY) def api_origin_visits(request, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/visits/ Get information about all visits of a software origin. Visits are returned sorted in descending order according to their date. :param str origin_url: a software origin URL :query int per_page: specify the number of visits to list, for pagination purposes :query int last_visit: visit to start listing from, for pagination purposes {common_headers} {resheader_link} {return_origin_visit_array} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visits/` """ result = {} origin_query = {"url": origin_url} notfound_msg = "No origin {} found".format(origin_url) url_args_next = {"origin_url": origin_url} per_page = int(request.query_params.get("per_page", "10")) last_visit = request.query_params.get("last_visit") if last_visit: last_visit = int(last_visit) def _lookup_origin_visits(origin_query, last_visit=last_visit, per_page=per_page): all_visits = get_origin_visits(origin_query) all_visits.reverse() visits = [] if not last_visit: visits = all_visits[:per_page] else: for i, v in enumerate(all_visits): if v["visit"] == last_visit: visits = all_visits[i + 1 : i + 1 + per_page] break for v in visits: yield v results = api_lookup( _lookup_origin_visits, origin_query, notfound_msg=notfound_msg, enrich_fn=partial( enrich_origin_visit, with_origin_link=False, with_origin_visit_link=True ), request=request, ) if results: nb_results = len(results) if nb_results == per_page: new_last_visit = results[-1]["visit"] query_params = {} query_params["last_visit"] = new_last_visit if request.query_params.get("per_page"): query_params["per_page"] = per_page result["headers"] = { "link-next": reverse( "api-1-origin-visits", url_args=url_args_next, query_params=query_params, request=request, ) } result.update({"results": results}) return result @api_route( r"/origin/(?P.*)/visit/latest/", "api-1-origin-visit-latest", throttle_scope="swh_api_origin_visit_latest", ) @api_doc("/origin/visit/latest/") @format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) def api_origin_visit_latest(request, origin_url=None): """ .. http:get:: /api/1/origin/(origin_url)/visit/latest/ Get information about the latest visit of a software origin. :param str origin_url: a software origin URL :query boolean require_snapshot: if true, only return a visit with a snapshot {common_headers} {return_origin_visit} :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visit/latest/` """ require_snapshot = request.query_params.get("require_snapshot", "false") return api_lookup( service.lookup_origin_visit_latest, origin_url, bool(strtobool(require_snapshot)), notfound_msg=("No visit for origin {} found".format(origin_url)), enrich_fn=partial( enrich_origin_visit, with_origin_link=True, with_origin_visit_link=False ), request=request, ) @api_route( r"/origin/(?P.*)/visit/(?P[0-9]+)/", "api-1-origin-visit" ) @api_doc("/origin/visit/") @format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) def api_origin_visit(request, visit_id, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/visit/(visit_id)/ Get information about a specific visit of a software origin. :param str origin_url: a software origin URL :param int visit_id: a visit identifier {common_headers} {return_origin_visit} :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visit/1/` """ return api_lookup( service.lookup_origin_visit, origin_url, int(visit_id), notfound_msg=("No visit {} for origin {} found".format(visit_id, origin_url)), enrich_fn=partial( enrich_origin_visit, with_origin_link=True, with_origin_visit_link=False ), request=request, ) @api_route( r"/origin/(?P.+)" "/intrinsic-metadata", "api-origin-intrinsic-metadata" ) @api_doc("/origin/intrinsic-metadata/") @format_docstring() def api_origin_intrinsic_metadata(request, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/intrinsic-metadata Get intrinsic metadata of a software origin (as a JSON-LD/CodeMeta dictionary). :param string origin_url: the origin url :>json string ???: intrinsic metadata field of the origin {common_headers} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/python/cpython/intrinsic-metadata` """ return api_lookup( service.lookup_origin_intrinsic_metadata, origin_url, notfound_msg=f"Origin with url {origin_url} not found", enrich_fn=enrich_origin, request=request, ) diff --git a/swh/web/tests/api/views/__init__.py b/swh/web/tests/api/views/__init__.py index e69de29b..77a7e05b 100644 --- a/swh/web/tests/api/views/__init__.py +++ b/swh/web/tests/api/views/__init__.py @@ -0,0 +1,71 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Optional + +from rest_framework.test import APIClient +from rest_framework.response import Response + + +def check_api_get_responses( + api_client: APIClient, url: str, status_code: int +) -> Response: + """Helper function to check Web API responses to GET requests + for all accepted content types. + + Args: + api_client: DRF test client + url: Web API URL to check responses + status_code: expected HTTP status code + + Returns: + The Web API JSON response + """ + # check API Web UI + html_content_type = "text/html" + resp = api_client.get(url, HTTP_ACCEPT=html_content_type) + assert resp.status_code == status_code, resp.content + assert resp["Content-Type"] == html_content_type + + # check YAML response + yaml_content_type = "application/yaml" + resp = api_client.get(url, HTTP_ACCEPT=yaml_content_type) + assert resp.status_code == status_code, resp.data + assert resp["Content-Type"] == yaml_content_type + + # check JSON response + resp = api_client.get(url) + assert resp.status_code == status_code, resp.data + assert resp["Content-Type"] == "application/json" + + return resp + + +def check_api_post_responses( + api_client: APIClient, url: str, data: Optional[Dict[str, Any]], status_code: int +) -> Response: + """Helper function to check Web API responses to POST requests + for all accepted content types. + + Args: + api_client: DRF test client + url: Web API URL to check responses + status_code: expected HTTP status code + + Returns: + The Web API JSON response + """ + # check YAML response + yaml_content_type = "application/yaml" + resp = api_client.post(url, data=data, format="json", HTTP_ACCEPT=yaml_content_type) + assert resp.status_code == status_code, resp.data + assert resp["Content-Type"] == yaml_content_type + + # check JSON response + resp = api_client.post(url, data=data, format="json") + assert resp.status_code == status_code, resp.data + assert resp["Content-Type"] == "application/json" + + return resp diff --git a/swh/web/tests/api/views/test_content.py b/swh/web/tests/api/views/test_content.py index 69f0e2d2..af91ceec 100644 --- a/swh/web/tests/api/views/test_content.py +++ b/swh/web/tests/api/views/test_content.py @@ -1,399 +1,329 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from hypothesis import given from swh.web.common.utils import reverse +from swh.web.tests.api.views import check_api_get_responses, check_api_post_responses from swh.web.tests.data import random_content from swh.web.tests.strategies import content, contents_with_ctags from swh.web.tests.conftest import ctags_json_missing, fossology_missing @given(content()) def test_api_content_filetype(api_client, indexer_data, content): indexer_data.content_add_mimetype(content["sha1"]) url = reverse( "api-1-content-filetype", url_args={"q": "sha1_git:%s" % content["sha1_git"]} ) - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=200) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" content_url = reverse( "api-1-content", url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) expected_data = indexer_data.content_get_mimetype(content["sha1"]) expected_data["content_url"] = content_url assert rv.data == expected_data def test_api_content_filetype_sha_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-filetype", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=404) - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" assert rv.data == { "exception": "NotFoundExc", "reason": "No filetype information found for content " "sha1:%s." % unknown_content_["sha1"], } @pytest.mark.skip # Language indexer is disabled @given(content()) def test_api_content_language(api_client, indexer_data, content): indexer_data.content_add_language(content["sha1"]) url = reverse( "api-1-content-language", url_args={"q": "sha1_git:%s" % content["sha1_git"]} ) - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=200) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" content_url = reverse( "api-1-content", url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) expected_data = indexer_data.content_get_language(content["sha1"]) expected_data["content_url"] = content_url assert rv.data == expected_data def test_api_content_language_sha_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-language", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No language information found for content " "sha1:%s." % unknown_content_["sha1"], } @pytest.mark.skip # Language indexer is disabled @pytest.mark.skipif( ctags_json_missing, reason="requires ctags with json output support" ) @given(contents_with_ctags()) def test_api_content_symbol(api_client, indexer_data, contents_with_ctags): expected_data = {} for content_sha1 in contents_with_ctags["sha1s"]: indexer_data.content_add_ctags(content_sha1) for ctag in indexer_data.content_get_ctags(content_sha1): if ctag["name"] == contents_with_ctags["symbol_name"]: expected_data[content_sha1] = ctag break url = reverse( "api-1-content-symbol", url_args={"q": contents_with_ctags["symbol_name"]}, query_params={"per_page": 100}, ) - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=200) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" for entry in rv.data: content_sha1 = entry["sha1"] expected_entry = expected_data[content_sha1] for key, view_name in ( ("content_url", "api-1-content"), ("data_url", "api-1-content-raw"), ("license_url", "api-1-content-license"), ("language_url", "api-1-content-language"), ("filetype_url", "api-1-content-filetype"), ): expected_entry[key] = reverse( view_name, url_args={"q": "sha1:%s" % content_sha1}, request=rv.wsgi_request, ) expected_entry["sha1"] = content_sha1 del expected_entry["id"] assert entry == expected_entry assert "Link" not in rv url = reverse( "api-1-content-symbol", url_args={"q": contents_with_ctags["symbol_name"]}, query_params={"per_page": 2}, ) - rv = api_client.get(url) + + rv = check_api_get_responses(api_client, url, status_code=200) next_url = ( reverse( "api-1-content-symbol", url_args={"q": contents_with_ctags["symbol_name"]}, query_params={"last_sha1": rv.data[1]["sha1"], "per_page": 2}, request=rv.wsgi_request, ), ) assert rv["Link"] == '<%s>; rel="next"' % next_url def test_api_content_symbol_not_found(api_client): url = reverse("api-1-content-symbol", url_args={"q": "bar"}) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No indexed raw content match expression 'bar'.", } assert "Link" not in rv @pytest.mark.skipif( ctags_json_missing, reason="requires ctags with json output support" ) @given(content()) def test_api_content_ctags(api_client, indexer_data, content): indexer_data.content_add_ctags(content["sha1"]) url = reverse( "api-1-content-ctags", url_args={"q": "sha1_git:%s" % content["sha1_git"]} ) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) content_url = reverse( "api-1-content", url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) expected_data = list(indexer_data.content_get_ctags(content["sha1"])) for e in expected_data: e["content_url"] = content_url assert rv.data == expected_data @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_api_content_license(api_client, indexer_data, content): indexer_data.content_add_license(content["sha1"]) url = reverse( "api-1-content-license", url_args={"q": "sha1_git:%s" % content["sha1_git"]} ) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) content_url = reverse( "api-1-content", url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) expected_data = indexer_data.content_get_license(content["sha1"]) expected_data["content_url"] = content_url assert rv.data == expected_data def test_api_content_license_sha_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-license", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No license information found for content " "sha1:%s." % unknown_content_["sha1"], } @given(content()) def test_api_content_metadata(api_client, archive_data, content): url = reverse("api-1-content", {"q": "sha1:%s" % content["sha1"]}) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) expected_data = archive_data.content_get(content["sha1"]) for key, view_name in ( ("data_url", "api-1-content-raw"), ("license_url", "api-1-content-license"), ("language_url", "api-1-content-language"), ("filetype_url", "api-1-content-filetype"), ): expected_data[key] = reverse( view_name, url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) assert rv.data == expected_data -def test_api_content_not_found_as_json(api_client): +def test_api_content_not_found(api_client): unknown_content_ = random_content() url = reverse("api-1-content", url_args={"q": "sha1:%s" % unknown_content_["sha1"]}) - rv = api_client.get(url) - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Content with sha1 checksum equals to %s not found!" % unknown_content_["sha1"], } -def test_api_content_not_found_as_yaml(api_client): - unknown_content_ = random_content() - - url = reverse( - "api-1-content", url_args={"q": "sha256:%s" % unknown_content_["sha256"]} - ) - rv = api_client.get(url, HTTP_ACCEPT="application/yaml") - - assert rv.status_code == 404, rv.data - assert "application/yaml" in rv["Content-Type"] - - assert rv.data == { - "exception": "NotFoundExc", - "reason": "Content with sha256 checksum equals to %s not found!" - % unknown_content_["sha256"], - } - - def test_api_content_raw_ko_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-raw", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Content with sha1 checksum equals to %s not found!" % unknown_content_["sha1"], } @given(content()) def test_api_content_raw_text(api_client, archive_data, content): url = reverse("api-1-content-raw", url_args={"q": "sha1:%s" % content["sha1"]}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv["Content-Type"] == "application/octet-stream" assert ( rv["Content-disposition"] == "attachment; filename=content_sha1_%s_raw" % content["sha1"] ) assert rv["Content-Type"] == "application/octet-stream" expected_data = archive_data.content_get_data(content["sha1"]) assert rv.content == expected_data["data"] @given(content()) def test_api_content_raw_text_with_filename(api_client, archive_data, content): url = reverse( "api-1-content-raw", url_args={"q": "sha1:%s" % content["sha1"]}, query_params={"filename": "filename.txt"}, ) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv["Content-Type"] == "application/octet-stream" assert rv["Content-disposition"] == "attachment; filename=filename.txt" assert rv["Content-Type"] == "application/octet-stream" expected_data = archive_data.content_get_data(content["sha1"]) assert rv.content == expected_data["data"] @given(content()) def test_api_check_content_known(api_client, content): url = reverse("api-1-content-known", url_args={"q": content["sha1"]}) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" - - assert rv.data == { - "search_res": [{"found": True, "sha1": content["sha1"]}], - "search_stats": {"nbfiles": 1, "pct": 100.0}, - } - - -@given(content()) -def test_api_check_content_known_as_yaml(api_client, content): - url = reverse("api-1-content-known", url_args={"q": content["sha1"]}) - rv = api_client.get(url, HTTP_ACCEPT="application/yaml") - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/yaml" - + rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == { "search_res": [{"found": True, "sha1": content["sha1"]}], "search_stats": {"nbfiles": 1, "pct": 100.0}, } @given(content()) -def test_api_check_content_known_post_as_yaml(api_client, content): +def test_api_check_content_known_post(api_client, content): url = reverse("api-1-content-known") - rv = api_client.post( - url, data={"q": content["sha1"]}, HTTP_ACCEPT="application/yaml" + rv = check_api_post_responses( + api_client, url, data={"q": content["sha1"]}, status_code=200 ) - assert rv.status_code == 200, rv.data - assert "application/yaml" in rv["Content-Type"] assert rv.data == { "search_res": [{"found": True, "sha1": content["sha1"]}], "search_stats": {"nbfiles": 1, "pct": 100.0}, } def test_api_check_content_known_not_found(api_client): unknown_content_ = random_content() url = reverse("api-1-content-known", url_args={"q": unknown_content_["sha1"]}) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == { "search_res": [{"found": False, "sha1": unknown_content_["sha1"]}], "search_stats": {"nbfiles": 1, "pct": 0.0}, } @given(content()) def test_api_content_uppercase(api_client, content): url = reverse( "api-1-content-uppercase-checksum", url_args={"q": content["sha1"].upper()} ) rv = api_client.get(url) assert rv.status_code == 302, rv.data redirect_url = reverse("api-1-content", url_args={"q": content["sha1"]}) assert rv["location"] == redirect_url diff --git a/swh/web/tests/api/views/test_directory.py b/swh/web/tests/api/views/test_directory.py index ab4bc54e..cec21a5c 100644 --- a/swh/web/tests/api/views/test_directory.py +++ b/swh/web/tests/api/views/test_directory.py @@ -1,91 +1,80 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.web.api.utils import enrich_directory from swh.web.common.utils import reverse +from swh.web.tests.api.views import check_api_get_responses from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import directory @given(directory()) def test_api_directory(api_client, archive_data, directory): url = reverse("api-1-directory", url_args={"sha1_git": directory}) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) dir_content = list(archive_data.directory_ls(directory)) expected_data = list( map(enrich_directory, dir_content, [rv.wsgi_request] * len(dir_content)) ) assert rv.data == expected_data def test_api_directory_not_found(api_client): unknown_directory_ = random_sha1() url = reverse("api-1-directory", url_args={"sha1_git": unknown_directory_}) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Directory with sha1_git %s not found" % unknown_directory_, } @given(directory()) def test_api_directory_with_path_found(api_client, archive_data, directory): directory_content = archive_data.directory_ls(directory) path = random.choice(directory_content) url = reverse( "api-1-directory", url_args={"sha1_git": directory, "path": path["name"]} ) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == enrich_directory(path, rv.wsgi_request) @given(directory()) def test_api_directory_with_path_not_found(api_client, directory): path = "some/path/to/nonexistent/dir/" url = reverse("api-1-directory", url_args={"sha1_git": directory, "path": path}) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": ( "Directory entry with path %s from %s not found" % (path, directory) ), } @given(directory()) def test_api_directory_uppercase(api_client, directory): url = reverse( "api-1-directory-uppercase-checksum", url_args={"sha1_git": directory.upper()} ) resp = api_client.get(url) assert resp.status_code == 302 redirect_url = reverse("api-1-directory", url_args={"sha1_git": directory}) assert resp["location"] == redirect_url diff --git a/swh/web/tests/api/views/test_identifiers.py b/swh/web/tests/api/views/test_identifiers.py index 0eb09d1a..48a4b5d9 100644 --- a/swh/web/tests/api/views/test_identifiers.py +++ b/swh/web/tests/api/views/test_identifiers.py @@ -1,194 +1,167 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import reverse +from swh.web.tests.api.views import check_api_get_responses, check_api_post_responses from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( content, directory, origin, release, revision, snapshot, unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ) @given(origin(), content(), directory(), release(), revision(), snapshot()) def test_swhid_resolve_success( api_client, client, origin, content, directory, release, revision, snapshot ): for obj_type, obj_id in ( (CONTENT, content["sha1_git"]), (DIRECTORY, directory), (RELEASE, release), (REVISION, revision), (SNAPSHOT, snapshot), ): swhid = gen_swhid(obj_type, obj_id, metadata={"origin": origin["url"]}) url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) - resp = api_client.get(url) + resp = check_api_get_responses(api_client, url, status_code=200) if obj_type == CONTENT: url_args = {"query_string": "sha1_git:%s" % obj_id} elif obj_type == SNAPSHOT: url_args = {"snapshot_id": obj_id} else: url_args = {"sha1_git": obj_id} browse_rev_url = reverse( "browse-%s" % obj_type, url_args=url_args, query_params={"origin_url": origin["url"]}, request=resp.wsgi_request, ) expected_result = { "browse_url": browse_rev_url, "metadata": {"origin": origin["url"]}, "namespace": "swh", "object_id": obj_id, "object_type": obj_type, "scheme_version": 1, } - assert resp.status_code == 200, resp.data assert resp.data == expected_result - # also checks endpoint documented view - # TODO: remove that check once T2529 is implemented - resp = client.get(url, HTTP_ACCEPT="text/html") - assert resp.status_code == 200, resp.content - def test_swhid_resolve_invalid(api_client): rev_id_invalid = "96db9023b8_foo_50d6c108e9a3" swhid = "swh:1:rev:%s" % rev_id_invalid url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) - - resp = api_client.get(url) - - assert resp.status_code == 400, resp.data + check_api_get_responses(api_client, url, status_code=400) @given( unknown_content(), unknown_directory(), unknown_release(), unknown_revision(), unknown_snapshot(), ) def test_swhid_resolve_not_found( api_client, unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ): for obj_type, obj_id in ( (CONTENT, unknown_content["sha1_git"]), (DIRECTORY, unknown_directory), (RELEASE, unknown_release), (REVISION, unknown_revision), (SNAPSHOT, unknown_snapshot), ): swhid = gen_swhid(obj_type, obj_id) url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) - resp = api_client.get(url) - - assert resp.status_code == 404, resp.data + check_api_get_responses(api_client, url, status_code=404) def test_swh_origin_id_not_resolvable(api_client): ori_swhid = "swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb" url = reverse("api-1-resolve-swhid", url_args={"swhid": ori_swhid}) - resp = api_client.get(url) - assert resp.status_code == 400, resp.data + check_api_get_responses(api_client, url, status_code=400) @given(content(), directory()) def test_api_known_swhid_some_present(api_client, content, directory): content_ = gen_swhid(CONTENT, content["sha1_git"]) directory_ = gen_swhid(DIRECTORY, directory) unknown_revision_ = gen_swhid(REVISION, random_sha1()) unknown_release_ = gen_swhid(RELEASE, random_sha1()) unknown_snapshot_ = gen_swhid(SNAPSHOT, random_sha1()) input_swhids = [ content_, directory_, unknown_revision_, unknown_release_, unknown_snapshot_, ] url = reverse("api-1-known") - resp = api_client.post( - url, data=input_swhids, format="json", HTTP_ACCEPT="application/json" - ) + resp = check_api_post_responses(api_client, url, data=input_swhids, status_code=200) - assert resp.status_code == 200, resp.data - assert resp["Content-Type"] == "application/json" assert resp.data == { content_: {"known": True}, directory_: {"known": True}, unknown_revision_: {"known": False}, unknown_release_: {"known": False}, unknown_snapshot_: {"known": False}, } def test_api_known_invalid_swhid(api_client): invalid_swhid_sha1 = ["swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13de;"] invalid_swhid_type = ["swh:1:cnn:8068d0075010b590762c6cb5682ed53cb3c13deb"] url = reverse("api-1-known") - resp = api_client.post( - url, data=invalid_swhid_sha1, format="json", HTTP_ACCEPT="application/json" - ) - - assert resp.status_code == 400, resp.data - - resp2 = api_client.post( - url, data=invalid_swhid_type, format="json", HTTP_ACCEPT="application/json" - ) + check_api_post_responses(api_client, url, data=invalid_swhid_sha1, status_code=400) - assert resp2.status_code == 400, resp.data + check_api_post_responses(api_client, url, data=invalid_swhid_type, status_code=400) def test_api_known_raises_large_payload_error(api_client): random_swhid = "swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13deb" limit = 10000 err_msg = "The maximum number of SWHIDs this endpoint can receive is 1000" swhids = [random_swhid for i in range(limit)] url = reverse("api-1-known") - resp = api_client.post( - url, data=swhids, format="json", HTTP_ACCEPT="application/json" - ) + resp = check_api_post_responses(api_client, url, data=swhids, status_code=413) - assert resp.status_code == 413, resp.data - assert resp["Content-Type"] == "application/json" assert resp.data == {"exception": "LargePayloadExc", "reason": err_msg} diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index 4ac4e907..64c258e9 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,757 +1,684 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta from hypothesis import given import pytest from requests.utils import parse_header_links from swh.model.model import Origin, OriginVisit, OriginVisitStatus from swh.storage.exc import StorageDBError, StorageAPIError from swh.storage.utils import now from swh.web.api.utils import enrich_origin_visit, enrich_origin from swh.web.common.exc import BadInputExc from swh.web.common.utils import reverse from swh.web.common.origin_visits import get_origin_visits +from swh.web.tests.api.views import check_api_get_responses from swh.web.tests.strategies import origin, new_origin, visit_dates, new_snapshots def _scroll_results(api_client, url): """Iterates through pages of results, and returns them all.""" results = [] while True: rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv["Content-Type"] == "application/json" results.extend(rv.data) if "Link" in rv: for link in parse_header_links(rv["Link"]): if link["rel"] == "next": # Found link to next page of results url = link["url"] break else: # No link with 'rel=next' break else: # No Link header break return results def test_api_lookup_origin_visits_raise_error(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "voluntary error to check the bad request middleware." mock_get_origin_visits.side_effect = BadInputExc(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) - rv = api_client.get(url) - - assert rv.status_code == 400, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == {"exception": "BadInputExc", "reason": err_msg} def test_api_lookup_origin_visits_raise_swh_storage_error_db(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage exploded! Will be back online shortly!" mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) - rv = api_client.get(url) - - assert rv.status_code == 503, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageDBError", "reason": "An unexpected error occurred in the backend: %s" % err_msg, } def test_api_lookup_origin_visits_raise_swh_storage_error_api(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage API dropped dead! Will resurrect asap!" mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) - rv = api_client.get(url) - - assert rv.status_code == 503, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageAPIError", "reason": "An unexpected error occurred in the api backend: %s" % err_msg, } @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=now(), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]["visit"], all_visits[2:]), ): url = reverse( "api-1-origin-visits", url_args={"origin_url": new_origin.url}, query_params={"per_page": 2, "last_visit": last_visit}, ) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request, ) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=now(), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]["visit"], all_visits[2:4]), ): url = reverse( "api-1-origin-visits", url_args={"origin_url": new_origin.url}, query_params={"per_page": 2, "last_visit": last_visit}, ) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request, ) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_id = origin_visit.visit archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=visit_date + timedelta(minutes=5), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit", url_args={"origin_url": new_origin.url, "visit_id": visit_id}, ) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_get_by(new_origin.url, visit_id) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(new_origin()) def test_api_lookup_origin_visit_latest_no_visit(api_client, archive_data, new_origin): archive_data.origin_add([new_origin]) url = reverse("api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}) - rv = api_client.get(url) - assert rv.status_code == 404, rv.data + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No visit for origin %s found" % new_origin.url, } @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit_ids[0], date=now(), status="full", snapshot=new_snapshots[0].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse("api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_get_by(new_origin.url, visit_ids[1]) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest_with_snapshot( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) # Add snapshot to the latest visit visit_id = visit_ids[-1] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit_id, date=now(), status="full", snapshot=new_snapshots[0].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}, query_params={"require_snapshot": True}, ) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_status_get_latest( new_origin.url, type="git", require_snapshot=True ) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(origin()) def test_api_lookup_origin_visit_not_found(api_client, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v["visit"] for v in all_visits]) url = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": max_visit_id + 1}, ) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin %s or its visit with id %s not found!" % (origin["url"], max_visit_id + 1), } def test_api_origins_wrong_input(api_client, archive_data): """Should fail with 400 if the input is deprecated. """ # fail if wrong input url = reverse("api-1-origins", query_params={"origin_from": 1}) - rv = api_client.get(url) - - assert rv.status_code == 400, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == { "exception": "BadInputExc", "reason": "Please use the Link header to browse through result", } def test_api_origins(api_client, archive_data): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} # Get only one url = reverse("api-1-origins", query_params={"origin_count": 1}) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= origin_urls # Get all url = reverse("api-1-origins", query_params={"origin_count": len(origins)}) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls # Get "all + 10" url = reverse("api-1-origins", query_params={"origin_count": len(origins) + 10}) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls @pytest.mark.parametrize("origin_count", [1, 2, 10, 100]) def test_api_origins_scroll(api_client, archive_data, origin_count): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} url = reverse("api-1-origins", query_params={"origin_count": origin_count}) results = _scroll_results(api_client, url) assert len(results) == len(origins) assert {origin["url"] for origin in results} == origin_urls @given(origin()) def test_api_origin_by_url(api_client, archive_data, origin): origin_url = origin["url"] url = reverse("api-1-origin", url_args={"origin_url": origin_url}) - rv = api_client.get(url) - + rv = check_api_get_responses(api_client, url, status_code=200) expected_origin = archive_data.origin_get([origin_url])[0] expected_origin = enrich_origin(expected_origin, rv.wsgi_request) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" assert rv.data == expected_origin @given(new_origin()) def test_api_origin_not_found(api_client, new_origin): url = reverse("api-1-origin", url_args={"origin_url": new_origin.url}) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin with url %s not found!" % new_origin.url, } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.service.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } # Search for 'github.com', get only one url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 1}, ) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= expected_origins # Search for 'github.com', get all url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 2}, ) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins # Search for 'github.com', get more than available url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 10}, ) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_words(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.service.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com"}, query_params={"limit": 2}, ) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "com github"}, query_params={"limit": 2}, ) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "memononen libtess2"}, query_params={"limit": 2}, ) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } url = reverse( "api-1-origin-search", url_args={"url_pattern": "libtess2 memononen"}, query_params={"limit": 2}, ) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) @pytest.mark.parametrize("limit", [1, 2, 3, 10]) def test_api_origin_search_scroll(api_client, archive_data, mocker, limit, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.service.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": limit}, ) results = _scroll_results(api_client, url) assert {origin["url"] for origin in results} == expected_origins @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_limit(api_client, archive_data, tests_data, mocker, backend): if backend == "swh-search": tests_data["search"].origin_update( [{"url": "http://foobar/{}".format(i)} for i in range(2000)] ) else: # equivalent to not configuring search in the config mocker.patch("swh.web.common.service.search", None) archive_data.origin_add( [Origin(url="http://foobar/{}".format(i)) for i in range(2000)] ) url = reverse( "api-1-origin-search", url_args={"url_pattern": "foobar"}, query_params={"limit": 1050}, ) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1000 @given(origin()) def test_api_origin_metadata_search(api_client, mocker, origin): mock_idx_storage = mocker.patch("swh.web.common.service.idx_storage") oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [ { "from_revision": ( b"p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed " b"\xf2U\xfa\x05B8" ), "metadata": {"author": "Jane Doe"}, "id": origin["url"], "tool": { "configuration": { "context": ["NpmMapping", "CodemetaMapping"], "type": "local", }, "id": 3, "name": "swh-metadata-detector", "version": "0.0.1", }, } ] url = reverse("api-1-origin-metadata-search", query_params={"fulltext": "Jane Doe"}) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.content - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) expected_data = [ { "url": origin["url"], "metadata": { "metadata": {"author": "Jane Doe"}, "from_revision": ("7026b7c1a2af56521e951c01ed20f255fa054238"), "tool": { "configuration": { "context": ["NpmMapping", "CodemetaMapping"], "type": "local", }, "id": 3, "name": "swh-metadata-detector", "version": "0.0.1", }, }, } ] assert rv.data == expected_data oimsft.assert_called_with(conjunction=["Jane Doe"], limit=70) @given(origin()) def test_api_origin_metadata_search_limit(api_client, mocker, origin): mock_idx_storage = mocker.patch("swh.web.common.service.idx_storage") oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [ { "from_revision": ( b"p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed " b"\xf2U\xfa\x05B8" ), "metadata": {"author": "Jane Doe"}, "id": origin["url"], "tool": { "configuration": { "context": ["NpmMapping", "CodemetaMapping"], "type": "local", }, "id": 3, "name": "swh-metadata-detector", "version": "0.0.1", }, } ] url = reverse("api-1-origin-metadata-search", query_params={"fulltext": "Jane Doe"}) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.content - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=["Jane Doe"], limit=70) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": "Jane Doe", "limit": 10}, ) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.content - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=["Jane Doe"], limit=10) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": "Jane Doe", "limit": 987}, ) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.content - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=["Jane Doe"], limit=100) @given(origin()) def test_api_origin_intrinsic_metadata(api_client, mocker, origin): mock_idx_storage = mocker.patch("swh.web.common.service.idx_storage") oimg = mock_idx_storage.origin_intrinsic_metadata_get oimg.side_effect = lambda origin_urls: [ { "from_revision": ( b"p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed " b"\xf2U\xfa\x05B8" ), "metadata": {"author": "Jane Doe"}, "id": origin["url"], "tool": { "configuration": { "context": ["NpmMapping", "CodemetaMapping"], "type": "local", }, "id": 3, "name": "swh-metadata-detector", "version": "0.0.1", }, } ] url = reverse( "api-origin-intrinsic-metadata", url_args={"origin_url": origin["url"]} ) - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=200) + + oimg.assert_called_with([origin["url"]]) - oimg.assert_called_once_with([origin["url"]]) - assert rv.status_code == 200, rv.content - assert rv["Content-Type"] == "application/json" expected_data = {"author": "Jane Doe"} assert rv.data == expected_data def test_api_origin_metadata_search_invalid(api_client, mocker): mock_idx_storage = mocker.patch("swh.web.common.service.idx_storage") url = reverse("api-1-origin-metadata-search") - rv = api_client.get(url) - - assert rv.status_code == 400, rv.content + check_api_get_responses(api_client, url, status_code=400) mock_idx_storage.assert_not_called() diff --git a/swh/web/tests/api/views/test_origin_save.py b/swh/web/tests/api/views/test_origin_save.py index f881f067..66e3fa1f 100644 --- a/swh/web/tests/api/views/test_origin_save.py +++ b/swh/web/tests/api/views/test_origin_save.py @@ -1,328 +1,320 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from datetime import datetime, timedelta from django.utils import timezone from swh.web.common.utils import reverse from swh.web.common.models import ( SaveUnauthorizedOrigin, SaveOriginRequest, SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_REJECTED, SAVE_REQUEST_PENDING, -) -from swh.web.common.models import ( SAVE_TASK_NOT_CREATED, SAVE_TASK_NOT_YET_SCHEDULED, SAVE_TASK_SCHEDULED, SAVE_TASK_FAILED, SAVE_TASK_SUCCEED, ) +from swh.web.tests.api.views import check_api_get_responses, check_api_post_responses pytestmark = pytest.mark.django_db @pytest.fixture(autouse=True) def populated_db(): SaveUnauthorizedOrigin.objects.create(url="https://github.com/user/illegal_repo") SaveUnauthorizedOrigin.objects.create(url="https://gitlab.com/user_to_exclude") def test_invalid_visit_type(api_client): url = reverse( "api-1-save-origin", url_args={ "visit_type": "foo", "origin_url": "https://github.com/torvalds/linux", }, ) - - response = api_client.post(url) - assert response.status_code == 400 + check_api_get_responses(api_client, url, status_code=400) def test_invalid_origin_url(api_client): url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": "bar"} ) - - response = api_client.post(url) - assert response.status_code == 400 + check_api_get_responses(api_client, url, status_code=400) def check_created_save_request_status( api_client, mocker, origin_url, scheduler_task_status, expected_request_status, expected_task_status=None, visit_date=None, ): mock_scheduler = mocker.patch("swh.web.common.origin_save.scheduler") if not scheduler_task_status: mock_scheduler.get_tasks.return_value = [] else: mock_scheduler.get_tasks.return_value = [ { "priority": "high", "policy": "oneshot", "type": "load-git", "arguments": {"kwargs": {"repo_url": origin_url}, "args": []}, "status": scheduler_task_status, "id": 1, } ] mock_scheduler.create_tasks.return_value = [ { "priority": "high", "policy": "oneshot", "type": "load-git", "arguments": {"kwargs": {"repo_url": origin_url}, "args": []}, "status": "next_run_not_scheduled", "id": 1, } ] url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": origin_url} ) mock_visit_date = mocker.patch( ("swh.web.common.origin_save." "_get_visit_info_for_save_request") ) mock_visit_date.return_value = (visit_date, None) - response = api_client.post(url) if expected_request_status != SAVE_REQUEST_REJECTED: - assert response.status_code == 200, response.data + response = check_api_post_responses(api_client, url, data=None, status_code=200) assert response.data["save_request_status"] == expected_request_status assert response.data["save_task_status"] == expected_task_status else: - assert response.status_code == 403, response.data + check_api_post_responses(api_client, url, data=None, status_code=403) def check_save_request_status( api_client, mocker, origin_url, expected_request_status, expected_task_status, scheduler_task_status="next_run_not_scheduled", visit_date=None, ): mock_scheduler = mocker.patch("swh.web.common.origin_save.scheduler") mock_scheduler.get_tasks.return_value = [ { "priority": "high", "policy": "oneshot", "type": "load-git", "arguments": {"kwargs": {"repo_url": origin_url}, "args": []}, "status": scheduler_task_status, "id": 1, } ] url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": origin_url} ) mock_visit_date = mocker.patch( ("swh.web.common.origin_save." "_get_visit_info_for_save_request") ) mock_visit_date.return_value = (visit_date, None) - response = api_client.get(url) - assert response.status_code == 200, response.data + response = check_api_get_responses(api_client, url, status_code=200) save_request_data = response.data[0] assert save_request_data["save_request_status"] == expected_request_status assert save_request_data["save_task_status"] == expected_task_status # Check that save task status is still available when # the scheduler task has been archived mock_scheduler.get_tasks.return_value = [] - response = api_client.get(url) - assert response.status_code == 200 + response = check_api_get_responses(api_client, url, status_code=200) save_request_data = response.data[0] assert save_request_data["save_task_status"] == expected_task_status def test_save_request_rejected(api_client, mocker): origin_url = "https://github.com/user/illegal_repo" check_created_save_request_status( api_client, mocker, origin_url, None, SAVE_REQUEST_REJECTED ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_REJECTED, SAVE_TASK_NOT_CREATED ) def test_save_request_pending(api_client, mocker): origin_url = "https://unkwownforge.com/user/repo" check_created_save_request_status( api_client, mocker, origin_url, None, SAVE_REQUEST_PENDING, SAVE_TASK_NOT_CREATED, ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_PENDING, SAVE_TASK_NOT_CREATED ) def test_save_request_succeed(api_client, mocker): origin_url = "https://github.com/Kitware/CMake" check_created_save_request_status( api_client, mocker, origin_url, None, SAVE_REQUEST_ACCEPTED, SAVE_TASK_NOT_YET_SCHEDULED, ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_ACCEPTED, SAVE_TASK_SCHEDULED, scheduler_task_status="next_run_scheduled", ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_ACCEPTED, SAVE_TASK_SUCCEED, scheduler_task_status="completed", visit_date=None, ) visit_date = datetime.now(tz=timezone.utc) + timedelta(hours=1) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_ACCEPTED, SAVE_TASK_SUCCEED, scheduler_task_status="completed", visit_date=visit_date, ) def test_save_request_failed(api_client, mocker): origin_url = "https://gitlab.com/inkscape/inkscape" check_created_save_request_status( api_client, mocker, origin_url, None, SAVE_REQUEST_ACCEPTED, SAVE_TASK_NOT_YET_SCHEDULED, ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_ACCEPTED, SAVE_TASK_SCHEDULED, scheduler_task_status="next_run_scheduled", ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_ACCEPTED, SAVE_TASK_FAILED, scheduler_task_status="disabled", ) def test_create_save_request_only_when_needed(api_client, mocker): origin_url = "https://github.com/webpack/webpack" SaveOriginRequest.objects.create( visit_type="git", origin_url=origin_url, status=SAVE_REQUEST_ACCEPTED, loading_task_id=56, ) check_created_save_request_status( api_client, mocker, origin_url, "next_run_not_scheduled", SAVE_REQUEST_ACCEPTED, SAVE_TASK_NOT_YET_SCHEDULED, ) sors = list( SaveOriginRequest.objects.filter(visit_type="git", origin_url=origin_url) ) assert len(sors) == 1 check_created_save_request_status( api_client, mocker, origin_url, "next_run_scheduled", SAVE_REQUEST_ACCEPTED, SAVE_TASK_SCHEDULED, ) sors = list( SaveOriginRequest.objects.filter(visit_type="git", origin_url=origin_url) ) assert len(sors) == 1 visit_date = datetime.now(tz=timezone.utc) + timedelta(hours=1) check_created_save_request_status( api_client, mocker, origin_url, "completed", SAVE_REQUEST_ACCEPTED, SAVE_TASK_NOT_YET_SCHEDULED, visit_date=visit_date, ) sors = list( SaveOriginRequest.objects.filter(visit_type="git", origin_url=origin_url) ) - assert len(sors) == 2 + # check_api_post_responses sends two POST requests to check YAML and JSON response + assert len(sors) == 3 check_created_save_request_status( api_client, mocker, origin_url, "disabled", SAVE_REQUEST_ACCEPTED, SAVE_TASK_NOT_YET_SCHEDULED, ) sors = list( SaveOriginRequest.objects.filter(visit_type="git", origin_url=origin_url) ) - assert len(sors) == 3 + assert len(sors) == 5 def test_get_save_requests_unknown_origin(api_client): unknown_origin_url = "https://gitlab.com/foo/bar" url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": unknown_origin_url}, ) - response = api_client.get(url) - assert response.status_code == 404 + response = check_api_get_responses(api_client, url, status_code=404) assert response.data == { "exception": "NotFoundExc", "reason": ( "No save requests found for visit of type " "git on origin with url %s." ) % unknown_origin_url, } diff --git a/swh/web/tests/api/views/test_ping.py b/swh/web/tests/api/views/test_ping.py index 36408bef..89a035d4 100644 --- a/swh/web/tests/api/views/test_ping.py +++ b/swh/web/tests/api/views/test_ping.py @@ -1,16 +1,13 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.web.common.utils import reverse +from swh.web.tests.api.views import check_api_get_responses def test_api_1_ping(api_client): url = reverse("api-1-ping") - - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == "pong" diff --git a/swh/web/tests/api/views/test_release.py b/swh/web/tests/api/views/test_release.py index 3c7b259b..ad3313c0 100644 --- a/swh/web/tests/api/views/test_release.py +++ b/swh/web/tests/api/views/test_release.py @@ -1,128 +1,122 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import ( ObjectType, Person, Release, Timestamp, TimestampWithTimezone, ) from swh.web.common.utils import reverse +from swh.web.tests.api.views import check_api_get_responses from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import release, content, directory @given(release()) def test_api_release(api_client, archive_data, release): url = reverse("api-1-release", url_args={"sha1_git": release}) - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=200) expected_release = archive_data.release_get(release) target_revision = expected_release["target"] target_url = reverse( "api-1-revision", url_args={"sha1_git": target_revision}, request=rv.wsgi_request, ) expected_release["target_url"] = target_url - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" assert rv.data == expected_release @given(content(), directory(), release()) def test_api_release_target_type_not_a_revision( api_client, archive_data, content, directory, release ): for target_type, target in ( (ObjectType.CONTENT, content), (ObjectType.DIRECTORY, directory), (ObjectType.RELEASE, release), ): if target_type == ObjectType.CONTENT: target = target["sha1_git"] sample_release = Release( author=Person( email=b"author@company.org", fullname=b"author ", name=b"author", ), date=TimestampWithTimezone( timestamp=Timestamp( seconds=int(datetime.now().timestamp()), microseconds=0 ), offset=0, negative_utc=False, ), message=b"sample release message", name=b"sample release", synthetic=False, target=hash_to_bytes(target), target_type=target_type, ) archive_data.release_add([sample_release]) new_release_id = hash_to_hex(sample_release.id) url = reverse("api-1-release", url_args={"sha1_git": new_release_id}) - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=200) expected_release = archive_data.release_get(new_release_id) if target_type == ObjectType.CONTENT: url_args = {"q": "sha1_git:%s" % target} else: url_args = {"sha1_git": target} target_url = reverse( "api-1-%s" % target_type.value, url_args=url_args, request=rv.wsgi_request ) expected_release["target_url"] = target_url - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" assert rv.data == expected_release def test_api_release_not_found(api_client): unknown_release_ = random_sha1() url = reverse("api-1-release", url_args={"sha1_git": unknown_release_}) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Release with sha1_git %s not found." % unknown_release_, } @given(release()) def test_api_release_uppercase(api_client, release): url = reverse( "api-1-release-uppercase-checksum", url_args={"sha1_git": release.upper()} ) resp = api_client.get(url) assert resp.status_code == 302 redirect_url = reverse( "api-1-release-uppercase-checksum", url_args={"sha1_git": release} ) assert resp["location"] == redirect_url diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/tests/api/views/test_revision.py index e3a7747b..f09f34da 100644 --- a/swh/web/tests/api/views/test_revision.py +++ b/swh/web/tests/api/views/test_revision.py @@ -1,222 +1,200 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.web.api.utils import enrich_revision from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse +from swh.web.tests.api.views import check_api_get_responses from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import revision @given(revision()) def test_api_revision(api_client, archive_data, revision): url = reverse("api-1-revision", url_args={"sha1_git": revision}) - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=200) expected_revision = archive_data.revision_get(revision) enrich_revision(expected_revision, rv.wsgi_request) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" assert rv.data == expected_revision def test_api_revision_not_found(api_client): unknown_revision_ = random_sha1() url = reverse("api-1-revision", url_args={"sha1_git": unknown_revision_}) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Revision with sha1_git %s not found." % unknown_revision_, } @given(revision()) def test_api_revision_raw_ok(api_client, archive_data, revision): url = reverse("api-1-revision-raw-message", url_args={"sha1_git": revision}) rv = api_client.get(url) expected_message = archive_data.revision_get(revision)["message"] assert rv.status_code == 200 assert rv["Content-Type"] == "application/octet-stream" + assert rv.content == expected_message.encode() def test_api_revision_raw_ko_no_rev(api_client): unknown_revision_ = random_sha1() url = reverse( "api-1-revision-raw-message", url_args={"sha1_git": unknown_revision_} ) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Revision with sha1_git %s not found." % unknown_revision_, } @given(revision()) def test_api_revision_log(api_client, archive_data, revision): limit = 10 url = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, query_params={"limit": limit}, ) - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=200) expected_log = archive_data.revision_log(revision, limit=limit) expected_log = list( map(enrich_revision, expected_log, [rv.wsgi_request] * len(expected_log)) ) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" assert rv.data == expected_log def test_api_revision_log_not_found(api_client): unknown_revision_ = random_sha1() url = reverse("api-1-revision-log", url_args={"sha1_git": unknown_revision_}) - rv = api_client.get(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Revision with sha1_git %s not found." % unknown_revision_, } assert not rv.has_header("Link") def test_api_revision_directory_ko_not_found(api_client, mocker): mock_rev_dir = mocker.patch("swh.web.api.views.revision._revision_directory_by") mock_rev_dir.side_effect = NotFoundExc("Not found") - rv = api_client.get("/api/1/revision/999/directory/some/path/to/dir/") + url = "/api/1/revision/999/directory/some/path/to/dir/" + rv = check_api_get_responses(api_client, url, status_code=404) - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" assert rv.data == {"exception": "NotFoundExc", "reason": "Not found"} - mock_rev_dir.assert_called_once_with( - {"sha1_git": "999"}, - "some/path/to/dir", - "/api/1/revision/999/directory/some/path/to/dir/", - with_data=False, + mock_rev_dir.assert_called_with( + {"sha1_git": "999"}, "some/path/to/dir", url, with_data=False, ) def test_api_revision_directory_ok_returns_dir_entries(api_client, mocker): mock_rev_dir = mocker.patch("swh.web.api.views.revision._revision_directory_by") stub_dir = { "type": "dir", "revision": "999", "content": [ { "sha1_git": "789", "type": "file", "target": "101", "target_url": "/api/1/content/sha1_git:101/", "name": "somefile", "file_url": "/api/1/revision/999/directory/some/path/" "somefile/", }, { "sha1_git": "123", "type": "dir", "target": "456", "target_url": "/api/1/directory/456/", "name": "to-subdir", "dir_url": "/api/1/revision/999/directory/some/path/" "to-subdir/", }, ], } mock_rev_dir.return_value = stub_dir - - rv = api_client.get("/api/1/revision/999/directory/some/path/") + url = "/api/1/revision/999/directory/some/path/" + rv = check_api_get_responses(api_client, url, status_code=200) stub_dir["content"][0]["target_url"] = rv.wsgi_request.build_absolute_uri( stub_dir["content"][0]["target_url"] ) stub_dir["content"][0]["file_url"] = rv.wsgi_request.build_absolute_uri( stub_dir["content"][0]["file_url"] ) stub_dir["content"][1]["target_url"] = rv.wsgi_request.build_absolute_uri( stub_dir["content"][1]["target_url"] ) stub_dir["content"][1]["dir_url"] = rv.wsgi_request.build_absolute_uri( stub_dir["content"][1]["dir_url"] ) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" assert rv.data == stub_dir - mock_rev_dir.assert_called_once_with( - {"sha1_git": "999"}, - "some/path", - "/api/1/revision/999/directory/some/path/", - with_data=False, + mock_rev_dir.assert_called_with( + {"sha1_git": "999"}, "some/path", url, with_data=False, ) def test_api_revision_directory_ok_returns_content(api_client, mocker): mock_rev_dir = mocker.patch("swh.web.api.views.revision._revision_directory_by") stub_content = { "type": "file", "revision": "999", "content": { "sha1_git": "789", "sha1": "101", "data_url": "/api/1/content/101/raw/", }, } mock_rev_dir.return_value = stub_content url = "/api/1/revision/666/directory/some/other/path/" - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=200) stub_content["content"]["data_url"] = rv.wsgi_request.build_absolute_uri( stub_content["content"]["data_url"] ) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" assert rv.data == stub_content - mock_rev_dir.assert_called_once_with( + mock_rev_dir.assert_called_with( {"sha1_git": "666"}, "some/other/path", url, with_data=False ) @given(revision()) def test_api_revision_uppercase(api_client, revision): url = reverse( "api-1-revision-uppercase-checksum", url_args={"sha1_git": revision.upper()} ) resp = api_client.get(url) assert resp.status_code == 302 redirect_url = reverse("api-1-revision", url_args={"sha1_git": revision}) assert resp["location"] == redirect_url diff --git a/swh/web/tests/api/views/test_snapshot.py b/swh/web/tests/api/views/test_snapshot.py index 9096e264..a7877f15 100644 --- a/swh/web/tests/api/views/test_snapshot.py +++ b/swh/web/tests/api/views/test_snapshot.py @@ -1,165 +1,153 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.model.hashutil import hash_to_hex from swh.model.model import Snapshot from swh.web.api.utils import enrich_snapshot from swh.web.common.utils import reverse +from swh.web.tests.api.views import check_api_get_responses from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import snapshot, new_snapshot @given(snapshot()) def test_api_snapshot(api_client, archive_data, snapshot): url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) expected_data = {**archive_data.snapshot_get(snapshot), "next_branch": None} expected_data = enrich_snapshot(expected_data, rv.wsgi_request) assert rv.data == expected_data @given(snapshot()) def test_api_snapshot_paginated(api_client, archive_data, snapshot): branches_offset = 0 branches_count = 2 snapshot_branches = [] for k, v in sorted(archive_data.snapshot_get(snapshot)["branches"].items()): snapshot_branches.append( {"name": k, "target_type": v["target_type"], "target": v["target"]} ) whole_snapshot = {"id": snapshot, "branches": {}, "next_branch": None} while branches_offset < len(snapshot_branches): branches_from = snapshot_branches[branches_offset]["name"] url = reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={ "branches_from": branches_from, "branches_count": branches_count, }, ) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) expected_data = archive_data.snapshot_get_branches( snapshot, branches_from, branches_count ) expected_data = enrich_snapshot(expected_data, rv.wsgi_request) branches_offset += branches_count if branches_offset < len(snapshot_branches): next_branch = snapshot_branches[branches_offset]["name"] expected_data["next_branch"] = next_branch else: expected_data["next_branch"] = None assert rv.data == expected_data whole_snapshot["branches"].update(expected_data["branches"]) if branches_offset < len(snapshot_branches): next_url = rv.wsgi_request.build_absolute_uri( reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={ "branches_from": next_branch, "branches_count": branches_count, }, ) ) assert rv["Link"] == '<%s>; rel="next"' % next_url else: assert not rv.has_header("Link") url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == whole_snapshot @given(snapshot()) def test_api_snapshot_filtered(api_client, archive_data, snapshot): snapshot_branches = [] for k, v in sorted(archive_data.snapshot_get(snapshot)["branches"].items()): snapshot_branches.append( {"name": k, "target_type": v["target_type"], "target": v["target"]} ) target_type = random.choice(snapshot_branches)["target_type"] url = reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={"target_types": target_type}, ) - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=200) expected_data = archive_data.snapshot_get_branches( snapshot, target_types=target_type ) expected_data = enrich_snapshot(expected_data, rv.wsgi_request) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" assert rv.data == expected_data def test_api_snapshot_errors(api_client): unknown_snapshot_ = random_sha1() url = reverse("api-1-snapshot", url_args={"snapshot_id": "63ce369"}) - rv = api_client.get(url) - assert rv.status_code == 400, rv.data + check_api_get_responses(api_client, url, status_code=400) url = reverse("api-1-snapshot", url_args={"snapshot_id": unknown_snapshot_}) - rv = api_client.get(url) - assert rv.status_code == 404, rv.data + check_api_get_responses(api_client, url, status_code=404) @given(snapshot()) def test_api_snapshot_uppercase(api_client, snapshot): url = reverse( "api-1-snapshot-uppercase-checksum", url_args={"snapshot_id": snapshot.upper()} ) resp = api_client.get(url) assert resp.status_code == 302 redirect_url = reverse( "api-1-snapshot-uppercase-checksum", url_args={"snapshot_id": snapshot} ) assert resp["location"] == redirect_url @given(new_snapshot(min_size=4)) def test_api_snapshot_null_branch(api_client, archive_data, new_snapshot): snp_dict = new_snapshot.to_dict() snp_id = hash_to_hex(snp_dict["id"]) for branch in snp_dict["branches"].keys(): snp_dict["branches"][branch] = None break archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) url = reverse("api-1-snapshot", url_args={"snapshot_id": snp_id}) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data + check_api_get_responses(api_client, url, status_code=200) diff --git a/swh/web/tests/api/views/test_stat.py b/swh/web/tests/api/views/test_stat.py index c6541a96..32bc676e 100644 --- a/swh/web/tests/api/views/test_stat.py +++ b/swh/web/tests/api/views/test_stat.py @@ -1,72 +1,60 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.common.exc import BadInputExc from swh.web.common.utils import reverse +from swh.web.tests.api.views import check_api_get_responses def test_api_1_stat_counters_raise_error(api_client, mocker): mock_service = mocker.patch("swh.web.api.views.stat.service") mock_service.stat_counters.side_effect = BadInputExc( "voluntary error to check the bad request middleware." ) url = reverse("api-1-stat-counters") - rv = api_client.get(url) - - assert rv.status_code == 400, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == { "exception": "BadInputExc", "reason": "voluntary error to check the bad request middleware.", } def test_api_1_stat_counters_raise_from_db(api_client, mocker): mock_service = mocker.patch("swh.web.api.views.stat.service") mock_service.stat_counters.side_effect = StorageDBError( "Storage exploded! Will be back online shortly!" ) url = reverse("api-1-stat-counters") - rv = api_client.get(url) - - assert rv.status_code == 503, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageDBError", "reason": "An unexpected error occurred in the backend: " "Storage exploded! Will be back online shortly!", } def test_api_1_stat_counters_raise_from_api(api_client, mocker): mock_service = mocker.patch("swh.web.api.views.stat.service") mock_service.stat_counters.side_effect = StorageAPIError( "Storage API dropped dead! Will resurrect from its ashes asap!" ) url = reverse("api-1-stat-counters") - rv = api_client.get(url) - - assert rv.status_code == 503, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageAPIError", "reason": "An unexpected error occurred in the api backend: " "Storage API dropped dead! Will resurrect from its ashes asap!", } def test_api_1_stat_counters(api_client, archive_data): url = reverse("api-1-stat-counters") - - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == archive_data.stat_counters() diff --git a/swh/web/tests/api/views/test_vault.py b/swh/web/tests/api/views/test_vault.py index efb63ddf..9a939679 100644 --- a/swh/web/tests/api/views/test_vault.py +++ b/swh/web/tests/api/views/test_vault.py @@ -1,173 +1,166 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.model import hashutil from swh.vault.exc import NotFoundExc from swh.web.common.utils import reverse +from swh.web.tests.api.views import check_api_get_responses, check_api_post_responses from swh.web.tests.strategies import ( directory, revision, unknown_directory, unknown_revision, ) @given(directory(), revision()) def test_api_vault_cook(api_client, mocker, directory, revision): mock_service = mocker.patch("swh.web.api.views.vault.service") for obj_type, obj_id in ( ("directory", directory), ("revision_gitfast", revision), ): fetch_url = reverse( f"api-1-vault-fetch-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, ) stub_cook = { "fetch_url": fetch_url, "obj_id": obj_id, "obj_type": obj_type, "progress_message": None, "status": "done", "task_uuid": "de75c902-5ee5-4739-996e-448376a93eff", } stub_fetch = b"content" mock_service.vault_cook.return_value = stub_cook mock_service.vault_fetch.return_value = stub_fetch + email = "test@test.mail" url = reverse( - f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id} + f"api-1-vault-cook-{obj_type}", + url_args={f"{obj_type[:3]}_id": obj_id}, + query_params={"email": email}, ) - rv = api_client.post(url, {"email": "test@test.mail"}) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_post_responses(api_client, url, data=None, status_code=200) stub_cook["fetch_url"] = rv.wsgi_request.build_absolute_uri( stub_cook["fetch_url"] ) assert rv.data == stub_cook mock_service.vault_cook.assert_called_with( - obj_type, hashutil.hash_to_bytes(obj_id), "test@test.mail" + obj_type, hashutil.hash_to_bytes(obj_id), email ) rv = api_client.get(fetch_url) assert rv.status_code == 200 assert rv["Content-Type"] == "application/gzip" assert rv.content == stub_fetch mock_service.vault_fetch.assert_called_with( obj_type, hashutil.hash_to_bytes(obj_id) ) @given(directory(), revision()) def test_api_vault_cook_uppercase_hash(api_client, directory, revision): for obj_type, obj_id in ( ("directory", directory), ("revision_gitfast", revision), ): url = reverse( f"api-1-vault-cook-{obj_type}-uppercase-checksum", url_args={f"{obj_type[:3]}_id": obj_id.upper()}, ) rv = api_client.post(url, {"email": "test@test.mail"}) assert rv.status_code == 302 redirect_url = reverse( f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id} ) assert rv["location"] == redirect_url fetch_url = reverse( f"api-1-vault-fetch-{obj_type}-uppercase-checksum", url_args={f"{obj_type[:3]}_id": obj_id.upper()}, ) rv = api_client.get(fetch_url) assert rv.status_code == 302 redirect_url = reverse( f"api-1-vault-fetch-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, ) assert rv["location"] == redirect_url @given(directory(), revision(), unknown_directory(), unknown_revision()) def test_api_vault_cook_notfound( api_client, mocker, directory, revision, unknown_directory, unknown_revision ): mock_vault = mocker.patch("swh.web.common.service.vault") mock_vault.cook.side_effect = NotFoundExc("object not found") mock_vault.fetch.side_effect = NotFoundExc("cooked archive not found") mock_vault.progress.side_effect = NotFoundExc("cooking request not found") for obj_type, obj_id in ( ("directory", directory), ("revision_gitfast", revision), ): obj_name = obj_type.split("_")[0] url = reverse( f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, ) - rv = api_client.get(url) + rv = check_api_get_responses(api_client, url, status_code=404) - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" assert rv.data["exception"] == "NotFoundExc" assert ( rv.data["reason"] == f"Cooking of {obj_name} '{obj_id}' was never requested." ) mock_vault.progress.assert_called_with(obj_type, hashutil.hash_to_bytes(obj_id)) for obj_type, obj_id in ( ("directory", unknown_directory), ("revision_gitfast", unknown_revision), ): obj_name = obj_type.split("_")[0] url = reverse( f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id} ) - rv = api_client.post(url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_post_responses(api_client, url, data=None, status_code=404) assert rv.data["exception"] == "NotFoundExc" assert rv.data["reason"] == f"{obj_name.title()} '{obj_id}' not found." mock_vault.cook.assert_called_with( obj_type, hashutil.hash_to_bytes(obj_id), email=None ) fetch_url = reverse( f"api-1-vault-fetch-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, ) - rv = api_client.get(fetch_url) - - assert rv.status_code == 404, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, fetch_url, status_code=404) assert rv.data["exception"] == "NotFoundExc" assert ( rv.data["reason"] == f"Cooked archive for {obj_name} '{obj_id}' not found." ) mock_vault.fetch.assert_called_with(obj_type, hashutil.hash_to_bytes(obj_id))