diff --git a/requirements-swh.txt b/requirements-swh.txt index c30b5a81..3e818225 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,7 +1,7 @@ swh.core >= 0.0.94 swh.indexer >= 0.0.170 -swh.model >= 0.0.64 +swh.model >= 0.3.0 swh.scheduler >= 0.0.72 swh.search >= 0.0.4 swh.storage >= 0.0.182 swh.vault >= 0.0.33 \ No newline at end of file diff --git a/swh/web/api/utils.py b/swh/web/api/utils.py index f5096ad7..dca7a166 100644 --- a/swh/web/api/utils.py +++ b/swh/web/api/utils.py @@ -1,335 +1,335 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, Optional, Any from django.http import HttpRequest from swh.web.common.utils import reverse, resolve_branch_alias from swh.web.common.query import parse_hash def filter_field_keys(data, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - data: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(data, map): return map(lambda x: filter_field_keys(x, field_keys), data) if isinstance(data, list): return [filter_field_keys(x, field_keys) for x in data] if isinstance(data, dict): return {k: v for (k, v) in data.items() if k in field_keys} return data def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return "".join([person["name"], " <", person["email"], ">"]) def enrich_object( object: Dict[str, str], request: Optional[HttpRequest] = None ) -> Dict[str, str]: """Enrich an object (revision, release) with link to the 'target' of type 'target_type'. Args: object: An object with target and target_type keys (e.g. release, revision) request: Absolute URIs will be generated if provided Returns: Object enriched with target object url (revision, release, content, directory) """ if "target" in object and "target_type" in object: if object["target_type"] in ("revision", "release", "directory"): object["target_url"] = reverse( "api-1-%s" % object["target_type"], url_args={"sha1_git": object["target"]}, request=request, ) elif object["target_type"] == "content": object["target_url"] = reverse( "api-1-content", url_args={"q": "sha1_git:" + object["target"]}, request=request, ) elif object["target_type"] == "snapshot": object["target_url"] = reverse( "api-1-snapshot", url_args={"snapshot_id": object["target"]}, request=request, ) return object enrich_release = enrich_object def enrich_directory( directory: Dict[str, str], request: Optional[HttpRequest] = None ) -> Dict[str, str]: """Enrich directory with url to content or directory. Args: directory: dict of data associated to a swh directory object request: Absolute URIs will be generated if provided Returns: An enriched directory dict filled with additional urls """ if "type" in directory: target_type = directory["type"] target = directory["target"] if target_type == "file": directory["target_url"] = reverse( "api-1-content", url_args={"q": "sha1_git:%s" % target}, request=request ) elif target_type == "dir": directory["target_url"] = reverse( "api-1-directory", url_args={"sha1_git": target}, request=request ) else: directory["target_url"] = reverse( "api-1-revision", url_args={"sha1_git": target}, request=request ) return directory def enrich_metadata_endpoint( content_metadata: Dict[str, str], request: Optional[HttpRequest] = None ) -> Dict[str, str]: """Enrich content metadata dict with link to the upper metadata endpoint. Args: content_metadata: dict of data associated to a swh content metadata request: Absolute URIs will be generated if provided Returns: An enriched content metadata dict filled with an additional url """ c = content_metadata c["content_url"] = reverse( "api-1-content", url_args={"q": "sha1:%s" % c["id"]}, request=request ) return c def enrich_content( content: Dict[str, Any], top_url: Optional[bool] = False, query_string: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> Dict[str, str]: """Enrich content with links to: - data_url: its raw data - filetype_url: its filetype information - language_url: its programming language information - license_url: its licensing information Args: content: dict of data associated to a swh content object top_url: whether or not to include the content url in the enriched data query_string: optional query string of type ':' used when requesting the content, it acts as a hint for picking the same hash method when computing the url listed above request: Absolute URIs will be generated if provided Returns: An enriched content dict filled with additional urls """ checksums = content if "checksums" in content: checksums = content["checksums"] hash_algo = "sha1" if query_string: hash_algo = parse_hash(query_string)[0] if hash_algo in checksums: q = "%s:%s" % (hash_algo, checksums[hash_algo]) if top_url: content["content_url"] = reverse("api-1-content", url_args={"q": q}) content["data_url"] = reverse( "api-1-content-raw", url_args={"q": q}, request=request ) content["filetype_url"] = reverse( "api-1-content-filetype", url_args={"q": q}, request=request ) content["language_url"] = reverse( "api-1-content-language", url_args={"q": q}, request=request ) content["license_url"] = reverse( "api-1-content-license", url_args={"q": q}, request=request ) return content def enrich_revision( revision: Dict[str, Any], request: Optional[HttpRequest] = None ) -> Dict[str, Any]: """Enrich revision with links where it makes sense (directory, parents). Keep track of the navigation breadcrumbs if they are specified. Args: revision: the revision as a dict request: Absolute URIs will be generated if provided Returns: An enriched revision dict filled with additional urls """ revision["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision["id"]}, request=request ) revision["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision["id"]}, request=request ) if "directory" in revision: revision["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision["directory"]}, request=request, ) if "parents" in revision: parents = [] for parent in revision["parents"]: parents.append( { "id": parent, "url": reverse( "api-1-revision", url_args={"sha1_git": parent}, request=request ), } ) - revision["parents"] = parents + revision["parents"] = tuple(parents) if "children" in revision: children = [] for child in revision["children"]: children.append( reverse("api-1-revision", url_args={"sha1_git": child}, request=request) ) revision["children_urls"] = children if "message_decoding_failed" in revision: revision["message_url"] = reverse( "api-1-revision-raw-message", url_args={"sha1_git": revision["id"]}, request=request, ) return revision def enrich_snapshot( snapshot: Dict[str, Any], request: Optional[HttpRequest] = None ) -> Dict[str, Any]: """Enrich snapshot with links to the branch targets Args: snapshot: the snapshot as a dict request: Absolute URIs will be generated if provided Returns: An enriched snapshot dict filled with additional urls """ if "branches" in snapshot: snapshot["branches"] = { k: enrich_object(v, request) if v else None for k, v in snapshot["branches"].items() } for k, v in snapshot["branches"].items(): if v and v["target_type"] == "alias": branch = resolve_branch_alias(snapshot, v) if branch: branch = enrich_object(branch, request) v["target_url"] = branch["target_url"] return snapshot def enrich_origin( origin: Dict[str, Any], request: Optional[HttpRequest] = None ) -> Dict[str, Any]: """Enrich origin dict with link to its visits Args: origin: the origin as a dict request: Absolute URIs will be generated if provided Returns: An enriched origin dict filled with an additional url """ if "url" in origin: origin["origin_visits_url"] = reverse( "api-1-origin-visits", url_args={"origin_url": origin["url"]}, request=request, ) return origin def enrich_origin_visit( origin_visit: Dict[str, Any], *, with_origin_link: bool, with_origin_visit_link: bool, request: Optional[HttpRequest] = None, ) -> Dict[str, Any]: """Enrich origin visit dict with additional links Args: origin_visit: the origin visit as a dict with_origin_link: whether to add link to origin with_origin_visit_link: whether to add link to origin visit request: Absolute URIs will be generated if provided Returns: An enriched origin visit dict filled with additional urls """ ov = origin_visit if with_origin_link: ov["origin_url"] = reverse( "api-1-origin", url_args={"origin_url": ov["origin"]}, request=request ) if with_origin_visit_link: ov["origin_visit_url"] = reverse( "api-1-origin-visit", url_args={"origin_url": ov["origin"], "visit_id": ov["visit"]}, request=request, ) snapshot = ov["snapshot"] if snapshot: ov["snapshot_url"] = reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, request=request ) else: ov["snapshot_url"] = None return ov diff --git a/swh/web/common/converters.py b/swh/web/common/converters.py index 94b668d9..12aa48f1 100644 --- a/swh/web/common/converters.py +++ b/swh/web/common/converters.py @@ -1,389 +1,391 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json from typing import Dict, Any from swh.core.utils import decode_with_escape from swh.model import hashutil from swh.web.common.typing import OriginInfo, OriginVisitInfo def _group_checksums(data): """Groups checksums values computed from hash functions used in swh and stored in data dict under a single entry 'checksums' """ if data: checksums = {} for hash in hashutil.ALGORITHMS: if hash in data and data[hash]: checksums[hash] = data[hash] del data[hash] if len(checksums) > 0: data["checksums"] = checksums def fmap(f, data): """Map f to data at each level. This must keep the origin data structure type: - map -> map - dict -> dict - list -> list - None -> None Args: f: function that expects one argument. data: data to traverse to apply the f function. list, map, dict or bare value. Returns: The same data-structure with modified values by the f function. """ if data is None: return data if isinstance(data, map): return map(lambda y: fmap(f, y), (x for x in data)) if isinstance(data, list): return [fmap(f, x) for x in data] + if isinstance(data, tuple): + return tuple(fmap(f, x) for x in data) if isinstance(data, dict): return {k: fmap(f, v) for (k, v) in data.items()} return f(data) def from_swh( dict_swh, hashess={}, bytess={}, dates={}, blacklist={}, removables_if_empty={}, empty_dict={}, empty_list={}, convert={}, convert_fn=lambda x: x, ): """Convert from a swh dictionary to something reasonably json serializable. Args: dict_swh: the origin dictionary needed to be transformed hashess: list/set of keys representing hashes values (sha1, sha256, sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal string bytess: list/set of keys representing bytes values which needs to be decoded blacklist: set of keys to filter out from the conversion convert: set of keys whose associated values need to be converted using convert_fn convert_fn: the conversion function to apply on the value of key in 'convert' The remaining keys are copied as is in the output. Returns: dictionary equivalent as dict_swh only with its keys converted. """ def convert_hashes_bytes(v): """v is supposedly a hash as bytes, returns it converted in hex. """ if isinstance(v, bytes): return hashutil.hash_to_hex(v) return v def convert_bytes(v): """v is supposedly a bytes string, decode as utf-8. FIXME: Improve decoding policy. If not utf-8, break! """ if isinstance(v, bytes): return v.decode("utf-8") return v def convert_date(v): """ Args: v (dict or datatime): either: - a dict with three keys: - timestamp (dict or integer timestamp) - offset - negative_utc - or, a datetime We convert it to a human-readable string """ if not v: return v if isinstance(v, datetime.datetime): return v.isoformat() tz = datetime.timezone(datetime.timedelta(minutes=v["offset"])) swh_timestamp = v["timestamp"] if isinstance(swh_timestamp, dict): date = datetime.datetime.fromtimestamp(swh_timestamp["seconds"], tz=tz) else: date = datetime.datetime.fromtimestamp(swh_timestamp, tz=tz) datestr = date.isoformat() if v["offset"] == 0 and v["negative_utc"]: # remove the rightmost + and replace it with a - return "-".join(datestr.rsplit("+", 1)) return datestr if not dict_swh: return dict_swh new_dict = {} for key, value in dict_swh.items(): if key in blacklist or (key in removables_if_empty and not value): continue if key in dates: new_dict[key] = convert_date(value) elif key in convert: new_dict[key] = convert_fn(value) elif isinstance(value, dict): new_dict[key] = from_swh( value, hashess=hashess, bytess=bytess, dates=dates, blacklist=blacklist, removables_if_empty=removables_if_empty, empty_dict=empty_dict, empty_list=empty_list, convert=convert, convert_fn=convert_fn, ) elif key in hashess: new_dict[key] = fmap(convert_hashes_bytes, value) elif key in bytess: try: new_dict[key] = fmap(convert_bytes, value) except UnicodeDecodeError: if "decoding_failures" not in new_dict: new_dict["decoding_failures"] = [key] else: new_dict["decoding_failures"].append(key) new_dict[key] = fmap(decode_with_escape, value) elif key in empty_dict and not value: new_dict[key] = {} elif key in empty_list and not value: new_dict[key] = [] else: new_dict[key] = value _group_checksums(new_dict) return new_dict def from_origin(origin: Dict[str, Any]) -> OriginInfo: """Convert from a swh origin to an origin dictionary. """ return from_swh(origin) def from_release(release): """Convert from a swh release to a json serializable release dictionary. Args: release (dict): dictionary with keys: - id: identifier of the revision (sha1 in bytes) - revision: identifier of the revision the release points to (sha1 in bytes) comment: release's comment message (bytes) name: release's name (string) author: release's author identifier (swh's id) synthetic: the synthetic property (boolean) Returns: dict: Release dictionary with the following keys: - id: hexadecimal sha1 (string) - revision: hexadecimal sha1 (string) - comment: release's comment message (string) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) """ return from_swh( release, hashess={"id", "target"}, bytess={"message", "name", "fullname", "email"}, dates={"date"}, ) class SWHMetadataEncoder(json.JSONEncoder): """Special json encoder for metadata field which can contain bytes encoded value. """ def default(self, obj): if isinstance(obj, bytes): try: return obj.decode("utf-8") except UnicodeDecodeError: # fallback to binary representation to avoid display errors return repr(obj) # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj) def convert_revision_metadata(metadata): """Convert json specific dict to a json serializable one. """ if not metadata: return {} return json.loads(json.dumps(metadata, cls=SWHMetadataEncoder)) def from_revision(revision): """Convert from a swh revision to a json serializable revision dictionary. Args: revision (dict): dict with keys: - id: identifier of the revision (sha1 in bytes) - directory: identifier of the directory the revision points to (sha1 in bytes) - author_name, author_email: author's revision name and email - committer_name, committer_email: committer's revision name and email - message: revision's message - date, date_offset: revision's author date - committer_date, committer_date_offset: revision's commit date - parents: list of parents for such revision - synthetic: revision's property nature - type: revision's type (git, tar or dsc at the moment) - metadata: if the revision is synthetic, this can reference dynamic properties. Returns: dict: Revision dictionary with the same keys as inputs, except: - sha1s are in hexadecimal strings (id, directory) - bytes are decoded in string (author_name, committer_name, author_email, committer_email) Remaining keys are left as is """ revision = from_swh( revision, hashess={"id", "directory", "parents", "children"}, bytess={"name", "fullname", "email"}, convert={"metadata"}, convert_fn=convert_revision_metadata, dates={"date", "committer_date"}, ) if revision: if "parents" in revision: revision["merge"] = len(revision["parents"]) > 1 if "message" in revision: try: revision["message"] = revision["message"].decode("utf-8") except UnicodeDecodeError: revision["message_decoding_failed"] = True revision["message"] = None return revision def from_content(content): """Convert swh content to serializable content dictionary. """ return from_swh( content, hashess={"sha1", "sha1_git", "sha256", "blake2s256"}, blacklist={"ctime"}, convert={"status"}, convert_fn=lambda v: "absent" if v == "hidden" else v, ) def from_person(person): """Convert swh person to serializable person dictionary. """ return from_swh(person, bytess={"name", "fullname", "email"}) def from_origin_visit(visit: Dict[str, Any]) -> OriginVisitInfo: """Convert swh origin_visit to serializable origin_visit dictionary. """ ov = from_swh( visit, hashess={"target", "snapshot"}, bytess={"branch"}, dates={"date"}, empty_dict={"metadata"}, ) return ov def from_snapshot(snapshot): """Convert swh snapshot to serializable snapshot dictionary. """ sv = from_swh(snapshot, hashess={"id", "target"}, bytess={"next_branch"}) if sv and "branches" in sv: sv["branches"] = {decode_with_escape(k): v for k, v in sv["branches"].items()} for k, v in snapshot["branches"].items(): # alias target existing branch names, not a sha1 if v and v["target_type"] == "alias": branch = decode_with_escape(k) target = decode_with_escape(v["target"]) sv["branches"][branch]["target"] = target return sv def from_directory_entry(dir_entry): """Convert swh directory to serializable directory dictionary. """ return from_swh( dir_entry, hashess={"dir_id", "sha1_git", "sha1", "sha256", "blake2s256", "target"}, bytess={"name"}, removables_if_empty={"sha1", "sha1_git", "sha256", "blake2s256", "status"}, convert={"status"}, convert_fn=lambda v: "absent" if v == "hidden" else v, ) def from_filetype(content_entry): """Convert swh content to serializable dictionary containing keys 'id', 'encoding', and 'mimetype'. """ return from_swh(content_entry, hashess={"id"}) diff --git a/swh/web/tests/api/test_utils.py b/swh/web/tests/api/test_utils.py index 5033dde7..42761f7a 100644 --- a/swh/web/tests/api/test_utils.py +++ b/swh/web/tests/api/test_utils.py @@ -1,603 +1,603 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.web.api import utils from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse, resolve_branch_alias from swh.web.tests.strategies import ( release, directory, content, revision, snapshot, origin, ) url_map = [ { "rule": "/other/", "methods": set(["GET", "POST", "HEAD"]), "endpoint": "foo", }, { "rule": "/some/old/url/", "methods": set(["GET", "POST"]), "endpoint": "blablafn", }, { "rule": "/other/old/url/", "methods": set(["GET", "HEAD"]), "endpoint": "bar", }, {"rule": "/other", "methods": set([]), "endpoint": None}, {"rule": "/other2", "methods": set([]), "endpoint": None}, ] def test_filter_field_keys_dict_unknown_keys(): actual_res = utils.filter_field_keys( {"directory": 1, "file": 2, "link": 3}, {"directory1", "file2"} ) assert actual_res == {} def test_filter_field_keys_dict(): actual_res = utils.filter_field_keys( {"directory": 1, "file": 2, "link": 3}, {"directory", "link"} ) assert actual_res == {"directory": 1, "link": 3} def test_filter_field_keys_list_unknown_keys(): actual_res = utils.filter_field_keys( [{"directory": 1, "file": 2, "link": 3}, {"1": 1, "2": 2, "link": 3}], {"d"} ) assert actual_res == [{}, {}] def test_filter_field_keys_map(): actual_res = utils.filter_field_keys( map( lambda x: {"i": x["i"] + 1, "j": x["j"]}, [{"i": 1, "j": None}, {"i": 2, "j": None}, {"i": 3, "j": None}], ), {"i"}, ) assert list(actual_res) == [{"i": 2}, {"i": 3}, {"i": 4}] def test_filter_field_keys_list(): actual_res = utils.filter_field_keys( [{"directory": 1, "file": 2, "link": 3}, {"dir": 1, "fil": 2, "lin": 3}], {"directory", "dir"}, ) assert actual_res == [{"directory": 1}, {"dir": 1}] def test_filter_field_keys_other(): input_set = {1, 2} actual_res = utils.filter_field_keys(input_set, {"a", "1"}) assert actual_res == input_set def test_person_to_string(): assert ( utils.person_to_string({"name": "raboof", "email": "foo@bar"}) == "raboof " ) def test_enrich_release_empty(): actual_release = utils.enrich_release({}) assert actual_release == {} @given(release()) def test_enrich_release_content_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "content" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{release_data["target"]}'}, request=request, ) assert actual_release == release_data @given(release()) def test_enrich_release_directory_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "directory" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-directory", url_args={"sha1_git": release_data["target"]}, request=request, ) assert actual_release == release_data @given(release()) def test_enrich_release_revision_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "revision" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-revision", url_args={"sha1_git": release_data["target"]}, request=request ) assert actual_release == release_data @given(release()) def test_enrich_release_release_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "release" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-release", url_args={"sha1_git": release_data["target"]}, request=request ) assert actual_release == release_data def test_enrich_directory_no_type(): assert utils.enrich_directory({"id": "dir-id"}) == {"id": "dir-id"} @given(directory()) def test_enrich_directory_with_type(api_request_factory, archive_data, directory): dir_content = archive_data.directory_ls(directory) dir_entry = random.choice(dir_content) url = reverse("api-1-directory", url_args={"sha1_git": directory}) request = api_request_factory.get(url) actual_directory = utils.enrich_directory(dir_entry, request) if dir_entry["type"] == "file": dir_entry["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{dir_entry["target"]}'}, request=request, ) elif dir_entry["type"] == "dir": dir_entry["target_url"] = reverse( "api-1-directory", url_args={"sha1_git": dir_entry["target"]}, request=request, ) elif dir_entry["type"] == "rev": dir_entry["target_url"] = reverse( "api-1-revision", url_args={"sha1_git": dir_entry["target"]}, request=request, ) assert actual_directory == dir_entry def test_enrich_content_without_hashes(): assert utils.enrich_content({"id": "123"}) == {"id": "123"} @given(content()) def test_enrich_content_with_hashes(api_request_factory, content): for algo in DEFAULT_ALGORITHMS: content_data = dict(content) query_string = "%s:%s" % (algo, content_data[algo]) url = reverse("api-1-content", url_args={"q": query_string}) request = api_request_factory.get(url) enriched_content = utils.enrich_content( content_data, query_string=query_string, request=request ) content_data["data_url"] = reverse( "api-1-content-raw", url_args={"q": query_string}, request=request ) content_data["filetype_url"] = reverse( "api-1-content-filetype", url_args={"q": query_string}, request=request ) content_data["language_url"] = reverse( "api-1-content-language", url_args={"q": query_string}, request=request ) content_data["license_url"] = reverse( "api-1-content-license", url_args={"q": query_string}, request=request ) assert enriched_content == content_data @given(content()) def test_enrich_content_with_hashes_and_top_level_url(api_request_factory, content): for algo in DEFAULT_ALGORITHMS: content_data = dict(content) query_string = "%s:%s" % (algo, content_data[algo]) url = reverse("api-1-content", url_args={"q": query_string}) request = api_request_factory.get(url) enriched_content = utils.enrich_content( content_data, query_string=query_string, top_url=True, request=request ) content_data["content_url"] = reverse( "api-1-content", url_args={"q": query_string}, request=request ) content_data["data_url"] = reverse( "api-1-content-raw", url_args={"q": query_string}, request=request ) content_data["filetype_url"] = reverse( "api-1-content-filetype", url_args={"q": query_string}, request=request ) content_data["language_url"] = reverse( "api-1-content-language", url_args={"q": query_string}, request=request ) content_data["license_url"] = reverse( "api-1-content-license", url_args={"q": query_string}, request=request ) assert enriched_content == content_data @given(revision()) def test_enrich_revision_without_children_or_parent( api_request_factory, archive_data, revision ): revision_data = archive_data.revision_get(revision) del revision_data["parents"] url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_with_children_and_parent_no_dir( api_request_factory, archive_data, revision, parent_revision, child_revision ): revision_data = archive_data.revision_get(revision) del revision_data["directory"] - revision_data["parents"].append(parent_revision) + revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) - revision_data["parents"] = [ + revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] - ] + ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_no_context( api_request_factory, revision, parent_revision, child_revision ): revision_data = { "id": revision, "parents": [parent_revision], "children": [child_revision], } url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) - revision_data["parents"] = [ + revision_data["parents"] = tuple( { "id": parent_revision, "url": reverse( "api-1-revision", url_args={"sha1_git": parent_revision}, request=request, ), } - ] + ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_with_no_message( api_request_factory, archive_data, revision, parent_revision, child_revision ): revision_data = archive_data.revision_get(revision) revision_data["message"] = None - revision_data["parents"].append(parent_revision) + revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) - revision_data["parents"] = [ + revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] - ] + ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_with_invalid_message( api_request_factory, archive_data, revision, parent_revision, child_revision ): revision_data = archive_data.revision_get(revision) revision_data["message"] = None revision_data["message_decoding_failed"] = (True,) - revision_data["parents"].append(parent_revision) + revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["message_url"] = reverse( "api-1-revision-raw-message", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) - revision_data["parents"] = [ + revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] - ] + ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(snapshot()) def test_enrich_snapshot(api_request_factory, archive_data, snapshot): snapshot_data = archive_data.snapshot_get(snapshot) url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) request = api_request_factory.get(url) actual_snapshot = utils.enrich_snapshot(snapshot_data, request) for _, b in snapshot_data["branches"].items(): if b["target_type"] in ("directory", "revision", "release"): b["target_url"] = reverse( f'api-1-{b["target_type"]}', url_args={"sha1_git": b["target"]}, request=request, ) elif b["target_type"] == "content": b["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{b["target"]}'}, request=request, ) for _, b in snapshot_data["branches"].items(): if b["target_type"] == "alias": target = resolve_branch_alias(snapshot_data, b) b["target_url"] = target["target_url"] assert actual_snapshot == snapshot_data @given(origin()) def test_enrich_origin(api_request_factory, archive_data, origin): url = reverse("api-1-origin", url_args={"origin_url": origin["url"]}) request = api_request_factory.get(url) origin_data = {"url": origin["url"]} actual_origin = utils.enrich_origin(origin_data, request) origin_data["origin_visits_url"] = reverse( "api-1-origin-visits", url_args={"origin_url": origin["url"]}, request=request ) assert actual_origin == origin_data @given(origin()) def test_enrich_origin_visit(api_request_factory, archive_data, origin): origin_visit = random.choice(get_origin_visits(origin)) url = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": origin_visit["visit"]}, ) request = api_request_factory.get(url) actual_origin_visit = utils.enrich_origin_visit( origin_visit, with_origin_link=True, with_origin_visit_link=True, request=request, ) origin_visit["origin_url"] = reverse( "api-1-origin", url_args={"origin_url": origin["url"]}, request=request ) origin_visit["origin_visit_url"] = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": origin_visit["visit"]}, request=request, ) origin_visit["snapshot_url"] = reverse( "api-1-snapshot", url_args={"snapshot_id": origin_visit["snapshot"]}, request=request, ) assert actual_origin_visit == origin_visit diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index 7a341907..33fd4df8 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,950 +1,950 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import pytest import random from collections import defaultdict from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.from_disk import DentryPerms from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT from swh.model.model import Directory, DirectoryEntry, Origin, Revision from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.tests.data import random_sha1, random_content from swh.web.tests.strategies import ( content, unknown_content, contents, unknown_contents, contents_with_ctags, origin, new_origin, visit_dates, directory, unknown_directory, release, unknown_release, revision, unknown_revision, revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, revision_with_submodules, empty_directory, new_revision, snapshot, unknown_snapshot, ) from swh.web.tests.conftest import ctags_json_missing, fossology_missing @given(contents()) def test_lookup_multiple_hashes_all_present(contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({"sha1": cnt["sha1"]}) expected_output.append({"sha1": cnt["sha1"], "found": True}) assert service.lookup_multiple_hashes(input_data) == expected_output @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({"sha1": cnt["sha1"]}) expected_output.append({"sha1": cnt["sha1"], "found": cnt in contents}) assert service.lookup_multiple_hashes(input_data) == expected_output def test_lookup_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.lookup_hash("sha1_git:%s" % unknown_content_["sha1_git"]) assert actual_lookup == {"found": None, "algo": "sha1_git"} @given(content()) def test_lookup_hash_exist(archive_data, content): actual_lookup = service.lookup_hash("sha1:%s" % content["sha1"]) content_metadata = archive_data.content_get_metadata(content["sha1"]) assert {"found": content_metadata, "algo": "sha1"} == actual_lookup def test_search_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.search_hash("sha1_git:%s" % unknown_content_["sha1_git"]) assert {"found": False} == actual_lookup @given(content()) def test_search_hash_exist(content): actual_lookup = service.search_hash("sha1:%s" % content["sha1"]) assert {"found": True} == actual_lookup @pytest.mark.skipif( ctags_json_missing, reason="requires ctags with json output support" ) @given(contents_with_ctags()) def test_lookup_content_ctags(indexer_data, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags["sha1s"]) indexer_data.content_add_ctags(content_sha1) actual_ctags = list(service.lookup_content_ctags("sha1:%s" % content_sha1)) expected_data = list(indexer_data.content_get_ctags(content_sha1)) for ctag in expected_data: ctag["id"] = content_sha1 assert actual_ctags == expected_data def test_lookup_content_ctags_no_hash(): unknown_content_ = random_content() actual_ctags = list( service.lookup_content_ctags("sha1:%s" % unknown_content_["sha1"]) ) assert actual_ctags == [] @given(content()) def test_lookup_content_filetype(indexer_data, content): indexer_data.content_add_mimetype(content["sha1"]) actual_filetype = service.lookup_content_filetype(content["sha1"]) expected_filetype = indexer_data.content_get_mimetype(content["sha1"]) assert actual_filetype == expected_filetype @pytest.mark.skip # Language indexer is disabled. @given(content()) def test_lookup_content_language(indexer_data, content): indexer_data.content_add_language(content["sha1"]) actual_language = service.lookup_content_language(content["sha1"]) expected_language = indexer_data.content_get_language(content["sha1"]) assert actual_language == expected_language @given(contents_with_ctags()) def test_lookup_expression(indexer_data, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags["sha1s"]: if len(expected_ctags) == per_page: break indexer_data.content_add_ctags(content_sha1) for ctag in indexer_data.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag["name"] == contents_with_ctags["symbol_name"]: del ctag["id"] ctag["sha1"] = content_sha1 expected_ctags.append(ctag) actual_ctags = list( service.lookup_expression( contents_with_ctags["symbol_name"], last_sha1=None, per_page=10 ) ) assert actual_ctags == expected_ctags def test_lookup_expression_no_result(): expected_ctags = [] actual_ctags = list( service.lookup_expression("barfoo", last_sha1=None, per_page=10) ) assert actual_ctags == expected_ctags @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(indexer_data, content): indexer_data.content_add_license(content["sha1"]) actual_license = service.lookup_content_license(content["sha1"]) expected_license = indexer_data.content_get_license(content["sha1"]) assert actual_license == expected_license def test_stat_counters(archive_data): actual_stats = service.stat_counters() assert actual_stats == archive_data.stat_counters() @given(new_origin(), visit_dates()) def test_lookup_origin_visits(archive_data, new_origin, visit_dates): archive_data.origin_add_one(new_origin) for ts in visit_dates: archive_data.origin_visit_add(new_origin.url, ts, type="git") actual_origin_visits = list( service.lookup_origin_visits(new_origin.url, per_page=100) ) expected_visits = archive_data.origin_visit_get(new_origin.url) for expected_visit in expected_visits: expected_visit["origin"] = new_origin.url assert actual_origin_visits == expected_visits @given(new_origin(), visit_dates()) def test_lookup_origin_visit(archive_data, new_origin, visit_dates): archive_data.origin_add_one(new_origin) visits = [] for ts in visit_dates: visits.append(archive_data.origin_visit_add(new_origin.url, ts, type="git")) visit = random.choice(visits).visit actual_origin_visit = service.lookup_origin_visit(new_origin.url, visit) expected_visit = dict(archive_data.origin_visit_get_by(new_origin.url, visit)) assert actual_origin_visit == expected_visit @given(new_origin()) def test_lookup_origin(archive_data, new_origin): archive_data.origin_add_one(new_origin) actual_origin = service.lookup_origin({"url": new_origin.url}) expected_origin = archive_data.origin_get({"url": new_origin.url}) assert actual_origin == expected_origin @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_release(invalid_sha1) assert e.match("Invalid checksum") @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(sha256): with pytest.raises(BadInputExc) as e: service.lookup_release(sha256) assert e.match("Only sha1_git is supported.") @given(directory()) def test_lookup_directory_with_path_not_found(directory): path = "some/invalid/path/here" with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_path(directory, path) assert e.match("Directory entry with path %s from %s not found" % (path, directory)) @given(directory()) def test_lookup_directory_with_path_found(archive_data, directory): directory_content = archive_data.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry["name"] actual_result = service.lookup_directory_with_path(directory, path) assert actual_result == directory_entry @given(release()) def test_lookup_release(archive_data, release): actual_release = service.lookup_release(release) assert actual_release == archive_data.release_get(release) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Invalid checksum query string") sha1_git = sha256 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Only sha1_git is supported") @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( revision, unknown_revision ): sha1_git_root = revision sha1_git = unknown_revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Revision %s not found" % sha1_git) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( revision, unknown_revision ): sha1_git_root = unknown_revision sha1_git = revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Revision root %s not found" % sha1_git_root) @given(ancestor_revisions()) def test_lookup_revision_with_context(archive_data, ancestor_revisions): sha1_git = ancestor_revisions["sha1_git"] root_sha1_git = ancestor_revisions["sha1_git_root"] for sha1_git_root in (root_sha1_git, {"id": hash_to_bytes(root_sha1_git)}): actual_revision = service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in archive_data.revision_log(root_sha1_git): for p_rev in rev["parents"]: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev["id"]) expected_revision = archive_data.revision_get(sha1_git) expected_revision["children"] = children assert actual_revision == expected_revision @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(non_ancestor_revisions): sha1_git = non_ancestor_revisions["sha1_git"] root_sha1_git = non_ancestor_revisions["sha1_git_root"] with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(root_sha1_git, sha1_git) assert e.match("Revision %s is not an ancestor of %s" % (sha1_git, root_sha1_git)) def test_lookup_directory_with_revision_not_found(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(unknown_revision_) assert e.match("Revision %s not found" % unknown_revision_) @given(new_revision()) def test_lookup_directory_with_revision_unknown_content(archive_data, new_revision): unknown_content_ = random_content() dir_path = "README.md" # A directory that points to unknown content dir = Directory( - entries=[ + entries=( DirectoryEntry( name=bytes(dir_path.encode("utf-8")), type="file", target=hash_to_bytes(unknown_content_["sha1_git"]), perms=DentryPerms.content, - ) - ] + ), + ) ) # Create a revision that points to a directory # Which points to unknown content new_revision = new_revision.to_dict() new_revision["directory"] = dir.id del new_revision["id"] new_revision = Revision.from_dict(new_revision) # Add the directory and revision in mem archive_data.directory_add([dir]) archive_data.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision.id) with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(new_revision_id, dir_path) assert e.match("Content not found for revision %s" % new_revision_id) @given(revision()) def test_lookup_directory_with_revision_ko_path_to_nowhere(revision): invalid_path = "path/to/something/unknown" with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(revision, invalid_path) assert e.match("Directory or File") assert e.match(invalid_path) assert e.match("revision %s" % revision) assert e.match("not found") @given(revision_with_submodules()) def test_lookup_directory_with_revision_submodules( archive_data, revision_with_submodules ): rev_sha1_git = revision_with_submodules["rev_sha1_git"] rev_dir_path = revision_with_submodules["rev_dir_rev_path"] actual_data = service.lookup_directory_with_revision(rev_sha1_git, rev_dir_path) revision = archive_data.revision_get(revision_with_submodules["rev_sha1_git"]) directory = archive_data.directory_ls(revision["directory"]) rev_entry = next(e for e in directory if e["name"] == rev_dir_path) expected_data = { "content": archive_data.revision_get(rev_entry["target"]), "path": rev_dir_path, "revision": rev_sha1_git, "type": "rev", } assert actual_data == expected_data @given(revision()) def test_lookup_directory_with_revision_without_path(archive_data, revision): actual_directory_entries = service.lookup_directory_with_revision(revision) revision_data = archive_data.revision_get(revision) expected_directory_entries = archive_data.directory_ls(revision_data["directory"]) assert actual_directory_entries["type"] == "dir" assert actual_directory_entries["content"] == expected_directory_entries @given(revision()) def test_lookup_directory_with_revision_with_path(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] in ("file", "dir") ] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry["name"] ) assert actual_dir_entry["type"] == expected_dir_entry["type"] assert actual_dir_entry["revision"] == revision assert actual_dir_entry["path"] == expected_dir_entry["name"] if actual_dir_entry["type"] == "file": del actual_dir_entry["content"]["checksums"]["blake2s256"] for key in ("checksums", "status", "length"): assert actual_dir_entry["content"][key] == expected_dir_entry[key] else: sub_dir_entries = archive_data.directory_ls(expected_dir_entry["target"]) assert actual_dir_entry["content"] == sub_dir_entries @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( archive_data, revision ): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] expected_dir_entry = random.choice(dir_entries) expected_data = archive_data.content_get(expected_dir_entry["checksums"]["sha1"]) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry["name"], with_data=True ) assert actual_dir_entry["type"] == expected_dir_entry["type"] assert actual_dir_entry["revision"] == revision assert actual_dir_entry["path"] == expected_dir_entry["name"] del actual_dir_entry["content"]["checksums"]["blake2s256"] for key in ("checksums", "status", "length"): assert actual_dir_entry["content"][key] == expected_dir_entry[key] assert actual_dir_entry["content"]["data"] == expected_data["data"] @given(revision()) def test_lookup_revision(archive_data, revision): actual_revision = service.lookup_revision(revision) assert actual_revision == archive_data.revision_get(revision) @given(new_revision()) def test_lookup_revision_invalid_msg(archive_data, new_revision): new_revision = new_revision.to_dict() new_revision["message"] = b"elegant fix for bug \xff" archive_data.revision_add([Revision.from_dict(new_revision)]) revision = service.lookup_revision(hash_to_hex(new_revision["id"])) assert revision["message"] is None assert revision["message_decoding_failed"] is True @given(new_revision()) def test_lookup_revision_msg_ok(archive_data, new_revision): archive_data.revision_add([new_revision]) revision_message = service.lookup_revision_message(hash_to_hex(new_revision.id)) assert revision_message == {"message": new_revision.message} def test_lookup_revision_msg_no_rev(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_revision_message(unknown_revision_) assert e.match("Revision with sha1_git %s not found." % unknown_revision_) @given(revisions()) def test_lookup_revision_multiple(archive_data, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(archive_data.revision_get(rev)) assert actual_revisions == expected_revisions def test_lookup_revision_multiple_none_found(): unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] actual_revisions = list(service.lookup_revision_multiple(unknown_revisions_)) assert actual_revisions == [None] * len(unknown_revisions_) @given(revision()) def test_lookup_revision_log(archive_data, revision): actual_revision_log = list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = archive_data.revision_log(revision, limit=25) assert actual_revision_log == expected_revision_log def _get_origin_branches(archive_data, origin): origin_visit = archive_data.origin_visit_get(origin["url"])[-1] snapshot = archive_data.snapshot_get(origin_visit["snapshot"]) branches = { k: v for (k, v) in snapshot["branches"].items() if v["target_type"] == "revision" } return branches @given(origin()) def test_lookup_revision_log_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_log = list( service.lookup_revision_log_by(origin["url"], branch_name, None, limit=25) ) expected_log = archive_data.revision_log(branches[branch_name]["target"], limit=25) assert actual_log == expected_log @given(origin()) def test_lookup_revision_log_by_notfound(origin): with pytest.raises(NotFoundExc): service.lookup_revision_log_by( origin["url"], "unknown_branch_name", None, limit=100 ) def test_lookup_content_raw_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content_raw("sha1:" + unknown_content_["sha1"]) assert e.match( "Content with %s checksum equals to %s not found!" % ("sha1", unknown_content_["sha1"]) ) @given(content()) def test_lookup_content_raw(archive_data, content): actual_content = service.lookup_content_raw("sha256:%s" % content["sha256"]) expected_content = archive_data.content_get(content["sha1"]) assert actual_content == expected_content def test_lookup_content_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content("sha1:%s" % unknown_content_["sha1"]) assert e.match( "Content with %s checksum equals to %s not found!" % ("sha1", unknown_content_["sha1"]) ) @given(content()) def test_lookup_content_with_sha1(archive_data, content): actual_content = service.lookup_content("sha1:%s" % content["sha1"]) expected_content = archive_data.content_get_metadata(content["sha1"]) assert actual_content == expected_content @given(content()) def test_lookup_content_with_sha256(archive_data, content): actual_content = service.lookup_content("sha256:%s" % content["sha256"]) expected_content = archive_data.content_get_metadata(content["sha1"]) assert actual_content == expected_content def test_lookup_directory_bad_checksum(): with pytest.raises(BadInputExc): service.lookup_directory("directory_id") def test_lookup_directory_not_found(): unknown_directory_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory(unknown_directory_) assert e.match("Directory with sha1_git %s not found" % unknown_directory_) @given(directory()) def test_lookup_directory(archive_data, directory): actual_directory_ls = list(service.lookup_directory(directory)) expected_directory_ls = archive_data.directory_ls(directory) assert actual_directory_ls == expected_directory_ls @given(empty_directory()) def test_lookup_directory_empty(empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) assert actual_directory_ls == [] @given(origin()) def test_lookup_revision_by_nothing_found(origin): with pytest.raises(NotFoundExc): service.lookup_revision_by(origin["url"], "invalid-branch-name") @given(origin()) def test_lookup_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_revision = service.lookup_revision_by(origin["url"], branch_name) expected_revision = archive_data.revision_get(branches[branch_name]["target"]) assert actual_revision == expected_revision @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(origin, revision): with pytest.raises(NotFoundExc): service.lookup_revision_with_context_by( origin["url"], "invalid-branch-name", None, revision ) @given(origin()) def test_lookup_revision_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]["target"] root_rev_log = archive_data.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) rev = root_rev_log[-1]["id"] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin["url"], branch_name, None, rev ) expected_root_rev = archive_data.revision_get(root_rev) expected_rev = archive_data.revision_get(rev) expected_rev["children"] = children[rev] assert actual_root_rev == expected_root_rev assert actual_rev == expected_rev def test_lookup_revision_through_ko_not_implemented(): with pytest.raises(NotImplementedError): service.lookup_revision_through({"something-unknown": 10}) @given(origin()) def test_lookup_revision_through_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]["target"] root_rev_log = archive_data.revision_log(root_rev) rev = root_rev_log[-1]["id"] assert service.lookup_revision_through( { "origin_url": origin["url"], "branch_name": branch_name, "ts": None, "sha1_git": rev, } ) == service.lookup_revision_with_context_by(origin["url"], branch_name, None, rev) @given(origin()) def test_lookup_revision_through_with_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) assert service.lookup_revision_through( {"origin_url": origin["url"], "branch_name": branch_name, "ts": None,} ) == service.lookup_revision_by(origin["url"], branch_name, None) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(ancestor_revisions): sha1_git = ancestor_revisions["sha1_git"] sha1_git_root = ancestor_revisions["sha1_git_root"] assert service.lookup_revision_through( {"sha1_git_root": sha1_git_root, "sha1_git": sha1_git,} ) == service.lookup_revision_with_context(sha1_git_root, sha1_git) @given(revision()) def test_lookup_revision_through_with_revision(revision): assert service.lookup_revision_through( {"sha1_git": revision} ) == service.lookup_revision(revision) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(revision): with pytest.raises(NotFoundExc): service.lookup_directory_through_revision( {"sha1_git": revision}, "some/invalid/path" ) @given(revision()) def test_lookup_directory_through_revision_ok(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {"sha1_git": revision}, dir_entry["name"] ) == (revision, service.lookup_directory_with_revision(revision, dir_entry["name"])) @given(revision()) def test_lookup_directory_through_revision_ok_with_data(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {"sha1_git": revision}, dir_entry["name"], with_data=True ) == ( revision, service.lookup_directory_with_revision( revision, dir_entry["name"], with_data=True ), ) @given(content(), directory(), release(), revision(), snapshot()) def test_lookup_known_objects( archive_data, content, directory, release, revision, snapshot ): expected = archive_data.content_find(content) assert service.lookup_object(CONTENT, content["sha1_git"]) == expected expected = archive_data.directory_get(directory) assert service.lookup_object(DIRECTORY, directory) == expected expected = archive_data.release_get(release) assert service.lookup_object(RELEASE, release) == expected expected = archive_data.revision_get(revision) assert service.lookup_object(REVISION, revision) == expected expected = archive_data.snapshot_get(snapshot) assert service.lookup_object(SNAPSHOT, snapshot) == expected @given( unknown_content(), unknown_directory(), unknown_release(), unknown_revision(), unknown_snapshot(), ) def test_lookup_unknown_objects( unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ): with pytest.raises(NotFoundExc) as e: service.lookup_object(CONTENT, unknown_content["sha1_git"]) assert e.match(r"Content.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(DIRECTORY, unknown_directory) assert e.match(r"Directory.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(RELEASE, unknown_release) assert e.match(r"Release.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(REVISION, unknown_revision) assert e.match(r"Revision.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(SNAPSHOT, unknown_snapshot) assert e.match(r"Snapshot.*not found") @given(invalid_sha1()) def test_lookup_invalid_objects(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_object("foo", invalid_sha1) assert e.match("Invalid swh object type") with pytest.raises(BadInputExc) as e: service.lookup_object(CONTENT, invalid_sha1) assert e.match("Invalid hash") with pytest.raises(BadInputExc) as e: service.lookup_object(DIRECTORY, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: service.lookup_object(RELEASE, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: service.lookup_object(REVISION, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: service.lookup_object(SNAPSHOT, invalid_sha1) assert e.match("Invalid checksum") def test_lookup_missing_hashes_non_present(): missing_cnt = random_sha1() missing_dir = random_sha1() missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_pids = { CONTENT: [hash_to_bytes(missing_cnt)], DIRECTORY: [hash_to_bytes(missing_dir)], REVISION: [hash_to_bytes(missing_rev)], RELEASE: [hash_to_bytes(missing_rel)], SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = service.lookup_missing_hashes(grouped_pids) assert actual_result == { missing_cnt, missing_dir, missing_rev, missing_rel, missing_snp, } @given(content(), directory()) def test_lookup_missing_hashes_some_present(archive_data, content, directory): missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_pids = { CONTENT: [hash_to_bytes(content["sha1_git"])], DIRECTORY: [hash_to_bytes(directory)], REVISION: [hash_to_bytes(missing_rev)], RELEASE: [hash_to_bytes(missing_rel)], SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = service.lookup_missing_hashes(grouped_pids) assert actual_result == {missing_rev, missing_rel, missing_snp} @given(origin()) def test_lookup_origin_extra_trailing_slash(origin): origin_info = service.lookup_origin({"url": f"{origin['url']}/"}) assert origin_info["url"] == origin["url"] def test_lookup_origin_missing_trailing_slash(archive_data): deb_origin = Origin(url="http://snapshot.debian.org/package/r-base/") archive_data.origin_add_one(deb_origin) origin_info = service.lookup_origin({"url": deb_origin.url[:-1]}) assert origin_info["url"] == deb_origin.url diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index b2f7c5fb..c5fb392a 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,353 +1,353 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os import random from copy import deepcopy from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.ctags import CtagsIndexer from swh.indexer.storage import get_indexer_storage from swh.model.model import Content from swh.model.hashutil import hash_to_hex, hash_to_bytes, DEFAULT_ALGORITHMS from swh.model.model import Directory, Origin from swh.loader.git.from_disk import GitLoaderFromArchive from swh.search import get_search from swh.storage.algos.dir_iterators import dir_iterator from swh.web import config from swh.web.browse.utils import ( get_mimetype_and_encoding_for_content, prepare_content_for_display, _re_encode_content, ) from swh.web.common import service # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { "storage": {"cls": "memory",}, "save_data": False, "max_content_size": 100 * 1024 * 1024, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { "storage": {"cls": "memory"}, "objstorage": {"cls": "memory", "args": {},}, "indexer_storage": {"cls": "memory", "args": {},}, } def random_sha1(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) def random_sha256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_blake2s256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_content(): return { "sha1": random_sha1(), "sha1_git": random_sha1(), "sha256": random_sha256(), "blake2s256": random_blake2s256(), } # MimetypeIndexer with custom configuration for tests class _MimetypeIndexer(MimetypeIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, "tools": { "name": "file", "version": "1:5.30-1+deb9u1", "configuration": {"type": "library", "debian-package": "python3-magic"}, }, } # FossologyLicenseIndexer with custom configuration for tests class _FossologyLicenseIndexer(FossologyLicenseIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, "workdir": "/tmp/swh/indexer.fossology.license", "tools": { "name": "nomos", "version": "3.1.0rc2-31-ga2cbb8c", "configuration": {"command_line": "nomossa ",}, }, } # CtagsIndexer with custom configuration for tests class _CtagsIndexer(CtagsIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, "workdir": "/tmp/swh/indexer.ctags", "languages": {"c": "c"}, "tools": { "name": "universal-ctags", "version": "~git7859817b", "configuration": { "command_line": """ctags --fields=+lnz --sort=no --links=no """ """--output-format=json """ }, }, } # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { "type": "git", "url": "https://github.com/wcoder/highlightjs-line-numbers.js", "archives": [ "highlightjs-line-numbers.js.zip", "highlightjs-line-numbers.js_visit2.zip", ], "visit_date": ["Dec 1 2018, 01:00 UTC", "Jan 20 2019, 15:00 UTC"], }, { "type": "git", "url": "https://github.com/memononen/libtess2", "archives": ["libtess2.zip"], "visit_date": ["May 25 2018, 01:00 UTC"], }, { "type": "git", "url": "repo_with_submodules", "archives": ["repo_with_submodules.tgz"], "visit_date": ["Jan 1 2019, 01:00 UTC"], }, ] _contents = {} def _add_extra_contents(storage, contents): pbm_image_data = b"""P1 # PBM example 24 7 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 1 0 0 1 1 1 0 0 0 1 1 1 0 0 0 1 1 1 0 0 0 1 1 1 1 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0""" # add file with mimetype image/x-portable-bitmap in the archive content pbm_content = Content.from_data(pbm_image_data) storage.content_add([pbm_content]) contents.add(pbm_content.sha1) # Tests data initialization def _init_tests_data(): # To hold reference to the memory storage storage = None # Create search instance search = get_search("memory", {}) search.initialize() search.origin_update({"url": origin["url"]} for origin in _TEST_ORIGINS) # Load git repositories from archives for origin in _TEST_ORIGINS: for i, archive in enumerate(origin["archives"]): origin_repo_archive = os.path.join( os.path.dirname(__file__), "resources/repos/%s" % archive ) loader = GitLoaderFromArchive( origin["url"], archive_path=origin_repo_archive, config=_TEST_LOADER_CONFIG, visit_date=origin["visit_date"][i], ) if storage is None: storage = loader.storage else: loader.storage = storage loader.load() origin.update(storage.origin_get(origin)) # add an 'id' key if enabled search.origin_update([{"url": origin["url"], "has_visits": True}]) for i in range(250): url = "https://many.origins/%d" % (i + 1) # storage.origin_add([{'url': url}]) storage.origin_add([Origin(url=url)]) search.origin_update([{"url": url, "has_visits": True}]) visit = storage.origin_visit_add(url, "2019-12-03 13:55:05Z", "tar") storage.origin_visit_update( url, visit.visit, status="full", snapshot=hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e"), ) contents = set() directories = set() revisions = set() releases = set() snapshots = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: snp = storage.snapshot_get_latest(origin["url"]) snapshots.add(hash_to_hex(snp["id"])) for branch_name, branch_data in snp["branches"].items(): if branch_data["target_type"] == "revision": revisions.add(branch_data["target"]) elif branch_data["target_type"] == "release": release = next(storage.release_get([branch_data["target"]])) revisions.add(release["target"]) releases.add(hash_to_hex(branch_data["target"])) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): dir_id = rev["directory"] directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): if entry["type"] == "file": contents.add(entry["sha1"]) content_path[entry["sha1"]] = "/".join( [hash_to_hex(dir_id), entry["path"].decode("utf-8")] ) elif entry["type"] == "dir": directories.add(hash_to_hex(entry["target"])) _add_extra_contents(storage, contents) # Get all checksums for each content result = storage.content_get_metadata(contents) contents = [] for sha1, contents_metadata in result.items(): sha1 = contents_metadata[0]["sha1"] content_metadata = { algo: hash_to_hex(contents_metadata[0][algo]) for algo in DEFAULT_ALGORITHMS } path = "" if sha1 in content_path: path = content_path[sha1] cnt = next(storage.content_get([sha1])) mimetype, encoding = get_mimetype_and_encoding_for_content(cnt["data"]) _, _, cnt["data"] = _re_encode_content(mimetype, encoding, cnt["data"]) content_display_data = prepare_content_for_display(cnt["data"], mimetype, path) content_metadata.update( { "path": path, "mimetype": mimetype, "encoding": encoding, "hljs_language": content_display_data["language"], "data": content_display_data["content_data"], } ) _contents[hash_to_hex(sha1)] = content_metadata contents.append(content_metadata) # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage("memory", {}) # Add the empty directory to the test archive - storage.directory_add([Directory(entries=[])]) + storage.directory_add([Directory(entries=())]) # Return tests data return { "search": search, "storage": storage, "idx_storage": idx_storage, "origins": _TEST_ORIGINS, "contents": contents, "directories": list(directories), "releases": list(releases), "revisions": list(map(hash_to_hex, revisions)), "snapshots": list(snapshots), "generated_checksums": set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} for idx_name, idx_class in ( ("mimetype_indexer", _MimetypeIndexer), ("license_indexer", _FossologyLicenseIndexer), ("ctags_indexer", _CtagsIndexer), ): idx = idx_class() idx.storage = tests_data["storage"] idx.objstorage = tests_data["storage"].objstorage idx.idx_storage = tests_data["idx_storage"] idx.register_tools(idx.config["tools"]) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage, search): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update( {"storage": storage, "indexer_storage": idx_storage, "search": search,} ) service.storage = storage service.idx_storage = idx_storage service.search = search