diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index 8f3439d6..01d4c1b5 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,681 +1,686 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import json from hypothesis import given import pytest from swh.indexer.storage.model import OriginIntrinsicMetadataRow from swh.model.hashutil import hash_to_bytes from swh.model.model import Origin, OriginVisit, OriginVisitStatus from swh.storage.exc import StorageAPIError, StorageDBError from swh.storage.utils import now from swh.web.api.utils import enrich_origin, enrich_origin_visit from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse from swh.web.tests.api.views.utils import scroll_results from swh.web.tests.data import ( INDEXER_TOOL, ORIGIN_MASTER_REVISION, ORIGIN_METADATA_KEY, ORIGIN_METADATA_VALUE, ) from swh.web.tests.strategies import new_origin, new_snapshots, origin, visit_dates from swh.web.tests.utils import check_api_get_responses def test_api_lookup_origin_visits_raise_error(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "voluntary error to check the bad request middleware." mock_get_origin_visits.side_effect = BadInputExc(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == {"exception": "BadInputExc", "reason": err_msg} def test_api_lookup_origin_visits_raise_swh_storage_error_db(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage exploded! Will be back online shortly!" mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageDBError", "reason": "An unexpected error occurred in the backend: %s" % err_msg, } def test_api_lookup_origin_visits_raise_swh_storage_error_api(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage API dropped dead! Will resurrect asap!" mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageAPIError", "reason": "An unexpected error occurred in the api backend: %s" % err_msg, } @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=now(), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]["visit"], all_visits[2:]), ): url = reverse( "api-1-origin-visits", url_args={"origin_url": new_origin.url}, query_params={"per_page": 2, "last_visit": last_visit}, ) rv = check_api_get_responses(api_client, url, status_code=200) for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request, ) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=now(), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]["visit"], all_visits[2:4]), ): url = reverse( "api-1-origin-visits", url_args={"origin_url": new_origin.url}, query_params={"per_page": 2, "last_visit": last_visit}, ) rv = check_api_get_responses(api_client, url, status_code=200) for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request, ) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_id = origin_visit.visit archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=visit_date + timedelta(minutes=5), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit", url_args={"origin_url": new_origin.url, "visit_id": visit_id}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_get_by(new_origin.url, visit_id) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(new_origin()) def test_api_lookup_origin_visit_latest_no_visit(api_client, archive_data, new_origin): archive_data.origin_add([new_origin]) url = reverse("api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No visit for origin %s found" % new_origin.url, } @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit_ids[0], date=now(), status="full", snapshot=new_snapshots[0].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse("api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_get_by(new_origin.url, visit_ids[1]) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest_with_snapshot( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) # Add snapshot to the latest visit visit_id = visit_ids[-1] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit_id, date=now(), status="full", snapshot=new_snapshots[0].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}, query_params={"require_snapshot": True}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_status_get_latest( new_origin.url, type="git", require_snapshot=True ) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(origin()) def test_api_lookup_origin_visit_not_found(api_client, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v["visit"] for v in all_visits]) url = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": max_visit_id + 1}, ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin %s or its visit with id %s not found!" % (origin["url"], max_visit_id + 1), } def test_api_origins_wrong_input(api_client, archive_data): """Should fail with 400 if the input is deprecated. """ # fail if wrong input url = reverse("api-1-origins", query_params={"origin_from": 1}) rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == { "exception": "BadInputExc", "reason": "Please use the Link header to browse through result", } def test_api_origins(api_client, archive_data): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} # Get only one url = reverse("api-1-origins", query_params={"origin_count": 1}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= origin_urls # Get all url = reverse("api-1-origins", query_params={"origin_count": len(origins)}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls # Get "all + 10" url = reverse("api-1-origins", query_params={"origin_count": len(origins) + 10}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls @pytest.mark.parametrize("origin_count", [1, 2, 10, 100]) def test_api_origins_scroll(api_client, archive_data, origin_count): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} url = reverse("api-1-origins", query_params={"origin_count": origin_count}) results = scroll_results(api_client, url) assert len(results) == len(origins) assert {origin["url"] for origin in results} == origin_urls @given(origin()) def test_api_origin_by_url(api_client, archive_data, origin): origin_url = origin["url"] url = reverse("api-1-origin", url_args={"origin_url": origin_url}) rv = check_api_get_responses(api_client, url, status_code=200) expected_origin = archive_data.origin_get([origin_url])[0] expected_origin = enrich_origin(expected_origin, rv.wsgi_request) assert rv.data == expected_origin @given(new_origin()) def test_api_origin_not_found(api_client, new_origin): url = reverse("api-1-origin", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin with url %s not found!" % new_origin.url, } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } # Search for 'github.com', get only one url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 1}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] # Search for 'github.com', get all url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] # Search for 'github.com', get more than available url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 10}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_words(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "com github"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "memononen libtess2"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } url = reverse( "api-1-origin-search", url_args={"url_pattern": "libtess2 memononen"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_visit_type(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com",}, query_params={"visit_type": "git"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com",}, query_params={"visit_type": "foo"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == [] @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) @pytest.mark.parametrize("limit", [1, 2, 3, 10]) def test_api_origin_search_scroll(api_client, archive_data, mocker, limit, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": limit}, ) results = scroll_results(api_client, url) assert {origin["url"] for origin in results} == expected_origins @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_limit(api_client, archive_data, tests_data, mocker, backend): if backend == "swh-search": tests_data["search"].origin_update( [{"url": "http://foobar/{}".format(i)} for i in range(2000)] ) else: # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) archive_data.origin_add( [Origin(url="http://foobar/{}".format(i)) for i in range(2000)] ) url = reverse( "api-1-origin-search", url_args={"url_pattern": "foobar"}, query_params={"limit": 1050}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1000 @pytest.mark.parametrize("backend", ["swh-search", "swh-indexer-storage"]) def test_api_origin_metadata_search(api_client, mocker, backend): mock_config = mocker.patch("swh.web.common.archive.config") mock_config.get_config.return_value = {"metadata_search_backend": backend} url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE} ) rv = check_api_get_responses(api_client, url, status_code=200) - - expected_data = [ - { - "url": origin_url, - "metadata": { - "from_revision": master_rev, - "tool": { - "name": INDEXER_TOOL["tool_name"], - "version": INDEXER_TOOL["tool_version"], - "configuration": INDEXER_TOOL["tool_configuration"], - "id": INDEXER_TOOL["id"], + rv.data = sorted(rv.data, key=lambda d: d["url"]) + + expected_data = sorted( + [ + { + "url": origin_url, + "metadata": { + "from_revision": ORIGIN_MASTER_REVISION[origin_url], + "tool": { + "name": INDEXER_TOOL["tool_name"], + "version": INDEXER_TOOL["tool_version"], + "configuration": INDEXER_TOOL["tool_configuration"], + "id": INDEXER_TOOL["id"], + }, + "mappings": [], }, - "mappings": [], - }, - } - for origin_url, master_rev in ORIGIN_MASTER_REVISION.items() - ] + } + for origin_url in sorted(ORIGIN_MASTER_REVISION.keys()) + ], + key=lambda d: d["url"], + ) for i in range(len(expected_data)): expected = expected_data[i] response = rv.data[i] metadata = response["metadata"].pop("metadata") assert any( [ORIGIN_METADATA_VALUE in json.dumps(val) for val in metadata.values()] ) + assert response == expected def test_api_origin_metadata_search_limit(api_client, mocker): mock_idx_storage = mocker.patch("swh.web.common.archive.idx_storage") oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [ OriginIntrinsicMetadataRow( id=origin_url, from_revision=hash_to_bytes(master_rev), indexer_configuration_id=INDEXER_TOOL["id"], metadata={ORIGIN_METADATA_KEY: ORIGIN_METADATA_VALUE}, mappings=[], ) for origin_url, master_rev in ORIGIN_MASTER_REVISION.items() ] url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE} ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=70) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE, "limit": 10}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=10) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE, "limit": 987}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=100) @given(origin()) def test_api_origin_intrinsic_metadata(api_client, origin): url = reverse( "api-origin-intrinsic-metadata", url_args={"origin_url": origin["url"]} ) rv = check_api_get_responses(api_client, url, status_code=200) - expected_data = {ORIGIN_METADATA_KEY: ORIGIN_METADATA_VALUE} - assert rv.data == expected_data + assert ORIGIN_METADATA_KEY in rv.data + assert rv.data[ORIGIN_METADATA_KEY] == ORIGIN_METADATA_VALUE def test_api_origin_metadata_search_invalid(api_client, mocker): mock_idx_storage = mocker.patch("swh.web.common.archive.idx_storage") url = reverse("api-1-origin-metadata-search") check_api_get_responses(api_client, url, status_code=400) mock_idx_storage.assert_not_called() @pytest.mark.parametrize("backend", ["swh-counters", "swh-storage"]) def test_api_stat_counters(api_client, mocker, backend): mock_config = mocker.patch("swh.web.common.archive.config") mock_config.get_config.return_value = {"counters_backend": backend} url = reverse("api-1-stat-counters") rv = check_api_get_responses(api_client, url, status_code=200) counts = json.loads(rv.content) for obj in ["content", "origin", "release", "directory", "revision"]: assert counts.get(obj, 0) > 0 diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index 57995abf..3c45ed02 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,466 +1,476 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from datetime import timedelta import os import random import time from typing import Dict, List, Optional, Set from swh.core.config import merge_configs from swh.counters import get_counters from swh.indexer.ctags import CtagsIndexer from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.storage import get_indexer_storage from swh.indexer.storage.model import OriginIntrinsicMetadataRow from swh.loader.git.from_disk import GitLoaderFromArchive from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_hex from swh.model.model import ( Content, Directory, Origin, OriginVisit, OriginVisitStatus, Snapshot, ) from swh.search import get_search from swh.storage import get_storage from swh.storage.algos.dir_iterators import dir_iterator from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import Sha1 from swh.storage.utils import now from swh.web import config from swh.web.browse.utils import ( _re_encode_content, get_mimetype_and_encoding_for_content, prepare_content_for_display, ) from swh.web.common import archive # Module used to initialize data that will be provided as tests input # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { "storage": {"cls": "memory"}, "objstorage": {"cls": "memory", "args": {},}, "indexer_storage": {"cls": "memory", "args": {},}, } def random_sha1(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) def random_sha256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_blake2s256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_content(): return { "sha1": random_sha1(), "sha1_git": random_sha1(), "sha256": random_sha256(), "blake2s256": random_blake2s256(), } _TEST_MIMETYPE_INDEXER_CONFIG = merge_configs( _TEST_INDEXER_BASE_CONFIG, { "tools": { "name": "file", "version": "1:5.30-1+deb9u1", "configuration": {"type": "library", "debian-package": "python3-magic"}, } }, ) _TEST_LICENSE_INDEXER_CONFIG = merge_configs( _TEST_INDEXER_BASE_CONFIG, { "workdir": "/tmp/swh/indexer.fossology.license", "tools": { "name": "nomos", "version": "3.1.0rc2-31-ga2cbb8c", "configuration": {"command_line": "nomossa ",}, }, }, ) _TEST_CTAGS_INDEXER_CONFIG = merge_configs( _TEST_INDEXER_BASE_CONFIG, { "workdir": "/tmp/swh/indexer.ctags", "languages": {"c": "c"}, "tools": { "name": "universal-ctags", "version": "~git7859817b", "configuration": { "command_line": """ctags --fields=+lnz --sort=no --links=no """ """--output-format=json """ }, }, }, ) # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { "type": "git", "url": "https://github.com/memononen/libtess2", "archives": ["libtess2.zip"], + "metadata": { + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", + "description": ( + "Game and tools oriented refactored version of GLU tessellator." + ), + }, }, { "type": "git", "url": "https://github.com/wcoder/highlightjs-line-numbers.js", "archives": [ "highlightjs-line-numbers.js.zip", "highlightjs-line-numbers.js_visit2.zip", ], - "metadata": {"description": "Line numbering plugin for Highlight.js",}, + "metadata": { + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", + "description": "Line numbering plugin for Highlight.js", + }, }, { "type": "git", "url": "repo_with_submodules", "archives": ["repo_with_submodules.tgz"], "metadata": { + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "description": "This is just a sample repository with submodules", }, }, ] _contents = {} def _add_extra_contents(storage, contents): pbm_image_data = b"""P1 # PBM example 24 7 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 1 0 0 1 1 1 0 0 0 1 1 1 0 0 0 1 1 1 0 0 0 1 1 1 1 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0""" # add file with mimetype image/x-portable-bitmap in the archive content pbm_content = Content.from_data(pbm_image_data) storage.content_add([pbm_content]) contents.add(pbm_content.sha1) INDEXER_TOOL = { "tool_name": "swh-web tests", "tool_version": "1.0", "tool_configuration": {}, } -ORIGIN_METADATA_KEY = "vcs" +ORIGIN_METADATA_KEY = "keywords" ORIGIN_METADATA_VALUE = "git" ORIGIN_MASTER_REVISION = {} def _add_origin( storage, search, counters, origin_url, visit_type="git", snapshot_branches={} ): storage.origin_add([Origin(url=origin_url)]) search.origin_update( [{"url": origin_url, "has_visits": True, "visit_types": [visit_type]}] ) counters.add("origin", [origin_url]) date = now() visit = OriginVisit(origin=origin_url, date=date, type=visit_type) visit = storage.origin_visit_add([visit])[0] counters.add("origin_visit", [f"{visit.unique_key()}"]) snapshot = Snapshot.from_dict({"branches": snapshot_branches}) storage.snapshot_add([snapshot]) counters.add("snapshot", [snapshot.id]) visit_status = OriginVisitStatus( origin=origin_url, visit=visit.visit, date=date + timedelta(minutes=1), type=visit.type, status="full", snapshot=snapshot.id, ) storage.origin_visit_status_add([visit_status]) counters.add("origin_visit_status", [f"{visit_status.unique_key()}"]) # Tests data initialization def _init_tests_data(): # To hold reference to the memory storage storage = get_storage("memory") # Create search instance search = get_search("memory") search.initialize() search.origin_update({"url": origin["url"]} for origin in _TEST_ORIGINS) # create the counters instance counters = get_counters("memory") # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage("memory") # Declare a test tool for origin intrinsic metadata tests idx_tool = idx_storage.indexer_configuration_add([INDEXER_TOOL])[0] INDEXER_TOOL["id"] = idx_tool["id"] # Load git repositories from archives for origin in _TEST_ORIGINS: for i, archive_ in enumerate(origin["archives"]): if i > 0: # ensure visit dates will be different when simulating # multiple visits of an origin time.sleep(1) origin_repo_archive = os.path.join( os.path.dirname(__file__), "resources/repos/%s" % archive_ ) loader = GitLoaderFromArchive( storage, origin["url"], archive_path=origin_repo_archive, ) result = loader.load() assert result["status"] == "eventful" ori = storage.origin_get([origin["url"]])[0] origin.update(ori.to_dict()) # add an 'id' key if enabled search.origin_update( [{"url": origin["url"], "has_visits": True, "visit_types": ["git"]}] ) for i in range(250): _add_origin( storage, search, counters, origin_url=f"https://many.origins/{i+1}", visit_type="tar", ) sha1s: Set[Sha1] = set() directories = set() revisions = set() releases = set() snapshots = set() content_path = {} # Get all objects loaded into the test archive common_metadata = {ORIGIN_METADATA_KEY: ORIGIN_METADATA_VALUE} for origin in _TEST_ORIGINS: snp = snapshot_get_latest(storage, origin["url"]) snapshots.add(hash_to_hex(snp.id)) for branch_name, branch_data in snp.branches.items(): target_type = branch_data.target_type.value if target_type == "revision": revisions.add(branch_data.target) if b"master" in branch_name: # Add some origin intrinsic metadata for tests metadata = common_metadata metadata.update(origin.get("metadata", {})) origin_metadata = OriginIntrinsicMetadataRow( id=origin["url"], from_revision=branch_data.target, indexer_configuration_id=idx_tool["id"], metadata=metadata, mappings=[], ) idx_storage.origin_intrinsic_metadata_add([origin_metadata]) search.origin_update( [{"url": origin["url"], "intrinsic_metadata": metadata}] ) ORIGIN_MASTER_REVISION[origin["url"]] = hash_to_hex( branch_data.target ) elif target_type == "release": release = storage.release_get([branch_data.target])[0] revisions.add(release.target) releases.add(hash_to_hex(branch_data.target)) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): if rev is None: continue dir_id = rev.directory directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): if entry["type"] == "file": sha1s.add(entry["sha1"]) content_path[entry["sha1"]] = "/".join( [hash_to_hex(dir_id), entry["path"].decode("utf-8")] ) elif entry["type"] == "dir": directories.add(hash_to_hex(entry["target"])) _add_extra_contents(storage, sha1s) # Get all checksums for each content result: List[Optional[Content]] = storage.content_get(list(sha1s)) contents: List[Dict] = [] for content in result: assert content is not None sha1 = hash_to_hex(content.sha1) content_metadata = { algo: hash_to_hex(getattr(content, algo)) for algo in DEFAULT_ALGORITHMS } path = "" if content.sha1 in content_path: path = content_path[content.sha1] cnt_data = storage.content_get_data(content.sha1) assert cnt_data is not None mimetype, encoding = get_mimetype_and_encoding_for_content(cnt_data) _, _, cnt_data = _re_encode_content(mimetype, encoding, cnt_data) content_display_data = prepare_content_for_display(cnt_data, mimetype, path) content_metadata.update( { "path": path, "mimetype": mimetype, "encoding": encoding, "hljs_language": content_display_data["language"], "data": content_display_data["content_data"], } ) _contents[sha1] = content_metadata contents.append(content_metadata) # Add the empty directory to the test archive storage.directory_add([Directory(entries=())]) # Add empty content to the test archive storage.content_add([Content.from_data(data=b"")]) # Add fake git origin with pull request branches _add_origin( storage, search, counters, origin_url="https://git.example.org/project", snapshot_branches={ b"refs/heads/master": { "target_type": "revision", "target": next(iter(revisions)), }, **{ f"refs/pull/{i}".encode(): { "target_type": "revision", "target": next(iter(revisions)), } for i in range(300) }, }, ) counters.add("revision", revisions) counters.add("release", releases) counters.add("directory", directories) counters.add("content", [content["sha1"] for content in contents]) # Return tests data return { "search": search, "storage": storage, "idx_storage": idx_storage, "counters": counters, "origins": _TEST_ORIGINS, "contents": contents, "directories": list(directories), "releases": list(releases), "revisions": list(map(hash_to_hex, revisions)), "snapshots": list(snapshots), "generated_checksums": set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} for idx_name, idx_class, idx_config in ( ("mimetype_indexer", MimetypeIndexer, _TEST_MIMETYPE_INDEXER_CONFIG), ("license_indexer", FossologyLicenseIndexer, _TEST_LICENSE_INDEXER_CONFIG), ("ctags_indexer", CtagsIndexer, _TEST_CTAGS_INDEXER_CONFIG), ): idx = idx_class(config=idx_config) idx.storage = tests_data["storage"] idx.objstorage = tests_data["storage"].objstorage idx.idx_storage = tests_data["idx_storage"] idx.register_tools(idx.config["tools"]) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage, search, counters): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update( { "storage": storage, "indexer_storage": idx_storage, "search": search, "counters": counters, } ) archive.storage = storage archive.idx_storage = idx_storage archive.search = search archive.counters = counters