diff --git a/swh/web/api/utils.py b/swh/web/api/utils.py index 38f0983f..9f22fab6 100644 --- a/swh/web/api/utils.py +++ b/swh/web/api/utils.py @@ -1,335 +1,335 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Optional from django.http import HttpRequest from swh.web.common.query import parse_hash from swh.web.common.utils import resolve_branch_alias, reverse def filter_field_keys(data, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - data: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(data, map): return map(lambda x: filter_field_keys(x, field_keys), data) if isinstance(data, list): return [filter_field_keys(x, field_keys) for x in data] if isinstance(data, dict): return {k: v for (k, v) in data.items() if k in field_keys} return data def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return "".join([person["name"], " <", person["email"], ">"]) def enrich_object( object: Dict[str, str], request: Optional[HttpRequest] = None ) -> Dict[str, str]: """Enrich an object (revision, release) with link to the 'target' of type 'target_type'. Args: object: An object with target and target_type keys (e.g. release, revision) request: Absolute URIs will be generated if provided Returns: Object enriched with target object url (revision, release, content, directory) """ if "target" in object and "target_type" in object: if object["target_type"] in ("revision", "release", "directory"): object["target_url"] = reverse( "api-1-%s" % object["target_type"], url_args={"sha1_git": object["target"]}, request=request, ) elif object["target_type"] == "content": object["target_url"] = reverse( "api-1-content", url_args={"q": "sha1_git:" + object["target"]}, request=request, ) elif object["target_type"] == "snapshot": object["target_url"] = reverse( "api-1-snapshot", url_args={"snapshot_id": object["target"]}, request=request, ) return object enrich_release = enrich_object def enrich_directory( directory: Dict[str, str], request: Optional[HttpRequest] = None ) -> Dict[str, str]: """Enrich directory with url to content or directory. Args: directory: dict of data associated to a swh directory object request: Absolute URIs will be generated if provided Returns: An enriched directory dict filled with additional urls """ if "type" in directory: target_type = directory["type"] target = directory["target"] if target_type == "file": directory["target_url"] = reverse( "api-1-content", url_args={"q": "sha1_git:%s" % target}, request=request ) elif target_type == "dir": directory["target_url"] = reverse( "api-1-directory", url_args={"sha1_git": target}, request=request ) else: directory["target_url"] = reverse( "api-1-revision", url_args={"sha1_git": target}, request=request ) return directory def enrich_metadata_endpoint( content_metadata: Dict[str, str], request: Optional[HttpRequest] = None ) -> Dict[str, str]: """Enrich content metadata dict with link to the upper metadata endpoint. Args: content_metadata: dict of data associated to a swh content metadata request: Absolute URIs will be generated if provided Returns: An enriched content metadata dict filled with an additional url """ c = content_metadata c["content_url"] = reverse( "api-1-content", url_args={"q": "sha1:%s" % c["id"]}, request=request ) return c def enrich_content( content: Dict[str, Any], top_url: Optional[bool] = False, query_string: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> Dict[str, str]: """Enrich content with links to: - data_url: its raw data - filetype_url: its filetype information - language_url: its programming language information - license_url: its licensing information Args: content: dict of data associated to a swh content object top_url: whether or not to include the content url in the enriched data query_string: optional query string of type ':' used when requesting the content, it acts as a hint for picking the same hash method when computing the url listed above request: Absolute URIs will be generated if provided Returns: An enriched content dict filled with additional urls """ checksums = content if "checksums" in content: checksums = content["checksums"] hash_algo = "sha1" if query_string: hash_algo = parse_hash(query_string)[0] if hash_algo in checksums: q = "%s:%s" % (hash_algo, checksums[hash_algo]) if top_url: content["content_url"] = reverse("api-1-content", url_args={"q": q}) content["data_url"] = reverse( "api-1-content-raw", url_args={"q": q}, request=request ) content["filetype_url"] = reverse( "api-1-content-filetype", url_args={"q": q}, request=request ) content["language_url"] = reverse( "api-1-content-language", url_args={"q": q}, request=request ) content["license_url"] = reverse( "api-1-content-license", url_args={"q": q}, request=request ) return content def enrich_revision( revision: Dict[str, Any], request: Optional[HttpRequest] = None ) -> Dict[str, Any]: """Enrich revision with links where it makes sense (directory, parents). Keep track of the navigation breadcrumbs if they are specified. Args: revision: the revision as a dict request: Absolute URIs will be generated if provided Returns: An enriched revision dict filled with additional urls """ revision["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision["id"]}, request=request ) revision["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision["id"]}, request=request ) if "directory" in revision: revision["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision["directory"]}, request=request, ) if "parents" in revision: parents = [] for parent in revision["parents"]: parents.append( { "id": parent, "url": reverse( "api-1-revision", url_args={"sha1_git": parent}, request=request ), } ) revision["parents"] = tuple(parents) if "children" in revision: children = [] for child in revision["children"]: children.append( reverse("api-1-revision", url_args={"sha1_git": child}, request=request) ) revision["children_urls"] = children - if "message_decoding_failed" in revision: + if "decoding_failures" in revision and "message" in revision["decoding_failures"]: revision["message_url"] = reverse( "api-1-revision-raw-message", url_args={"sha1_git": revision["id"]}, request=request, ) return revision def enrich_snapshot( snapshot: Dict[str, Any], request: Optional[HttpRequest] = None ) -> Dict[str, Any]: """Enrich snapshot with links to the branch targets Args: snapshot: the snapshot as a dict request: Absolute URIs will be generated if provided Returns: An enriched snapshot dict filled with additional urls """ if "branches" in snapshot: snapshot["branches"] = { k: enrich_object(v, request) if v else None for k, v in snapshot["branches"].items() } for k, v in snapshot["branches"].items(): if v and v["target_type"] == "alias": branch = resolve_branch_alias(snapshot, v) if branch: branch = enrich_object(branch, request) v["target_url"] = branch["target_url"] return snapshot def enrich_origin( origin: Dict[str, Any], request: Optional[HttpRequest] = None ) -> Dict[str, Any]: """Enrich origin dict with link to its visits Args: origin: the origin as a dict request: Absolute URIs will be generated if provided Returns: An enriched origin dict filled with an additional url """ if "url" in origin: origin["origin_visits_url"] = reverse( "api-1-origin-visits", url_args={"origin_url": origin["url"]}, request=request, ) return origin def enrich_origin_visit( origin_visit: Dict[str, Any], *, with_origin_link: bool, with_origin_visit_link: bool, request: Optional[HttpRequest] = None, ) -> Dict[str, Any]: """Enrich origin visit dict with additional links Args: origin_visit: the origin visit as a dict with_origin_link: whether to add link to origin with_origin_visit_link: whether to add link to origin visit request: Absolute URIs will be generated if provided Returns: An enriched origin visit dict filled with additional urls """ ov = origin_visit if with_origin_link: ov["origin_url"] = reverse( "api-1-origin", url_args={"origin_url": ov["origin"]}, request=request ) if with_origin_visit_link: ov["origin_visit_url"] = reverse( "api-1-origin-visit", url_args={"origin_url": ov["origin"], "visit_id": ov["visit"]}, request=request, ) snapshot = ov["snapshot"] if snapshot: ov["snapshot_url"] = reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, request=request ) else: ov["snapshot_url"] = None return ov diff --git a/swh/web/common/converters.py b/swh/web/common/converters.py index 7373e8fd..00f09bd0 100644 --- a/swh/web/common/converters.py +++ b/swh/web/common/converters.py @@ -1,389 +1,383 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json from typing import Any, Dict, Union from swh.core.utils import decode_with_escape from swh.model import hashutil from swh.model.model import Release, Revision from swh.storage.interface import PartialBranches from swh.web.common.typing import OriginInfo, OriginVisitInfo def _group_checksums(data): """Groups checksums values computed from hash functions used in swh and stored in data dict under a single entry 'checksums' """ if data: checksums = {} for hash in hashutil.ALGORITHMS: if hash in data and data[hash]: checksums[hash] = data[hash] del data[hash] if len(checksums) > 0: data["checksums"] = checksums def fmap(f, data): """Map f to data at each level. This must keep the origin data structure type: - map -> map - dict -> dict - list -> list - None -> None Args: f: function that expects one argument. data: data to traverse to apply the f function. list, map, dict or bare value. Returns: The same data-structure with modified values by the f function. """ if data is None: return data if isinstance(data, map): return map(lambda y: fmap(f, y), (x for x in data)) if isinstance(data, list): return [fmap(f, x) for x in data] if isinstance(data, tuple): return tuple(fmap(f, x) for x in data) if isinstance(data, dict): return {k: fmap(f, v) for (k, v) in data.items()} return f(data) def from_swh( dict_swh, hashess={}, bytess={}, dates={}, blacklist={}, removables_if_empty={}, empty_dict={}, empty_list={}, convert={}, convert_fn=lambda x: x, ): """Convert from a swh dictionary to something reasonably json serializable. Args: dict_swh: the origin dictionary needed to be transformed hashess: list/set of keys representing hashes values (sha1, sha256, sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal string bytess: list/set of keys representing bytes values which needs to be decoded blacklist: set of keys to filter out from the conversion convert: set of keys whose associated values need to be converted using convert_fn convert_fn: the conversion function to apply on the value of key in 'convert' The remaining keys are copied as is in the output. Returns: dictionary equivalent as dict_swh only with its keys converted. """ def convert_hashes_bytes(v): """v is supposedly a hash as bytes, returns it converted in hex. """ if isinstance(v, bytes): return hashutil.hash_to_hex(v) return v def convert_bytes(v): """v is supposedly a bytes string, decode as utf-8. FIXME: Improve decoding policy. If not utf-8, break! """ if isinstance(v, bytes): return v.decode("utf-8") return v def convert_date(v): """ Args: v (dict or datatime): either: - a dict with three keys: - timestamp (dict or integer timestamp) - offset - negative_utc - or, a datetime We convert it to a human-readable string """ if not v: return v if isinstance(v, datetime.datetime): return v.isoformat() tz = datetime.timezone(datetime.timedelta(minutes=v["offset"])) swh_timestamp = v["timestamp"] if isinstance(swh_timestamp, dict): date = datetime.datetime.fromtimestamp(swh_timestamp["seconds"], tz=tz) else: date = datetime.datetime.fromtimestamp(swh_timestamp, tz=tz) datestr = date.isoformat() if v["offset"] == 0 and v["negative_utc"]: # remove the rightmost + and replace it with a - return "-".join(datestr.rsplit("+", 1)) return datestr if not dict_swh: return dict_swh new_dict = {} for key, value in dict_swh.items(): if key in blacklist or (key in removables_if_empty and not value): continue if key in dates: new_dict[key] = convert_date(value) elif key in convert: new_dict[key] = convert_fn(value) elif isinstance(value, dict): new_dict[key] = from_swh( value, hashess=hashess, bytess=bytess, dates=dates, blacklist=blacklist, removables_if_empty=removables_if_empty, empty_dict=empty_dict, empty_list=empty_list, convert=convert, convert_fn=convert_fn, ) elif key in hashess: new_dict[key] = fmap(convert_hashes_bytes, value) elif key in bytess: try: new_dict[key] = fmap(convert_bytes, value) except UnicodeDecodeError: if "decoding_failures" not in new_dict: new_dict["decoding_failures"] = [key] else: new_dict["decoding_failures"].append(key) new_dict[key] = fmap(decode_with_escape, value) elif key in empty_dict and not value: new_dict[key] = {} elif key in empty_list and not value: new_dict[key] = [] else: new_dict[key] = value _group_checksums(new_dict) return new_dict def from_origin(origin: Dict[str, Any]) -> OriginInfo: """Convert from a swh origin to an origin dictionary. """ return from_swh(origin) def from_release(release: Release) -> Dict[str, Any]: """Convert from a swh release to a json serializable release dictionary. Args: release: A release model object Returns: release dictionary with the following keys - id: hexadecimal sha1 (string) - revision: hexadecimal sha1 (string) - comment: release's comment message (string) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) """ return from_swh( release.to_dict(), hashess={"id", "target"}, bytess={"message", "name", "fullname", "email"}, dates={"date"}, ) class SWHMetadataEncoder(json.JSONEncoder): """Special json encoder for metadata field which can contain bytes encoded value. """ def default(self, obj): if isinstance(obj, bytes): try: return obj.decode("utf-8") except UnicodeDecodeError: # fallback to binary representation to avoid display errors return repr(obj) # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj) def convert_revision_metadata(metadata): """Convert json specific dict to a json serializable one. """ if not metadata: return {} return json.loads(json.dumps(metadata, cls=SWHMetadataEncoder)) def from_revision(revision: Union[Dict[str, Any], Revision]) -> Dict[str, Any]: """Convert swh revision model object to a json serializable revision dictionary. Args: revision: revision model object Returns: dict: Revision dictionary with the same keys as inputs, except: - sha1s are in hexadecimal strings (id, directory) - bytes are decoded in string (author_name, committer_name, author_email, committer_email) Remaining keys are left as is """ if isinstance(revision, Revision): revision_d = revision.to_dict() else: revision_d = revision revision_d = from_swh( revision_d, hashess={"id", "directory", "parents", "children"}, - bytess={"name", "fullname", "email", "extra_headers"}, + bytess={"name", "fullname", "email", "extra_headers", "message"}, convert={"metadata"}, convert_fn=convert_revision_metadata, dates={"date", "committer_date"}, ) if revision_d: if "parents" in revision_d: revision_d["merge"] = len(revision_d["parents"]) > 1 - if "message" in revision_d: - try: - revision_d["message"] = revision_d["message"].decode("utf-8") - except UnicodeDecodeError: - revision_d["message_decoding_failed"] = True - revision_d["message"] = None return revision_d def from_content(content): """Convert swh content to serializable content dictionary. """ return from_swh( content, hashess={"sha1", "sha1_git", "sha256", "blake2s256"}, blacklist={"ctime"}, convert={"status"}, convert_fn=lambda v: "absent" if v == "hidden" else v, ) def from_person(person): """Convert swh person to serializable person dictionary. """ return from_swh(person, bytess={"name", "fullname", "email"}) def from_origin_visit(visit: Dict[str, Any]) -> OriginVisitInfo: """Convert swh origin_visit to serializable origin_visit dictionary. """ ov = from_swh( visit, hashess={"target", "snapshot"}, bytess={"branch"}, dates={"date"}, empty_dict={"metadata"}, ) return ov def from_snapshot(snapshot): """Convert swh snapshot to serializable (partial) snapshot dictionary. """ sv = from_swh(snapshot, hashess={"id", "target"}, bytess={"next_branch"}) if sv and "branches" in sv: sv["branches"] = {decode_with_escape(k): v for k, v in sv["branches"].items()} for k, v in snapshot["branches"].items(): # alias target existing branch names, not a sha1 if v and v["target_type"] == "alias": branch = decode_with_escape(k) target = decode_with_escape(v["target"]) sv["branches"][branch]["target"] = target return sv def from_partial_branches(branches: PartialBranches): """Convert PartialBranches to serializable partial snapshot dictionary """ return from_snapshot( { "id": branches["id"], "branches": { branch_name: branch.to_dict() if branch else None for (branch_name, branch) in branches["branches"].items() }, "next_branch": branches["next_branch"], } ) def from_directory_entry(dir_entry): """Convert swh directory to serializable directory dictionary. """ return from_swh( dir_entry, hashess={"dir_id", "sha1_git", "sha1", "sha256", "blake2s256", "target"}, bytess={"name"}, removables_if_empty={"sha1", "sha1_git", "sha256", "blake2s256", "status"}, convert={"status"}, convert_fn=lambda v: "absent" if v == "hidden" else v, ) def from_filetype(content_entry): """Convert swh content to serializable dictionary containing keys 'id', 'encoding', and 'mimetype'. """ return from_swh(content_entry, hashess={"id"}) diff --git a/swh/web/tests/api/test_utils.py b/swh/web/tests/api/test_utils.py index 2ccf07b7..530d19a2 100644 --- a/swh/web/tests/api/test_utils.py +++ b/swh/web/tests/api/test_utils.py @@ -1,601 +1,600 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.web.api import utils from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import resolve_branch_alias, reverse from swh.web.tests.strategies import ( content, directory, origin, release, revision, snapshot, ) url_map = [ { "rule": "/other/", "methods": set(["GET", "POST", "HEAD"]), "endpoint": "foo", }, { "rule": "/some/old/url/", "methods": set(["GET", "POST"]), "endpoint": "blablafn", }, { "rule": "/other/old/url/", "methods": set(["GET", "HEAD"]), "endpoint": "bar", }, {"rule": "/other", "methods": set([]), "endpoint": None}, {"rule": "/other2", "methods": set([]), "endpoint": None}, ] def test_filter_field_keys_dict_unknown_keys(): actual_res = utils.filter_field_keys( {"directory": 1, "file": 2, "link": 3}, {"directory1", "file2"} ) assert actual_res == {} def test_filter_field_keys_dict(): actual_res = utils.filter_field_keys( {"directory": 1, "file": 2, "link": 3}, {"directory", "link"} ) assert actual_res == {"directory": 1, "link": 3} def test_filter_field_keys_list_unknown_keys(): actual_res = utils.filter_field_keys( [{"directory": 1, "file": 2, "link": 3}, {"1": 1, "2": 2, "link": 3}], {"d"} ) assert actual_res == [{}, {}] def test_filter_field_keys_map(): actual_res = utils.filter_field_keys( map( lambda x: {"i": x["i"] + 1, "j": x["j"]}, [{"i": 1, "j": None}, {"i": 2, "j": None}, {"i": 3, "j": None}], ), {"i"}, ) assert list(actual_res) == [{"i": 2}, {"i": 3}, {"i": 4}] def test_filter_field_keys_list(): actual_res = utils.filter_field_keys( [{"directory": 1, "file": 2, "link": 3}, {"dir": 1, "fil": 2, "lin": 3}], {"directory", "dir"}, ) assert actual_res == [{"directory": 1}, {"dir": 1}] def test_filter_field_keys_other(): input_set = {1, 2} actual_res = utils.filter_field_keys(input_set, {"a", "1"}) assert actual_res == input_set def test_person_to_string(): assert ( utils.person_to_string({"name": "raboof", "email": "foo@bar"}) == "raboof " ) def test_enrich_release_empty(): actual_release = utils.enrich_release({}) assert actual_release == {} @given(release()) def test_enrich_release_content_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "content" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{release_data["target"]}'}, request=request, ) assert actual_release == release_data @given(release()) def test_enrich_release_directory_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "directory" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-directory", url_args={"sha1_git": release_data["target"]}, request=request, ) assert actual_release == release_data @given(release()) def test_enrich_release_revision_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "revision" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-revision", url_args={"sha1_git": release_data["target"]}, request=request ) assert actual_release == release_data @given(release()) def test_enrich_release_release_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "release" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-release", url_args={"sha1_git": release_data["target"]}, request=request ) assert actual_release == release_data def test_enrich_directory_no_type(): assert utils.enrich_directory({"id": "dir-id"}) == {"id": "dir-id"} @given(directory()) def test_enrich_directory_with_type(api_request_factory, archive_data, directory): dir_content = archive_data.directory_ls(directory) dir_entry = random.choice(dir_content) url = reverse("api-1-directory", url_args={"sha1_git": directory}) request = api_request_factory.get(url) actual_directory = utils.enrich_directory(dir_entry, request) if dir_entry["type"] == "file": dir_entry["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{dir_entry["target"]}'}, request=request, ) elif dir_entry["type"] == "dir": dir_entry["target_url"] = reverse( "api-1-directory", url_args={"sha1_git": dir_entry["target"]}, request=request, ) elif dir_entry["type"] == "rev": dir_entry["target_url"] = reverse( "api-1-revision", url_args={"sha1_git": dir_entry["target"]}, request=request, ) assert actual_directory == dir_entry def test_enrich_content_without_hashes(): assert utils.enrich_content({"id": "123"}) == {"id": "123"} @given(content()) def test_enrich_content_with_hashes(api_request_factory, content): for algo in DEFAULT_ALGORITHMS: content_data = dict(content) query_string = "%s:%s" % (algo, content_data[algo]) url = reverse("api-1-content", url_args={"q": query_string}) request = api_request_factory.get(url) enriched_content = utils.enrich_content( content_data, query_string=query_string, request=request ) content_data["data_url"] = reverse( "api-1-content-raw", url_args={"q": query_string}, request=request ) content_data["filetype_url"] = reverse( "api-1-content-filetype", url_args={"q": query_string}, request=request ) content_data["language_url"] = reverse( "api-1-content-language", url_args={"q": query_string}, request=request ) content_data["license_url"] = reverse( "api-1-content-license", url_args={"q": query_string}, request=request ) assert enriched_content == content_data @given(content()) def test_enrich_content_with_hashes_and_top_level_url(api_request_factory, content): for algo in DEFAULT_ALGORITHMS: content_data = dict(content) query_string = "%s:%s" % (algo, content_data[algo]) url = reverse("api-1-content", url_args={"q": query_string}) request = api_request_factory.get(url) enriched_content = utils.enrich_content( content_data, query_string=query_string, top_url=True, request=request ) content_data["content_url"] = reverse( "api-1-content", url_args={"q": query_string}, request=request ) content_data["data_url"] = reverse( "api-1-content-raw", url_args={"q": query_string}, request=request ) content_data["filetype_url"] = reverse( "api-1-content-filetype", url_args={"q": query_string}, request=request ) content_data["language_url"] = reverse( "api-1-content-language", url_args={"q": query_string}, request=request ) content_data["license_url"] = reverse( "api-1-content-license", url_args={"q": query_string}, request=request ) assert enriched_content == content_data @given(revision()) def test_enrich_revision_without_children_or_parent( api_request_factory, archive_data, revision ): revision_data = archive_data.revision_get(revision) del revision_data["parents"] url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_with_children_and_parent_no_dir( api_request_factory, archive_data, revision, parent_revision, child_revision ): revision_data = archive_data.revision_get(revision) del revision_data["directory"] revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_no_context( api_request_factory, revision, parent_revision, child_revision ): revision_data = { "id": revision, "parents": [parent_revision], "children": [child_revision], } url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": parent_revision, "url": reverse( "api-1-revision", url_args={"sha1_git": parent_revision}, request=request, ), } ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_with_no_message( api_request_factory, archive_data, revision, parent_revision, child_revision ): revision_data = archive_data.revision_get(revision) revision_data["message"] = None revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_with_invalid_message( api_request_factory, archive_data, revision, parent_revision, child_revision ): revision_data = archive_data.revision_get(revision) - revision_data["message"] = None - revision_data["message_decoding_failed"] = (True,) + revision_data["decoding_failures"] = ["message"] revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["message_url"] = reverse( "api-1-revision-raw-message", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(snapshot()) def test_enrich_snapshot(api_request_factory, archive_data, snapshot): snapshot_data = archive_data.snapshot_get(snapshot) url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) request = api_request_factory.get(url) actual_snapshot = utils.enrich_snapshot(snapshot_data, request) for _, b in snapshot_data["branches"].items(): if b["target_type"] in ("directory", "revision", "release"): b["target_url"] = reverse( f'api-1-{b["target_type"]}', url_args={"sha1_git": b["target"]}, request=request, ) elif b["target_type"] == "content": b["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{b["target"]}'}, request=request, ) for _, b in snapshot_data["branches"].items(): if b["target_type"] == "alias": target = resolve_branch_alias(snapshot_data, b) b["target_url"] = target["target_url"] assert actual_snapshot == snapshot_data @given(origin()) def test_enrich_origin(api_request_factory, archive_data, origin): url = reverse("api-1-origin", url_args={"origin_url": origin["url"]}) request = api_request_factory.get(url) origin_data = {"url": origin["url"]} actual_origin = utils.enrich_origin(origin_data, request) origin_data["origin_visits_url"] = reverse( "api-1-origin-visits", url_args={"origin_url": origin["url"]}, request=request ) assert actual_origin == origin_data @given(origin()) def test_enrich_origin_visit(api_request_factory, archive_data, origin): origin_visit = random.choice(get_origin_visits(origin)) url = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": origin_visit["visit"]}, ) request = api_request_factory.get(url) actual_origin_visit = utils.enrich_origin_visit( origin_visit, with_origin_link=True, with_origin_visit_link=True, request=request, ) origin_visit["origin_url"] = reverse( "api-1-origin", url_args={"origin_url": origin["url"]}, request=request ) origin_visit["origin_visit_url"] = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": origin_visit["visit"]}, request=request, ) origin_visit["snapshot_url"] = reverse( "api-1-snapshot", url_args={"snapshot_id": origin_visit["snapshot"]}, request=request, ) assert actual_origin_visit == origin_visit diff --git a/swh/web/tests/common/test_converters.py b/swh/web/tests/common/test_converters.py index 750178fd..a4cc597c 100644 --- a/swh/web/tests/common/test_converters.py +++ b/swh/web/tests/common/test_converters.py @@ -1,757 +1,757 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.model import hashutil from swh.model.model import ( ObjectType, Person, Release, Revision, RevisionType, Timestamp, TimestampWithTimezone, ) from swh.web.common import converters def test_fmap(): assert [2, 3, None, 4] == converters.fmap(lambda x: x + 1, [1, 2, None, 3]) assert [11, 12, 13] == list( converters.fmap(lambda x: x + 10, map(lambda x: x, [1, 2, 3])) ) assert {"a": 2, "b": 4} == converters.fmap(lambda x: x * 2, {"a": 1, "b": 2}) assert 100 == converters.fmap(lambda x: x * 10, 10) assert {"a": [2, 6], "b": 4} == converters.fmap( lambda x: x * 2, {"a": [1, 3], "b": 2} ) assert converters.fmap(lambda x: x, None) is None def test_from_swh(): some_input = { "a": "something", "b": "someone", "c": b"sharp-0.3.4.tgz", "d": hashutil.hash_to_bytes("b04caf10e9535160d90e874b45aa426de762f19f"), "e": b"sharp.html/doc_002dS_005fISREG.html", "g": [b"utf-8-to-decode", b"another-one"], "h": "something filtered", "i": {"e": b"something"}, "j": { "k": { "l": [b"bytes thing", b"another thingy", b""], "n": "don't care either", }, "m": "don't care", }, "o": "something", "p": b"foo", "q": {"extra-headers": [["a", b"intact"]]}, "w": None, "r": {"p": "also intact", "q": "bar"}, "s": {"timestamp": 42, "offset": -420, "negative_utc": None,}, "s1": { "timestamp": {"seconds": 42, "microseconds": 0}, "offset": -420, "negative_utc": None, }, "s2": datetime.datetime(2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc), "t": None, "u": None, "v": None, "x": None, } expected_output = { "a": "something", "b": "someone", "c": "sharp-0.3.4.tgz", "d": "b04caf10e9535160d90e874b45aa426de762f19f", "e": "sharp.html/doc_002dS_005fISREG.html", "g": ["utf-8-to-decode", "another-one"], "i": {"e": "something"}, "j": {"k": {"l": ["bytes thing", "another thingy", ""]}}, "p": "foo", "q": {"extra-headers": [["a", "intact"]]}, "w": {}, "r": {"p": "also intact", "q": "bar"}, "s": "1969-12-31T17:00:42-07:00", "s1": "1969-12-31T17:00:42-07:00", "s2": "2013-07-01T20:00:00+00:00", "u": {}, "v": [], "x": None, } actual_output = converters.from_swh( some_input, hashess={"d", "o", "x"}, bytess={"c", "e", "g", "l"}, dates={"s", "s1", "s2"}, blacklist={"h", "m", "n", "o"}, removables_if_empty={"t"}, empty_dict={"u"}, empty_list={"v"}, convert={"p", "q", "w"}, convert_fn=converters.convert_revision_metadata, ) assert expected_output == actual_output def test_from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(): some_input = {"a": "something", "b": None, "c": "someone", "d": None, "e": None} expected_output = { "a": "something", "b": None, "c": "someone", "d": None, "e": None, } actual_output = converters.from_swh( some_input, hashess={"a", "b"}, bytess={"c", "d"}, dates={"e"} ) assert expected_output == actual_output def test_from_swh_edge_cases_convert_invalid_utf8_bytes(): some_input = { "a": "something", "b": "someone", "c": b"a name \xff", "d": b"an email \xff", } expected_output = { "a": "something", "b": "someone", "c": "a name \\xff", "d": "an email \\xff", "decoding_failures": ["c", "d"], } actual_output = converters.from_swh( some_input, hashess={"a", "b"}, bytess={"c", "d"} ) for v in ["a", "b", "c", "d"]: assert expected_output[v] == actual_output[v] assert len(expected_output["decoding_failures"]) == len( actual_output["decoding_failures"] ) for v in expected_output["decoding_failures"]: assert v in actual_output["decoding_failures"] def test_from_swh_empty(): assert {} == converters.from_swh({}) def test_from_swh_none(): assert converters.from_swh(None) is None def test_from_origin(): origin_input = { "id": 9, "type": "ftp", "url": "rsync://ftp.gnu.org/gnu/octave", } expected_origin = { "id": 9, "type": "ftp", "url": "rsync://ftp.gnu.org/gnu/octave", } actual_origin = converters.from_origin(origin_input) assert actual_origin == expected_origin def test_from_origin_visit(): snap_hash = "b5f0b7f716735ebffe38505c60145c4fd9da6ca3" for snap in [snap_hash, None]: visit = { "date": { "timestamp": datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ).timestamp(), "offset": 0, "negative_utc": False, }, "origin": 10, "visit": 100, "metadata": None, "status": "full", "snapshot": hashutil.hash_to_bytes(snap) if snap else snap, } expected_visit = { "date": "2015-01-01T22:00:00+00:00", "origin": 10, "visit": 100, "metadata": {}, "status": "full", "snapshot": snap_hash if snap else snap, } actual_visit = converters.from_origin_visit(visit) assert actual_visit == expected_visit def test_from_release(): """Convert release model object to a dict should be ok""" ts = int( datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ).timestamp() ) release_input = Release( id=hashutil.hash_to_bytes("aad23fa492a0c5fed0708a6703be875448c86884"), target=hashutil.hash_to_bytes("5e46d564378afc44b31bb89f99d5675195fbdf67"), target_type=ObjectType.REVISION, date=TimestampWithTimezone( timestamp=Timestamp(seconds=ts, microseconds=0), offset=0, negative_utc=False, ), author=Person( name=b"author name", fullname=b"Author Name author@email", email=b"author@email", ), name=b"v0.0.1", message=b"some comment on release", synthetic=True, ) expected_release = { "id": "aad23fa492a0c5fed0708a6703be875448c86884", "target": "5e46d564378afc44b31bb89f99d5675195fbdf67", "target_type": "revision", "date": "2015-01-01T22:00:00+00:00", "author": { "name": "author name", "fullname": "Author Name author@email", "email": "author@email", }, "name": "v0.0.1", "message": "some comment on release", "target_type": "revision", "synthetic": True, } actual_release = converters.from_release(release_input) assert actual_release == expected_release def test_from_revision_model_object(): ts = int( datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc ).timestamp() ) revision_input = Revision( directory=hashutil.hash_to_bytes("7834ef7e7c357ce2af928115c6c6a42b7e2a44e6"), author=Person( name=b"Software Heritage", fullname=b"robot robot@softwareheritage.org", email=b"robot@softwareheritage.org", ), committer=Person( name=b"Software Heritage", fullname=b"robot robot@softwareheritage.org", email=b"robot@softwareheritage.org", ), message=b"synthetic revision message", date=TimestampWithTimezone( timestamp=Timestamp(seconds=ts, microseconds=0), offset=0, negative_utc=False, ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=ts, microseconds=0), offset=0, negative_utc=False, ), synthetic=True, type=RevisionType.TAR, parents=tuple( [ hashutil.hash_to_bytes("29d8be353ed3480476f032475e7c244eff7371d5"), hashutil.hash_to_bytes("30d8be353ed3480476f032475e7c244eff7371d5"), ] ), extra_headers=((b"gpgsig", b"some-signature"),), metadata={ "original_artifact": [ { "archive_type": "tar", "name": "webbase-5.7.0.tar.gz", "sha1": "147f73f369733d088b7a6fa9c4e0273dcd3c7ccd", "sha1_git": "6a15ea8b881069adedf11feceec35588f2cfe8f1", "sha256": "401d0df797110bea805d358b85bcc1ced29549d3d73f" "309d36484e7edf7bb912", } ], }, ) expected_revision = { "id": "a001358278a0d811fe7072463f805da601121c2a", "directory": "7834ef7e7c357ce2af928115c6c6a42b7e2a44e6", "author": { "name": "Software Heritage", "fullname": "robot robot@softwareheritage.org", "email": "robot@softwareheritage.org", }, "committer": { "name": "Software Heritage", "fullname": "robot robot@softwareheritage.org", "email": "robot@softwareheritage.org", }, "message": "synthetic revision message", "date": "2000-01-17T11:23:54+00:00", "committer_date": "2000-01-17T11:23:54+00:00", "parents": tuple( [ "29d8be353ed3480476f032475e7c244eff7371d5", "30d8be353ed3480476f032475e7c244eff7371d5", ] ), "type": "tar", "synthetic": True, "extra_headers": (("gpgsig", "some-signature"),), "metadata": { "original_artifact": [ { "archive_type": "tar", "name": "webbase-5.7.0.tar.gz", "sha1": "147f73f369733d088b7a6fa9c4e0273dcd3c7ccd", "sha1_git": "6a15ea8b881069adedf11feceec35588f2cfe8f1", "sha256": "401d0df797110bea805d358b85bcc1ced29549d3d73f" "309d36484e7edf7bb912", } ], }, "merge": True, } actual_revision = converters.from_revision(revision_input) assert actual_revision == expected_revision def test_from_revision(): ts = datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc ).timestamp() revision_input = { "id": hashutil.hash_to_bytes("18d8be353ed3480476f032475e7c233eff7371d5"), "directory": hashutil.hash_to_bytes("7834ef7e7c357ce2af928115c6c6a42b7e2a44e6"), "author": { "name": b"Software Heritage", "fullname": b"robot robot@softwareheritage.org", "email": b"robot@softwareheritage.org", }, "committer": { "name": b"Software Heritage", "fullname": b"robot robot@softwareheritage.org", "email": b"robot@softwareheritage.org", }, "message": b"synthetic revision message", "date": {"timestamp": ts, "offset": 0, "negative_utc": False,}, "committer_date": {"timestamp": ts, "offset": 0, "negative_utc": False,}, "synthetic": True, "type": "tar", "parents": [ hashutil.hash_to_bytes("29d8be353ed3480476f032475e7c244eff7371d5"), hashutil.hash_to_bytes("30d8be353ed3480476f032475e7c244eff7371d5"), ], "children": [ hashutil.hash_to_bytes("123546353ed3480476f032475e7c244eff7371d5"), ], "metadata": { "extra_headers": [["gpgsig", b"some-signature"]], "original_artifact": [ { "archive_type": "tar", "name": "webbase-5.7.0.tar.gz", "sha1": "147f73f369733d088b7a6fa9c4e0273dcd3c7ccd", "sha1_git": "6a15ea8b881069adedf11feceec35588f2cfe8f1", "sha256": "401d0df797110bea805d358b85bcc1ced29549d3d73f" "309d36484e7edf7bb912", } ], }, } expected_revision = { "id": "18d8be353ed3480476f032475e7c233eff7371d5", "directory": "7834ef7e7c357ce2af928115c6c6a42b7e2a44e6", "author": { "name": "Software Heritage", "fullname": "robot robot@softwareheritage.org", "email": "robot@softwareheritage.org", }, "committer": { "name": "Software Heritage", "fullname": "robot robot@softwareheritage.org", "email": "robot@softwareheritage.org", }, "message": "synthetic revision message", "date": "2000-01-17T11:23:54+00:00", "committer_date": "2000-01-17T11:23:54+00:00", "children": ["123546353ed3480476f032475e7c244eff7371d5"], "parents": [ "29d8be353ed3480476f032475e7c244eff7371d5", "30d8be353ed3480476f032475e7c244eff7371d5", ], "type": "tar", "synthetic": True, "metadata": { "extra_headers": [["gpgsig", "some-signature"]], "original_artifact": [ { "archive_type": "tar", "name": "webbase-5.7.0.tar.gz", "sha1": "147f73f369733d088b7a6fa9c4e0273dcd3c7ccd", "sha1_git": "6a15ea8b881069adedf11feceec35588f2cfe8f1", "sha256": "401d0df797110bea805d358b85bcc1ced29549d3d73f" "309d36484e7edf7bb912", } ], }, "merge": True, } actual_revision = converters.from_revision(revision_input) assert actual_revision == expected_revision def test_from_revision_nomerge(): revision_input = { "id": hashutil.hash_to_bytes("18d8be353ed3480476f032475e7c233eff7371d5"), "parents": [hashutil.hash_to_bytes("29d8be353ed3480476f032475e7c244eff7371d5")], } expected_revision = { "id": "18d8be353ed3480476f032475e7c233eff7371d5", "parents": ["29d8be353ed3480476f032475e7c244eff7371d5"], "merge": False, } actual_revision = converters.from_revision(revision_input) assert actual_revision == expected_revision def test_from_revision_noparents(): revision_input = { "id": hashutil.hash_to_bytes("18d8be353ed3480476f032475e7c233eff7371d5"), "directory": hashutil.hash_to_bytes("7834ef7e7c357ce2af928115c6c6a42b7e2a44e6"), "author": { "name": b"Software Heritage", "fullname": b"robot robot@softwareheritage.org", "email": b"robot@softwareheritage.org", }, "committer": { "name": b"Software Heritage", "fullname": b"robot robot@softwareheritage.org", "email": b"robot@softwareheritage.org", }, "message": b"synthetic revision message", "date": { "timestamp": datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc ).timestamp(), "offset": 0, "negative_utc": False, }, "committer_date": { "timestamp": datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc ).timestamp(), "offset": 0, "negative_utc": False, }, "synthetic": True, "type": "tar", "children": [ hashutil.hash_to_bytes("123546353ed3480476f032475e7c244eff7371d5"), ], "metadata": { "original_artifact": [ { "archive_type": "tar", "name": "webbase-5.7.0.tar.gz", "sha1": "147f73f369733d088b7a6fa9c4e0273dcd3c7ccd", "sha1_git": "6a15ea8b881069adedf11feceec35588f2cfe8f1", "sha256": "401d0df797110bea805d358b85bcc1ced29549d3d73f" "309d36484e7edf7bb912", } ] }, } expected_revision = { "id": "18d8be353ed3480476f032475e7c233eff7371d5", "directory": "7834ef7e7c357ce2af928115c6c6a42b7e2a44e6", "author": { "name": "Software Heritage", "fullname": "robot robot@softwareheritage.org", "email": "robot@softwareheritage.org", }, "committer": { "name": "Software Heritage", "fullname": "robot robot@softwareheritage.org", "email": "robot@softwareheritage.org", }, "message": "synthetic revision message", "date": "2000-01-17T11:23:54+00:00", "committer_date": "2000-01-17T11:23:54+00:00", "children": ["123546353ed3480476f032475e7c244eff7371d5"], "type": "tar", "synthetic": True, "metadata": { "original_artifact": [ { "archive_type": "tar", "name": "webbase-5.7.0.tar.gz", "sha1": "147f73f369733d088b7a6fa9c4e0273dcd3c7ccd", "sha1_git": "6a15ea8b881069adedf11feceec35588f2cfe8f1", "sha256": "401d0df797110bea805d358b85bcc1ced29549d3d73f" "309d36484e7edf7bb912", } ] }, } actual_revision = converters.from_revision(revision_input) assert actual_revision == expected_revision def test_from_revision_invalid(): revision_input = { "id": hashutil.hash_to_bytes("18d8be353ed3480476f032475e7c233eff7371d5"), "directory": hashutil.hash_to_bytes("7834ef7e7c357ce2af928115c6c6a42b7e2a44e6"), "author": { "name": b"Software Heritage", "fullname": b"robot robot@softwareheritage.org", "email": b"robot@softwareheritage.org", }, "committer": { "name": b"Software Heritage", "fullname": b"robot robot@softwareheritage.org", "email": b"robot@softwareheritage.org", }, "message": b"invalid message \xff", "date": { "timestamp": datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc ).timestamp(), "offset": 0, "negative_utc": False, }, "committer_date": { "timestamp": datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc ).timestamp(), "offset": 0, "negative_utc": False, }, "synthetic": True, "type": "tar", "parents": [ hashutil.hash_to_bytes("29d8be353ed3480476f032475e7c244eff7371d5"), hashutil.hash_to_bytes("30d8be353ed3480476f032475e7c244eff7371d5"), ], "children": [ hashutil.hash_to_bytes("123546353ed3480476f032475e7c244eff7371d5"), ], "metadata": { "original_artifact": [ { "archive_type": "tar", "name": "webbase-5.7.0.tar.gz", "sha1": "147f73f369733d088b7a6fa9c4e0273dcd3c7ccd", "sha1_git": "6a15ea8b881069adedf11feceec35588f2cfe8f1", "sha256": "401d0df797110bea805d358b85bcc1ced29549d3d73f" "309d36484e7edf7bb912", } ] }, } expected_revision = { "id": "18d8be353ed3480476f032475e7c233eff7371d5", "directory": "7834ef7e7c357ce2af928115c6c6a42b7e2a44e6", "author": { "name": "Software Heritage", "fullname": "robot robot@softwareheritage.org", "email": "robot@softwareheritage.org", }, "committer": { "name": "Software Heritage", "fullname": "robot robot@softwareheritage.org", "email": "robot@softwareheritage.org", }, - "message": None, - "message_decoding_failed": True, + "message": "invalid message \\xff", + "decoding_failures": ["message"], "date": "2000-01-17T11:23:54+00:00", "committer_date": "2000-01-17T11:23:54+00:00", "children": ["123546353ed3480476f032475e7c244eff7371d5"], "parents": [ "29d8be353ed3480476f032475e7c244eff7371d5", "30d8be353ed3480476f032475e7c244eff7371d5", ], "type": "tar", "synthetic": True, "metadata": { "original_artifact": [ { "archive_type": "tar", "name": "webbase-5.7.0.tar.gz", "sha1": "147f73f369733d088b7a6fa9c4e0273dcd3c7ccd", "sha1_git": "6a15ea8b881069adedf11feceec35588f2cfe8f1", "sha256": "401d0df797110bea805d358b85bcc1ced29549d3d73f" "309d36484e7edf7bb912", } ] }, "merge": True, } actual_revision = converters.from_revision(revision_input) assert actual_revision == expected_revision def test_from_content_none(): assert converters.from_content(None) is None def test_from_content(): content_input = { "sha1": hashutil.hash_to_bytes("5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5"), "sha256": hashutil.hash_to_bytes( "39007420ca5de7cb3cfc15196335507e" "e76c98930e7e0afa4d2747d3bf96c926" ), "blake2s256": hashutil.hash_to_bytes( "49007420ca5de7cb3cfc15196335507e" "e76c98930e7e0afa4d2747d3bf96c926" ), "sha1_git": hashutil.hash_to_bytes("40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03"), "ctime": "something-which-is-filtered-out", "data": b"data in bytes", "length": 10, "status": "hidden", } # 'status' is filtered expected_content = { "checksums": { "sha1": "5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5", "sha256": "39007420ca5de7cb3cfc15196335507ee76c98" "930e7e0afa4d2747d3bf96c926", "blake2s256": "49007420ca5de7cb3cfc15196335507ee7" "6c98930e7e0afa4d2747d3bf96c926", "sha1_git": "40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03", }, "data": b"data in bytes", "length": 10, "status": "absent", } actual_content = converters.from_content(content_input) assert actual_content == expected_content def test_from_person(): person_input = { "id": 10, "anything": "else", "name": b"bob", "fullname": b"bob bob@alice.net", "email": b"bob@foo.alice", } expected_person = { "id": 10, "anything": "else", "name": "bob", "fullname": "bob bob@alice.net", "email": "bob@foo.alice", } actual_person = converters.from_person(person_input) assert actual_person == expected_person def test_from_directory_entries(): dir_entries_input = { "sha1": hashutil.hash_to_bytes("5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5"), "sha256": hashutil.hash_to_bytes( "39007420ca5de7cb3cfc15196335507e" "e76c98930e7e0afa4d2747d3bf96c926" ), "sha1_git": hashutil.hash_to_bytes("40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03"), "blake2s256": hashutil.hash_to_bytes( "685395c5dc57cada459364f0946d3dd45bad5fcbab" "c1048edb44380f1d31d0aa" ), "target": hashutil.hash_to_bytes("40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03"), "dir_id": hashutil.hash_to_bytes("40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03"), "name": b"bob", "type": 10, "status": "hidden", } expected_dir_entries = { "checksums": { "sha1": "5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5", "sha256": "39007420ca5de7cb3cfc15196335507ee76c98" "930e7e0afa4d2747d3bf96c926", "sha1_git": "40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03", "blake2s256": "685395c5dc57cada459364f0946d3dd45bad5f" "cbabc1048edb44380f1d31d0aa", }, "target": "40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03", "dir_id": "40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03", "name": "bob", "type": 10, "status": "absent", } actual_dir_entries = converters.from_directory_entry(dir_entries_input) assert actual_dir_entries == expected_dir_entries def test_from_filetype(): content_filetype = { "id": hashutil.hash_to_bytes("5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5"), "encoding": "utf-8", "mimetype": "text/plain", } expected_content_filetype = { "id": "5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5", "encoding": "utf-8", "mimetype": "text/plain", } actual_content_filetype = converters.from_filetype(content_filetype) assert actual_content_filetype == expected_content_filetype diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index 5d8ebf81..56dd63b2 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,995 +1,995 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import itertools import random from hypothesis import given import pytest from swh.model.from_disk import DentryPerms from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit, Revision from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.tests.conftest import ctags_json_missing, fossology_missing from swh.web.tests.data import random_content, random_sha1 from swh.web.tests.strategies import ( ancestor_revisions, content, contents, contents_with_ctags, directory, empty_directory, invalid_sha1, new_origin, new_revision, non_ancestor_revisions, origin, release, releases, revision, revision_with_submodules, revisions, sha256, snapshot, unknown_content, unknown_contents, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, visit_dates, ) @given(contents()) def test_lookup_multiple_hashes_all_present(contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({"sha1": cnt["sha1"]}) expected_output.append({"sha1": cnt["sha1"], "found": True}) assert service.lookup_multiple_hashes(input_data) == expected_output @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({"sha1": cnt["sha1"]}) expected_output.append({"sha1": cnt["sha1"], "found": cnt in contents}) assert service.lookup_multiple_hashes(input_data) == expected_output def test_lookup_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.lookup_hash("sha1_git:%s" % unknown_content_["sha1_git"]) assert actual_lookup == {"found": None, "algo": "sha1_git"} @given(content()) def test_lookup_hash_exist(archive_data, content): actual_lookup = service.lookup_hash("sha1:%s" % content["sha1"]) content_metadata = archive_data.content_get(content["sha1"]) assert {"found": content_metadata, "algo": "sha1"} == actual_lookup def test_search_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.search_hash("sha1_git:%s" % unknown_content_["sha1_git"]) assert {"found": False} == actual_lookup @given(content()) def test_search_hash_exist(content): actual_lookup = service.search_hash("sha1:%s" % content["sha1"]) assert {"found": True} == actual_lookup @pytest.mark.skipif( ctags_json_missing, reason="requires ctags with json output support" ) @given(contents_with_ctags()) def test_lookup_content_ctags(indexer_data, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags["sha1s"]) indexer_data.content_add_ctags(content_sha1) actual_ctags = list(service.lookup_content_ctags("sha1:%s" % content_sha1)) expected_data = list(indexer_data.content_get_ctags(content_sha1)) for ctag in expected_data: ctag["id"] = content_sha1 assert actual_ctags == expected_data def test_lookup_content_ctags_no_hash(): unknown_content_ = random_content() actual_ctags = list( service.lookup_content_ctags("sha1:%s" % unknown_content_["sha1"]) ) assert actual_ctags == [] @given(content()) def test_lookup_content_filetype(indexer_data, content): indexer_data.content_add_mimetype(content["sha1"]) actual_filetype = service.lookup_content_filetype(content["sha1"]) expected_filetype = indexer_data.content_get_mimetype(content["sha1"]) assert actual_filetype == expected_filetype @pytest.mark.skip # Language indexer is disabled. @given(content()) def test_lookup_content_language(indexer_data, content): indexer_data.content_add_language(content["sha1"]) actual_language = service.lookup_content_language(content["sha1"]) expected_language = indexer_data.content_get_language(content["sha1"]) assert actual_language == expected_language @given(contents_with_ctags()) def test_lookup_expression(indexer_data, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags["sha1s"]: if len(expected_ctags) == per_page: break indexer_data.content_add_ctags(content_sha1) for ctag in indexer_data.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag["name"] == contents_with_ctags["symbol_name"]: del ctag["id"] ctag["sha1"] = content_sha1 expected_ctags.append(ctag) actual_ctags = list( service.lookup_expression( contents_with_ctags["symbol_name"], last_sha1=None, per_page=10 ) ) assert actual_ctags == expected_ctags def test_lookup_expression_no_result(): expected_ctags = [] actual_ctags = list( service.lookup_expression("barfoo", last_sha1=None, per_page=10) ) assert actual_ctags == expected_ctags @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(indexer_data, content): indexer_data.content_add_license(content["sha1"]) actual_license = service.lookup_content_license(content["sha1"]) expected_license = indexer_data.content_get_license(content["sha1"]) assert actual_license == expected_license def test_stat_counters(archive_data): actual_stats = service.stat_counters() assert actual_stats == archive_data.stat_counters() @given(new_origin(), visit_dates()) def test_lookup_origin_visits(archive_data, new_origin, visit_dates): archive_data.origin_add([new_origin]) archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=ts, type="git",) for ts in visit_dates] ) actual_origin_visits = list( service.lookup_origin_visits(new_origin.url, per_page=100) ) expected_visits = archive_data.origin_visit_get(new_origin.url) for expected_visit in expected_visits: expected_visit["origin"] = new_origin.url assert actual_origin_visits == expected_visits @given(new_origin(), visit_dates()) def test_lookup_origin_visit(archive_data, new_origin, visit_dates): archive_data.origin_add([new_origin]) visits = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=ts, type="git",) for ts in visit_dates] ) visit = random.choice(visits).visit actual_origin_visit = service.lookup_origin_visit(new_origin.url, visit) expected_visit = dict(archive_data.origin_visit_get_by(new_origin.url, visit)) assert actual_origin_visit == expected_visit @given(new_origin()) def test_lookup_origin(archive_data, new_origin): archive_data.origin_add([new_origin]) actual_origin = service.lookup_origin({"url": new_origin.url}) expected_origin = archive_data.origin_get([new_origin.url])[0] assert actual_origin == expected_origin @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_release(invalid_sha1) assert e.match("Invalid checksum") @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(sha256): with pytest.raises(BadInputExc) as e: service.lookup_release(sha256) assert e.match("Only sha1_git is supported.") @given(releases()) def test_lookup_release_multiple(archive_data, releases): actual_releases = list(service.lookup_release_multiple(releases)) expected_releases = [] for release_id in releases: release_info = archive_data.release_get(release_id) expected_releases.append(release_info) assert actual_releases == expected_releases def test_lookup_release_multiple_none_found(): unknown_releases_ = [random_sha1(), random_sha1(), random_sha1()] actual_releases = list(service.lookup_release_multiple(unknown_releases_)) assert actual_releases == [None] * len(unknown_releases_) @given(directory()) def test_lookup_directory_with_path_not_found(directory): path = "some/invalid/path/here" with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_path(directory, path) assert e.match("Directory entry with path %s from %s not found" % (path, directory)) @given(directory()) def test_lookup_directory_with_path_found(archive_data, directory): directory_content = archive_data.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry["name"] actual_result = service.lookup_directory_with_path(directory, path) assert actual_result == directory_entry @given(release()) def test_lookup_release(archive_data, release): actual_release = service.lookup_release(release) assert actual_release == archive_data.release_get(release) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Invalid checksum query string") sha1_git = sha256 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Only sha1_git is supported") @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( revision, unknown_revision ): sha1_git_root = revision sha1_git = unknown_revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Revision %s not found" % sha1_git) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( revision, unknown_revision ): sha1_git_root = unknown_revision sha1_git = revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Revision root %s not found" % sha1_git_root) @given(ancestor_revisions()) def test_lookup_revision_with_context(archive_data, ancestor_revisions): sha1_git = ancestor_revisions["sha1_git"] root_sha1_git = ancestor_revisions["sha1_git_root"] for sha1_git_root in (root_sha1_git, {"id": hash_to_bytes(root_sha1_git)}): actual_revision = service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in archive_data.revision_log(root_sha1_git): for p_rev in rev["parents"]: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev["id"]) expected_revision = archive_data.revision_get(sha1_git) expected_revision["children"] = children assert actual_revision == expected_revision @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(non_ancestor_revisions): sha1_git = non_ancestor_revisions["sha1_git"] root_sha1_git = non_ancestor_revisions["sha1_git_root"] with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(root_sha1_git, sha1_git) assert e.match("Revision %s is not an ancestor of %s" % (sha1_git, root_sha1_git)) def test_lookup_directory_with_revision_not_found(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(unknown_revision_) assert e.match("Revision %s not found" % unknown_revision_) @given(new_revision()) def test_lookup_directory_with_revision_unknown_content(archive_data, new_revision): unknown_content_ = random_content() dir_path = "README.md" # A directory that points to unknown content dir = Directory( entries=( DirectoryEntry( name=bytes(dir_path.encode("utf-8")), type="file", target=hash_to_bytes(unknown_content_["sha1_git"]), perms=DentryPerms.content, ), ) ) # Create a revision that points to a directory # Which points to unknown content new_revision = new_revision.to_dict() new_revision["directory"] = dir.id del new_revision["id"] new_revision = Revision.from_dict(new_revision) # Add the directory and revision in mem archive_data.directory_add([dir]) archive_data.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision.id) with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(new_revision_id, dir_path) assert e.match("Content not found for revision %s" % new_revision_id) @given(revision()) def test_lookup_directory_with_revision_ko_path_to_nowhere(revision): invalid_path = "path/to/something/unknown" with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(revision, invalid_path) assert e.match("Directory or File") assert e.match(invalid_path) assert e.match("revision %s" % revision) assert e.match("not found") @given(revision_with_submodules()) def test_lookup_directory_with_revision_submodules( archive_data, revision_with_submodules ): rev_sha1_git = revision_with_submodules["rev_sha1_git"] rev_dir_path = revision_with_submodules["rev_dir_rev_path"] actual_data = service.lookup_directory_with_revision(rev_sha1_git, rev_dir_path) revision = archive_data.revision_get(revision_with_submodules["rev_sha1_git"]) directory = archive_data.directory_ls(revision["directory"]) rev_entry = next(e for e in directory if e["name"] == rev_dir_path) expected_data = { "content": archive_data.revision_get(rev_entry["target"]), "path": rev_dir_path, "revision": rev_sha1_git, "type": "rev", } assert actual_data == expected_data @given(revision()) def test_lookup_directory_with_revision_without_path(archive_data, revision): actual_directory_entries = service.lookup_directory_with_revision(revision) revision_data = archive_data.revision_get(revision) expected_directory_entries = archive_data.directory_ls(revision_data["directory"]) assert actual_directory_entries["type"] == "dir" assert actual_directory_entries["content"] == expected_directory_entries @given(revision()) def test_lookup_directory_with_revision_with_path(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] in ("file", "dir") ] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry["name"] ) assert actual_dir_entry["type"] == expected_dir_entry["type"] assert actual_dir_entry["revision"] == revision assert actual_dir_entry["path"] == expected_dir_entry["name"] if actual_dir_entry["type"] == "file": del actual_dir_entry["content"]["checksums"]["blake2s256"] for key in ("checksums", "status", "length"): assert actual_dir_entry["content"][key] == expected_dir_entry[key] else: sub_dir_entries = archive_data.directory_ls(expected_dir_entry["target"]) assert actual_dir_entry["content"] == sub_dir_entries @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( archive_data, revision ): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] expected_dir_entry = random.choice(dir_entries) expected_data = archive_data.content_get_data( expected_dir_entry["checksums"]["sha1"] ) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry["name"], with_data=True ) assert actual_dir_entry["type"] == expected_dir_entry["type"] assert actual_dir_entry["revision"] == revision assert actual_dir_entry["path"] == expected_dir_entry["name"] del actual_dir_entry["content"]["checksums"]["blake2s256"] for key in ("checksums", "status", "length"): assert actual_dir_entry["content"][key] == expected_dir_entry[key] assert actual_dir_entry["content"]["data"] == expected_data["data"] @given(revision()) def test_lookup_revision(archive_data, revision): actual_revision = service.lookup_revision(revision) assert actual_revision == archive_data.revision_get(revision) @given(new_revision()) def test_lookup_revision_invalid_msg(archive_data, new_revision): new_revision = new_revision.to_dict() new_revision["message"] = b"elegant fix for bug \xff" archive_data.revision_add([Revision.from_dict(new_revision)]) revision = service.lookup_revision(hash_to_hex(new_revision["id"])) - assert revision["message"] is None - assert revision["message_decoding_failed"] is True + assert revision["message"] == "elegant fix for bug \\xff" + assert revision["decoding_failures"] == ["message"] @given(new_revision()) def test_lookup_revision_msg_ok(archive_data, new_revision): archive_data.revision_add([new_revision]) revision_message = service.lookup_revision_message(hash_to_hex(new_revision.id)) assert revision_message == {"message": new_revision.message} def test_lookup_revision_msg_no_rev(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_revision_message(unknown_revision_) assert e.match("Revision with sha1_git %s not found." % unknown_revision_) @given(revisions()) def test_lookup_revision_multiple(archive_data, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(archive_data.revision_get(rev)) assert actual_revisions == expected_revisions def test_lookup_revision_multiple_none_found(): unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] actual_revisions = list(service.lookup_revision_multiple(unknown_revisions_)) assert actual_revisions == [None] * len(unknown_revisions_) @given(revision()) def test_lookup_revision_log(archive_data, revision): actual_revision_log = list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = archive_data.revision_log(revision, limit=25) assert actual_revision_log == expected_revision_log def _get_origin_branches(archive_data, origin): origin_visit = archive_data.origin_visit_get(origin["url"])[-1] snapshot = archive_data.snapshot_get(origin_visit["snapshot"]) branches = { k: v for (k, v) in snapshot["branches"].items() if v["target_type"] == "revision" } return branches @given(origin()) def test_lookup_revision_log_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_log = list( service.lookup_revision_log_by(origin["url"], branch_name, None, limit=25) ) expected_log = archive_data.revision_log(branches[branch_name]["target"], limit=25) assert actual_log == expected_log @given(origin()) def test_lookup_revision_log_by_notfound(origin): with pytest.raises(NotFoundExc): service.lookup_revision_log_by( origin["url"], "unknown_branch_name", None, limit=100 ) def test_lookup_content_raw_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content_raw("sha1:" + unknown_content_["sha1"]) assert e.match( "Content with %s checksum equals to %s not found!" % ("sha1", unknown_content_["sha1"]) ) @given(content()) def test_lookup_content_raw(archive_data, content): actual_content = service.lookup_content_raw("sha256:%s" % content["sha256"]) expected_content = archive_data.content_get_data(content["sha1"]) assert actual_content == expected_content def test_lookup_content_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content("sha1:%s" % unknown_content_["sha1"]) assert e.match( "Content with %s checksum equals to %s not found!" % ("sha1", unknown_content_["sha1"]) ) @given(content()) def test_lookup_content_with_sha1(archive_data, content): actual_content = service.lookup_content(f"sha1:{content['sha1']}") expected_content = archive_data.content_get(content["sha1"]) assert actual_content == expected_content @given(content()) def test_lookup_content_with_sha256(archive_data, content): actual_content = service.lookup_content(f"sha256:{content['sha256']}") expected_content = archive_data.content_get(content["sha1"]) assert actual_content == expected_content def test_lookup_directory_bad_checksum(): with pytest.raises(BadInputExc): service.lookup_directory("directory_id") def test_lookup_directory_not_found(): unknown_directory_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory(unknown_directory_) assert e.match("Directory with sha1_git %s not found" % unknown_directory_) @given(directory()) def test_lookup_directory(archive_data, directory): actual_directory_ls = list(service.lookup_directory(directory)) expected_directory_ls = archive_data.directory_ls(directory) assert actual_directory_ls == expected_directory_ls @given(empty_directory()) def test_lookup_directory_empty(empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) assert actual_directory_ls == [] @given(origin()) def test_lookup_revision_by_nothing_found(origin): with pytest.raises(NotFoundExc): service.lookup_revision_by(origin["url"], "invalid-branch-name") @given(origin()) def test_lookup_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_revision = service.lookup_revision_by(origin["url"], branch_name) expected_revision = archive_data.revision_get(branches[branch_name]["target"]) assert actual_revision == expected_revision @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(origin, revision): with pytest.raises(NotFoundExc): service.lookup_revision_with_context_by( origin["url"], "invalid-branch-name", None, revision ) @given(origin()) def test_lookup_revision_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]["target"] root_rev_log = archive_data.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) rev = root_rev_log[-1]["id"] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin["url"], branch_name, None, rev ) expected_root_rev = archive_data.revision_get(root_rev) expected_rev = archive_data.revision_get(rev) expected_rev["children"] = children[rev] assert actual_root_rev == expected_root_rev assert actual_rev == expected_rev def test_lookup_revision_through_ko_not_implemented(): with pytest.raises(NotImplementedError): service.lookup_revision_through({"something-unknown": 10}) @given(origin()) def test_lookup_revision_through_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]["target"] root_rev_log = archive_data.revision_log(root_rev) rev = root_rev_log[-1]["id"] assert service.lookup_revision_through( { "origin_url": origin["url"], "branch_name": branch_name, "ts": None, "sha1_git": rev, } ) == service.lookup_revision_with_context_by(origin["url"], branch_name, None, rev) @given(origin()) def test_lookup_revision_through_with_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) assert service.lookup_revision_through( {"origin_url": origin["url"], "branch_name": branch_name, "ts": None,} ) == service.lookup_revision_by(origin["url"], branch_name, None) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(ancestor_revisions): sha1_git = ancestor_revisions["sha1_git"] sha1_git_root = ancestor_revisions["sha1_git_root"] assert service.lookup_revision_through( {"sha1_git_root": sha1_git_root, "sha1_git": sha1_git,} ) == service.lookup_revision_with_context(sha1_git_root, sha1_git) @given(revision()) def test_lookup_revision_through_with_revision(revision): assert service.lookup_revision_through( {"sha1_git": revision} ) == service.lookup_revision(revision) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(revision): with pytest.raises(NotFoundExc): service.lookup_directory_through_revision( {"sha1_git": revision}, "some/invalid/path" ) @given(revision()) def test_lookup_directory_through_revision_ok(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {"sha1_git": revision}, dir_entry["name"] ) == (revision, service.lookup_directory_with_revision(revision, dir_entry["name"])) @given(revision()) def test_lookup_directory_through_revision_ok_with_data(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {"sha1_git": revision}, dir_entry["name"], with_data=True ) == ( revision, service.lookup_directory_with_revision( revision, dir_entry["name"], with_data=True ), ) @given(content(), directory(), release(), revision(), snapshot()) def test_lookup_known_objects( archive_data, content, directory, release, revision, snapshot ): expected = archive_data.content_find(content) assert service.lookup_object(CONTENT, content["sha1_git"]) == expected expected = archive_data.directory_get(directory) assert service.lookup_object(DIRECTORY, directory) == expected expected = archive_data.release_get(release) assert service.lookup_object(RELEASE, release) == expected expected = archive_data.revision_get(revision) assert service.lookup_object(REVISION, revision) == expected expected = {**archive_data.snapshot_get(snapshot), "next_branch": None} assert service.lookup_object(SNAPSHOT, snapshot) == expected @given( unknown_content(), unknown_directory(), unknown_release(), unknown_revision(), unknown_snapshot(), ) def test_lookup_unknown_objects( unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ): with pytest.raises(NotFoundExc) as e: service.lookup_object(CONTENT, unknown_content["sha1_git"]) assert e.match(r"Content.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(DIRECTORY, unknown_directory) assert e.match(r"Directory.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(RELEASE, unknown_release) assert e.match(r"Release.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(REVISION, unknown_revision) assert e.match(r"Revision.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(SNAPSHOT, unknown_snapshot) assert e.match(r"Snapshot.*not found") @given(invalid_sha1()) def test_lookup_invalid_objects(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_object("foo", invalid_sha1) assert e.match("Invalid swh object type") with pytest.raises(BadInputExc) as e: service.lookup_object(CONTENT, invalid_sha1) assert e.match("Invalid hash") with pytest.raises(BadInputExc) as e: service.lookup_object(DIRECTORY, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: service.lookup_object(RELEASE, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: service.lookup_object(REVISION, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: service.lookup_object(SNAPSHOT, invalid_sha1) assert e.match("Invalid checksum") def test_lookup_missing_hashes_non_present(): missing_cnt = random_sha1() missing_dir = random_sha1() missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_swhids = { CONTENT: [hash_to_bytes(missing_cnt)], DIRECTORY: [hash_to_bytes(missing_dir)], REVISION: [hash_to_bytes(missing_rev)], RELEASE: [hash_to_bytes(missing_rel)], SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = service.lookup_missing_hashes(grouped_swhids) assert actual_result == { missing_cnt, missing_dir, missing_rev, missing_rel, missing_snp, } @given(content(), directory()) def test_lookup_missing_hashes_some_present(archive_data, content, directory): missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_swhids = { CONTENT: [hash_to_bytes(content["sha1_git"])], DIRECTORY: [hash_to_bytes(directory)], REVISION: [hash_to_bytes(missing_rev)], RELEASE: [hash_to_bytes(missing_rel)], SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = service.lookup_missing_hashes(grouped_swhids) assert actual_result == {missing_rev, missing_rel, missing_snp} @given(origin()) def test_lookup_origin_extra_trailing_slash(origin): origin_info = service.lookup_origin({"url": f"{origin['url']}/"}) assert origin_info["url"] == origin["url"] def test_lookup_origin_missing_trailing_slash(archive_data): deb_origin = Origin(url="http://snapshot.debian.org/package/r-base/") archive_data.origin_add([deb_origin]) origin_info = service.lookup_origin({"url": deb_origin.url[:-1]}) assert origin_info["url"] == deb_origin.url @given(snapshot()) def test_lookup_snapshot_branch_name_from_tip_revision(archive_data, snapshot_id): snapshot = archive_data.snapshot_get(snapshot_id) branches = [ {"name": k, "revision": v["target"]} for k, v in snapshot["branches"].items() if v["target_type"] == "revision" ] branch_info = random.choice(branches) possible_results = [ b["name"] for b in branches if b["revision"] == branch_info["revision"] ] assert ( service.lookup_snapshot_branch_name_from_tip_revision( snapshot_id, branch_info["revision"] ) in possible_results )