diff --git a/swh/web/api/apiresponse.py b/swh/web/api/apiresponse.py index ab2d27ab..dc1f8143 100644 --- a/swh/web/api/apiresponse.py +++ b/swh/web/api/apiresponse.py @@ -1,190 +1,193 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import traceback from django.utils.html import escape from rest_framework.response import Response +from rest_framework.utils.encoders import JSONEncoder from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.api import utils from swh.web.common.exc import NotFoundExc, ForbiddenExc, BadInputExc, LargePayloadExc from swh.web.common.utils import shorten_path, gen_path_info from swh.web.config import get_config def compute_link_header(rv, options): """Add Link header in returned value results. Args: request: a DRF Request object rv (dict): dictionary with keys: - headers: potential headers with 'link-next' and 'link-prev' keys - results: containing the result to return options (dict): the initial dict to update with result if any Returns: dict: dictionary with optional keys 'link-next' and 'link-prev' """ link_headers = [] if "headers" not in rv: return {} rv_headers = rv["headers"] if "link-next" in rv_headers: link_headers.append('<%s>; rel="next"' % rv_headers["link-next"]) if "link-prev" in rv_headers: link_headers.append('<%s>; rel="previous"' % rv_headers["link-prev"]) if link_headers: link_header_str = ",".join(link_headers) headers = options.get("headers", {}) headers.update({"Link": link_header_str}) return headers return {} def filter_by_fields(request, data): """Extract a request parameter 'fields' if it exists to permit the filtering on the data dict's keys. If such field is not provided, returns the data as is. """ fields = request.query_params.get("fields") if fields: fields = set(fields.split(",")) data = utils.filter_field_keys(data, fields) return data def transform(rv): """Transform an eventual returned value with multiple layer of information with only what's necessary. If the returned value rv contains the 'results' key, this is the associated value which is returned. Otherwise, return the initial dict without the potential 'headers' key. """ if "results" in rv: return rv["results"] if "headers" in rv: rv.pop("headers") return rv def make_api_response(request, data, doc_data={}, options={}): """Generates an API response based on the requested mimetype. Args: request: a DRF Request object data: raw data to return in the API response doc_data: documentation data for HTML response options: optional data that can be used to generate the response Returns: a DRF Response a object """ if data: options["headers"] = compute_link_header(data, options) data = transform(data) data = filter_by_fields(request, data) doc_env = doc_data headers = {} if "headers" in options: doc_env["headers_data"] = options["headers"] headers = options["headers"] # get request status code doc_env["status_code"] = options.get("status", 200) response_args = { "status": doc_env["status_code"], "headers": headers, "content_type": request.accepted_media_type, } # when requesting HTML, typically when browsing the API through its # documented views, we need to enrich the input data with documentation # related ones and inform DRF that we request HTML template rendering if request.accepted_media_type == "text/html": if data: - data = json.dumps(data, sort_keys=True, indent=4, separators=(",", ": ")) + data = json.dumps( + data, cls=JSONEncoder, sort_keys=True, indent=4, separators=(",", ": ") + ) doc_env["response_data"] = data doc_env["heading"] = shorten_path(str(request.path)) # generate breadcrumbs data if "route" in doc_env: doc_env["endpoint_path"] = gen_path_info(doc_env["route"]) for i in range(len(doc_env["endpoint_path"]) - 1): doc_env["endpoint_path"][i]["path"] += "/doc/" if not doc_env["noargs"]: doc_env["endpoint_path"][-1]["path"] += "/doc/" response_args["data"] = doc_env response_args["template_name"] = "api/apidoc.html" # otherwise simply return the raw data and let DRF picks # the correct renderer (JSON or YAML) else: response_args["data"] = data return Response(**response_args) def error_response(request, error, doc_data): """Private function to create a custom error response. Args: request: a DRF Request object error: the exception that caused the error doc_data: documentation data for HTML response """ error_code = 500 if isinstance(error, BadInputExc): error_code = 400 elif isinstance(error, NotFoundExc): error_code = 404 elif isinstance(error, ForbiddenExc): error_code = 403 elif isinstance(error, LargePayloadExc): error_code = 413 elif isinstance(error, StorageDBError): error_code = 503 elif isinstance(error, StorageAPIError): error_code = 503 error_opts = {"status": error_code} error_data = { "exception": error.__class__.__name__, "reason": str(error), } if request.accepted_media_type == "text/html": error_data["reason"] = escape(error_data["reason"]) if get_config()["debug"]: error_data["traceback"] = traceback.format_exc() return make_api_response(request, error_data, doc_data, options=error_opts) diff --git a/swh/web/tests/api/views/test_identifiers.py b/swh/web/tests/api/views/test_identifiers.py index 6bc610d7..0eb09d1a 100644 --- a/swh/web/tests/api/views/test_identifiers.py +++ b/swh/web/tests/api/views/test_identifiers.py @@ -1,188 +1,194 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT +from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( content, directory, origin, release, revision, snapshot, unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ) @given(origin(), content(), directory(), release(), revision(), snapshot()) def test_swhid_resolve_success( - api_client, origin, content, directory, release, revision, snapshot + api_client, client, origin, content, directory, release, revision, snapshot ): - for obj_type_short, obj_type, obj_id in ( - ("cnt", CONTENT, content["sha1_git"]), - ("dir", DIRECTORY, directory), - ("rel", RELEASE, release), - ("rev", REVISION, revision), - ("snp", SNAPSHOT, snapshot), + for obj_type, obj_id in ( + (CONTENT, content["sha1_git"]), + (DIRECTORY, directory), + (RELEASE, release), + (REVISION, revision), + (SNAPSHOT, snapshot), ): - swhid = "swh:1:%s:%s;origin=%s" % (obj_type_short, obj_id, origin["url"]) + swhid = gen_swhid(obj_type, obj_id, metadata={"origin": origin["url"]}) url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) resp = api_client.get(url) if obj_type == CONTENT: url_args = {"query_string": "sha1_git:%s" % obj_id} elif obj_type == SNAPSHOT: url_args = {"snapshot_id": obj_id} else: url_args = {"sha1_git": obj_id} browse_rev_url = reverse( "browse-%s" % obj_type, url_args=url_args, query_params={"origin_url": origin["url"]}, request=resp.wsgi_request, ) expected_result = { "browse_url": browse_rev_url, "metadata": {"origin": origin["url"]}, "namespace": "swh", "object_id": obj_id, "object_type": obj_type, "scheme_version": 1, } assert resp.status_code == 200, resp.data assert resp.data == expected_result + # also checks endpoint documented view + # TODO: remove that check once T2529 is implemented + resp = client.get(url, HTTP_ACCEPT="text/html") + assert resp.status_code == 200, resp.content + def test_swhid_resolve_invalid(api_client): rev_id_invalid = "96db9023b8_foo_50d6c108e9a3" swhid = "swh:1:rev:%s" % rev_id_invalid url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) resp = api_client.get(url) assert resp.status_code == 400, resp.data @given( unknown_content(), unknown_directory(), unknown_release(), unknown_revision(), unknown_snapshot(), ) def test_swhid_resolve_not_found( api_client, unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ): - for obj_type_short, obj_id in ( - ("cnt", unknown_content["sha1_git"]), - ("dir", unknown_directory), - ("rel", unknown_release), - ("rev", unknown_revision), - ("snp", unknown_snapshot), + for obj_type, obj_id in ( + (CONTENT, unknown_content["sha1_git"]), + (DIRECTORY, unknown_directory), + (RELEASE, unknown_release), + (REVISION, unknown_revision), + (SNAPSHOT, unknown_snapshot), ): - swhid = "swh:1:%s:%s" % (obj_type_short, obj_id) + swhid = gen_swhid(obj_type, obj_id) url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) resp = api_client.get(url) assert resp.status_code == 404, resp.data def test_swh_origin_id_not_resolvable(api_client): ori_swhid = "swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb" url = reverse("api-1-resolve-swhid", url_args={"swhid": ori_swhid}) resp = api_client.get(url) assert resp.status_code == 400, resp.data @given(content(), directory()) def test_api_known_swhid_some_present(api_client, content, directory): - content_ = "swh:1:cnt:%s" % content["sha1_git"] - directory_ = "swh:1:dir:%s" % directory - unknown_revision_ = "swh:1:rev:%s" % random_sha1() - unknown_release_ = "swh:1:rel:%s" % random_sha1() - unknown_snapshot_ = "swh:1:snp:%s" % random_sha1() + content_ = gen_swhid(CONTENT, content["sha1_git"]) + directory_ = gen_swhid(DIRECTORY, directory) + unknown_revision_ = gen_swhid(REVISION, random_sha1()) + unknown_release_ = gen_swhid(RELEASE, random_sha1()) + unknown_snapshot_ = gen_swhid(SNAPSHOT, random_sha1()) input_swhids = [ content_, directory_, unknown_revision_, unknown_release_, unknown_snapshot_, ] url = reverse("api-1-known") resp = api_client.post( url, data=input_swhids, format="json", HTTP_ACCEPT="application/json" ) assert resp.status_code == 200, resp.data assert resp["Content-Type"] == "application/json" assert resp.data == { content_: {"known": True}, directory_: {"known": True}, unknown_revision_: {"known": False}, unknown_release_: {"known": False}, unknown_snapshot_: {"known": False}, } def test_api_known_invalid_swhid(api_client): invalid_swhid_sha1 = ["swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13de;"] invalid_swhid_type = ["swh:1:cnn:8068d0075010b590762c6cb5682ed53cb3c13deb"] url = reverse("api-1-known") resp = api_client.post( url, data=invalid_swhid_sha1, format="json", HTTP_ACCEPT="application/json" ) assert resp.status_code == 400, resp.data resp2 = api_client.post( url, data=invalid_swhid_type, format="json", HTTP_ACCEPT="application/json" ) assert resp2.status_code == 400, resp.data def test_api_known_raises_large_payload_error(api_client): random_swhid = "swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13deb" limit = 10000 err_msg = "The maximum number of SWHIDs this endpoint can receive is 1000" swhids = [random_swhid for i in range(limit)] url = reverse("api-1-known") resp = api_client.post( url, data=swhids, format="json", HTTP_ACCEPT="application/json" ) assert resp.status_code == 413, resp.data assert resp["Content-Type"] == "application/json" assert resp.data == {"exception": "LargePayloadExc", "reason": err_msg}