diff --git a/swh/web/api/utils.py b/swh/web/api/utils.py index 7d2c33c2..fd783b53 100644 --- a/swh/web/api/utils.py +++ b/swh/web/api/utils.py @@ -1,351 +1,357 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List, Optional, Tuple, Union from django.http import HttpRequest +from swh.model.model import Origin from swh.web.common.query import parse_hash from swh.web.common.typing import OriginInfo from swh.web.common.utils import resolve_branch_alias, reverse def filter_field_keys(data, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - data: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(data, map): return map(lambda x: filter_field_keys(x, field_keys), data) if isinstance(data, list): return [filter_field_keys(x, field_keys) for x in data] if isinstance(data, dict): return {k: v for (k, v) in data.items() if k in field_keys} return data def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string.""" return "".join([person["name"], " <", person["email"], ">"]) def enrich_object( object: Dict[str, str], request: Optional[HttpRequest] = None ) -> Dict[str, str]: """Enrich an object (revision, release) with link to the 'target' of type 'target_type'. Args: object: An object with target and target_type keys (e.g. release, revision) request: Absolute URIs will be generated if provided Returns: Object enriched with target object url (revision, release, content, directory) """ if "target" in object and "target_type" in object: if object["target_type"] in ("revision", "release", "directory"): object["target_url"] = reverse( "api-1-%s" % object["target_type"], url_args={"sha1_git": object["target"]}, request=request, ) elif object["target_type"] == "content": object["target_url"] = reverse( "api-1-content", url_args={"q": "sha1_git:" + object["target"]}, request=request, ) elif object["target_type"] == "snapshot": object["target_url"] = reverse( "api-1-snapshot", url_args={"snapshot_id": object["target"]}, request=request, ) return object enrich_release = enrich_object def enrich_directory_entry( directory: Dict[str, str], request: Optional[HttpRequest] = None ) -> Dict[str, str]: """Enrich directory entry with url to target. Args: directory: dict of data associated to a swh directory entry request: Absolute URIs will be generated if provided Returns: An enriched directory dict filled with additional url """ if "type" in directory: target_type = directory["type"] target = directory["target"] if target_type == "file": directory["target_url"] = reverse( "api-1-content", url_args={"q": "sha1_git:%s" % target}, request=request ) elif target_type == "dir": directory["target_url"] = reverse( "api-1-directory", url_args={"sha1_git": target}, request=request ) else: directory["target_url"] = reverse( "api-1-revision", url_args={"sha1_git": target}, request=request ) return directory def enrich_metadata_endpoint( content_metadata: Dict[str, str], request: Optional[HttpRequest] = None ) -> Dict[str, str]: """Enrich content metadata dict with link to the upper metadata endpoint. Args: content_metadata: dict of data associated to a swh content metadata request: Absolute URIs will be generated if provided Returns: An enriched content metadata dict filled with an additional url """ c = content_metadata c["content_url"] = reverse( "api-1-content", url_args={"q": "sha1:%s" % c["id"]}, request=request ) return c def enrich_content( content: Dict[str, Any], request: Optional[HttpRequest] = None, top_url: Optional[bool] = False, query_string: Optional[str] = None, ) -> Dict[str, str]: """Enrich content with links to: - data_url: its raw data - filetype_url: its filetype information - language_url: its programming language information - license_url: its licensing information Args: content: dict of data associated to a swh content object top_url: whether or not to include the content url in the enriched data query_string: optional query string of type ':' used when requesting the content, it acts as a hint for picking the same hash method when computing the url listed above request: Absolute URIs will be generated if provided Returns: An enriched content dict filled with additional urls """ checksums = content if "checksums" in content: checksums = content["checksums"] hash_algo = "sha1" if query_string: hash_algo = parse_hash(query_string)[0] if hash_algo in checksums: q = "%s:%s" % (hash_algo, checksums[hash_algo]) if top_url: content["content_url"] = reverse("api-1-content", url_args={"q": q}) content["data_url"] = reverse( "api-1-content-raw", url_args={"q": q}, request=request ) content["filetype_url"] = reverse( "api-1-content-filetype", url_args={"q": q}, request=request ) content["language_url"] = reverse( "api-1-content-language", url_args={"q": q}, request=request ) content["license_url"] = reverse( "api-1-content-license", url_args={"q": q}, request=request ) return content def enrich_revision( revision: Dict[str, Any], request: Optional[HttpRequest] = None ) -> Dict[str, Any]: """Enrich revision with links where it makes sense (directory, parents). Keep track of the navigation breadcrumbs if they are specified. Args: revision: the revision as a dict request: Absolute URIs will be generated if provided Returns: An enriched revision dict filled with additional urls """ revision["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision["id"]}, request=request ) revision["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision["id"]}, request=request ) if "directory" in revision: revision["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision["directory"]}, request=request, ) if "parents" in revision: parents = [] for parent in revision["parents"]: parents.append( { "id": parent, "url": reverse( "api-1-revision", url_args={"sha1_git": parent}, request=request ), } ) revision["parents"] = tuple(parents) if "children" in revision: children = [] for child in revision["children"]: children.append( reverse("api-1-revision", url_args={"sha1_git": child}, request=request) ) revision["children_urls"] = children if "decoding_failures" in revision and "message" in revision["decoding_failures"]: revision["message_url"] = reverse( "api-1-revision-raw-message", url_args={"sha1_git": revision["id"]}, request=request, ) return revision def enrich_snapshot( snapshot: Dict[str, Any], request: Optional[HttpRequest] = None ) -> Dict[str, Any]: """Enrich snapshot with links to the branch targets Args: snapshot: the snapshot as a dict request: Absolute URIs will be generated if provided Returns: An enriched snapshot dict filled with additional urls """ if "branches" in snapshot: snapshot["branches"] = { k: enrich_object(v, request) if v else None for k, v in snapshot["branches"].items() } for k, v in snapshot["branches"].items(): if v and v["target_type"] == "alias": branch = resolve_branch_alias(snapshot, v) if branch: branch = enrich_object(branch, request) v["target_url"] = branch["target_url"] return snapshot def enrich_origin( origin: Union[Dict[str, Any], OriginInfo], request: Optional[HttpRequest] = None ) -> Dict[str, Any]: """Enrich origin dict with link to its visits Args: origin: the origin as a dict request: Absolute URIs will be generated if provided Returns: - An enriched origin dict filled with an additional url + An enriched origin dict filled with additional urls """ origin_dict = dict(origin) if "url" in origin_dict: origin_dict["origin_visits_url"] = reverse( "api-1-origin-visits", url_args={"origin_url": origin_dict["url"]}, request=request, ) + origin_dict["metadata_authorities_url"] = reverse( + "api-1-raw-extrinsic-metadata-swhid-authorities", + url_args={"target": Origin(url=origin_dict["url"]).swhid()}, + request=request, + ) return origin_dict def enrich_origin_search_result( origin_search_result: Tuple[List[Dict[str, Any]], Optional[str]], request: Optional[HttpRequest] = None, ) -> Tuple[List[Dict[str, Any]], Optional[str]]: """Enrich origin search result with additional links Args: origin_search_result: tuple returned when searching origins request: Absolute URIs will be generated if provided Returns: An enriched origin search result filled with additional urls """ origins, page_token = origin_search_result return [enrich_origin(origin, request=request) for origin in origins], page_token def enrich_origin_visit( origin_visit: Dict[str, Any], request: Optional[HttpRequest] = None, with_origin_link: bool = False, with_origin_visit_link: bool = False, ) -> Dict[str, Any]: """Enrich origin visit dict with additional links Args: origin_visit: the origin visit as a dict with_origin_link: whether to add link to origin with_origin_visit_link: whether to add link to origin visit request: Absolute URIs will be generated if provided Returns: An enriched origin visit dict filled with additional urls """ ov = origin_visit if with_origin_link: ov["origin_url"] = reverse( "api-1-origin", url_args={"origin_url": ov["origin"]}, request=request ) if with_origin_visit_link: ov["origin_visit_url"] = reverse( "api-1-origin-visit", url_args={"origin_url": ov["origin"], "visit_id": ov["visit"]}, request=request, ) snapshot = ov["snapshot"] if snapshot: ov["snapshot_url"] = reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, request=request ) else: ov["snapshot_url"] = None return ov diff --git a/swh/web/api/views/metadata.py b/swh/web/api/views/metadata.py index fdf3bc36..e66615f5 100644 --- a/swh/web/api/views/metadata.py +++ b/swh/web/api/views/metadata.py @@ -1,254 +1,286 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import re from typing import Dict, Optional import iso8601 from django.http import HttpResponse from rest_framework.request import Request from swh.model import hashutil, swhids from swh.model.model import MetadataAuthority, MetadataAuthorityType from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.common import archive, converters from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.common.utils import SWHID_RE, reverse @api_route( f"/raw-extrinsic-metadata/swhid/(?P{SWHID_RE})/", "api-1-raw-extrinsic-metadata-swhid", ) @api_doc("/raw-extrinsic-metadata/swhid/") @format_docstring() def api_raw_extrinsic_metadata_swhid(request: Request, target: str): """ .. http:get:: /api/1/raw-extrinsic-metadata/swhid/(target) Returns raw `extrinsic metadata `__ collected on a given object. :param string target: The core SWHID of the object whose metadata should be returned :query string authority: A metadata authority identifier, formatted as `` ``. Required. :query string after: ISO8601 representation of the minimum timestamp of metadata to fetch. Defaults to allowing all metadata. :query int limit: Maximum number of metadata objects to return. {common_headers} :>jsonarr string target: SWHID of the object described by this metadata + (absent when ``target`` is not a core SWHID (ie. it does not have type + ``cnt``/``dir``/``rev``/``rel``/``snp``) :>jsonarr string discovery_date: ISO8601/RFC3339 timestamp of the moment this metadata was collected. :>jsonarr object authority: authority this metadata is coming from :>jsonarr object fetcher: tool used to fetch the metadata :>jsonarr string format: short identifier of the format of the metadata :>jsonarr string metadata_url: link to download the metadata "blob" itself :>jsonarr string origin: URL of the origin in which context's the metadata is valid, if any :>jsonarr int visit: identifier of the visit in which context's the metadata is valid, if any :>jsonarr string snapshot: SWHID of the snapshot in which context's the metadata is valid, if any :>jsonarr string release: SWHID of the release in which context's the metadata is valid, if any :>jsonarr string revision: SWHID of the revision in which context's the metadata is valid, if any :>jsonarr string path: SWHID of the path in which context's is valid if any, relative to a release or revision as anchor :>jsonarr string directory: SWHID of the directory in which context's the metadata is valid, if any :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`raw-extrinsic-metadata/swhid/swh:1:dir:a2faa28028657859c16ff506924212b33f0e1307/?authority=forge%20https://pypi.org/` """ # noqa authority_str: Optional[str] = request.query_params.get("authority") after_str: Optional[str] = request.query_params.get("after") limit_str: str = request.query_params.get("limit", "100") page_token_str: Optional[str] = request.query_params.get("page_token") if authority_str is None: raise BadInputExc("The 'authority' query parameter is required.") if " " not in authority_str.strip(): raise BadInputExc("The 'authority' query parameter should contain a space.") (authority_type_str, authority_url) = authority_str.split(" ", 1) try: authority_type = MetadataAuthorityType(authority_type_str) except ValueError: raise BadInputExc( f"Invalid 'authority' type, should be one of: " f"{', '.join(member.value for member in MetadataAuthorityType)}" ) authority = MetadataAuthority(authority_type, authority_url) if after_str: try: after = iso8601.parse_date(after_str) except iso8601.ParseError: raise BadInputExc("Invalid format for 'after' parameter.") from None else: after = None try: limit = int(limit_str) except ValueError: raise BadInputExc("'limit' parameter must be an integer.") from None limit = min(limit, 10000) try: - parsed_target = swhids.CoreSWHID.from_string(target).to_extended() + parsed_target = swhids.ExtendedSWHID.from_string(target) except swhids.ValidationError as e: - raise BadInputExc(f"Invalid target SWHID: {e.args[0]}") from None + raise BadInputExc(f"Invalid target SWHID: {e}") from None + + try: + swhids.CoreSWHID.from_string(target) + except swhids.ValidationError: + # Can be parsed as an extended SWHID, but not as a core SWHID + extended_swhid = True + else: + extended_swhid = False if page_token_str is not None: page_token = base64.urlsafe_b64decode(page_token_str) else: page_token = None result_page = archive.storage.raw_extrinsic_metadata_get( target=parsed_target, authority=authority, after=after, page_token=page_token, limit=limit, ) + filename = None + if parsed_target.object_type == swhids.ExtendedObjectType.ORIGIN: + origin_sha1 = hashutil.hash_to_hex(parsed_target.object_id) + (origin_info,) = list(archive.lookup_origins_by_sha1s([origin_sha1])) + if origin_info is not None: + filename = re.sub("[:/_.]+", "_", origin_info["url"]) + "_metadata" + if filename is None: + filename = f"{target}_metadata" + results = [] for metadata in result_page.results: result = converters.from_raw_extrinsic_metadata(metadata) + if extended_swhid: + # Keep extended SWHIDs away from the public API as much as possible. + # (It is part of the URL, but not documented, and only accessed via + # the link in the response of api-1-origin) + del result["target"] + # We can't reliably send metadata directly, because it is a bytestring, # and we have to return JSON documents. result["metadata_url"] = reverse( "api-1-raw-extrinsic-metadata-get", url_args={"id": hashutil.hash_to_hex(metadata.id)}, - query_params={"filename": f"{target}_metadata"}, + query_params={"filename": filename}, request=request, ) results.append(result) headers: Dict[str, str] = {} if result_page.next_page_token is not None: headers["link-next"] = reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": target}, query_params=dict( authority=authority_str, after=after_str, limit=limit_str, page_token=base64.urlsafe_b64encode( result_page.next_page_token.encode() ).decode(), ), request=request, ) return { "results": results, "headers": headers, } @api_route( "/raw-extrinsic-metadata/get/(?P[0-9a-z]+)/", "api-1-raw-extrinsic-metadata-get", ) def api_raw_extrinsic_metadata_get(request: Request, id: str): # This is an internal endpoint that should only be accessed via URLs given # by /raw-extrinsic-metadata/swhid/; so it is not documented. metadata = archive.storage.raw_extrinsic_metadata_get_by_ids( [hashutil.hash_to_bytes(id)] ) if not metadata: raise NotFoundExc( "Metadata not found. Use /raw-extrinsic-metadata/swhid/ to access metadata." ) response = HttpResponse( metadata[0].metadata, content_type="application/octet-stream" ) filename = request.query_params.get("filename") if filename and re.match("[a-zA-Z0-9:._-]+", filename): response["Content-disposition"] = f'attachment; filename="{filename}"' else: # It should always be not-None and match the regexp if the URL was created by # /raw-extrinsic-metadata/swhid/, but we're better safe than sorry. response["Content-disposition"] = "attachment" return response @api_route( f"/raw-extrinsic-metadata/swhid/(?P{SWHID_RE})/authorities/", "api-1-raw-extrinsic-metadata-swhid-authorities", ) @api_doc("/raw-extrinsic-metadata/swhid/authorities/") @format_docstring() def api_raw_extrinsic_metadata_swhid_authorities(request: Request, target: str): """ .. http:get:: /api/1/raw-extrinsic-metadata/swhid/(target)/authorities/ Returns a list of metadata authorities that provided metadata on the given target. They can then be used to get the raw `extrinsic metadata `__ collected on that object from each of the authorities. + This endpoint should only be used directly to retrieve metadata from + core SWHIDs (with type ``cnt``, ``dir``, ``rev``, ``rel``, and ``snp``). + For "extended" SWHIDs such as origins, the URL in the + ``origin_metadata_authorities_url`` field of + :http:get:`/api/1/origin/(origin_url)/get/` should be used instead of building + this URL directly. + :param string target: The core SWHID of the object whose metadata-providing authorities should be returned {common_headers} :>jsonarr string type: Type of authority (deposit_client, forge, registry) :>jsonarr string url: Unique IRI identifying the authority :>jsonarr object metadata_list_url: URL to get the list of metadata objects on the given object from this authority :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`raw-extrinsic-metadata/swhid/swh:1:dir:a2faa28028657859c16ff506924212b33f0e1307/authorities/` """ # noqa try: - parsed_target = swhids.CoreSWHID.from_string(target).to_extended() + parsed_target = swhids.ExtendedSWHID.from_string(target) except swhids.ValidationError as e: - raise BadInputExc(f"Invalid target SWHID: {e.args[0]}") from None + raise BadInputExc(f"Invalid target SWHID: {e}") from None authorities = archive.storage.raw_extrinsic_metadata_get_authorities( target=parsed_target ) results = [ { **authority.to_dict(), "metadata_list_url": reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": target}, query_params={"authority": f"{authority.type.value} {authority.url}"}, request=request, ), } for authority in authorities ] return { "results": results, "headers": {}, } diff --git a/swh/web/api/views/origin.py b/swh/web/api/views/origin.py index cadef97b..307cbf0a 100644 --- a/swh/web/api/views/origin.py +++ b/swh/web/api/views/origin.py @@ -1,506 +1,510 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from distutils.util import strtobool from functools import partial from typing import Dict from rest_framework.request import Request from swh.search.exc import SearchQuerySyntaxError from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.utils import ( enrich_origin, enrich_origin_search_result, enrich_origin_visit, ) from swh.web.api.views.utils import api_lookup from swh.web.common import archive from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.typing import OriginInfo from swh.web.common.utils import origin_visit_types, reverse DOC_RETURN_ORIGIN = """ :>json string origin_visits_url: link to in order to get information about the visits for that origin :>json string url: the origin canonical url """ DOC_RETURN_ORIGIN_ARRAY = DOC_RETURN_ORIGIN.replace(":>json", ":>jsonarr") DOC_RETURN_ORIGIN_VISIT = """ :>json string date: ISO8601/RFC3339 representation of the visit date (in UTC) :>json str origin: the origin canonical url :>json string origin_url: link to get information about the origin :>jsonarr string snapshot: the snapshot identifier of the visit (may be null if status is not **full**). :>jsonarr string snapshot_url: link to :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get information about the snapshot of the visit (may be null if status is not **full**). :>json string status: status of the visit (either **full**, **partial** or **ongoing**) :>json number visit: the unique identifier of the visit + :>json string metadata_authorities_url: link to + :http:get:`/api/1/raw-extrinsic-metadata/swhid/(target)/authorities/` + to get the list of metadata authorities providing extrinsic metadata + on this origin (and, indirectly, to the origin's extrinsic metadata itself) """ DOC_RETURN_ORIGIN_VISIT_ARRAY = DOC_RETURN_ORIGIN_VISIT.replace(":>json", ":>jsonarr") DOC_RETURN_ORIGIN_VISIT_ARRAY += """ :>jsonarr number id: the unique identifier of the origin :>jsonarr string origin_visit_url: link to :http:get:`/api/1/origin/(origin_url)/visit/(visit_id)/` in order to get information about the visit """ @api_route(r"/origins/", "api-1-origins") @api_doc("/origins/", noargs=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origins(request: Request): """ .. http:get:: /api/1/origins/ Get list of archived software origins. .. warning:: This endpoint used to provide an ``origin_from`` query parameter, and guarantee an order on results. This is no longer true, and only the Link header should be used for paginating through results. :query int origin_count: The maximum number of origins to return (default to 100, can not exceed 10000) {return_origin_array} {common_headers} {resheader_link} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origins?origin_count=500` """ old_param_origin_from = request.query_params.get("origin_from") if old_param_origin_from: raise BadInputExc("Please use the Link header to browse through result") page_token = request.query_params.get("page_token", None) limit = min(int(request.query_params.get("origin_count", "100")), 10000) page_result = archive.lookup_origins(page_token, limit) origins = [enrich_origin(o, request=request) for o in page_result.results] next_page_token = page_result.next_page_token headers: Dict[str, str] = {} if next_page_token is not None: headers["link-next"] = reverse( "api-1-origins", query_params={"page_token": next_page_token, "origin_count": str(limit)}, request=request, ) return {"results": origins, "headers": headers} @api_route(r"/origin/(?P.+)/get/", "api-1-origin") @api_doc("/origin/") @format_docstring(return_origin=DOC_RETURN_ORIGIN) def api_origin(request: Request, origin_url: str): """ .. http:get:: /api/1/origin/(origin_url)/get/ Get information about a software origin. :param string origin_url: the origin url {return_origin} {common_headers} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/python/cpython/get/` """ ori_dict = {"url": origin_url} error_msg = "Origin with url %s not found." % ori_dict["url"] return api_lookup( archive.lookup_origin, ori_dict, lookup_similar_urls=False, notfound_msg=error_msg, enrich_fn=enrich_origin, request=request, ) def _visit_types() -> str: docstring = "" # available visit types are queried using swh-search so we do it in a try # block in case of failure (for instance in docker environment when # elasticsearch service is not available) try: visit_types = [f"**{visit_type}**" for visit_type in origin_visit_types()] docstring = ", ".join(visit_types[:-1]) + f", and {visit_types[-1]}" except Exception: docstring = "???" pass return docstring @api_route( r"/origin/search/(?P.*)/", "api-1-origin-search", throttle_scope="swh_api_origin_search", ) @api_doc("/origin/search/") @format_docstring( return_origin_array=DOC_RETURN_ORIGIN_ARRAY, visit_types=_visit_types() ) def api_origin_search(request: Request, url_pattern: str): """ .. http:get:: /api/1/origin/search/(url_pattern)/ Search for software origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. .. warning:: This endpoint used to provide an ``offset`` query parameter, and guarantee an order on results. This is no longer true, and only the Link header should be used for paginating through results. :param string url_pattern: a string pattern :query boolean use_ql: whether to use swh search query language or not :query int limit: the maximum number of found origins to return (bounded to 1000) :query boolean with_visit: if true, only return origins with at least one visit by Software heritage :query string visit_type: if provided, only return origins with that specific visit type (currently the supported types are {visit_types}) {return_origin_array} {common_headers} {resheader_link} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/search/python/?limit=2` """ result = {} limit = min(int(request.query_params.get("limit", "70")), 1000) page_token = request.query_params.get("page_token") use_ql = request.query_params.get("use_ql", "false") with_visit = request.query_params.get("with_visit", "false") visit_type = request.query_params.get("visit_type") try: (results, page_token) = api_lookup( archive.search_origin, url_pattern, bool(strtobool(use_ql)), limit, bool(strtobool(with_visit)), [visit_type] if visit_type else None, page_token, enrich_fn=enrich_origin_search_result, request=request, ) except SearchQuerySyntaxError as e: raise BadInputExc(f"Syntax error in search query: {e.args[0]}") if page_token is not None: query_params = {k: v for (k, v) in request.GET.dict().items()} query_params["page_token"] = page_token result["headers"] = { "link-next": reverse( "api-1-origin-search", url_args={"url_pattern": url_pattern}, query_params=query_params, request=request, ) } result.update({"results": results}) return result @api_route(r"/origin/metadata-search/", "api-1-origin-metadata-search") @api_doc("/origin/metadata-search/", noargs=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_metadata_search(request: Request): """ .. http:get:: /api/1/origin/metadata-search/ Search for software origins whose metadata (expressed as a JSON-LD/CodeMeta dictionary) match the provided criteria. For now, only full-text search on this dictionary is supported. :query str fulltext: a string that will be matched against origin metadata; results are ranked and ordered starting with the best ones. :query int limit: the maximum number of found origins to return (bounded to 100) {return_origin_array} {common_headers} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/metadata-search/?limit=2&fulltext=Jane%20Doe` """ fulltext = request.query_params.get("fulltext", None) limit = min(int(request.query_params.get("limit", "70")), 100) if not fulltext: content = '"fulltext" must be provided and non-empty.' raise BadInputExc(content) results = api_lookup( archive.search_origin_metadata, fulltext, limit, request=request ) return { "results": results, } @api_route(r"/origin/(?P.+)/visits/", "api-1-origin-visits") @api_doc("/origin/visits/") @format_docstring(return_origin_visit_array=DOC_RETURN_ORIGIN_VISIT_ARRAY) def api_origin_visits(request: Request, origin_url: str): """ .. http:get:: /api/1/origin/(origin_url)/visits/ Get information about all visits of a software origin. Visits are returned sorted in descending order according to their date. :param str origin_url: a software origin URL :query int per_page: specify the number of visits to list, for pagination purposes :query int last_visit: visit to start listing from, for pagination purposes {common_headers} {resheader_link} {return_origin_visit_array} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visits/` """ result = {} origin_query = OriginInfo(url=origin_url) notfound_msg = "No origin {} found".format(origin_url) url_args_next = {"origin_url": origin_url} per_page = int(request.query_params.get("per_page", "10")) last_visit_str = request.query_params.get("last_visit") last_visit = int(last_visit_str) if last_visit_str else None def _lookup_origin_visits(origin_query, last_visit=last_visit, per_page=per_page): all_visits = get_origin_visits(origin_query, lookup_similar_urls=False) all_visits.reverse() visits = [] if not last_visit: visits = all_visits[:per_page] else: for i, v in enumerate(all_visits): if v["visit"] == last_visit: visits = all_visits[i + 1 : i + 1 + per_page] break for v in visits: yield v results = api_lookup( _lookup_origin_visits, origin_query, notfound_msg=notfound_msg, enrich_fn=partial( enrich_origin_visit, with_origin_link=False, with_origin_visit_link=True ), request=request, ) if results: nb_results = len(results) if nb_results == per_page: new_last_visit = results[-1]["visit"] query_params = {} query_params["last_visit"] = new_last_visit if request.query_params.get("per_page"): query_params["per_page"] = per_page result["headers"] = { "link-next": reverse( "api-1-origin-visits", url_args=url_args_next, query_params=query_params, request=request, ) } result.update({"results": results}) return result @api_route( r"/origin/(?P.+)/visit/latest/", "api-1-origin-visit-latest", throttle_scope="swh_api_origin_visit_latest", ) @api_doc("/origin/visit/latest/") @format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) def api_origin_visit_latest(request: Request, origin_url: str): """ .. http:get:: /api/1/origin/(origin_url)/visit/latest/ Get information about the latest visit of a software origin. :param str origin_url: a software origin URL :query boolean require_snapshot: if true, only return a visit with a snapshot {common_headers} {return_origin_visit} :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visit/latest/` """ require_snapshot = request.query_params.get("require_snapshot", "false") return api_lookup( archive.lookup_origin_visit_latest, origin_url, bool(strtobool(require_snapshot)), lookup_similar_urls=False, notfound_msg=("No visit for origin {} found".format(origin_url)), enrich_fn=partial( enrich_origin_visit, with_origin_link=True, with_origin_visit_link=False ), request=request, ) @api_route( r"/origin/(?P.+)/visit/(?P[0-9]+)/", "api-1-origin-visit" ) @api_doc("/origin/visit/") @format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) def api_origin_visit(request: Request, visit_id: str, origin_url: str): """ .. http:get:: /api/1/origin/(origin_url)/visit/(visit_id)/ Get information about a specific visit of a software origin. :param str origin_url: a software origin URL :param int visit_id: a visit identifier {common_headers} {return_origin_visit} :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visit/1/` """ return api_lookup( archive.lookup_origin_visit, origin_url, int(visit_id), lookup_similar_urls=False, notfound_msg=("No visit {} for origin {} found".format(visit_id, origin_url)), enrich_fn=partial( enrich_origin_visit, with_origin_link=True, with_origin_visit_link=False ), request=request, ) @api_route( r"/origin/(?P.+)/intrinsic-metadata/", "api-origin-intrinsic-metadata" ) @api_doc("/origin/intrinsic-metadata/") @format_docstring() def api_origin_intrinsic_metadata(request: Request, origin_url: str): """ .. http:get:: /api/1/origin/(origin_url)/intrinsic-metadata Get intrinsic metadata of a software origin (as a JSON-LD/CodeMeta dictionary). :param string origin_url: the origin url :>json string ???: intrinsic metadata field of the origin {common_headers} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/python/cpython/intrinsic-metadata` """ return api_lookup( archive.lookup_origin_intrinsic_metadata, origin_url, notfound_msg=f"Origin with url {origin_url} not found", enrich_fn=enrich_origin, request=request, ) diff --git a/swh/web/tests/api/test_utils.py b/swh/web/tests/api/test_utils.py index c47124c7..e95b12a5 100644 --- a/swh/web/tests/api/test_utils.py +++ b/swh/web/tests/api/test_utils.py @@ -1,597 +1,614 @@ -# Copyright (C) 2015-2021 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from swh.model.hashutil import DEFAULT_ALGORITHMS +from swh.model.model import Origin from swh.web.api import utils from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import resolve_branch_alias, reverse url_map = [ { "rule": "/other/", "methods": set(["GET", "POST", "HEAD"]), "endpoint": "foo", }, { "rule": "/some/old/url/", "methods": set(["GET", "POST"]), "endpoint": "blablafn", }, { "rule": "/other/old/url/", "methods": set(["GET", "HEAD"]), "endpoint": "bar", }, {"rule": "/other", "methods": set([]), "endpoint": None}, {"rule": "/other2", "methods": set([]), "endpoint": None}, ] def test_filter_field_keys_dict_unknown_keys(): actual_res = utils.filter_field_keys( {"directory": 1, "file": 2, "link": 3}, {"directory1", "file2"} ) assert actual_res == {} def test_filter_field_keys_dict(): actual_res = utils.filter_field_keys( {"directory": 1, "file": 2, "link": 3}, {"directory", "link"} ) assert actual_res == {"directory": 1, "link": 3} def test_filter_field_keys_list_unknown_keys(): actual_res = utils.filter_field_keys( [{"directory": 1, "file": 2, "link": 3}, {"1": 1, "2": 2, "link": 3}], {"d"} ) assert actual_res == [{}, {}] def test_filter_field_keys_map(): actual_res = utils.filter_field_keys( map( lambda x: {"i": x["i"] + 1, "j": x["j"]}, [{"i": 1, "j": None}, {"i": 2, "j": None}, {"i": 3, "j": None}], ), {"i"}, ) assert list(actual_res) == [{"i": 2}, {"i": 3}, {"i": 4}] def test_filter_field_keys_list(): actual_res = utils.filter_field_keys( [{"directory": 1, "file": 2, "link": 3}, {"dir": 1, "fil": 2, "lin": 3}], {"directory", "dir"}, ) assert actual_res == [{"directory": 1}, {"dir": 1}] def test_filter_field_keys_other(): input_set = {1, 2} actual_res = utils.filter_field_keys(input_set, {"a", "1"}) assert actual_res == input_set def test_person_to_string(): assert ( utils.person_to_string({"name": "raboof", "email": "foo@bar"}) == "raboof " ) def test_enrich_release_empty(): actual_release = utils.enrich_release({}) assert actual_release == {} def test_enrich_release_content_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "content" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{release_data["target"]}'}, request=request, ) assert actual_release == release_data def test_enrich_release_directory_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "directory" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-directory", url_args={"sha1_git": release_data["target"]}, request=request, ) assert actual_release == release_data def test_enrich_release_revision_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "revision" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-revision", url_args={"sha1_git": release_data["target"]}, request=request ) assert actual_release == release_data def test_enrich_release_release_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "release" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-release", url_args={"sha1_git": release_data["target"]}, request=request ) assert actual_release == release_data def test_enrich_directory_entry_no_type(): assert utils.enrich_directory_entry({"id": "dir-id"}) == {"id": "dir-id"} def test_enrich_directory_entry_with_type(api_request_factory, archive_data, directory): dir_content = archive_data.directory_ls(directory) dir_entry = random.choice(dir_content) url = reverse("api-1-directory", url_args={"sha1_git": directory}) request = api_request_factory.get(url) actual_directory = utils.enrich_directory_entry(dir_entry, request) if dir_entry["type"] == "file": dir_entry["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{dir_entry["target"]}'}, request=request, ) elif dir_entry["type"] == "dir": dir_entry["target_url"] = reverse( "api-1-directory", url_args={"sha1_git": dir_entry["target"]}, request=request, ) elif dir_entry["type"] == "rev": dir_entry["target_url"] = reverse( "api-1-revision", url_args={"sha1_git": dir_entry["target"]}, request=request, ) assert actual_directory == dir_entry def test_enrich_content_without_hashes(): assert utils.enrich_content({"id": "123"}) == {"id": "123"} def test_enrich_content_with_hashes(api_request_factory, content): for algo in DEFAULT_ALGORITHMS: content_data = dict(content) query_string = "%s:%s" % (algo, content_data[algo]) url = reverse("api-1-content", url_args={"q": query_string}) request = api_request_factory.get(url) enriched_content = utils.enrich_content( content_data, query_string=query_string, request=request ) content_data["data_url"] = reverse( "api-1-content-raw", url_args={"q": query_string}, request=request ) content_data["filetype_url"] = reverse( "api-1-content-filetype", url_args={"q": query_string}, request=request ) content_data["language_url"] = reverse( "api-1-content-language", url_args={"q": query_string}, request=request ) content_data["license_url"] = reverse( "api-1-content-license", url_args={"q": query_string}, request=request ) assert enriched_content == content_data def test_enrich_content_with_hashes_and_top_level_url(api_request_factory, content): for algo in DEFAULT_ALGORITHMS: content_data = dict(content) query_string = "%s:%s" % (algo, content_data[algo]) url = reverse("api-1-content", url_args={"q": query_string}) request = api_request_factory.get(url) enriched_content = utils.enrich_content( content_data, query_string=query_string, top_url=True, request=request ) content_data["content_url"] = reverse( "api-1-content", url_args={"q": query_string}, request=request ) content_data["data_url"] = reverse( "api-1-content-raw", url_args={"q": query_string}, request=request ) content_data["filetype_url"] = reverse( "api-1-content-filetype", url_args={"q": query_string}, request=request ) content_data["language_url"] = reverse( "api-1-content-language", url_args={"q": query_string}, request=request ) content_data["license_url"] = reverse( "api-1-content-license", url_args={"q": query_string}, request=request ) assert enriched_content == content_data def test_enrich_revision_without_children_or_parent( api_request_factory, archive_data, revision ): revision_data = archive_data.revision_get(revision) del revision_data["parents"] url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) assert actual_revision == revision_data def test_enrich_revision_with_children_and_parent_no_dir( api_request_factory, archive_data, revisions_list ): revision, parent_revision, child_revision = revisions_list(size=3) revision_data = archive_data.revision_get(revision) del revision_data["directory"] revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data def test_enrich_revisionno_context(api_request_factory, revisions_list): revision, parent_revision, child_revision = revisions_list(size=3) revision_data = { "id": revision, "parents": [parent_revision], "children": [child_revision], } url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": parent_revision, "url": reverse( "api-1-revision", url_args={"sha1_git": parent_revision}, request=request, ), } ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data def test_enrich_revision_with_no_message( api_request_factory, archive_data, revisions_list ): revision, parent_revision, child_revision = revisions_list(size=3) revision_data = archive_data.revision_get(revision) revision_data["message"] = None revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data def test_enrich_revision_with_invalid_message( api_request_factory, archive_data, revisions_list ): revision, parent_revision, child_revision = revisions_list(size=3) revision_data = archive_data.revision_get(revision) revision_data["decoding_failures"] = ["message"] revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["message_url"] = reverse( "api-1-revision-raw-message", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data def test_enrich_snapshot(api_request_factory, archive_data, snapshot): snapshot_data = archive_data.snapshot_get(snapshot) url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) request = api_request_factory.get(url) actual_snapshot = utils.enrich_snapshot(snapshot_data, request) for _, b in snapshot_data["branches"].items(): if b["target_type"] in ("directory", "revision", "release"): b["target_url"] = reverse( f'api-1-{b["target_type"]}', url_args={"sha1_git": b["target"]}, request=request, ) elif b["target_type"] == "content": b["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{b["target"]}'}, request=request, ) for _, b in snapshot_data["branches"].items(): if b["target_type"] == "alias": target = resolve_branch_alias(snapshot_data, b) b["target_url"] = target["target_url"] assert actual_snapshot == snapshot_data def test_enrich_origin(api_request_factory, origin): url = reverse("api-1-origin", url_args={"origin_url": origin["url"]}) request = api_request_factory.get(url) origin_data = {"url": origin["url"]} actual_origin = utils.enrich_origin(origin_data, request) origin_data["origin_visits_url"] = reverse( "api-1-origin-visits", url_args={"origin_url": origin["url"]}, request=request ) + origin_data["metadata_authorities_url"] = reverse( + "api-1-raw-extrinsic-metadata-swhid-authorities", + url_args={"target": Origin(url=origin["url"]).swhid()}, + request=request, + ) assert actual_origin == origin_data def test_enrich_origin_search_result(api_request_factory, origin): url = reverse("api-1-origin-search", url_args={"url_pattern": origin["url"]}) request = api_request_factory.get(url) origin_visits_url = reverse( "api-1-origin-visits", url_args={"origin_url": origin["url"]}, request=request ) + metadata_authorities_url = reverse( + "api-1-raw-extrinsic-metadata-swhid-authorities", + url_args={"target": Origin(url=origin["url"]).swhid()}, + request=request, + ) origin_search_result_data = ( [{"url": origin["url"]}], None, ) enriched_origin_search_result = ( - [{"url": origin["url"], "origin_visits_url": origin_visits_url}], + [ + { + "url": origin["url"], + "origin_visits_url": origin_visits_url, + "metadata_authorities_url": metadata_authorities_url, + } + ], None, ) assert ( utils.enrich_origin_search_result(origin_search_result_data, request=request) == enriched_origin_search_result ) def test_enrich_origin_visit(api_request_factory, origin): origin_visit = random.choice(get_origin_visits(origin)) url = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": origin_visit["visit"]}, ) request = api_request_factory.get(url) actual_origin_visit = utils.enrich_origin_visit( origin_visit, with_origin_link=True, with_origin_visit_link=True, request=request, ) origin_visit["origin_url"] = reverse( "api-1-origin", url_args={"origin_url": origin["url"]}, request=request ) origin_visit["origin_visit_url"] = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": origin_visit["visit"]}, request=request, ) origin_visit["snapshot_url"] = reverse( "api-1-snapshot", url_args={"snapshot_id": origin_visit["snapshot"]}, request=request, ) assert actual_origin_visit == origin_visit diff --git a/swh/web/tests/api/views/test_metadata.py b/swh/web/tests/api/views/test_metadata.py index a55ff1d6..c89b731b 100644 --- a/swh/web/tests/api/views/test_metadata.py +++ b/swh/web/tests/api/views/test_metadata.py @@ -1,203 +1,237 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import attr -from hypothesis import given -from hypothesis.strategies import composite, sampled_from, sets +from hypothesis import given, settings +from hypothesis.strategies import sets import pytest -from swh.model.hypothesis_strategies import raw_extrinsic_metadata, sha1_git -from swh.model.swhids import CoreSWHID, ObjectType +from swh.model.hypothesis_strategies import raw_extrinsic_metadata +from swh.model.model import Origin from swh.web.common.utils import reverse from swh.web.tests.api.views.utils import scroll_results from swh.web.tests.utils import check_api_get_responses, check_http_get_response -@composite -def core_swhids(draw): - object_type = draw(sampled_from(ObjectType)) - object_id = draw(sha1_git()) - return CoreSWHID(object_type=object_type, object_id=object_id).to_extended() - - -@given(raw_extrinsic_metadata(target=core_swhids())) +@given(raw_extrinsic_metadata()) def test_api_raw_extrinsic_metadata(api_client, subtest, metadata): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.metadata_authority_add([metadata.authority]) archive_data.metadata_fetcher_add([metadata.fetcher]) archive_data.raw_extrinsic_metadata_add([metadata]) authority = metadata.authority url = reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": str(metadata.target)}, query_params={"authority": f"{authority.type.value} {authority.url}"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 expected_result = metadata.to_dict() del expected_result["id"] del expected_result["metadata"] metadata_url = rv.data[0]["metadata_url"] expected_result["metadata_url"] = metadata_url expected_result["discovery_date"] = expected_result[ "discovery_date" ].isoformat() + if expected_result["target"].startswith(("swh:1:ori:", "swh:1:emd:")): + # non-core SWHID are hidden from the API + del expected_result["target"] assert rv.data == [expected_result] rv = check_http_get_response(api_client, metadata_url, status_code=200) assert rv["Content-Type"] == "application/octet-stream" assert ( rv["Content-Disposition"] == f'attachment; filename="{metadata.target}_metadata"' ) assert rv.content == metadata.metadata +@settings(max_examples=1) +@given(raw_extrinsic_metadata()) +def test_api_raw_extrinsic_metadata_origin_filename(api_client, subtest, metadata): + # ensure archive_data fixture will be reset between each hypothesis + # example test run + @subtest + def test_inner(archive_data): + nonlocal metadata + origin = Origin(url="http://example.com/repo.git") + metadata = attr.evolve(metadata, target=origin.swhid()) + metadata = attr.evolve(metadata, id=metadata.compute_hash()) + archive_data.origin_add([origin]) + archive_data.metadata_authority_add([metadata.authority]) + archive_data.metadata_fetcher_add([metadata.fetcher]) + archive_data.raw_extrinsic_metadata_add([metadata]) + + authority = metadata.authority + url = reverse( + "api-1-raw-extrinsic-metadata-swhid", + url_args={"target": str(metadata.target)}, + query_params={"authority": f"{authority.type.value} {authority.url}"}, + ) + rv = check_api_get_responses(api_client, url, status_code=200) + + assert len(rv.data) == 1 + metadata_url = rv.data[0]["metadata_url"] + rv = check_http_get_response(api_client, metadata_url, status_code=200) + assert rv["Content-Type"] == "application/octet-stream" + assert ( + rv["Content-Disposition"] + == 'attachment; filename="http_example_com_repo_git_metadata"' + ) + assert rv.content == metadata.metadata + + @pytest.mark.parametrize("limit", [1, 2, 10, 100]) -@given(sets(raw_extrinsic_metadata(target=core_swhids()), min_size=1)) +@given(sets(raw_extrinsic_metadata(), min_size=1)) def test_api_raw_extrinsic_metadata_scroll(api_client, subtest, limit, meta): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): # Make all metadata objects use the same authority and target metadata0 = next(iter(meta)) metadata = { attr.evolve(m, authority=metadata0.authority, target=metadata0.target) for m in meta } # Metadata ids must also be updated as they depend on authority and target metadata = {attr.evolve(m, id=m.compute_hash()) for m in metadata} authority = metadata0.authority archive_data.metadata_authority_add([authority]) archive_data.metadata_fetcher_add(list({m.fetcher for m in metadata})) archive_data.raw_extrinsic_metadata_add(metadata) url = reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": str(metadata0.target)}, query_params={ "authority": f"{authority.type.value} {authority.url}", "limit": limit, }, ) results = scroll_results(api_client, url) expected_results = [m.to_dict() for m in metadata] for expected_result in expected_results: del expected_result["id"] del expected_result["metadata"] expected_result["discovery_date"] = expected_result[ "discovery_date" ].isoformat() + if expected_result["target"].startswith(("swh:1:ori:", "swh:1:emd:")): + # non-core SWHID are hidden from the API + del expected_result["target"] assert len(results) == len(expected_results) for result in results: del result["metadata_url"] - assert result in expected_results + assert result in expected_results, str(expected_results) _swhid = "swh:1:dir:a2faa28028657859c16ff506924212b33f0e1307" @pytest.mark.parametrize( "status_code,url_args,query_params", [ pytest.param( 200, {"target": _swhid}, {"authority": "forge http://example.org"}, id="minimal working", ), pytest.param( 200, {"target": _swhid}, { "authority": "forge http://example.org", "after": "2021-06-18T09:31:09", "limit": 100, }, id="maximal working", ), pytest.param( 400, {"target": _swhid}, {"authority": "foo http://example.org"}, id="invalid authority type", ), pytest.param( 400, {"target": _swhid}, { "authority": "forge http://example.org", "after": "yesterday", }, id="invalid 'after' format", ), pytest.param( 400, {"target": _swhid}, { "authority": "forge http://example.org", "limit": "abc", }, id="invalid 'limit'", ), ], ) def test_api_raw_extrinsic_metadata_check_params( api_client, archive_data, status_code, url_args, query_params ): url = reverse( "api-1-raw-extrinsic-metadata-swhid", url_args=url_args, query_params=query_params, ) check_api_get_responses(api_client, url, status_code=status_code) -@given(raw_extrinsic_metadata(target=core_swhids())) +@given(raw_extrinsic_metadata()) def test_api_raw_extrinsic_metadata_list_authorities(api_client, subtest, metadata): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.metadata_authority_add([metadata.authority]) archive_data.metadata_fetcher_add([metadata.fetcher]) archive_data.raw_extrinsic_metadata_add([metadata]) authority = metadata.authority url = reverse( "api-1-raw-extrinsic-metadata-swhid-authorities", url_args={"target": str(metadata.target)}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_results = [ { "type": authority.type.value, "url": authority.url, "metadata_list_url": "http://testserver" + reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": str(metadata.target)}, query_params={ "authority": f"{authority.type.value} {authority.url}" }, ), } ] assert rv.data == expected_results