diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index 3eac0194..5b3e0920 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,672 +1,677 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given import pytest from requests.utils import parse_header_links from swh.model.model import Origin from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.api.utils import enrich_origin_visit, enrich_origin from swh.web.common.exc import BadInputExc from swh.web.common.utils import reverse from swh.web.common.origin_visits import get_origin_visits from swh.web.tests.strategies import ( origin, new_origin, visit_dates, new_snapshots ) def _scroll_results(api_client, url): """Iterates through pages of results, and returns them all.""" results = [] while True: rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' results.extend(rv.data) if 'Link' in rv: for link in parse_header_links(rv['Link']): if link['rel'] == 'next': # Found link to next page of results url = link['url'] break else: # No link with 'rel=next' break else: # No Link header break return results def test_api_lookup_origin_visits_raise_error(api_client, mocker): mock_get_origin_visits = mocker.patch( 'swh.web.api.views.origin.get_origin_visits') err_msg = 'voluntary error to check the bad request middleware.' mock_get_origin_visits.side_effect = BadInputExc(err_msg) url = reverse('api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = api_client.get(url) assert rv.status_code == 400, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'BadInputExc', 'reason': err_msg } def test_api_lookup_origin_visits_raise_swh_storage_error_db(api_client, mocker): mock_get_origin_visits = mocker.patch( 'swh.web.api.views.origin.get_origin_visits') err_msg = 'Storage exploded! Will be back online shortly!' mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse('api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = api_client.get(url) assert rv.status_code == 503, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'StorageDBError', 'reason': 'An unexpected error occurred in the backend: %s' % err_msg } def test_api_lookup_origin_visits_raise_swh_storage_error_api(api_client, mocker): mock_get_origin_visits = mocker.patch( 'swh.web.api.views.origin.get_origin_visits') err_msg = 'Storage API dropped dead! Will resurrect asap!' mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = api_client.get(url) assert rv.status_code == 503, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'StorageAPIError', 'reason': 'An unexpected error occurred in the api backend: %s' % err_msg } @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( new_origin.url, visit_date, type='git') archive_data.snapshot_add([new_snapshots[i]]) archive_data.origin_visit_update( new_origin.url, origin_visit.visit, + status='full', snapshot=new_snapshots[i].id) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:])): url = reverse('api-1-origin-visits', url_args={'origin_url': new_origin.url}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( new_origin.url, visit_date, type='git') archive_data.snapshot_add([new_snapshots[i]]) archive_data.origin_visit_update( new_origin.url, origin_visit.visit, + status='full', snapshot=new_snapshots[i].id) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-1-origin-visits', url_args={'origin_url': new_origin.url}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( new_origin.url, visit_date, type='git') visit_id = origin_visit.visit archive_data.snapshot_add([new_snapshots[i]]) archive_data.origin_visit_update( new_origin.url, visit_id, + status='full', snapshot=new_snapshots[i].id) url = reverse('api-1-origin-visit', url_args={'origin_url': new_origin.url, 'visit_id': visit_id}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_visit = archive_data.origin_visit_get_by( new_origin.url, visit_id) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request) assert rv.data == expected_visit @given(new_origin()) def test_api_lookup_origin_visit_latest_no_visit(api_client, archive_data, new_origin): archive_data.origin_add_one(new_origin) url = reverse('api-1-origin-visit-latest', url_args={'origin_url': new_origin.url}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'No visit for origin %s found' % new_origin.url } @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( new_origin.url, visit_date, type='git') visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) archive_data.origin_visit_update( new_origin.url, visit_ids[0], + status='full', snapshot=new_snapshots[0].id) url = reverse('api-1-origin-visit-latest', url_args={'origin_url': new_origin.url}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_visit = archive_data.origin_visit_get_by( new_origin.url, visit_ids[1]) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request) assert rv.data == expected_visit @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest_with_snapshot(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( new_origin.url, visit_date, type='git') visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) archive_data.origin_visit_update( new_origin.url, visit_ids[0], + status='full', snapshot=new_snapshots[0].id) url = reverse('api-1-origin-visit-latest', url_args={'origin_url': new_origin.url}, query_params={'require_snapshot': True}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_visit = archive_data.origin_visit_get_by( new_origin.url, visit_ids[0]) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request) assert rv.data == expected_visit @given(origin()) def test_api_lookup_origin_visit_not_found(api_client, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) url = reverse('api-1-origin-visit', url_args={'origin_url': origin['url'], 'visit_id': max_visit_id + 1}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Origin %s or its visit with id %s not found!' % (origin['url'], max_visit_id+1) } def test_api_origins(api_client, archive_data): origins = list(archive_data.origin_get_range(0, 10000)) origin_urls = {origin['url'] for origin in origins} # Get only one url = reverse('api-1-origins', query_params={'origin_count': 1}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} <= origin_urls # Get all url = reverse('api-1-origins', query_params={'origin_count': len(origins)}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == len(origins) assert {origin['url'] for origin in rv.data} == origin_urls # Get "all + 10" url = reverse('api-1-origins', query_params={'origin_count': len(origins)+10}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == len(origins) assert {origin['url'] for origin in rv.data} == origin_urls @pytest.mark.parametrize('origin_count', [1, 2, 10, 100]) def test_api_origins_scroll(api_client, archive_data, origin_count): origins = list(archive_data.origin_get_range(0, 10000)) origin_urls = {origin['url'] for origin in origins} url = reverse('api-1-origins', query_params={'origin_count': origin_count}) results = _scroll_results(api_client, url) assert len(results) == len(origins) assert {origin['url'] for origin in results} == origin_urls @given(origin()) def test_api_origin_by_url(api_client, archive_data, origin): url = reverse('api-1-origin', url_args={'origin_url': origin['url']}) rv = api_client.get(url) expected_origin = archive_data.origin_get(origin) expected_origin = enrich_origin(expected_origin, rv.wsgi_request) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == expected_origin @given(new_origin()) def test_api_origin_not_found(api_client, new_origin): url = reverse('api-1-origin', url_args={'origin_url': new_origin.url}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Origin with url %s not found!' % new_origin.url } @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) def test_api_origin_search(api_client, mocker, backend): if backend != 'swh-search': # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) expected_origins = { 'https://github.com/wcoder/highlightjs-line-numbers.js', 'https://github.com/memononen/libtess2', } # Search for 'github.com', get only one url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': 1}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} <= expected_origins # Search for 'github.com', get all url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins # Search for 'github.com', get more than available url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': 10}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) def test_api_origin_search_words(api_client, mocker, backend): if backend != 'swh-search': # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) expected_origins = { 'https://github.com/wcoder/highlightjs-line-numbers.js', 'https://github.com/memononen/libtess2', } url = reverse('api-1-origin-search', url_args={'url_pattern': 'github com'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins url = reverse('api-1-origin-search', url_args={'url_pattern': 'com github'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins url = reverse('api-1-origin-search', url_args={'url_pattern': 'memononen libtess2'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} \ == {'https://github.com/memononen/libtess2'} url = reverse('api-1-origin-search', url_args={'url_pattern': 'libtess2 memononen'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} \ == {'https://github.com/memononen/libtess2'} @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) @pytest.mark.parametrize('limit', [1, 2, 3, 10]) def test_api_origin_search_scroll( api_client, archive_data, mocker, limit, backend): if backend != 'swh-search': # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) expected_origins = { 'https://github.com/wcoder/highlightjs-line-numbers.js', 'https://github.com/memononen/libtess2', } url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': limit}) results = _scroll_results(api_client, url) assert {origin['url'] for origin in results} == expected_origins @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) def test_api_origin_search_limit( api_client, archive_data, tests_data, mocker, backend): if backend == 'swh-search': tests_data['search'].origin_update([ {'url': 'http://foobar/{}'.format(i)} for i in range(2000) ]) else: # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) archive_data.origin_add([ Origin(url='http://foobar/{}'.format(i)) for i in range(2000) ]) url = reverse('api-1-origin-search', url_args={'url_pattern': 'foobar'}, query_params={'limit': 1050}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1000 @given(origin()) def test_api_origin_metadata_search(api_client, mocker, origin): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' expected_data = [{ 'url': origin['url'], 'metadata': { 'metadata': {'author': 'Jane Doe'}, 'from_revision': ( '7026b7c1a2af56521e951c01ed20f255fa054238'), 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1', } } }] assert rv.data == expected_data oimsft.assert_called_with(conjunction=['Jane Doe'], limit=70) @given(origin()) def test_api_origin_metadata_search_limit(api_client, mocker, origin): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=['Jane Doe'], limit=70) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 10}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=['Jane Doe'], limit=10) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 987}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=['Jane Doe'], limit=100) @given(origin()) def test_api_origin_intrinsic_metadata(api_client, mocker, origin): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') oimg = mock_idx_storage.origin_intrinsic_metadata_get oimg.side_effect = lambda origin_urls: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-origin-intrinsic-metadata', url_args={'origin_url': origin['url']}) rv = api_client.get(url) oimg.assert_called_once_with([origin['url']]) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' expected_data = {'author': 'Jane Doe'} assert rv.data == expected_data def test_api_origin_metadata_search_invalid(api_client, mocker): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') url = reverse('api-1-origin-metadata-search') rv = api_client.get(url) assert rv.status_code == 400, rv.content mock_idx_storage.assert_not_called() diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index 915e638b..cea1392f 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,334 +1,335 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os import random from copy import deepcopy from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.ctags import CtagsIndexer from swh.indexer.storage import get_indexer_storage from swh.model.hashutil import hash_to_hex, DEFAULT_ALGORITHMS from swh.model.model import Directory, Origin from swh.loader.git.from_disk import GitLoaderFromArchive from swh.search import get_search from swh.storage.algos.dir_iterators import dir_iterator from swh.web import config from swh.web.browse.utils import ( get_mimetype_and_encoding_for_content, prepare_content_for_display, _re_encode_content ) from swh.web.common import service # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { 'storage': { 'cls': 'memory', }, 'save_data': False, 'max_content_size': 100 * 1024 * 1024, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { 'storage': { 'cls': 'memory' }, 'objstorage': { 'cls': 'memory', 'args': {}, }, 'indexer_storage': { 'cls': 'memory', 'args': {}, } } def random_sha1(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) def random_sha256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_blake2s256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_content(): return { 'sha1': random_sha1(), 'sha1_git': random_sha1(), 'sha256': random_sha256(), 'blake2s256': random_blake2s256(), } # MimetypeIndexer with custom configuration for tests class _MimetypeIndexer(MimetypeIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" } } } # FossologyLicenseIndexer with custom configuration for tests class _FossologyLicenseIndexer(FossologyLicenseIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.fossology.license', 'tools': { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': { 'command_line': 'nomossa ', }, } } # CtagsIndexer with custom configuration for tests class _CtagsIndexer(CtagsIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.ctags', 'languages': {'c': 'c'}, 'tools': { 'name': 'universal-ctags', 'version': '~git7859817b', 'configuration': { 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' # noqa '''--output-format=json ''' }, } } # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { 'type': 'git', 'url': 'https://github.com/wcoder/highlightjs-line-numbers.js', 'archives': ['highlightjs-line-numbers.js.zip', 'highlightjs-line-numbers.js_visit2.zip'], 'visit_date': ['Dec 1 2018, 01:00 UTC', 'Jan 20 2019, 15:00 UTC'] }, { 'type': 'git', 'url': 'https://github.com/memononen/libtess2', 'archives': ['libtess2.zip'], 'visit_date': ['May 25 2018, 01:00 UTC'] }, { 'type': 'git', 'url': 'repo_with_submodules', 'archives': ['repo_with_submodules.tgz'], 'visit_date': ['Jan 1 2019, 01:00 UTC'] } ] _contents = {} # Tests data initialization def _init_tests_data(): # To hold reference to the memory storage storage = None # Create search instance search = get_search('memory', {}) search.initialize() search.origin_update({'url': origin['url']} for origin in _TEST_ORIGINS) # Load git repositories from archives for origin in _TEST_ORIGINS: for i, archive in enumerate(origin['archives']): origin_repo_archive = \ os.path.join(os.path.dirname(__file__), 'resources/repos/%s' % archive) loader = GitLoaderFromArchive(origin['url'], archive_path=origin_repo_archive, config=_TEST_LOADER_CONFIG, visit_date=origin['visit_date'][i]) if storage is None: storage = loader.storage else: loader.storage = storage loader.load() origin.update(storage.origin_get(origin)) # add an 'id' key if enabled search.origin_update([{'url': origin['url'], 'has_visits': True}]) for i in range(250): url = 'https://many.origins/%d' % (i+1) # storage.origin_add([{'url': url}]) storage.origin_add([Origin(url=url)]) search.origin_update([{'url': url, 'has_visits': True}]) visit = storage.origin_visit_add(url, '2019-12-03 13:55:05', 'tar') storage.origin_visit_update( url, visit.visit, + status='full', snapshot='1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') contents = set() directories = set() revisions = set() releases = set() snapshots = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: snp = storage.snapshot_get_latest(origin['url']) snapshots.add(hash_to_hex(snp['id'])) for branch_name, branch_data in snp['branches'].items(): if branch_data['target_type'] == 'revision': revisions.add(branch_data['target']) elif branch_data['target_type'] == 'release': release = next(storage.release_get([branch_data['target']])) revisions.add(release['target']) releases.add(hash_to_hex(branch_data['target'])) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): dir_id = rev['directory'] directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): content_path[entry['sha1']] = '/'.join( [hash_to_hex(dir_id), entry['path'].decode('utf-8')]) if entry['type'] == 'file': contents.add(entry['sha1']) elif entry['type'] == 'dir': directories.add(hash_to_hex(entry['target'])) # Get all checksums for each content result = storage.content_get_metadata(contents) contents = [] for sha1, contents_metadata in result.items(): for content_metadata in contents_metadata: contents.append({ algo: hash_to_hex(content_metadata[algo]) for algo in DEFAULT_ALGORITHMS }) path = content_path[sha1] cnt = next(storage.content_get([sha1])) mimetype, encoding = get_mimetype_and_encoding_for_content( cnt['data']) _, _, cnt['data'] = _re_encode_content( mimetype, encoding, cnt['data']) content_display_data = prepare_content_for_display( cnt['data'], mimetype, path) contents[-1]['path'] = path contents[-1]['mimetype'] = mimetype contents[-1]['encoding'] = encoding contents[-1]['hljs_language'] = content_display_data['language'] contents[-1]['data'] = content_display_data['content_data'] _contents[contents[-1]['sha1']] = contents[-1] # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage('memory', {}) # Add the empty directory to the test archive storage.directory_add([Directory(entries=[])]) # Return tests data return { 'search': search, 'storage': storage, 'idx_storage': idx_storage, 'origins': _TEST_ORIGINS, 'contents': contents, 'directories': list(directories), 'releases': list(releases), 'revisions': list(map(hash_to_hex, revisions)), 'snapshots': list(snapshots), 'generated_checksums': set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer), ('license_indexer', _FossologyLicenseIndexer), ('ctags_indexer', _CtagsIndexer)): idx = idx_class() idx.storage = tests_data['storage'] idx.objstorage = tests_data['storage'].objstorage idx.idx_storage = tests_data['idx_storage'] idx.register_tools(idx.config['tools']) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage, search): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update({ 'storage': storage, 'indexer_storage': idx_storage, 'search': search, }) service.storage = storage service.idx_storage = idx_storage service.search = search