diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index b302d10f..0960c701 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,373 +1,375 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from rest_framework.test import APITestCase from unittest.mock import patch from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.common.utils import reverse from swh.web.common.origin_visits import get_origin_visits from swh.web.tests.strategies import ( origin, new_origin, new_origins, visit_dates, new_snapshots ) from swh.web.tests.testcase import WebTestCase class OriginApiTestCase(WebTestCase, APITestCase): @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_error( self, mock_get_origin_visits, ): err_msg = 'voluntary error to check the bad request middleware.' mock_get_origin_visits.side_effect = ValueError(err_msg) url = reverse('api-origin-visits', url_args={'origin_id': 2}) rv = self.client.get(url) self.assertEqual(rv.status_code, 400) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'ValueError', 'reason': err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_db( self, mock_get_origin_visits): err_msg = 'Storage exploded! Will be back online shortly!' mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse('api-origin-visits', url_args={'origin_id': 2}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageDBError', 'reason': 'An unexpected error occurred in the backend: %s' % err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_api( self, mock_get_origin_visits): err_msg = 'Storage API dropped dead! Will resurrect asap!' mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse('api-origin-visits', url_args={'origin_id': 2}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageAPIError', 'reason': 'An unexpected error occurred in the api backend: %s' % err_msg }) @given(new_origin(), visit_dates(4), new_snapshots(4)) def test_api_lookup_origin_visits(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) self.storage.snapshot_add(origin_id, origin_visit['visit'], new_snapshots[i]) all_visits = list(reversed(get_origin_visits(new_origin))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-origin-visits', url_args={'origin_id': origin_id}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') for expected_visit in expected_visits: origin_visit_url = reverse( 'api-origin-visit', url_args={'origin_id': origin_id, 'visit_id': expected_visit['visit']}) snapshot_url = reverse( 'api-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin_visit_url'] = origin_visit_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visits) @given(new_origin(), visit_dates(4), new_snapshots(4)) def test_api_lookup_origin_visit(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_id = origin_visit['visit'] self.storage.snapshot_add(origin_id, origin_visit['visit'], new_snapshots[i]) url = reverse('api-origin-visit', url_args={'origin_id': origin_id, 'visit_id': visit_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_id) origin_url = reverse('api-origin', url_args={'origin_id': origin_id}) snapshot_url = reverse( 'api-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visit) @given(origin()) def test_api_lookup_origin_visit_not_found(self, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) url = reverse('api-origin-visit', url_args={'origin_id': origin['id'], 'visit_id': max_visit_id + 1}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin with id %s or its visit with id %s not found!' % (origin['id'], max_visit_id+1) }) @given(origin()) def test_api_origin_by_id(self, origin): url = reverse('api-origin', url_args={'origin_id': origin['id']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-origin-visits', url_args={'origin_id': origin['id']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(origin()) def test_api_origin_by_type_url(self, origin): url = reverse('api-origin', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-origin-visits', url_args={'origin_id': origin['id']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(new_origin()) def test_api_origin_not_found(self, new_origin): url = reverse('api-origin', url_args={'origin_type': new_origin['type'], 'origin_url': new_origin['url']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin with type %s and url %s not found!' % (new_origin['type'], new_origin['url']) }) - @patch('swh.web.common.service.idx_storage') @given(origin()) - def test_api_origin_metadata_search(self, mock_idx_storage, origin): + def test_api_origin_metadata_search(self, origin): + with patch('swh.web.common.service.idx_storage') as mock_idx_storage: + mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ + .side_effect = lambda conjunction, limit: [{ + 'from_revision': ( + b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' + b'\xf2U\xfa\x05B8'), + 'metadata': {'author': 'Jane Doe'}, + 'id': origin['id'], + 'tool': { + 'configuration': { + 'context': ['NpmMapping', 'CodemetaMapping'], + 'type': 'local' + }, + 'id': 3, + 'name': 'swh-metadata-detector', + 'version': '0.0.1' + } + }] + + url = reverse('api-origin-metadata-search', + query_params={'fulltext': 'Jane Doe'}) + rv = self.client.get(url) - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .side_effect = lambda conjunction, limit: [{ - 'from_revision': - b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed \xf2U\xfa\x05B8', - 'metadata': {'author': 'Jane Doe'}, + self.assertEqual(rv.status_code, 200, rv.content) + self.assertEqual(rv['Content-Type'], 'application/json') + expected_data = [{ 'id': origin['id'], - 'tool': { - 'configuration': { - 'context': ['NpmMapping', 'CodemetaMapping'], - 'type': 'local' - }, - 'id': 3, - 'name': 'swh-metadata-detector', - 'version': '0.0.1' + 'type': origin['type'], + 'url': origin['url'], + 'metadata': { + 'metadata': {'author': 'Jane Doe'}, + 'from_revision': ( + '7026b7c1a2af56521e951c01ed20f255fa054238'), + 'tool': { + 'configuration': { + 'context': ['NpmMapping', 'CodemetaMapping'], + 'type': 'local' + }, + 'id': 3, + 'name': 'swh-metadata-detector', + 'version': '0.0.1', + } } }] + self.assertEqual(rv.data, expected_data) + mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ + .assert_called_with(conjunction=['Jane Doe'], limit=70) - url = reverse('api-origin-metadata-search', - query_params={'fulltext': 'Jane Doe'}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.content) - self.assertEqual(rv['Content-Type'], 'application/json') - expected_data = [{ - 'id': origin['id'], - 'type': origin['type'], - 'url': origin['url'], - 'metadata': { - 'metadata': {'author': 'Jane Doe'}, - 'from_revision': '7026b7c1a2af56521e951c01ed20f255fa054238', - 'tool': { - 'configuration': { - 'context': ['NpmMapping', 'CodemetaMapping'], - 'type': 'local' - }, - 'id': 3, - 'name': 'swh-metadata-detector', - 'version': '0.0.1', - } - } - }] - self.assertEqual(rv.data, expected_data) - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .assert_called_with(conjunction=['Jane Doe'], limit=70) - - @patch('swh.web.common.service.idx_storage') @given(origin()) - def test_api_origin_metadata_search_limit(self, mock_idx_storage, origin): - - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .side_effect = lambda conjunction, limit: [{ - 'from_revision': - b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed \xf2U\xfa\x05B8', - 'metadata': {'author': 'Jane Doe'}, - 'id': origin['id'], - 'tool': { - 'configuration': { - 'context': ['NpmMapping', 'CodemetaMapping'], - 'type': 'local' - }, - 'id': 3, - 'name': 'swh-metadata-detector', - 'version': '0.0.1' - } - }] - - url = reverse('api-origin-metadata-search', - query_params={'fulltext': 'Jane Doe'}) - rv = self.client.get(url) + def test_api_origin_metadata_search_limit(self, origin): + + with patch('swh.web.common.service.idx_storage') as mock_idx_storage: + mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ + .side_effect = lambda conjunction, limit: [{ + 'from_revision': ( + b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' + b'\xf2U\xfa\x05B8'), + 'metadata': {'author': 'Jane Doe'}, + 'id': origin['id'], + 'tool': { + 'configuration': { + 'context': ['NpmMapping', 'CodemetaMapping'], + 'type': 'local' + }, + 'id': 3, + 'name': 'swh-metadata-detector', + 'version': '0.0.1' + } + }] + + url = reverse('api-origin-metadata-search', + query_params={'fulltext': 'Jane Doe'}) + rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.content) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(len(rv.data), 1) - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .assert_called_with(conjunction=['Jane Doe'], limit=70) + self.assertEqual(rv.status_code, 200, rv.content) + self.assertEqual(rv['Content-Type'], 'application/json') + self.assertEqual(len(rv.data), 1) + mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ + .assert_called_with(conjunction=['Jane Doe'], limit=70) - url = reverse('api-origin-metadata-search', - query_params={'fulltext': 'Jane Doe', - 'limit': 10}) - rv = self.client.get(url) + url = reverse('api-origin-metadata-search', + query_params={'fulltext': 'Jane Doe', + 'limit': 10}) + rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.content) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(len(rv.data), 1) - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .assert_called_with(conjunction=['Jane Doe'], limit=10) + self.assertEqual(rv.status_code, 200, rv.content) + self.assertEqual(rv['Content-Type'], 'application/json') + self.assertEqual(len(rv.data), 1) + mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ + .assert_called_with(conjunction=['Jane Doe'], limit=10) - url = reverse('api-origin-metadata-search', - query_params={'fulltext': 'Jane Doe', - 'limit': 987}) - rv = self.client.get(url) + url = reverse('api-origin-metadata-search', + query_params={'fulltext': 'Jane Doe', + 'limit': 987}) + rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.content) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(len(rv.data), 1) - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .assert_called_with(conjunction=['Jane Doe'], limit=100) + self.assertEqual(rv.status_code, 200, rv.content) + self.assertEqual(rv['Content-Type'], 'application/json') + self.assertEqual(len(rv.data), 1) + mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ + .assert_called_with(conjunction=['Jane Doe'], limit=100) @patch('swh.web.common.service.idx_storage') def test_api_origin_metadata_search_invalid(self, mock_idx_storage): url = reverse('api-origin-metadata-search') rv = self.client.get(url) self.assertEqual(rv.status_code, 400, rv.content) mock_idx_storage.assert_not_called() @given(new_origins(20)) def test_api_lookup_origins(self, new_origins): nb_origins = len(new_origins) expected_origins = self.storage.origin_add(new_origins) origin_from_idx = random.randint(1, nb_origins-1) - 1 origin_from = expected_origins[origin_from_idx]['id'] max_origin_id = expected_origins[-1]['id'] origin_count = random.randint(1, max_origin_id - origin_from) url = reverse('api-origins', query_params={'origin_from': origin_from, 'origin_count': origin_count}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) start = origin_from_idx end = origin_from_idx + origin_count expected_origins = expected_origins[start:end] for expected_origin in expected_origins: expected_origin['origin_visits_url'] = reverse( 'api-origin-visits', url_args={'origin_id': expected_origin['id']}) self.assertEqual(rv.data, expected_origins) next_origin_id = expected_origins[-1]['id']+1 if self.storage.origin_get({'id': next_origin_id}): self.assertIn('Link', rv) next_url = reverse('api-origins', query_params={'origin_from': next_origin_id, 'origin_count': origin_count}) self.assertIn(next_url, rv['Link']) diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index 499a78fb..f54351f9 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,284 +1,304 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +from copy import deepcopy import os import time from swh.indexer.language import LanguageIndexer from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.ctags import CtagsIndexer from swh.indexer.storage import get_indexer_storage from swh.model.hashutil import hash_to_hex, hash_to_bytes, DEFAULT_ALGORITHMS from swh.model.identifiers import directory_identifier from swh.loader.git.from_disk import GitLoaderFromArchive from swh.storage.algos.dir_iterators import dir_iterator from swh.web.browse.utils import ( get_mimetype_and_encoding_for_content, prepare_content_for_display ) # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { 'storage': { 'cls': 'memory', 'args': {} }, 'send_contents': True, 'send_directories': True, 'send_revisions': True, 'send_releases': True, 'send_snapshot': True, 'content_size_limit': 100 * 1024 * 1024, 'content_packet_size': 10, 'content_packet_size_bytes': 100 * 1024 * 1024, 'directory_packet_size': 10, 'revision_packet_size': 10, 'release_packet_size': 10, 'save_data': False, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { 'storage': { 'cls': 'memory', 'args': {}, }, 'objstorage': { 'cls': 'memory', 'args': {}, }, 'indexer_storage': { 'cls': 'memory', 'args': {}, } } # MimetypeIndexer with custom configuration for tests class _MimetypeIndexer(MimetypeIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" } } } # LanguageIndexer with custom configuration for tests class _LanguageIndexer(LanguageIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'tools': { 'name': 'pygments', 'version': '2.0.1+dfsg-1.1+deb8u1', 'configuration': { 'type': 'library', 'debian-package': 'python3-pygments', 'max_content_size': 10240, } } } # FossologyLicenseIndexer with custom configuration for tests class _FossologyLicenseIndexer(FossologyLicenseIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.fossology.license', 'tools': { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': { 'command_line': 'nomossa ', }, } } # CtagsIndexer with custom configuration for tests class _CtagsIndexer(CtagsIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.ctags', 'languages': {'c': 'c'}, 'tools': { 'name': 'universal-ctags', 'version': '~git7859817b', 'configuration': { 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' # noqa '''--output-format=json ''' }, } } # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { 'id': 1, 'type': 'git', 'url': 'https://github.com/wcoder/highlightjs-line-numbers.js', 'archives': ['highlightjs-line-numbers.js.zip', 'highlightjs-line-numbers.js_visit2.zip'] }, { 'id': 2, 'type': 'git', 'url': 'https://github.com/memononen/libtess2', 'archives': ['libtess2.zip'] }, { 'id': 3, 'type': 'git', 'url': 'repo_with_submodules', 'archives': ['repo_with_submodules.tgz'] } ] _contents = {} # Tests data initialization def _init_tests_data(): # Load git repositories from archives loader = GitLoaderFromArchive(config=_TEST_LOADER_CONFIG) + + # Get reference to the memory storage + storage = loader.storage + for origin in _TEST_ORIGINS: nb_visits = len(origin['archives']) for i, archive in enumerate(origin['archives']): origin_repo_archive = \ os.path.join(os.path.dirname(__file__), 'resources/repos/%s' % archive) loader.load(origin['url'], origin_repo_archive, None) if nb_visits > 1 and i != nb_visits - 1: time.sleep(1) - # Get reference to the memory storage - storage = loader.storage - contents = set() directories = set() revisions = set() releases = set() snapshots = set() persons = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: snp = storage.snapshot_get_latest(origin['id']) snapshots.add(hash_to_hex(snp['id'])) for branch_name, branch_data in snp['branches'].items(): if branch_data['target_type'] == 'revision': revisions.add(branch_data['target']) elif branch_data['target_type'] == 'release': release = next(storage.release_get([branch_data['target']])) revisions.add(release['target']) releases.add(hash_to_hex(branch_data['target'])) persons.add(release['author']['id']) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): dir_id = rev['directory'] persons.add(rev['author']['id']) persons.add(rev['committer']['id']) directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): content_path[entry['sha1']] = '/'.join( [hash_to_hex(dir_id), entry['path'].decode('utf-8')]) if entry['type'] == 'file': contents.add(entry['sha1']) elif entry['type'] == 'dir': directories.add(hash_to_hex(entry['target'])) # Get all checksums for each content contents_metadata = storage.content_get_metadata(contents) contents = [] for content_metadata in contents_metadata: contents.append({ algo: hash_to_hex(content_metadata[algo]) for algo in DEFAULT_ALGORITHMS }) path = content_path[content_metadata['sha1']] cnt = next(storage.content_get([content_metadata['sha1']])) mimetype, encoding = get_mimetype_and_encoding_for_content(cnt['data']) content_display_data = prepare_content_for_display( cnt['data'], mimetype, path) contents[-1]['path'] = path contents[-1]['mimetype'] = mimetype contents[-1]['encoding'] = encoding contents[-1]['hljs_language'] = content_display_data['language'] contents[-1]['data'] = content_display_data['content_data'] _contents[contents[-1]['sha1']] = contents[-1] # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage('memory', {}) - # Instantiate content indexers that will be used in tests - # and force them to use the memory storages - indexers = {} - for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer), - ('language_indexer', _LanguageIndexer), - ('license_indexer', _FossologyLicenseIndexer), - ('ctags_indexer', _CtagsIndexer)): - idx = idx_class() - idx.storage = storage - idx.objstorage = storage.objstorage - idx.idx_storage = idx_storage - idx.register_tools(idx.config['tools']) - indexers[idx_name] = idx - # Add the empty directory to the test archive empty_dir_id = directory_identifier({'entries': []}) empty_dir_id_bin = hash_to_bytes(empty_dir_id) storage.directory_add([{'id': empty_dir_id_bin, 'entries': []}]) # Return tests data return { 'storage': storage, 'idx_storage': idx_storage, - **indexers, 'origins': _TEST_ORIGINS, 'contents': contents, 'directories': list(directories), 'persons': list(persons), 'releases': list(releases), 'revisions': list(map(hash_to_hex, revisions)), - 'snapshots': list(snapshots) + 'snapshots': list(snapshots), + 'generated_checksums': set(), } +def _init_indexers(tests_data): + # Instantiate content indexers that will be used in tests + # and force them to use the memory storages + indexers = {} + for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer), + ('language_indexer', _LanguageIndexer), + ('license_indexer', _FossologyLicenseIndexer), + ('ctags_indexer', _CtagsIndexer)): + idx = idx_class() + idx.storage = tests_data['storage'] + idx.objstorage = tests_data['storage'].objstorage + idx.idx_storage = tests_data['idx_storage'] + idx.register_tools(idx.config['tools']) + indexers[idx_name] = idx + + return indexers + + def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None +_current_tests_data = None +_indexer_loggers = {} -def get_tests_data(): +def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ - global _tests_data + global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() - return _tests_data + indexers = _init_indexers(_tests_data) + for (name, idx) in indexers.items(): + # pytest makes the loggers use a temporary file; and deepcopy + # requires serializability. So we remove them, and add them + # back after the copy. + _indexer_loggers[name] = idx.log + del idx.log + _tests_data.update(indexers) + if reset or _current_tests_data is None: + _current_tests_data = deepcopy(_tests_data) + for (name, logger) in _indexer_loggers.items(): + _current_tests_data[name].log = logger + return _current_tests_data diff --git a/swh/web/tests/strategies.py b/swh/web/tests/strategies.py index 6505cd48..eff83f94 100644 --- a/swh/web/tests/strategies.py +++ b/swh/web/tests/strategies.py @@ -1,532 +1,536 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from collections import defaultdict from datetime import datetime from hypothesis import settings, assume from hypothesis.strategies import ( just, sampled_from, lists, composite, datetimes, integers, binary, text, characters ) from swh.model.hashutil import hash_to_hex, hash_to_bytes from swh.model.identifiers import directory_identifier from swh.storage.algos.revisions_walker import get_revisions_walker from swh.model.hypothesis_strategies import ( origins as new_origin_strategy, snapshots as new_snapshot ) from swh.web.tests.data import get_tests_data # Module dedicated to the generation of input data for tests through # the use of hypothesis. # Some of these data are sampled from a test archive created and populated # in the swh.web.tests.data module. # Set the swh-web hypothesis profile if none has been explicitly set hypothesis_default_settings = settings.get_profile('default') if repr(settings()) == repr(hypothesis_default_settings): settings.load_profile('swh-web') -# Import tests data -tests_data = get_tests_data() -storage = tests_data['storage'] - # The following strategies exploit the hypothesis capabilities -_generated_checksums = set() - def _filter_checksum(cs): + generated_checksums = get_tests_data()['generated_checksums'] if not int.from_bytes(cs, byteorder='little') or \ - cs in _generated_checksums: + cs in generated_checksums: return False - _generated_checksums.add(cs) + generated_checksums.add(cs) return True def _known_swh_object(object_type): - return sampled_from(tests_data[object_type]) + return sampled_from(get_tests_data()[object_type]) def sha1(): """ Hypothesis strategy returning a valid hexadecimal sha1 value. """ return binary( min_size=20, max_size=20).filter(_filter_checksum).map(hash_to_hex) def invalid_sha1(): """ Hypothesis strategy returning an invalid sha1 representation. """ return binary( min_size=50, max_size=50).filter(_filter_checksum).map(hash_to_hex) def sha256(): """ Hypothesis strategy returning a valid hexadecimal sha256 value. """ return binary( min_size=32, max_size=32).filter(_filter_checksum).map(hash_to_hex) def content(): """ Hypothesis strategy returning a random content ingested into the test archive. """ return _known_swh_object('contents') def contents(): """ Hypothesis strategy returning random contents ingested into the test archive. """ return lists(content(), min_size=2, max_size=8) def content_text(): """ Hypothesis strategy returning random textual contents ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/')) def content_text_non_utf8(): """ Hypothesis strategy returning random textual contents not encoded to UTF-8 ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/') and c['encoding'] not in ('utf-8', 'us-ascii')) def content_text_no_highlight(): """ Hypothesis strategy returning random textual contents with no detected programming language to highlight ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/') and c['hljs_language'] == 'nohighlight') def content_image_type(): """ Hypothesis strategy returning random image contents ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('image/')) @composite def new_content(draw): blake2s256_hex = draw(sha256()) sha1_hex = draw(sha1()) sha1_git_hex = draw(sha1()) sha256_hex = draw(sha256()) assume(sha1_hex != sha1_git_hex) assume(blake2s256_hex != sha256_hex) return { 'blake2S256': blake2s256_hex, 'sha1': sha1_hex, 'sha1_git': sha1_git_hex, 'sha256': sha256_hex } def unknown_content(): """ Hypothesis strategy returning a random content not ingested into the test archive. """ return new_content().filter( - lambda c: next(storage.content_get( - [hash_to_bytes(c['sha1'])])) is None) + lambda c: next(get_tests_data()['storage'].content_get( + [hash_to_bytes(c['sha1'])])) is None) def unknown_contents(): """ Hypothesis strategy returning random contents not ingested into the test archive. """ return lists(unknown_content(), min_size=2, max_size=8) def directory(): """ Hypothesis strategy returning a random directory ingested into the test archive. """ return _known_swh_object('directories') def directory_with_subdirs(): """ Hypothesis strategy returning a random directory containing sub directories ingested into the test archive. """ + storage = get_tests_data()['storage'] return directory().filter( lambda d: any([e['type'] == 'dir' for e in list(storage.directory_ls(hash_to_bytes(d)))])) def empty_directory(): """ Hypothesis strategy returning the empty directory ingested into the test archive. """ return just(directory_identifier({'entries': []})) def unknown_directory(): """ Hypothesis strategy returning a random directory not ingested into the test archive. """ + storage = get_tests_data()['storage'] return sha1().filter( lambda s: len(list(storage.directory_missing([hash_to_bytes(s)]))) > 0) def origin(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ return _known_swh_object('origins') def origin_with_multiple_visits(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] + tests_data = get_tests_data() for origin in tests_data['origins']: - visits = list(storage.origin_visit_get(origin['id'])) + visits = list(tests_data['storage'].origin_visit_get(origin['id'])) if len(visits) > 1: ret.append(origin) return sampled_from(ret) def origin_with_release(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] + tests_data = get_tests_data() for origin in tests_data['origins']: - snapshot = storage.snapshot_get_latest(origin['id']) + snapshot = tests_data['storage'].snapshot_get_latest(origin['id']) if any([b['target_type'] == 'release' for b in snapshot['branches'].values()]): ret.append(origin) return sampled_from(ret) def unknown_origin_id(): """ Hypothesis strategy returning a random origin id not ingested into the test archive. """ return integers(min_value=1000000) def new_origin(): """ Hypothesis strategy returning a random origin not ingested into the test archive. """ + storage = get_tests_data()['storage'] return new_origin_strategy().map(lambda origin: origin.to_dict()).filter( lambda origin: storage.origin_get([origin])[0] is None) def new_origins(nb_origins=None): """ Hypothesis strategy returning random origins not ingested into the test archive. """ min_size = nb_origins if nb_origins is not None else 2 max_size = nb_origins if nb_origins is not None else 8 size = random.randint(min_size, max_size) return lists(new_origin(), min_size=size, max_size=size, unique_by=lambda o: tuple(sorted(o.items()))) def visit_dates(nb_dates=None): """ Hypothesis strategy returning a list of visit dates. """ min_size = nb_dates if nb_dates else 2 max_size = nb_dates if nb_dates else 8 return lists(datetimes(min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0)), min_size=min_size, max_size=max_size, unique=True).map(sorted) def release(): """ Hypothesis strategy returning a random release ingested into the test archive. """ return _known_swh_object('releases') def unknown_release(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( - lambda s: next(storage.release_get([s])) is None) + lambda s: next(get_tests_data()['storage'].release_get([s])) is None) def revision(): """ Hypothesis strategy returning a random revision ingested into the test archive. """ return _known_swh_object('revisions') def unknown_revision(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ + storage = get_tests_data()['storage'] return sha1().filter( lambda s: next(storage.revision_get([hash_to_bytes(s)])) is None) @composite def new_person(draw): """ Hypothesis strategy returning random raw swh person data. """ name = draw(text(min_size=5, max_size=30, alphabet=characters(min_codepoint=0, max_codepoint=255))) email = '%s@company.org' % name return { 'name': name.encode(), 'email': email.encode(), 'fullname': ('%s <%s>' % (name, email)).encode() } @composite def new_swh_date(draw): """ Hypothesis strategy returning random raw swh date data. """ timestamp = draw( datetimes(min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0)).map( lambda d: int(d.timestamp()))) return { 'timestamp': timestamp, 'offset': 0, 'negative_utc': False, } @composite def new_revision(draw): """ Hypothesis strategy returning random raw swh revision data not ingested into the test archive. """ return { 'id': draw(unknown_revision().map(hash_to_bytes)), 'directory': draw(sha1().map(hash_to_bytes)), 'author': draw(new_person()), 'committer': draw(new_person()), 'message': draw( text(min_size=20, max_size=100).map(lambda t: t.encode())), 'date': draw(new_swh_date()), 'committer_date': draw(new_swh_date()), 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } def revisions(): """ Hypothesis strategy returning random revisions ingested into the test archive. """ return lists(revision(), min_size=2, max_size=8) def unknown_revisions(): """ Hypothesis strategy returning random revisions not ingested into the test archive. """ return lists(unknown_revision(), min_size=2, max_size=8) def snapshot(): """ Hypothesis strategy returning a random snapshot ingested into the test archive. """ return _known_swh_object('snapshots') def new_snapshots(nb_snapshots=None): min_size = nb_snapshots if nb_snapshots else 2 max_size = nb_snapshots if nb_snapshots else 8 return lists(new_snapshot(min_size=2, max_size=10, only_objects=True) .map(lambda snp: snp.to_dict()), min_size=min_size, max_size=max_size) def unknown_snapshot(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ + storage = get_tests_data()['storage'] return sha1().filter( lambda s: storage.snapshot_get(hash_to_bytes(s)) is None) def person(): """ Hypothesis strategy returning a random person ingested into the test archive. """ return _known_swh_object('persons') def unknown_person(): """ Hypothesis strategy returning a random person not ingested into the test archive. """ return integers(min_value=1000000) def _get_origin_dfs_revisions_walker(): + tests_data = get_tests_data() + storage = tests_data['storage'] origin = random.choice(tests_data['origins'][:-1]) snapshot = storage.snapshot_get_latest(origin['id']) head = snapshot['branches'][b'HEAD']['target'] return get_revisions_walker('dfs', storage, head) def ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with an ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev['parents']: children[rev_p].append(rev['id']) if not init_rev_found: master_revisions.append(rev) if not rev['parents']: init_rev_found = True # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions)-1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev['id']] return just({ 'sha1_git_root': hash_to_hex(root_rev['id']), 'sha1_git': hash_to_hex(ancestor_rev['id']), 'children': [hash_to_hex(r) for r in ancestor_child_revs] }) def non_ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with no ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev['parents']) > 1: merge_revs.append(rev) for rev_p in rev['parents']: children[rev_p].append(rev['id']) # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev['parents']): selected_revs = merge_rev['parents'] return just({ 'sha1_git_root': hash_to_hex(selected_revs[0]), 'sha1_git': hash_to_hex(selected_revs[1]) }) # The following strategies returns data specific to some tests # that can not be generated and thus are hardcoded. def contents_with_ctags(): """ Hypothesis strategy returning contents ingested into the test archive. Those contents are ctags compatible, that is running ctags on those lay results. """ return just({ 'sha1s': ['0ab37c02043ebff946c1937523f60aadd0844351', '15554cf7608dde6bfefac7e3d525596343a85b6f', '2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd', '30acd0b47fc25e159e27a980102ddb1c4bea0b95', '4f81f05aaea3efb981f9d90144f746d6b682285b', '5153aa4b6e4455a62525bc4de38ed0ff6e7dd682', '59d08bafa6a749110dfb65ba43a61963d5a5bf9f', '7568285b2d7f31ae483ae71617bd3db873deaa2c', '7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4', '8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03', '9b3557f1ab4111c8607a4f2ea3c1e53c6992916c', '9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd', 'c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b', 'e89e55a12def4cd54d5bff58378a3b5119878eb7', 'e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e', 'eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5'], 'symbol_name': 'ABS' }) def revision_with_submodules(): """ Hypothesis strategy returning a revision that is known to point to a directory with revision entries (aka git submodule) """ return just({ 'rev_sha1_git': 'ffcb69001f3f6745dfd5b48f72ab6addb560e234', 'rev_dir_sha1_git': 'd92a21446387fa28410e5a74379c934298f39ae2', 'rev_dir_rev_path': 'libtess2' }) diff --git a/swh/web/tests/testcase.py b/swh/web/tests/testcase.py index 73e36f46..d15bb62e 100644 --- a/swh/web/tests/testcase.py +++ b/swh/web/tests/testcase.py @@ -1,183 +1,161 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import shutil from subprocess import run, PIPE from django.core.cache import cache from hypothesis.extra.django import TestCase from swh.model.hashutil import hash_to_bytes from swh.web import config from swh.web.common import converters, service from swh.web.tests.data import get_tests_data ctags_json_missing = \ shutil.which('ctags') is None or \ b'+json' not in run(['ctags', '--version'], stdout=PIPE).stdout fossology_missing = shutil.which('nomossa') is None class WebTestCase(TestCase): """Base TestCase class for swh-web. It is initialized with references to in-memory storages containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ - @classmethod - def setUpClass(cls): - super().setUpClass() - tests_data = get_tests_data() - cls.storage = tests_data['storage'] - cls.idx_storage = tests_data['idx_storage'] - cls.mimetype_indexer = tests_data['mimetype_indexer'] - cls.language_indexer = tests_data['language_indexer'] - cls.license_indexer = tests_data['license_indexer'] - cls.ctags_indexer = tests_data['ctags_indexer'] + def _pre_setup(self): + cache.clear() + + tests_data = get_tests_data(reset=True) + self.storage = tests_data['storage'] + self.idx_storage = tests_data['idx_storage'] + self.mimetype_indexer = tests_data['mimetype_indexer'] + self.language_indexer = tests_data['language_indexer'] + self.license_indexer = tests_data['license_indexer'] + self.ctags_indexer = tests_data['ctags_indexer'] # Update swh-web configuration to use the in-memory storage # instantiated in the tests.data module swh_config = config.get_config() - swh_config.update({'storage': cls.storage}) - service.storage = cls.storage + swh_config.update({'storage': self.storage}) + service.storage = self.storage # Update swh-web configuration to use the in-memory indexer storage # instantiated in the tests.data modules - swh_config.update({'indexer_storage': cls.idx_storage}) - service.idx_storage = cls.idx_storage + swh_config.update({'indexer_storage': self.idx_storage}) + service.idx_storage = self.idx_storage - @classmethod - def content_add_mimetype(cls, cnt_id): - cls.mimetype_indexer.run([hash_to_bytes(cnt_id)], - 'update-dups') + super()._pre_setup() + + def content_add_mimetype(self, cnt_id): + self.mimetype_indexer.run([hash_to_bytes(cnt_id)], + 'update-dups') - @classmethod - def content_get_mimetype(cls, cnt_id): - mimetype = next(cls.idx_storage.content_mimetype_get( + def content_get_mimetype(self, cnt_id): + mimetype = next(self.idx_storage.content_mimetype_get( [hash_to_bytes(cnt_id)])) return converters.from_filetype(mimetype) - @classmethod - def content_add_language(cls, cnt_id): - cls.language_indexer.run([hash_to_bytes(cnt_id)], - 'update-dups') + def content_add_language(self, cnt_id): + self.language_indexer.run([hash_to_bytes(cnt_id)], + 'update-dups') - @classmethod - def content_get_language(cls, cnt_id): - lang = next(cls.idx_storage.content_language_get( + def content_get_language(self, cnt_id): + lang = next(self.idx_storage.content_language_get( [hash_to_bytes(cnt_id)])) return converters.from_swh(lang, hashess={'id'}) - @classmethod - def content_add_license(cls, cnt_id): - cls.license_indexer.run([hash_to_bytes(cnt_id)], - 'update-dups') + def content_add_license(self, cnt_id): + self.license_indexer.run([hash_to_bytes(cnt_id)], + 'update-dups') - @classmethod - def content_get_license(cls, cnt_id): + def content_get_license(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) - lic = next(cls.idx_storage.content_fossology_license_get( + lic = next(self.idx_storage.content_fossology_license_get( [cnt_id_bytes])) return converters.from_swh({'id': cnt_id_bytes, 'facts': lic[cnt_id_bytes]}, hashess={'id'}) - @classmethod - def content_add_ctags(cls, cnt_id): - cls.ctags_indexer.run([hash_to_bytes(cnt_id)], - 'update-dups') + def content_add_ctags(self, cnt_id): + self.ctags_indexer.run([hash_to_bytes(cnt_id)], + 'update-dups') - @classmethod - def content_get_ctags(cls, cnt_id): + def content_get_ctags(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) - ctags = cls.idx_storage.content_ctags_get([cnt_id_bytes]) + ctags = self.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) - @classmethod - def content_get_metadata(cls, cnt_id): + def content_get_metadata(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) - metadata = next(cls.storage.content_get_metadata([cnt_id_bytes])) + metadata = next(self.storage.content_get_metadata([cnt_id_bytes])) return converters.from_swh(metadata, hashess={'sha1', 'sha1_git', 'sha256', 'blake2s256'}) - @classmethod - def content_get(cls, cnt_id): + def content_get(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) - cnt = next(cls.storage.content_get([cnt_id_bytes])) + cnt = next(self.storage.content_get([cnt_id_bytes])) return converters.from_content(cnt) - @classmethod - def directory_ls(cls, dir_id): + def directory_ls(self, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map(converters.from_directory_entry, - cls.storage.directory_ls(cnt_id_bytes)) + self.storage.directory_ls(cnt_id_bytes)) return list(dir_content) - @classmethod - def release_get(cls, rel_id): + def release_get(self, rel_id): rel_id_bytes = hash_to_bytes(rel_id) - rel_data = next(cls.storage.release_get([rel_id_bytes])) + rel_data = next(self.storage.release_get([rel_id_bytes])) return converters.from_release(rel_data) - @classmethod - def revision_get(cls, rev_id): + def revision_get(self, rev_id): rev_id_bytes = hash_to_bytes(rev_id) - rev_data = next(cls.storage.revision_get([rev_id_bytes])) + rev_data = next(self.storage.revision_get([rev_id_bytes])) return converters.from_revision(rev_data) - @classmethod - def revision_log(cls, rev_id, limit=None): + def revision_log(self, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list(map(converters.from_revision, - cls.storage.revision_log([rev_id_bytes], limit=limit))) + self.storage.revision_log([rev_id_bytes], limit=limit))) - @classmethod - def snapshot_get_latest(cls, origin_id): - snp = cls.storage.snapshot_get_latest(origin_id) + def snapshot_get_latest(self, origin_id): + snp = self.storage.snapshot_get_latest(origin_id) return converters.from_snapshot(snp) - @classmethod - def origin_get(cls, origin_info): - origin = cls.storage.origin_get(origin_info) + def origin_get(self, origin_info): + origin = self.storage.origin_get(origin_info) return converters.from_origin(origin) - @classmethod - def origin_visit_get(cls, origin_id): - visits = cls.storage.origin_visit_get(origin_id) + def origin_visit_get(self, origin_id): + visits = self.storage.origin_visit_get(origin_id) return list(map(converters.from_origin_visit, visits)) - @classmethod - def origin_visit_get_by(cls, origin_id, visit_id): - visit = cls.storage.origin_visit_get_by(origin_id, visit_id) + def origin_visit_get_by(self, origin_id, visit_id): + visit = self.storage.origin_visit_get_by(origin_id, visit_id) return converters.from_origin_visit(visit) - @classmethod - def snapshot_get(cls, snapshot_id): - snp = cls.storage.snapshot_get(hash_to_bytes(snapshot_id)) + def snapshot_get(self, snapshot_id): + snp = self.storage.snapshot_get(hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp) - @classmethod - def snapshot_get_branches(cls, snapshot_id, branches_from='', + def snapshot_get_branches(self, snapshot_id, branches_from='', branches_count=1000, target_types=None): - snp = cls.storage.snapshot_get_branches(hash_to_bytes(snapshot_id), - branches_from.encode(), - branches_count, target_types) + snp = self.storage.snapshot_get_branches( + hash_to_bytes(snapshot_id), branches_from.encode(), + branches_count, target_types) return converters.from_snapshot(snp) - @classmethod - def person_get(cls, person_id): - person = next(cls.storage.person_get([person_id])) + def person_get(self, person_id): + person = next(self.storage.person_get([person_id])) return converters.from_person(person) - - def setUp(self): - cache.clear()