diff --git a/swh/web/tests/api/views/test_snapshot.py b/swh/web/tests/api/views/test_snapshot.py index 25f6e035..9096e264 100644 --- a/swh/web/tests/api/views/test_snapshot.py +++ b/swh/web/tests/api/views/test_snapshot.py @@ -1,165 +1,165 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.model.hashutil import hash_to_hex from swh.model.model import Snapshot from swh.web.api.utils import enrich_snapshot from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import snapshot, new_snapshot @given(snapshot()) def test_api_snapshot(api_client, archive_data, snapshot): url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv["Content-Type"] == "application/json" - expected_data = archive_data.snapshot_get(snapshot) + expected_data = {**archive_data.snapshot_get(snapshot), "next_branch": None} expected_data = enrich_snapshot(expected_data, rv.wsgi_request) assert rv.data == expected_data @given(snapshot()) def test_api_snapshot_paginated(api_client, archive_data, snapshot): branches_offset = 0 branches_count = 2 snapshot_branches = [] for k, v in sorted(archive_data.snapshot_get(snapshot)["branches"].items()): snapshot_branches.append( {"name": k, "target_type": v["target_type"], "target": v["target"]} ) whole_snapshot = {"id": snapshot, "branches": {}, "next_branch": None} while branches_offset < len(snapshot_branches): branches_from = snapshot_branches[branches_offset]["name"] url = reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={ "branches_from": branches_from, "branches_count": branches_count, }, ) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv["Content-Type"] == "application/json" expected_data = archive_data.snapshot_get_branches( snapshot, branches_from, branches_count ) expected_data = enrich_snapshot(expected_data, rv.wsgi_request) branches_offset += branches_count if branches_offset < len(snapshot_branches): next_branch = snapshot_branches[branches_offset]["name"] expected_data["next_branch"] = next_branch else: expected_data["next_branch"] = None assert rv.data == expected_data whole_snapshot["branches"].update(expected_data["branches"]) if branches_offset < len(snapshot_branches): next_url = rv.wsgi_request.build_absolute_uri( reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={ "branches_from": next_branch, "branches_count": branches_count, }, ) ) assert rv["Link"] == '<%s>; rel="next"' % next_url else: assert not rv.has_header("Link") url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv["Content-Type"] == "application/json" assert rv.data == whole_snapshot @given(snapshot()) def test_api_snapshot_filtered(api_client, archive_data, snapshot): snapshot_branches = [] for k, v in sorted(archive_data.snapshot_get(snapshot)["branches"].items()): snapshot_branches.append( {"name": k, "target_type": v["target_type"], "target": v["target"]} ) target_type = random.choice(snapshot_branches)["target_type"] url = reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={"target_types": target_type}, ) rv = api_client.get(url) expected_data = archive_data.snapshot_get_branches( snapshot, target_types=target_type ) expected_data = enrich_snapshot(expected_data, rv.wsgi_request) assert rv.status_code == 200, rv.data assert rv["Content-Type"] == "application/json" assert rv.data == expected_data def test_api_snapshot_errors(api_client): unknown_snapshot_ = random_sha1() url = reverse("api-1-snapshot", url_args={"snapshot_id": "63ce369"}) rv = api_client.get(url) assert rv.status_code == 400, rv.data url = reverse("api-1-snapshot", url_args={"snapshot_id": unknown_snapshot_}) rv = api_client.get(url) assert rv.status_code == 404, rv.data @given(snapshot()) def test_api_snapshot_uppercase(api_client, snapshot): url = reverse( "api-1-snapshot-uppercase-checksum", url_args={"snapshot_id": snapshot.upper()} ) resp = api_client.get(url) assert resp.status_code == 302 redirect_url = reverse( "api-1-snapshot-uppercase-checksum", url_args={"snapshot_id": snapshot} ) assert resp["location"] == redirect_url @given(new_snapshot(min_size=4)) def test_api_snapshot_null_branch(api_client, archive_data, new_snapshot): snp_dict = new_snapshot.to_dict() snp_id = hash_to_hex(snp_dict["id"]) for branch in snp_dict["branches"].keys(): snp_dict["branches"][branch] = None break archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) url = reverse("api-1-snapshot", url_args={"snapshot_id": snp_id}) rv = api_client.get(url) assert rv.status_code == 200, rv.data diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index 63205ffd..2bd0c2b9 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,975 +1,975 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import pytest import random from collections import defaultdict from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.from_disk import DentryPerms from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit, Revision from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.tests.data import random_sha1, random_content from swh.web.tests.strategies import ( content, unknown_content, contents, unknown_contents, contents_with_ctags, origin, new_origin, visit_dates, directory, unknown_directory, release, unknown_release, revision, unknown_revision, revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, revision_with_submodules, empty_directory, new_revision, snapshot, unknown_snapshot, ) from swh.web.tests.conftest import ctags_json_missing, fossology_missing @given(contents()) def test_lookup_multiple_hashes_all_present(contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({"sha1": cnt["sha1"]}) expected_output.append({"sha1": cnt["sha1"], "found": True}) assert service.lookup_multiple_hashes(input_data) == expected_output @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({"sha1": cnt["sha1"]}) expected_output.append({"sha1": cnt["sha1"], "found": cnt in contents}) assert service.lookup_multiple_hashes(input_data) == expected_output def test_lookup_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.lookup_hash("sha1_git:%s" % unknown_content_["sha1_git"]) assert actual_lookup == {"found": None, "algo": "sha1_git"} @given(content()) def test_lookup_hash_exist(archive_data, content): actual_lookup = service.lookup_hash("sha1:%s" % content["sha1"]) content_metadata = archive_data.content_get(content["sha1"]) assert {"found": content_metadata, "algo": "sha1"} == actual_lookup def test_search_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.search_hash("sha1_git:%s" % unknown_content_["sha1_git"]) assert {"found": False} == actual_lookup @given(content()) def test_search_hash_exist(content): actual_lookup = service.search_hash("sha1:%s" % content["sha1"]) assert {"found": True} == actual_lookup @pytest.mark.skipif( ctags_json_missing, reason="requires ctags with json output support" ) @given(contents_with_ctags()) def test_lookup_content_ctags(indexer_data, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags["sha1s"]) indexer_data.content_add_ctags(content_sha1) actual_ctags = list(service.lookup_content_ctags("sha1:%s" % content_sha1)) expected_data = list(indexer_data.content_get_ctags(content_sha1)) for ctag in expected_data: ctag["id"] = content_sha1 assert actual_ctags == expected_data def test_lookup_content_ctags_no_hash(): unknown_content_ = random_content() actual_ctags = list( service.lookup_content_ctags("sha1:%s" % unknown_content_["sha1"]) ) assert actual_ctags == [] @given(content()) def test_lookup_content_filetype(indexer_data, content): indexer_data.content_add_mimetype(content["sha1"]) actual_filetype = service.lookup_content_filetype(content["sha1"]) expected_filetype = indexer_data.content_get_mimetype(content["sha1"]) assert actual_filetype == expected_filetype @pytest.mark.skip # Language indexer is disabled. @given(content()) def test_lookup_content_language(indexer_data, content): indexer_data.content_add_language(content["sha1"]) actual_language = service.lookup_content_language(content["sha1"]) expected_language = indexer_data.content_get_language(content["sha1"]) assert actual_language == expected_language @given(contents_with_ctags()) def test_lookup_expression(indexer_data, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags["sha1s"]: if len(expected_ctags) == per_page: break indexer_data.content_add_ctags(content_sha1) for ctag in indexer_data.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag["name"] == contents_with_ctags["symbol_name"]: del ctag["id"] ctag["sha1"] = content_sha1 expected_ctags.append(ctag) actual_ctags = list( service.lookup_expression( contents_with_ctags["symbol_name"], last_sha1=None, per_page=10 ) ) assert actual_ctags == expected_ctags def test_lookup_expression_no_result(): expected_ctags = [] actual_ctags = list( service.lookup_expression("barfoo", last_sha1=None, per_page=10) ) assert actual_ctags == expected_ctags @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(indexer_data, content): indexer_data.content_add_license(content["sha1"]) actual_license = service.lookup_content_license(content["sha1"]) expected_license = indexer_data.content_get_license(content["sha1"]) assert actual_license == expected_license def test_stat_counters(archive_data): actual_stats = service.stat_counters() assert actual_stats == archive_data.stat_counters() @given(new_origin(), visit_dates()) def test_lookup_origin_visits(archive_data, new_origin, visit_dates): archive_data.origin_add([new_origin]) archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=ts, type="git",) for ts in visit_dates] ) actual_origin_visits = list( service.lookup_origin_visits(new_origin.url, per_page=100) ) expected_visits = archive_data.origin_visit_get(new_origin.url) for expected_visit in expected_visits: expected_visit["origin"] = new_origin.url assert actual_origin_visits == expected_visits @given(new_origin(), visit_dates()) def test_lookup_origin_visit(archive_data, new_origin, visit_dates): archive_data.origin_add([new_origin]) visits = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=ts, type="git",) for ts in visit_dates] ) visit = random.choice(visits).visit actual_origin_visit = service.lookup_origin_visit(new_origin.url, visit) expected_visit = dict(archive_data.origin_visit_get_by(new_origin.url, visit)) assert actual_origin_visit == expected_visit @given(new_origin()) def test_lookup_origin(archive_data, new_origin): archive_data.origin_add([new_origin]) actual_origin = service.lookup_origin({"url": new_origin.url}) expected_origin = archive_data.origin_get([new_origin.url])[0] assert actual_origin == expected_origin @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_release(invalid_sha1) assert e.match("Invalid checksum") @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(sha256): with pytest.raises(BadInputExc) as e: service.lookup_release(sha256) assert e.match("Only sha1_git is supported.") @given(directory()) def test_lookup_directory_with_path_not_found(directory): path = "some/invalid/path/here" with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_path(directory, path) assert e.match("Directory entry with path %s from %s not found" % (path, directory)) @given(directory()) def test_lookup_directory_with_path_found(archive_data, directory): directory_content = archive_data.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry["name"] actual_result = service.lookup_directory_with_path(directory, path) assert actual_result == directory_entry @given(release()) def test_lookup_release(archive_data, release): actual_release = service.lookup_release(release) assert actual_release == archive_data.release_get(release) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Invalid checksum query string") sha1_git = sha256 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Only sha1_git is supported") @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( revision, unknown_revision ): sha1_git_root = revision sha1_git = unknown_revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Revision %s not found" % sha1_git) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( revision, unknown_revision ): sha1_git_root = unknown_revision sha1_git = revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Revision root %s not found" % sha1_git_root) @given(ancestor_revisions()) def test_lookup_revision_with_context(archive_data, ancestor_revisions): sha1_git = ancestor_revisions["sha1_git"] root_sha1_git = ancestor_revisions["sha1_git_root"] for sha1_git_root in (root_sha1_git, {"id": hash_to_bytes(root_sha1_git)}): actual_revision = service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in archive_data.revision_log(root_sha1_git): for p_rev in rev["parents"]: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev["id"]) expected_revision = archive_data.revision_get(sha1_git) expected_revision["children"] = children assert actual_revision == expected_revision @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(non_ancestor_revisions): sha1_git = non_ancestor_revisions["sha1_git"] root_sha1_git = non_ancestor_revisions["sha1_git_root"] with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(root_sha1_git, sha1_git) assert e.match("Revision %s is not an ancestor of %s" % (sha1_git, root_sha1_git)) def test_lookup_directory_with_revision_not_found(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(unknown_revision_) assert e.match("Revision %s not found" % unknown_revision_) @given(new_revision()) def test_lookup_directory_with_revision_unknown_content(archive_data, new_revision): unknown_content_ = random_content() dir_path = "README.md" # A directory that points to unknown content dir = Directory( entries=( DirectoryEntry( name=bytes(dir_path.encode("utf-8")), type="file", target=hash_to_bytes(unknown_content_["sha1_git"]), perms=DentryPerms.content, ), ) ) # Create a revision that points to a directory # Which points to unknown content new_revision = new_revision.to_dict() new_revision["directory"] = dir.id del new_revision["id"] new_revision = Revision.from_dict(new_revision) # Add the directory and revision in mem archive_data.directory_add([dir]) archive_data.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision.id) with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(new_revision_id, dir_path) assert e.match("Content not found for revision %s" % new_revision_id) @given(revision()) def test_lookup_directory_with_revision_ko_path_to_nowhere(revision): invalid_path = "path/to/something/unknown" with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(revision, invalid_path) assert e.match("Directory or File") assert e.match(invalid_path) assert e.match("revision %s" % revision) assert e.match("not found") @given(revision_with_submodules()) def test_lookup_directory_with_revision_submodules( archive_data, revision_with_submodules ): rev_sha1_git = revision_with_submodules["rev_sha1_git"] rev_dir_path = revision_with_submodules["rev_dir_rev_path"] actual_data = service.lookup_directory_with_revision(rev_sha1_git, rev_dir_path) revision = archive_data.revision_get(revision_with_submodules["rev_sha1_git"]) directory = archive_data.directory_ls(revision["directory"]) rev_entry = next(e for e in directory if e["name"] == rev_dir_path) expected_data = { "content": archive_data.revision_get(rev_entry["target"]), "path": rev_dir_path, "revision": rev_sha1_git, "type": "rev", } assert actual_data == expected_data @given(revision()) def test_lookup_directory_with_revision_without_path(archive_data, revision): actual_directory_entries = service.lookup_directory_with_revision(revision) revision_data = archive_data.revision_get(revision) expected_directory_entries = archive_data.directory_ls(revision_data["directory"]) assert actual_directory_entries["type"] == "dir" assert actual_directory_entries["content"] == expected_directory_entries @given(revision()) def test_lookup_directory_with_revision_with_path(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] in ("file", "dir") ] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry["name"] ) assert actual_dir_entry["type"] == expected_dir_entry["type"] assert actual_dir_entry["revision"] == revision assert actual_dir_entry["path"] == expected_dir_entry["name"] if actual_dir_entry["type"] == "file": del actual_dir_entry["content"]["checksums"]["blake2s256"] for key in ("checksums", "status", "length"): assert actual_dir_entry["content"][key] == expected_dir_entry[key] else: sub_dir_entries = archive_data.directory_ls(expected_dir_entry["target"]) assert actual_dir_entry["content"] == sub_dir_entries @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( archive_data, revision ): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] expected_dir_entry = random.choice(dir_entries) expected_data = archive_data.content_get_data( expected_dir_entry["checksums"]["sha1"] ) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry["name"], with_data=True ) assert actual_dir_entry["type"] == expected_dir_entry["type"] assert actual_dir_entry["revision"] == revision assert actual_dir_entry["path"] == expected_dir_entry["name"] del actual_dir_entry["content"]["checksums"]["blake2s256"] for key in ("checksums", "status", "length"): assert actual_dir_entry["content"][key] == expected_dir_entry[key] assert actual_dir_entry["content"]["data"] == expected_data["data"] @given(revision()) def test_lookup_revision(archive_data, revision): actual_revision = service.lookup_revision(revision) assert actual_revision == archive_data.revision_get(revision) @given(new_revision()) def test_lookup_revision_invalid_msg(archive_data, new_revision): new_revision = new_revision.to_dict() new_revision["message"] = b"elegant fix for bug \xff" archive_data.revision_add([Revision.from_dict(new_revision)]) revision = service.lookup_revision(hash_to_hex(new_revision["id"])) assert revision["message"] is None assert revision["message_decoding_failed"] is True @given(new_revision()) def test_lookup_revision_msg_ok(archive_data, new_revision): archive_data.revision_add([new_revision]) revision_message = service.lookup_revision_message(hash_to_hex(new_revision.id)) assert revision_message == {"message": new_revision.message} def test_lookup_revision_msg_no_rev(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_revision_message(unknown_revision_) assert e.match("Revision with sha1_git %s not found." % unknown_revision_) @given(revisions()) def test_lookup_revision_multiple(archive_data, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(archive_data.revision_get(rev)) assert actual_revisions == expected_revisions def test_lookup_revision_multiple_none_found(): unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] actual_revisions = list(service.lookup_revision_multiple(unknown_revisions_)) assert actual_revisions == [None] * len(unknown_revisions_) @given(revision()) def test_lookup_revision_log(archive_data, revision): actual_revision_log = list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = archive_data.revision_log(revision, limit=25) assert actual_revision_log == expected_revision_log def _get_origin_branches(archive_data, origin): origin_visit = archive_data.origin_visit_get(origin["url"])[-1] snapshot = archive_data.snapshot_get(origin_visit["snapshot"]) branches = { k: v for (k, v) in snapshot["branches"].items() if v["target_type"] == "revision" } return branches @given(origin()) def test_lookup_revision_log_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_log = list( service.lookup_revision_log_by(origin["url"], branch_name, None, limit=25) ) expected_log = archive_data.revision_log(branches[branch_name]["target"], limit=25) assert actual_log == expected_log @given(origin()) def test_lookup_revision_log_by_notfound(origin): with pytest.raises(NotFoundExc): service.lookup_revision_log_by( origin["url"], "unknown_branch_name", None, limit=100 ) def test_lookup_content_raw_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content_raw("sha1:" + unknown_content_["sha1"]) assert e.match( "Content with %s checksum equals to %s not found!" % ("sha1", unknown_content_["sha1"]) ) @given(content()) def test_lookup_content_raw(archive_data, content): actual_content = service.lookup_content_raw("sha256:%s" % content["sha256"]) expected_content = archive_data.content_get_data(content["sha1"]) assert actual_content == expected_content def test_lookup_content_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content("sha1:%s" % unknown_content_["sha1"]) assert e.match( "Content with %s checksum equals to %s not found!" % ("sha1", unknown_content_["sha1"]) ) @given(content()) def test_lookup_content_with_sha1(archive_data, content): actual_content = service.lookup_content(f"sha1:{content['sha1']}") expected_content = archive_data.content_get(content["sha1"]) assert actual_content == expected_content @given(content()) def test_lookup_content_with_sha256(archive_data, content): actual_content = service.lookup_content(f"sha256:{content['sha256']}") expected_content = archive_data.content_get(content["sha1"]) assert actual_content == expected_content def test_lookup_directory_bad_checksum(): with pytest.raises(BadInputExc): service.lookup_directory("directory_id") def test_lookup_directory_not_found(): unknown_directory_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory(unknown_directory_) assert e.match("Directory with sha1_git %s not found" % unknown_directory_) @given(directory()) def test_lookup_directory(archive_data, directory): actual_directory_ls = list(service.lookup_directory(directory)) expected_directory_ls = archive_data.directory_ls(directory) assert actual_directory_ls == expected_directory_ls @given(empty_directory()) def test_lookup_directory_empty(empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) assert actual_directory_ls == [] @given(origin()) def test_lookup_revision_by_nothing_found(origin): with pytest.raises(NotFoundExc): service.lookup_revision_by(origin["url"], "invalid-branch-name") @given(origin()) def test_lookup_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_revision = service.lookup_revision_by(origin["url"], branch_name) expected_revision = archive_data.revision_get(branches[branch_name]["target"]) assert actual_revision == expected_revision @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(origin, revision): with pytest.raises(NotFoundExc): service.lookup_revision_with_context_by( origin["url"], "invalid-branch-name", None, revision ) @given(origin()) def test_lookup_revision_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]["target"] root_rev_log = archive_data.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) rev = root_rev_log[-1]["id"] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin["url"], branch_name, None, rev ) expected_root_rev = archive_data.revision_get(root_rev) expected_rev = archive_data.revision_get(rev) expected_rev["children"] = children[rev] assert actual_root_rev == expected_root_rev assert actual_rev == expected_rev def test_lookup_revision_through_ko_not_implemented(): with pytest.raises(NotImplementedError): service.lookup_revision_through({"something-unknown": 10}) @given(origin()) def test_lookup_revision_through_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]["target"] root_rev_log = archive_data.revision_log(root_rev) rev = root_rev_log[-1]["id"] assert service.lookup_revision_through( { "origin_url": origin["url"], "branch_name": branch_name, "ts": None, "sha1_git": rev, } ) == service.lookup_revision_with_context_by(origin["url"], branch_name, None, rev) @given(origin()) def test_lookup_revision_through_with_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) assert service.lookup_revision_through( {"origin_url": origin["url"], "branch_name": branch_name, "ts": None,} ) == service.lookup_revision_by(origin["url"], branch_name, None) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(ancestor_revisions): sha1_git = ancestor_revisions["sha1_git"] sha1_git_root = ancestor_revisions["sha1_git_root"] assert service.lookup_revision_through( {"sha1_git_root": sha1_git_root, "sha1_git": sha1_git,} ) == service.lookup_revision_with_context(sha1_git_root, sha1_git) @given(revision()) def test_lookup_revision_through_with_revision(revision): assert service.lookup_revision_through( {"sha1_git": revision} ) == service.lookup_revision(revision) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(revision): with pytest.raises(NotFoundExc): service.lookup_directory_through_revision( {"sha1_git": revision}, "some/invalid/path" ) @given(revision()) def test_lookup_directory_through_revision_ok(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {"sha1_git": revision}, dir_entry["name"] ) == (revision, service.lookup_directory_with_revision(revision, dir_entry["name"])) @given(revision()) def test_lookup_directory_through_revision_ok_with_data(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {"sha1_git": revision}, dir_entry["name"], with_data=True ) == ( revision, service.lookup_directory_with_revision( revision, dir_entry["name"], with_data=True ), ) @given(content(), directory(), release(), revision(), snapshot()) def test_lookup_known_objects( archive_data, content, directory, release, revision, snapshot ): expected = archive_data.content_find(content) assert service.lookup_object(CONTENT, content["sha1_git"]) == expected expected = archive_data.directory_get(directory) assert service.lookup_object(DIRECTORY, directory) == expected expected = archive_data.release_get(release) assert service.lookup_object(RELEASE, release) == expected expected = archive_data.revision_get(revision) assert service.lookup_object(REVISION, revision) == expected - expected = archive_data.snapshot_get(snapshot) + expected = {**archive_data.snapshot_get(snapshot), "next_branch": None} assert service.lookup_object(SNAPSHOT, snapshot) == expected @given( unknown_content(), unknown_directory(), unknown_release(), unknown_revision(), unknown_snapshot(), ) def test_lookup_unknown_objects( unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ): with pytest.raises(NotFoundExc) as e: service.lookup_object(CONTENT, unknown_content["sha1_git"]) assert e.match(r"Content.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(DIRECTORY, unknown_directory) assert e.match(r"Directory.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(RELEASE, unknown_release) assert e.match(r"Release.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(REVISION, unknown_revision) assert e.match(r"Revision.*not found") with pytest.raises(NotFoundExc) as e: service.lookup_object(SNAPSHOT, unknown_snapshot) assert e.match(r"Snapshot.*not found") @given(invalid_sha1()) def test_lookup_invalid_objects(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_object("foo", invalid_sha1) assert e.match("Invalid swh object type") with pytest.raises(BadInputExc) as e: service.lookup_object(CONTENT, invalid_sha1) assert e.match("Invalid hash") with pytest.raises(BadInputExc) as e: service.lookup_object(DIRECTORY, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: service.lookup_object(RELEASE, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: service.lookup_object(REVISION, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: service.lookup_object(SNAPSHOT, invalid_sha1) assert e.match("Invalid checksum") def test_lookup_missing_hashes_non_present(): missing_cnt = random_sha1() missing_dir = random_sha1() missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_swhids = { CONTENT: [hash_to_bytes(missing_cnt)], DIRECTORY: [hash_to_bytes(missing_dir)], REVISION: [hash_to_bytes(missing_rev)], RELEASE: [hash_to_bytes(missing_rel)], SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = service.lookup_missing_hashes(grouped_swhids) assert actual_result == { missing_cnt, missing_dir, missing_rev, missing_rel, missing_snp, } @given(content(), directory()) def test_lookup_missing_hashes_some_present(archive_data, content, directory): missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_swhids = { CONTENT: [hash_to_bytes(content["sha1_git"])], DIRECTORY: [hash_to_bytes(directory)], REVISION: [hash_to_bytes(missing_rev)], RELEASE: [hash_to_bytes(missing_rel)], SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = service.lookup_missing_hashes(grouped_swhids) assert actual_result == {missing_rev, missing_rel, missing_snp} @given(origin()) def test_lookup_origin_extra_trailing_slash(origin): origin_info = service.lookup_origin({"url": f"{origin['url']}/"}) assert origin_info["url"] == origin["url"] def test_lookup_origin_missing_trailing_slash(archive_data): deb_origin = Origin(url="http://snapshot.debian.org/package/r-base/") archive_data.origin_add([deb_origin]) origin_info = service.lookup_origin({"url": deb_origin.url[:-1]}) assert origin_info["url"] == deb_origin.url @given(snapshot()) def test_lookup_snapshot_branch_name_from_tip_revision(archive_data, snapshot_id): snapshot = archive_data.snapshot_get(snapshot_id) branches = [ {"name": k, "revision": v["target"]} for k, v in snapshot["branches"].items() if v["target_type"] == "revision" ] branch_info = random.choice(branches) possible_results = [ b["name"] for b in branches if b["revision"] == branch_info["revision"] ] assert ( service.lookup_snapshot_branch_name_from_tip_revision( snapshot_id, branch_info["revision"] ) in possible_results ) diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py index ef4d7604..7b2ff863 100644 --- a/swh/web/tests/conftest.py +++ b/swh/web/tests/conftest.py @@ -1,362 +1,362 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import shutil import sys from subprocess import run, PIPE from typing import Any, Dict, List, Optional import pytest from django.core.cache import cache from hypothesis import settings, HealthCheck from rest_framework.test import APIClient, APIRequestFactory from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.web.common import converters from swh.web.common.typing import OriginVisitInfo from swh.web.tests.data import get_tests_data, override_storages from swh.storage.algos.origin import origin_get_latest_visit_status -from swh.storage.algos.snapshot import snapshot_get_latest +from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest # Used to skip some tests ctags_json_missing = ( shutil.which("ctags") is None or b"+json" not in run(["ctags", "--version"], stdout=PIPE).stdout ) fossology_missing = shutil.which("nomossa") is None # Register some hypothesis profiles settings.register_profile("default", settings()) settings.register_profile( "swh-web", settings( deadline=None, suppress_health_check=[HealthCheck.too_slow, HealthCheck.filter_too_much], ), ) settings.register_profile( "swh-web-fast", settings( deadline=None, max_examples=1, suppress_health_check=[HealthCheck.too_slow, HealthCheck.filter_too_much], ), ) def pytest_configure(config): # Use fast hypothesis profile by default if none has been # explicitly specified in pytest option if config.getoption("--hypothesis-profile") is None: settings.load_profile("swh-web-fast") # Small hack in order to be able to run the unit tests # without static assets generated by webpack. # Those assets are not really needed for the Python tests # but the django templates will fail to load due to missing # generated file webpack-stats.json describing the js and css # files to include. # So generate a dummy webpack-stats.json file to overcome # that issue. test_dir = os.path.dirname(__file__) # location of the static folder when running tests through tox static_dir = os.path.join(sys.prefix, "share/swh/web/static") if not os.path.exists(static_dir): # location of the static folder when running tests locally with pytest static_dir = os.path.join(test_dir, "../../../static") webpack_stats = os.path.join(static_dir, "webpack-stats.json") if os.path.exists(webpack_stats): return bundles_dir = os.path.join(test_dir, "../assets/src/bundles") _, dirs, _ = next(os.walk(bundles_dir)) mock_webpack_stats = {"status": "done", "publicPath": "/static", "chunks": {}} for bundle in dirs: asset = "js/%s.js" % bundle mock_webpack_stats["chunks"][bundle] = [ { "name": asset, "publicPath": "/static/%s" % asset, "path": os.path.join(static_dir, asset), } ] with open(webpack_stats, "w") as outfile: json.dump(mock_webpack_stats, outfile) # Clear Django cache before each test @pytest.fixture(autouse=True) def django_cache_cleared(): cache.clear() # Alias rf fixture from pytest-django @pytest.fixture def request_factory(rf): return rf # Fixture to get test client from Django REST Framework @pytest.fixture(scope="module") def api_client(): return APIClient() # Fixture to get API request factory from Django REST Framework @pytest.fixture(scope="module") def api_request_factory(): return APIRequestFactory() # Initialize tests data @pytest.fixture(scope="session", autouse=True) def tests_data(): data = get_tests_data(reset=True) # Update swh-web configuration to use the in-memory storages # instantiated in the tests.data module override_storages(data["storage"], data["idx_storage"], data["search"]) return data # Fixture to manipulate data from a sample archive used in the tests @pytest.fixture(scope="session") def archive_data(tests_data): return _ArchiveData(tests_data) # Fixture to manipulate indexer data from a sample archive used in the tests @pytest.fixture(scope="session") def indexer_data(tests_data): return _IndexerData(tests_data) # Custom data directory for requests_mock @pytest.fixture def datadir(): return os.path.join(os.path.abspath(os.path.dirname(__file__)), "resources") class _ArchiveData: """ Helper class to manage data from a sample test archive. It is initialized with a reference to an in-memory storage containing raw tests data. It is basically a proxy to Storage interface but it overrides some methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.storage = tests_data["storage"] def __getattr__(self, key): if key == "storage": raise AttributeError(key) # Forward calls to non overridden Storage methods to wrapped # storage instance return getattr(self.storage, key) def content_find(self, content: Dict[str, Any]) -> Dict[str, Any]: cnt_ids_bytes = { algo_hash: hash_to_bytes(content[algo_hash]) for algo_hash in ALGORITHMS if content.get(algo_hash) } cnt = self.storage.content_find(cnt_ids_bytes) return converters.from_content(cnt[0].to_dict()) if cnt else cnt def content_get(self, cnt_id: str) -> Dict[str, Any]: cnt_id_bytes = hash_to_bytes(cnt_id) content = self.storage.content_get([cnt_id_bytes])[0] if content: content_d = content.to_dict() content_d.pop("ctime", None) else: content_d = None return converters.from_swh( content_d, hashess={"sha1", "sha1_git", "sha256", "blake2s256"} ) def content_get_data(self, cnt_id: str) -> Optional[Dict[str, Any]]: cnt_id_bytes = hash_to_bytes(cnt_id) cnt_data = self.storage.content_get_data(cnt_id_bytes) if cnt_data is None: return None return converters.from_content({"data": cnt_data, "sha1": cnt_id_bytes}) def directory_get(self, dir_id): return {"id": dir_id, "content": self.directory_ls(dir_id)} def directory_ls(self, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map( converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes) ) return list(dir_content) def release_get(self, rel_id): rel_id_bytes = hash_to_bytes(rel_id) rel_data = next(self.storage.release_get([rel_id_bytes])) return converters.from_release(rel_data) def revision_get(self, rev_id): rev_id_bytes = hash_to_bytes(rev_id) rev_data = next(self.storage.revision_get([rev_id_bytes])) return converters.from_revision(rev_data) def revision_log(self, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list( map( converters.from_revision, self.storage.revision_log([rev_id_bytes], limit=limit), ) ) def snapshot_get_latest(self, origin_url): snp = snapshot_get_latest(self.storage, origin_url) return converters.from_snapshot(snp.to_dict()) def origin_get(self, origin_urls): origins = self.storage.origin_get(origin_urls) return [converters.from_origin(o.to_dict()) for o in origins] def origin_visit_get(self, origin_url): next_page_token = None visits = [] while True: visit_page = self.storage.origin_visit_get( origin_url, page_token=next_page_token ) next_page_token = visit_page.next_page_token for visit in visit_page.results: visit_status = self.storage.origin_visit_status_get_latest( origin_url, visit.visit ) visits.append( converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) ) if not next_page_token: break return visits def origin_visit_get_by(self, origin_url: str, visit_id: int) -> OriginVisitInfo: visit = self.storage.origin_visit_get_by(origin_url, visit_id) assert visit is not None visit_status = self.storage.origin_visit_status_get_latest(origin_url, visit_id) assert visit_status is not None return converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) def origin_visit_status_get_latest( self, origin_url, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ): visit_and_status = origin_get_latest_visit_status( self.storage, origin_url, type=type, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, ) return ( converters.from_origin_visit( {**visit_and_status[0].to_dict(), **visit_and_status[1].to_dict()} ) if visit_and_status else None ) def snapshot_get(self, snapshot_id): - snp = self.storage.snapshot_get(hash_to_bytes(snapshot_id)) - return converters.from_snapshot(snp) + snp = snapshot_get_all_branches(self.storage, hash_to_bytes(snapshot_id)) + return converters.from_snapshot(snp.to_dict()) def snapshot_get_branches( self, snapshot_id, branches_from="", branches_count=1000, target_types=None ): partial_branches = self.storage.snapshot_get_branches( hash_to_bytes(snapshot_id), branches_from.encode(), branches_count, target_types, ) return converters.from_partial_branches(partial_branches) def snapshot_get_head(self, snapshot): if snapshot["branches"]["HEAD"]["target_type"] == "alias": target = snapshot["branches"]["HEAD"]["target"] head = snapshot["branches"][target]["target"] else: head = snapshot["branches"]["HEAD"]["target"] return head class _IndexerData: """ Helper class to manage indexer tests data It is initialized with a reference to an in-memory indexer storage containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.idx_storage = tests_data["idx_storage"] self.mimetype_indexer = tests_data["mimetype_indexer"] self.license_indexer = tests_data["license_indexer"] self.ctags_indexer = tests_data["ctags_indexer"] def content_add_mimetype(self, cnt_id): self.mimetype_indexer.run([hash_to_bytes(cnt_id)], "update-dups") def content_get_mimetype(self, cnt_id): mimetype = next(self.idx_storage.content_mimetype_get([hash_to_bytes(cnt_id)])) return converters.from_filetype(mimetype) def content_add_language(self, cnt_id): raise NotImplementedError("Language indexer is disabled.") self.language_indexer.run([hash_to_bytes(cnt_id)], "update-dups") def content_get_language(self, cnt_id): lang = next(self.idx_storage.content_language_get([hash_to_bytes(cnt_id)])) return converters.from_swh(lang, hashess={"id"}) def content_add_license(self, cnt_id): self.license_indexer.run([hash_to_bytes(cnt_id)], "update-dups") def content_get_license(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) lic = next(self.idx_storage.content_fossology_license_get([cnt_id_bytes])) return converters.from_swh( {"id": cnt_id_bytes, "facts": lic[cnt_id_bytes]}, hashess={"id"} ) def content_add_ctags(self, cnt_id): self.ctags_indexer.run([hash_to_bytes(cnt_id)], "update-dups") def content_get_ctags(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) ctags = self.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={"id"}) diff --git a/swh/web/tests/strategies.py b/swh/web/tests/strategies.py index 197a5f14..21d266d7 100644 --- a/swh/web/tests/strategies.py +++ b/swh/web/tests/strategies.py @@ -1,594 +1,595 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from collections import defaultdict from datetime import datetime from hypothesis import settings, assume from hypothesis.extra.dateutil import timezones from hypothesis.strategies import ( just, sampled_from, lists, composite, datetimes, binary, text, characters, ) from swh.model.hashutil import hash_to_hex, hash_to_bytes from swh.model.identifiers import directory_identifier from swh.model.model import Person, Revision, RevisionType, TimestampWithTimezone from swh.storage.algos.revisions_walker import get_revisions_walker from swh.storage.algos.snapshot import snapshot_get_latest from swh.model.hypothesis_strategies import ( origins as new_origin_strategy, snapshots as new_snapshot, ) from swh.web.common.utils import browsers_supported_image_mimes from swh.web.tests.data import get_tests_data # Module dedicated to the generation of input data for tests through # the use of hypothesis. # Some of these data are sampled from a test archive created and populated # in the swh.web.tests.data module. # Set the swh-web hypothesis profile if none has been explicitly set hypothesis_default_settings = settings.get_profile("default") if repr(settings()) == repr(hypothesis_default_settings): settings.load_profile("swh-web") # The following strategies exploit the hypothesis capabilities def _filter_checksum(cs): generated_checksums = get_tests_data()["generated_checksums"] if not int.from_bytes(cs, byteorder="little") or cs in generated_checksums: return False generated_checksums.add(cs) return True def _known_swh_object(object_type): return sampled_from(get_tests_data()[object_type]) def sha1(): """ Hypothesis strategy returning a valid hexadecimal sha1 value. """ return binary(min_size=20, max_size=20).filter(_filter_checksum).map(hash_to_hex) def invalid_sha1(): """ Hypothesis strategy returning an invalid sha1 representation. """ return binary(min_size=50, max_size=50).filter(_filter_checksum).map(hash_to_hex) def sha256(): """ Hypothesis strategy returning a valid hexadecimal sha256 value. """ return binary(min_size=32, max_size=32).filter(_filter_checksum).map(hash_to_hex) def content(): """ Hypothesis strategy returning a random content ingested into the test archive. """ return _known_swh_object("contents") def contents(): """ Hypothesis strategy returning random contents ingested into the test archive. """ return lists(content(), min_size=2, max_size=8) def content_text(): """ Hypothesis strategy returning random textual contents ingested into the test archive. """ return content().filter(lambda c: c["mimetype"].startswith("text/")) def content_text_non_utf8(): """ Hypothesis strategy returning random textual contents not encoded to UTF-8 ingested into the test archive. """ return content().filter( lambda c: c["mimetype"].startswith("text/") and c["encoding"] not in ("utf-8", "us-ascii") ) def content_text_no_highlight(): """ Hypothesis strategy returning random textual contents with no detected programming language to highlight ingested into the test archive. """ return content().filter( lambda c: c["mimetype"].startswith("text/") and c["hljs_language"] == "nohighlight" ) def content_image_type(): """ Hypothesis strategy returning random image contents ingested into the test archive. """ return content().filter(lambda c: c["mimetype"] in browsers_supported_image_mimes) def content_unsupported_image_type_rendering(): """ Hypothesis strategy returning random image contents ingested into the test archive that can not be rendered by browsers. """ return content().filter( lambda c: c["mimetype"].startswith("image/") and c["mimetype"] not in browsers_supported_image_mimes ) def content_utf8_detected_as_binary(): """ Hypothesis strategy returning random textual contents detected as binary by libmagic while they are valid UTF-8 encoded files. """ def utf8_binary_detected(content): if content["encoding"] != "binary": return False try: content["data"].decode("utf-8") except Exception: return False else: return True return content().filter(utf8_binary_detected) @composite def new_content(draw): blake2s256_hex = draw(sha256()) sha1_hex = draw(sha1()) sha1_git_hex = draw(sha1()) sha256_hex = draw(sha256()) assume(sha1_hex != sha1_git_hex) assume(blake2s256_hex != sha256_hex) return { "blake2S256": blake2s256_hex, "sha1": sha1_hex, "sha1_git": sha1_git_hex, "sha256": sha256_hex, } def unknown_content(): """ Hypothesis strategy returning a random content not ingested into the test archive. """ return new_content().filter( lambda c: get_tests_data()["storage"].content_get_data(hash_to_bytes(c["sha1"])) is None ) def unknown_contents(): """ Hypothesis strategy returning random contents not ingested into the test archive. """ return lists(unknown_content(), min_size=2, max_size=8) def directory(): """ Hypothesis strategy returning a random directory ingested into the test archive. """ return _known_swh_object("directories") def directory_with_subdirs(): """ Hypothesis strategy returning a random directory containing sub directories ingested into the test archive. """ return directory().filter( lambda d: any( [ e["type"] == "dir" for e in list( get_tests_data()["storage"].directory_ls(hash_to_bytes(d)) ) ] ) ) def empty_directory(): """ Hypothesis strategy returning the empty directory ingested into the test archive. """ return just(directory_identifier({"entries": []})) def unknown_directory(): """ Hypothesis strategy returning a random directory not ingested into the test archive. """ return sha1().filter( lambda s: len( list(get_tests_data()["storage"].directory_missing([hash_to_bytes(s)])) ) > 0 ) def origin(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ return _known_swh_object("origins") def origin_with_multiple_visits(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() storage = tests_data["storage"] for origin in tests_data["origins"]: visit_page = storage.origin_visit_get(origin["url"]) if len(visit_page.results) > 1: ret.append(origin) return sampled_from(ret) def origin_with_releases(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() for origin in tests_data["origins"]: snapshot = snapshot_get_latest(tests_data["storage"], origin["url"]) if any([b.target_type.value == "release" for b in snapshot.branches.values()]): ret.append(origin) return sampled_from(ret) def new_origin(): """ Hypothesis strategy returning a random origin not ingested into the test archive. """ return new_origin_strategy().filter( lambda origin: get_tests_data()["storage"].origin_get([origin.url])[0] is None ) def new_origins(nb_origins=None): """ Hypothesis strategy returning random origins not ingested into the test archive. """ min_size = nb_origins if nb_origins is not None else 2 max_size = nb_origins if nb_origins is not None else 8 size = random.randint(min_size, max_size) return lists( new_origin(), min_size=size, max_size=size, unique_by=lambda o: tuple(sorted(o.items())), ) def visit_dates(nb_dates=None): """ Hypothesis strategy returning a list of visit dates. """ min_size = nb_dates if nb_dates else 2 max_size = nb_dates if nb_dates else 8 return lists( datetimes( min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0), timezones=timezones(), ), min_size=min_size, max_size=max_size, unique=True, ).map(sorted) def release(): """ Hypothesis strategy returning a random release ingested into the test archive. """ return _known_swh_object("releases") def unknown_release(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: next(get_tests_data()["storage"].release_get([s])) is None ) def revision(): """ Hypothesis strategy returning a random revision ingested into the test archive. """ return _known_swh_object("revisions") def unknown_revision(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: next(get_tests_data()["storage"].revision_get([hash_to_bytes(s)])) is None ) @composite def new_person(draw): """ Hypothesis strategy returning random raw swh person data. """ name = draw( text( min_size=5, max_size=30, alphabet=characters(min_codepoint=0, max_codepoint=255), ) ) email = "%s@company.org" % name return Person( name=name.encode(), email=email.encode(), fullname=("%s <%s>" % (name, email)).encode(), ) @composite def new_swh_date(draw): """ Hypothesis strategy returning random raw swh date data. """ timestamp = draw( datetimes( min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0) ).map(lambda d: int(d.timestamp())) ) return { "timestamp": timestamp, "offset": 0, "negative_utc": False, } @composite def new_revision(draw): """ Hypothesis strategy returning random raw swh revision data not ingested into the test archive. """ return Revision( directory=draw(sha1().map(hash_to_bytes)), author=draw(new_person()), committer=draw(new_person()), message=draw(text(min_size=20, max_size=100).map(lambda t: t.encode())), date=TimestampWithTimezone.from_datetime(draw(new_swh_date())), committer_date=TimestampWithTimezone.from_datetime(draw(new_swh_date())), synthetic=False, type=RevisionType.GIT, ) def revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions ingested into the test archive. """ return lists(revision(), min_size=min_size, max_size=max_size) def unknown_revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions not ingested into the test archive. """ return lists(unknown_revision(), min_size=min_size, max_size=max_size) def snapshot(): """ Hypothesis strategy returning a random snapshot ingested into the test archive. """ return _known_swh_object("snapshots") def new_snapshots(nb_snapshots=None): min_size = nb_snapshots if nb_snapshots else 2 max_size = nb_snapshots if nb_snapshots else 8 return lists( new_snapshot(min_size=2, max_size=10, only_objects=True), min_size=min_size, max_size=max_size, ) def unknown_snapshot(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( - lambda s: get_tests_data()["storage"].snapshot_get(hash_to_bytes(s)) is None + lambda s: get_tests_data()["storage"].snapshot_get_branches(hash_to_bytes(s)) + is None ) def _get_origin_dfs_revisions_walker(): tests_data = get_tests_data() storage = tests_data["storage"] origin = random.choice(tests_data["origins"][:-1]) snapshot = snapshot_get_latest(storage, origin["url"]) if snapshot.branches[b"HEAD"].target_type.value == "alias": target = snapshot.branches[b"HEAD"].target head = snapshot.branches[target].target else: head = snapshot.branches[b"HEAD"].target return get_revisions_walker("dfs", storage, head) def ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with an ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) if not init_rev_found: master_revisions.append(rev) if not rev["parents"]: init_rev_found = True # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions) - 1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev["id"]] return just( { "sha1_git_root": hash_to_hex(root_rev["id"]), "sha1_git": hash_to_hex(ancestor_rev["id"]), "children": [hash_to_hex(r) for r in ancestor_child_revs], } ) def non_ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with no ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev["parents"]) > 1: merge_revs.append(rev) for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev["parents"]): selected_revs = merge_rev["parents"] return just( { "sha1_git_root": hash_to_hex(selected_revs[0]), "sha1_git": hash_to_hex(selected_revs[1]), } ) # The following strategies returns data specific to some tests # that can not be generated and thus are hardcoded. def contents_with_ctags(): """ Hypothesis strategy returning contents ingested into the test archive. Those contents are ctags compatible, that is running ctags on those lay results. """ return just( { "sha1s": [ "0ab37c02043ebff946c1937523f60aadd0844351", "15554cf7608dde6bfefac7e3d525596343a85b6f", "2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd", "30acd0b47fc25e159e27a980102ddb1c4bea0b95", "4f81f05aaea3efb981f9d90144f746d6b682285b", "5153aa4b6e4455a62525bc4de38ed0ff6e7dd682", "59d08bafa6a749110dfb65ba43a61963d5a5bf9f", "7568285b2d7f31ae483ae71617bd3db873deaa2c", "7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4", "8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03", "9b3557f1ab4111c8607a4f2ea3c1e53c6992916c", "9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd", "c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b", "e89e55a12def4cd54d5bff58378a3b5119878eb7", "e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e", "eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5", ], "symbol_name": "ABS", } ) def revision_with_submodules(): """ Hypothesis strategy returning a revision that is known to point to a directory with revision entries (aka git submodule) """ return just( { "rev_sha1_git": "ffcb69001f3f6745dfd5b48f72ab6addb560e234", "rev_dir_sha1_git": "d92a21446387fa28410e5a74379c934298f39ae2", "rev_dir_rev_path": "libtess2", } )