diff --git a/swh/web/tests/api/views/test_content.py b/swh/web/tests/api/views/test_content.py index d9c62a5a..7ac1051f 100644 --- a/swh/web/tests/api/views/test_content.py +++ b/swh/web/tests/api/views/test_content.py @@ -1,390 +1,391 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from hypothesis import given from rest_framework.test import APITestCase from swh.web.common.utils import reverse +from swh.web.tests.data import random_content from swh.web.tests.strategies import ( - content, unknown_content, contents_with_ctags + content, contents_with_ctags ) from swh.web.tests.testcase import ( WebTestCase, ctags_json_missing, fossology_missing ) class ContentApiTestCase(WebTestCase, APITestCase): @given(content()) def test_api_content_filetype(self, content): self.content_add_mimetype(content['sha1']) url = reverse('api-content-filetype', url_args={'q': 'sha1_git:%s' % content['sha1_git']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') content_url = reverse('api-content', url_args={'q': 'sha1:%s' % content['sha1']}) expected_data = self.content_get_mimetype(content['sha1']) expected_data['content_url'] = content_url self.assertEqual(rv.data, expected_data) - @given(unknown_content()) - def test_api_content_filetype_sha_not_found(self, unknown_content): + def test_api_content_filetype_sha_not_found(self): + unknown_content_ = random_content() url = reverse('api-content-filetype', - url_args={'q': 'sha1:%s' % unknown_content['sha1']}) + url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No filetype information found for content ' - 'sha1:%s.' % unknown_content['sha1'] + 'sha1:%s.' % unknown_content_['sha1'] }) @pytest.mark.xfail # Language indexer is disabled @given(content()) def test_api_content_language(self, content): self.content_add_language(content['sha1']) url = reverse('api-content-language', url_args={'q': 'sha1_git:%s' % content['sha1_git']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') content_url = reverse('api-content', url_args={'q': 'sha1:%s' % content['sha1']}) expected_data = self.content_get_language(content['sha1']) expected_data['content_url'] = content_url self.assertEqual(rv.data, expected_data) - @given(unknown_content()) - def test_api_content_language_sha_not_found(self, unknown_content): + def test_api_content_language_sha_not_found(self): + unknown_content_ = random_content() url = reverse('api-content-language', - url_args={'q': 'sha1:%s' % unknown_content['sha1']}) + url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No language information found for content ' - 'sha1:%s.' % unknown_content['sha1'] + 'sha1:%s.' % unknown_content_['sha1'] }) @pytest.mark.xfail # Language indexer is disabled @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(contents_with_ctags()) def test_api_content_symbol(self, contents_with_ctags): expected_data = {} for content_sha1 in contents_with_ctags['sha1s']: self.content_add_ctags(content_sha1) for ctag in self.content_get_ctags(content_sha1): if ctag['name'] == contents_with_ctags['symbol_name']: expected_data[content_sha1] = ctag break url = reverse('api-content-symbol', url_args={'q': contents_with_ctags['symbol_name']}, query_params={'per_page': 100}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') for entry in rv.data: content_sha1 = entry['sha1'] expected_entry = expected_data[content_sha1] for key, view_name in (('content_url', 'api-content'), ('data_url', 'api-content-raw'), ('license_url', 'api-content-license'), ('language_url', 'api-content-language'), ('filetype_url', 'api-content-filetype')): expected_entry[key] = reverse(view_name, url_args={'q': 'sha1:%s' % content_sha1}) expected_entry['sha1'] = content_sha1 del expected_entry['id'] self.assertEqual(entry, expected_entry) self.assertFalse('Link' in rv) url = reverse('api-content-symbol', url_args={'q': contents_with_ctags['symbol_name']}, query_params={'per_page': 2}) rv = self.client.get(url) next_url = reverse('api-content-symbol', url_args={'q': contents_with_ctags['symbol_name']}, query_params={'last_sha1': rv.data[1]['sha1'], 'per_page': 2}) self.assertEqual(rv['Link'], '<%s>; rel="next"' % next_url) def test_api_content_symbol_not_found(self): url = reverse('api-content-symbol', url_args={'q': 'bar'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No indexed raw content match expression \'bar\'.' }) self.assertFalse('Link' in rv) @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(content()) def test_api_content_ctags(self, content): self.content_add_ctags(content['sha1']) url = reverse('api-content-ctags', url_args={'q': 'sha1_git:%s' % content['sha1_git']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') content_url = reverse('api-content', url_args={'q': 'sha1:%s' % content['sha1']}) expected_data = list(self.content_get_ctags(content['sha1'])) for e in expected_data: e['content_url'] = content_url self.assertEqual(rv.data, expected_data) @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_api_content_license(self, content): self.content_add_license(content['sha1']) url = reverse('api-content-license', url_args={'q': 'sha1_git:%s' % content['sha1_git']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') content_url = reverse('api-content', url_args={'q': 'sha1:%s' % content['sha1']}) expected_data = self.content_get_license(content['sha1']) expected_data['content_url'] = content_url self.assertEqual(rv.data, expected_data) - @given(unknown_content()) - def test_api_content_license_sha_not_found(self, unknown_content): + def test_api_content_license_sha_not_found(self): + unknown_content_ = random_content() url = reverse('api-content-license', - url_args={'q': 'sha1:%s' % unknown_content['sha1']}) + url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No license information found for content ' - 'sha1:%s.' % unknown_content['sha1'] + 'sha1:%s.' % unknown_content_['sha1'] }) @given(content()) def test_api_content_metadata(self, content): url = reverse('api-content', {'q': 'sha1:%s' % content['sha1']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = self.content_get_metadata(content['sha1']) for key, view_name in (('data_url', 'api-content-raw'), ('license_url', 'api-content-license'), ('language_url', 'api-content-language'), ('filetype_url', 'api-content-filetype')): expected_data[key] = reverse(view_name, url_args={'q': 'sha1:%s' % content['sha1']}) self.assertEqual(rv.data, expected_data) - @given(unknown_content()) - def test_api_content_not_found_as_json(self, unknown_content): + def test_api_content_not_found_as_json(self): + unknown_content_ = random_content() url = reverse('api-content', - url_args={'q': 'sha1:%s' % unknown_content['sha1']}) + url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Content with sha1 checksum equals to %s not found!' - % unknown_content['sha1'] + % unknown_content_['sha1'] }) - @given(unknown_content()) - def test_api_content_not_found_as_yaml(self, unknown_content): + def test_api_content_not_found_as_yaml(self): + unknown_content_ = random_content() url = reverse('api-content', - url_args={'q': 'sha256:%s' % unknown_content['sha256']}) + url_args={'q': 'sha256:%s' % unknown_content_['sha256']}) rv = self.client.get(url, HTTP_ACCEPT='application/yaml') self.assertEqual(rv.status_code, 404) self.assertTrue('application/yaml' in rv['Content-Type']) self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Content with sha256 checksum equals to %s not found!' % - unknown_content['sha256'] + unknown_content_['sha256'] }) - @given(unknown_content()) - def test_api_content_raw_ko_not_found(self, unknown_content): + def test_api_content_raw_ko_not_found(self): + unknown_content_ = random_content() url = reverse('api-content-raw', - url_args={'q': 'sha1:%s' % unknown_content['sha1']}) + url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Content with sha1 checksum equals to %s not found!' % - unknown_content['sha1'] + unknown_content_['sha1'] }) @given(content()) def test_api_content_raw_text(self, content): url = reverse('api-content-raw', url_args={'q': 'sha1:%s' % content['sha1']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/octet-stream') self.assertEqual( rv['Content-disposition'], 'attachment; filename=content_sha1_%s_raw' % content['sha1']) self.assertEqual( rv['Content-Type'], 'application/octet-stream') expected_data = self.content_get(content['sha1']) self.assertEqual(rv.content, expected_data['data']) @given(content()) def test_api_content_raw_text_with_filename(self, content): url = reverse('api-content-raw', url_args={'q': 'sha1:%s' % content['sha1']}, query_params={'filename': 'filename.txt'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/octet-stream') self.assertEqual( rv['Content-disposition'], 'attachment; filename=filename.txt') self.assertEqual( rv['Content-Type'], 'application/octet-stream') expected_data = self.content_get(content['sha1']) self.assertEqual(rv.content, expected_data['data']) @given(content()) def test_api_check_content_known(self, content): url = reverse('api-content-known', url_args={'q': content['sha1']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'search_res': [ { 'found': True, 'sha1': content['sha1'] } ], 'search_stats': {'nbfiles': 1, 'pct': 100.0} }) @given(content()) def test_api_check_content_known_as_yaml(self, content): url = reverse('api-content-known', url_args={'q': content['sha1']}) rv = self.client.get(url, HTTP_ACCEPT='application/yaml') self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/yaml') self.assertEqual(rv.data, { 'search_res': [ { 'found': True, 'sha1': content['sha1'] } ], 'search_stats': {'nbfiles': 1, 'pct': 100.0} }) @given(content()) def test_api_check_content_known_post_as_yaml(self, content): url = reverse('api-content-known') rv = self.client.post( url, data={ 'q': content['sha1'] }, HTTP_ACCEPT='application/yaml' ) self.assertEqual(rv.status_code, 200) self.assertTrue('application/yaml' in rv['Content-Type']) self.assertEqual(rv.data, { 'search_res': [ { 'found': True, 'sha1': content['sha1'] } ], 'search_stats': {'nbfiles': 1, 'pct': 100.0} }) - @given(unknown_content()) - def test_api_check_content_known_not_found(self, unknown_content): + def test_api_check_content_known_not_found(self): + unknown_content_ = random_content() url = reverse('api-content-known', - url_args={'q': unknown_content['sha1']}) + url_args={'q': unknown_content_['sha1']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'search_res': [ { 'found': False, - 'sha1': unknown_content['sha1'] + 'sha1': unknown_content_['sha1'] } ], 'search_stats': {'nbfiles': 1, 'pct': 0.0} }) @given(content()) def test_api_content_uppercase(self, content): url = reverse('api-content-uppercase-checksum', url_args={'q': content['sha1'].upper()}) resp = self.client.get(url) self.assertEqual(resp.status_code, 302) redirect_url = reverse('api-content', url_args={'q': content['sha1']}) self.assertEqual(resp['location'], redirect_url) diff --git a/swh/web/tests/api/views/test_directory.py b/swh/web/tests/api/views/test_directory.py index 61d804a1..0d6af0f5 100644 --- a/swh/web/tests/api/views/test_directory.py +++ b/swh/web/tests/api/views/test_directory.py @@ -1,105 +1,106 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from rest_framework.test import APITestCase from swh.web.common.utils import reverse -from swh.web.tests.strategies import directory, unknown_directory +from swh.web.tests.data import random_sha1 +from swh.web.tests.strategies import directory from swh.web.tests.testcase import WebTestCase class DirectoryApiTestCase(WebTestCase, APITestCase): @given(directory()) def test_api_directory(self, directory): url = reverse('api-directory', url_args={'sha1_git': directory}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = list(map(self._enrich_dir_data, self.directory_ls(directory))) self.assertEqual(rv.data, expected_data) - @given(unknown_directory()) - def test_api_directory_not_found(self, unknown_directory): + def test_api_directory_not_found(self): + unknown_directory_ = random_sha1() url = reverse('api-directory', - url_args={'sha1_git': unknown_directory}) + url_args={'sha1_git': unknown_directory_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Directory with sha1_git %s not found' - % unknown_directory}) + % unknown_directory_}) @given(directory()) def test_api_directory_with_path_found(self, directory): directory_content = self.directory_ls(directory) path = random.choice(directory_content) url = reverse('api-directory', url_args={'sha1_git': directory, 'path': path['name']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, self._enrich_dir_data(path)) @given(directory()) def test_api_directory_with_path_not_found(self, directory): path = 'some/path/to/nonexistent/dir/' url = reverse('api-directory', url_args={'sha1_git': directory, 'path': path}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': ('Directory entry with path %s from %s not found' % (path, directory))}) @given(directory()) def test_api_directory_uppercase(self, directory): url = reverse('api-directory-uppercase-checksum', url_args={'sha1_git': directory.upper()}) resp = self.client.get(url) self.assertEqual(resp.status_code, 302) redirect_url = reverse('api-directory', url_args={'sha1_git': directory}) self.assertEqual(resp['location'], redirect_url) @classmethod def _enrich_dir_data(cls, dir_data): if dir_data['type'] == 'file': dir_data['target_url'] = \ reverse('api-content', url_args={'q': 'sha1_git:%s' % dir_data['target']}) elif dir_data['type'] == 'dir': dir_data['target_url'] = \ reverse('api-directory', url_args={'sha1_git': dir_data['target']}) elif dir_data['type'] == 'rev': dir_data['target_url'] = \ reverse('api-revision', url_args={'sha1_git': dir_data['target']}) return dir_data diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index 0960c701..14307e3c 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,375 +1,375 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from rest_framework.test import APITestCase from unittest.mock import patch from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.common.utils import reverse from swh.web.common.origin_visits import get_origin_visits from swh.web.tests.strategies import ( origin, new_origin, new_origins, visit_dates, new_snapshots ) from swh.web.tests.testcase import WebTestCase class OriginApiTestCase(WebTestCase, APITestCase): @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_error( self, mock_get_origin_visits, ): err_msg = 'voluntary error to check the bad request middleware.' mock_get_origin_visits.side_effect = ValueError(err_msg) url = reverse('api-origin-visits', url_args={'origin_id': 2}) rv = self.client.get(url) self.assertEqual(rv.status_code, 400) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'ValueError', 'reason': err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_db( self, mock_get_origin_visits): err_msg = 'Storage exploded! Will be back online shortly!' mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse('api-origin-visits', url_args={'origin_id': 2}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageDBError', 'reason': 'An unexpected error occurred in the backend: %s' % err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_api( self, mock_get_origin_visits): err_msg = 'Storage API dropped dead! Will resurrect asap!' mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse('api-origin-visits', url_args={'origin_id': 2}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageAPIError', 'reason': 'An unexpected error occurred in the api backend: %s' % err_msg }) - @given(new_origin(), visit_dates(4), new_snapshots(4)) + @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) self.storage.snapshot_add(origin_id, origin_visit['visit'], new_snapshots[i]) all_visits = list(reversed(get_origin_visits(new_origin))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-origin-visits', url_args={'origin_id': origin_id}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') for expected_visit in expected_visits: origin_visit_url = reverse( 'api-origin-visit', url_args={'origin_id': origin_id, 'visit_id': expected_visit['visit']}) snapshot_url = reverse( 'api-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin_visit_url'] = origin_visit_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visits) - @given(new_origin(), visit_dates(4), new_snapshots(4)) + @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_id = origin_visit['visit'] self.storage.snapshot_add(origin_id, origin_visit['visit'], new_snapshots[i]) url = reverse('api-origin-visit', url_args={'origin_id': origin_id, 'visit_id': visit_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_id) origin_url = reverse('api-origin', url_args={'origin_id': origin_id}) snapshot_url = reverse( 'api-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visit) @given(origin()) def test_api_lookup_origin_visit_not_found(self, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) url = reverse('api-origin-visit', url_args={'origin_id': origin['id'], 'visit_id': max_visit_id + 1}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin with id %s or its visit with id %s not found!' % (origin['id'], max_visit_id+1) }) @given(origin()) def test_api_origin_by_id(self, origin): url = reverse('api-origin', url_args={'origin_id': origin['id']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-origin-visits', url_args={'origin_id': origin['id']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(origin()) def test_api_origin_by_type_url(self, origin): url = reverse('api-origin', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-origin-visits', url_args={'origin_id': origin['id']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(new_origin()) def test_api_origin_not_found(self, new_origin): url = reverse('api-origin', url_args={'origin_type': new_origin['type'], 'origin_url': new_origin['url']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin with type %s and url %s not found!' % (new_origin['type'], new_origin['url']) }) @given(origin()) def test_api_origin_metadata_search(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['id'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = [{ 'id': origin['id'], 'type': origin['type'], 'url': origin['url'], 'metadata': { 'metadata': {'author': 'Jane Doe'}, 'from_revision': ( '7026b7c1a2af56521e951c01ed20f255fa054238'), 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1', } } }] self.assertEqual(rv.data, expected_data) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) @given(origin()) def test_api_origin_metadata_search_limit(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['id'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) url = reverse('api-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 10}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=10) url = reverse('api-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 987}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=100) @patch('swh.web.common.service.idx_storage') def test_api_origin_metadata_search_invalid(self, mock_idx_storage): url = reverse('api-origin-metadata-search') rv = self.client.get(url) self.assertEqual(rv.status_code, 400, rv.content) mock_idx_storage.assert_not_called() - @given(new_origins(20)) + @given(new_origins(10)) def test_api_lookup_origins(self, new_origins): nb_origins = len(new_origins) expected_origins = self.storage.origin_add(new_origins) origin_from_idx = random.randint(1, nb_origins-1) - 1 origin_from = expected_origins[origin_from_idx]['id'] max_origin_id = expected_origins[-1]['id'] origin_count = random.randint(1, max_origin_id - origin_from) url = reverse('api-origins', query_params={'origin_from': origin_from, 'origin_count': origin_count}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) start = origin_from_idx end = origin_from_idx + origin_count expected_origins = expected_origins[start:end] for expected_origin in expected_origins: expected_origin['origin_visits_url'] = reverse( 'api-origin-visits', url_args={'origin_id': expected_origin['id']}) self.assertEqual(rv.data, expected_origins) next_origin_id = expected_origins[-1]['id']+1 if self.storage.origin_get({'id': next_origin_id}): self.assertIn('Link', rv) next_url = reverse('api-origins', query_params={'origin_from': next_origin_id, 'origin_count': origin_count}) self.assertIn(next_url, rv['Link']) diff --git a/swh/web/tests/api/views/test_person.py b/swh/web/tests/api/views/test_person.py index df3d4fa3..aeb2d103 100644 --- a/swh/web/tests/api/views/test_person.py +++ b/swh/web/tests/api/views/test_person.py @@ -1,40 +1,42 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +import random + from hypothesis import given from rest_framework.test import APITestCase from swh.web.common.utils import reverse -from swh.web.tests.strategies import person, unknown_person +from swh.web.tests.strategies import person from swh.web.tests.testcase import WebTestCase class PersonApiTestCase(WebTestCase, APITestCase): @given(person()) def test_api_person(self, person): url = reverse('api-person', url_args={'person_id': person}) rv = self.client.get(url) expected_person = self.person_get(person) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_person) - @given(unknown_person()) - def test_api_person_not_found(self, unknown_person): + def test_api_person_not_found(self): + unknown_person_ = random.randint(1000, 10000000) - url = reverse('api-person', url_args={'person_id': unknown_person}) + url = reverse('api-person', url_args={'person_id': unknown_person_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', - 'reason': 'Person with id %s not found' % unknown_person}) + 'reason': 'Person with id %s not found' % unknown_person_}) diff --git a/swh/web/tests/api/views/test_release.py b/swh/web/tests/api/views/test_release.py index 86608346..eff189b4 100644 --- a/swh/web/tests/api/views/test_release.py +++ b/swh/web/tests/api/views/test_release.py @@ -1,124 +1,125 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from hypothesis import given from rest_framework.test import APITestCase from swh.model.hashutil import hash_to_bytes from swh.web.common.utils import reverse +from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( - release, unknown_release, sha1, content, directory + release, sha1, content, directory ) from swh.web.tests.testcase import WebTestCase class ReleaseApiTestCase(WebTestCase, APITestCase): @given(release()) def test_api_release(self, release): url = reverse('api-release', url_args={'sha1_git': release}) rv = self.client.get(url) expected_release = self.release_get(release) author_id = expected_release['author']['id'] target_revision = expected_release['target'] author_url = reverse('api-person', url_args={'person_id': author_id}) target_url = reverse('api-revision', url_args={'sha1_git': target_revision}) expected_release['author_url'] = author_url expected_release['target_url'] = target_url self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_release) @given(sha1(), sha1(), sha1(), content(), directory(), release()) def test_api_release_target_type_not_a_revision(self, new_rel1, new_rel2, new_rel3, content, directory, release): for new_rel_id, target_type, target in ( (new_rel1, 'content', content), (new_rel2, 'directory', directory), (new_rel3, 'release', release)): if target_type == 'content': target = target['sha1_git'] sample_release = { 'author': { 'email': b'author@company.org', 'fullname': b'author ', 'name': b'author' }, 'date': { 'timestamp': int(datetime.now().timestamp()), 'offset': 0, 'negative_utc': False, }, 'id': hash_to_bytes(new_rel_id), 'message': b'sample release message', 'name': b'sample release', 'synthetic': False, 'target': hash_to_bytes(target), 'target_type': target_type } self.storage.release_add([sample_release]) url = reverse('api-release', url_args={'sha1_git': new_rel_id}) rv = self.client.get(url) expected_release = self.release_get(new_rel_id) author_id = expected_release['author']['id'] author_url = reverse('api-person', url_args={'person_id': author_id}) if target_type == 'content': url_args = {'q': 'sha1_git:%s' % target} else: url_args = {'sha1_git': target} target_url = reverse('api-%s' % target_type, url_args=url_args) expected_release['author_url'] = author_url expected_release['target_url'] = target_url self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_release) - @given(unknown_release()) - def test_api_release_not_found(self, unknown_release): + def test_api_release_not_found(self): + unknown_release_ = random_sha1() - url = reverse('api-release', url_args={'sha1_git': unknown_release}) + url = reverse('api-release', url_args={'sha1_git': unknown_release_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', - 'reason': 'Release with sha1_git %s not found.' % unknown_release + 'reason': 'Release with sha1_git %s not found.' % unknown_release_ }) @given(release()) def test_api_release_uppercase(self, release): url = reverse('api-release-uppercase-checksum', url_args={'sha1_git': release.upper()}) resp = self.client.get(url) self.assertEqual(resp.status_code, 302) redirect_url = reverse('api-release-uppercase-checksum', url_args={'sha1_git': release}) self.assertEqual(resp['location'], redirect_url) diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/tests/api/views/test_revision.py index c37e535e..306b7991 100644 --- a/swh/web/tests/api/views/test_revision.py +++ b/swh/web/tests/api/views/test_revision.py @@ -1,539 +1,539 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from rest_framework.test import APITestCase from unittest.mock import patch from swh.model.hashutil import hash_to_hex from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse, parse_timestamp +from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( - revision, unknown_revision, new_revision, - unknown_origin_id, origin, origin_with_multiple_visits + revision, new_revision, origin, origin_with_multiple_visits ) from swh.web.tests.testcase import WebTestCase class RevisionApiTestCase(WebTestCase, APITestCase): @given(revision()) def test_api_revision(self, revision): url = reverse('api-revision', url_args={'sha1_git': revision}) rv = self.client.get(url) expected_revision = self.revision_get(revision) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) - @given(unknown_revision()) - def test_api_revision_not_found(self, unknown_revision): + def test_api_revision_not_found(self): + unknown_revision_ = random_sha1() - url = reverse('api-revision', url_args={'sha1_git': unknown_revision}) + url = reverse('api-revision', + url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % - unknown_revision}) + unknown_revision_}) @given(revision()) def test_api_revision_raw_ok(self, revision): url = reverse('api-revision-raw-message', url_args={'sha1_git': revision}) rv = self.client.get(url) expected_message = self.revision_get(revision)['message'] self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/octet-stream') self.assertEqual(rv.content, expected_message.encode()) @given(new_revision()) def test_api_revision_raw_ok_no_msg(self, new_revision): del new_revision['message'] self.storage.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision['id']) url = reverse('api-revision-raw-message', url_args={'sha1_git': new_revision_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No message for revision with sha1_git %s.' % new_revision_id}) - @given(unknown_revision()) - def test_api_revision_raw_ko_no_rev(self, unknown_revision): + def test_api_revision_raw_ko_no_rev(self): + unknown_revision_ = random_sha1() url = reverse('api-revision-raw-message', - url_args={'sha1_git': unknown_revision}) + url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % - unknown_revision}) + unknown_revision_}) - @given(unknown_origin_id()) - def test_api_revision_with_origin_not_found(self, unknown_origin_id): + def test_api_revision_with_origin_not_found(self): + unknown_origin_id_ = random.randint(1000, 1000000) url = reverse('api-revision-origin', - url_args={'origin_id': unknown_origin_id}) + url_args={'origin_id': unknown_origin_id_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin with id %s not found!' % - unknown_origin_id}) + unknown_origin_id_}) @given(origin()) def test_api_revision_with_origin(self, origin): url = reverse('api-revision-origin', url_args={'origin_id': origin['id']}) rv = self.client.get(url) snapshot = self.snapshot_get_latest(origin['id']) expected_revision = self.revision_get( snapshot['branches']['HEAD']['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) @given(origin()) def test_api_revision_with_origin_and_branch_name(self, origin): snapshot = self.snapshot_get_latest(origin['id']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) url = reverse('api-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) @given(origin_with_multiple_visits()) def test_api_revision_with_origin_and_branch_name_and_ts(self, origin): visit = random.choice(self.origin_visit_get(origin['id'])) snapshot = self.snapshot_get(visit['snapshot']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) url = reverse('api-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name, 'ts': visit['date']}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) @given(origin_with_multiple_visits()) def test_api_revision_with_origin_and_branch_name_and_ts_escapes(self, origin): visit = random.choice(self.origin_visit_get(origin['id'])) snapshot = self.snapshot_get(visit['snapshot']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) date = parse_timestamp(visit['date']) formatted_date = date.strftime('Today is %B %d, %Y at %X') url = reverse('api-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name, 'ts': formatted_date}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) - @given(unknown_origin_id()) - def test_api_directory_through_revision_origin_ko(self, - unknown_origin_id): + def test_api_directory_through_revision_origin_ko(self): + unknown_origin_id_ = random.randint(1000, 1000000) url = reverse('api-revision-origin-directory', - url_args={'origin_id': unknown_origin_id}) + url_args={'origin_id': unknown_origin_id_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin with id %s not found!' % - unknown_origin_id + unknown_origin_id_ }) @given(origin()) def test_api_directory_through_revision_origin(self, origin): url = reverse('api-revision-origin-directory', url_args={'origin_id': origin['id']}) rv = self.client.get(url) snapshot = self.snapshot_get_latest(origin['id']) revision_id = snapshot['branches']['HEAD']['target'] revision = self.revision_get(revision_id) directory = self.directory_ls(revision['directory']) for entry in directory: if entry['type'] == 'dir': entry['target_url'] = reverse( 'api-directory', url_args={'sha1_git': entry['target']} ) entry['dir_url'] = reverse( 'api-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) elif entry['type'] == 'file': entry['target_url'] = reverse( 'api-content', url_args={'q': 'sha1_git:%s' % entry['target']} ) entry['file_url'] = reverse( 'api-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) elif entry['type'] == 'rev': entry['target_url'] = reverse( 'api-revision', url_args={'sha1_git': entry['target']} ) entry['rev_url'] = reverse( 'api-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) expected_result = { 'content': directory, 'path': '.', 'revision': revision_id, 'type': 'dir' } self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_result) @given(revision()) def test_api_revision_log(self, revision): per_page = 10 url = reverse('api-revision-log', url_args={'sha1_git': revision}, query_params={'per_page': per_page}) rv = self.client.get(url) expected_log = self.revision_log(revision, limit=per_page+1) expected_log = list(map(self._enrich_revision, expected_log)) has_next = len(expected_log) > per_page self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log[:-1] if has_next else expected_log) if has_next: self.assertIn('Link', rv) next_log_url = reverse( 'api-revision-log', url_args={'sha1_git': expected_log[-1]['id']}, query_params={'per_page': per_page}) self.assertIn(next_log_url, rv['Link']) - @given(unknown_revision()) - def test_api_revision_log_not_found(self, unknown_revision): + def test_api_revision_log_not_found(self): + unknown_revision_ = random_sha1() url = reverse('api-revision-log', - url_args={'sha1_git': unknown_revision}) + url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % - unknown_revision}) + unknown_revision_}) self.assertFalse(rv.has_header('Link')) @given(revision()) def test_api_revision_log_context(self, revision): revisions = self.revision_log(revision, limit=4) prev_rev = revisions[0]['id'] rev = revisions[-1]['id'] per_page = 10 url = reverse('api-revision-log', url_args={'sha1_git': rev, 'prev_sha1s': prev_rev}, query_params={'per_page': per_page}) rv = self.client.get(url) expected_log = self.revision_log(rev, limit=per_page) prev_revision = self.revision_get(prev_rev) expected_log.insert(0, prev_revision) expected_log = list(map(self._enrich_revision, expected_log)) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log) @given(origin()) def test_api_revision_log_by(self, origin): per_page = 10 url = reverse('api-revision-origin-log', url_args={'origin_id': origin['id']}, query_params={'per_page': per_page}) rv = self.client.get(url) snapshot = self.snapshot_get_latest(origin['id']) expected_log = self.revision_log( snapshot['branches']['HEAD']['target'], limit=per_page+1) expected_log = list(map(self._enrich_revision, expected_log)) has_next = len(expected_log) > per_page self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log[:-1] if has_next else expected_log) if has_next: self.assertIn('Link', rv) next_log_url = reverse( 'api-revision-origin-log', url_args={'origin_id': origin['id'], 'branch_name': 'HEAD'}, query_params={'per_page': per_page, 'sha1_git': expected_log[-1]['id']}) self.assertIn(next_log_url, rv['Link']) @given(origin()) def test_api_revision_log_by_ko(self, origin): invalid_branch_name = 'foobar' url = reverse('api-revision-origin-log', url_args={'origin_id': origin['id'], 'branch_name': invalid_branch_name}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertFalse(rv.has_header('Link')) self.assertEqual( rv.data, {'exception': 'NotFoundExc', 'reason': 'Revision for origin %s and branch %s not found.' % (origin['id'], invalid_branch_name)}) @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ko_not_found(self, mock_rev_dir): # given mock_rev_dir.side_effect = NotFoundExc('Not found') # then rv = self.client.get('/api/1/revision/999/directory/some/path/to/dir/') self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Not found'}) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path/to/dir', '/api/1/revision/999/directory/some/path/to/dir/', with_data=False) @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir): stub_dir = { 'type': 'dir', 'revision': '999', 'content': [ { 'sha1_git': '789', 'type': 'file', 'target': '101', 'target_url': '/api/1/content/sha1_git:101/', 'name': 'somefile', 'file_url': '/api/1/revision/999/directory/some/path/' 'somefile/' }, { 'sha1_git': '123', 'type': 'dir', 'target': '456', 'target_url': '/api/1/directory/456/', 'name': 'to-subdir', 'dir_url': '/api/1/revision/999/directory/some/path/' 'to-subdir/', }] } # given mock_rev_dir.return_value = stub_dir # then rv = self.client.get('/api/1/revision/999/directory/some/path/') self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, stub_dir) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path', '/api/1/revision/999/directory/some/path/', with_data=False) @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ok_returns_content(self, mock_rev_dir): stub_content = { 'type': 'file', 'revision': '999', 'content': { 'sha1_git': '789', 'sha1': '101', 'data_url': '/api/1/content/101/raw/', } } # given mock_rev_dir.return_value = stub_content # then url = '/api/1/revision/666/directory/some/other/path/' rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, stub_content) mock_rev_dir.assert_called_once_with( {'sha1_git': '666'}, 'some/other/path', url, with_data=False) def _enrich_revision(self, revision): author_url = reverse( 'api-person', url_args={'person_id': revision['author']['id']}) committer_url = reverse( 'api-person', url_args={'person_id': revision['committer']['id']}) directory_url = reverse( 'api-directory', url_args={'sha1_git': revision['directory']}) history_url = reverse('api-revision-log', url_args={'sha1_git': revision['id']}) parents_id_url = [] for p in revision['parents']: parents_id_url.append({ 'id': p, 'url': reverse('api-revision', url_args={'sha1_git': p}) }) revision_url = reverse('api-revision', url_args={'sha1_git': revision['id']}) revision['author_url'] = author_url revision['committer_url'] = committer_url revision['directory_url'] = directory_url revision['history_url'] = history_url revision['url'] = revision_url revision['parents'] = parents_id_url return revision @given(revision()) def test_api_revision_uppercase(self, revision): url = reverse('api-revision-uppercase-checksum', url_args={'sha1_git': revision.upper()}) resp = self.client.get(url) self.assertEqual(resp.status_code, 302) redirect_url = reverse('api-revision', url_args={'sha1_git': revision}) self.assertEqual(resp['location'], redirect_url) diff --git a/swh/web/tests/api/views/test_snapshot.py b/swh/web/tests/api/views/test_snapshot.py index c755239e..d8eb5348 100644 --- a/swh/web/tests/api/views/test_snapshot.py +++ b/swh/web/tests/api/views/test_snapshot.py @@ -1,189 +1,190 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from rest_framework.test import APITestCase from swh.model.hashutil import hash_to_hex from swh.web.common.utils import reverse +from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( - snapshot, unknown_snapshot, new_snapshot + snapshot, new_snapshot ) from swh.web.tests.testcase import WebTestCase class SnapshotApiTestCase(WebTestCase, APITestCase): @given(snapshot()) def test_api_snapshot(self, snapshot): url = reverse('api-snapshot', url_args={'snapshot_id': snapshot}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = self.snapshot_get(snapshot) expected_data = self._enrich_snapshot(expected_data) self.assertEqual(rv.data, expected_data) @given(snapshot()) def test_api_snapshot_paginated(self, snapshot): branches_offset = 0 branches_count = 2 snapshot_branches = [] for k, v in sorted(self.snapshot_get(snapshot)['branches'].items()): snapshot_branches.append({ 'name': k, 'target_type': v['target_type'], 'target': v['target'] }) whole_snapshot = {'id': snapshot, 'branches': {}, 'next_branch': None} while branches_offset < len(snapshot_branches): branches_from = snapshot_branches[branches_offset]['name'] url = reverse('api-snapshot', url_args={'snapshot_id': snapshot}, query_params={'branches_from': branches_from, 'branches_count': branches_count}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = self.snapshot_get_branches(snapshot, branches_from, branches_count) expected_data = self._enrich_snapshot(expected_data) branches_offset += branches_count if branches_offset < len(snapshot_branches): next_branch = snapshot_branches[branches_offset]['name'] expected_data['next_branch'] = next_branch else: expected_data['next_branch'] = None self.assertEqual(rv.data, expected_data) whole_snapshot['branches'].update(expected_data['branches']) if branches_offset < len(snapshot_branches): next_url = reverse( 'api-snapshot', url_args={'snapshot_id': snapshot}, query_params={'branches_from': next_branch, 'branches_count': branches_count}) self.assertEqual(rv['Link'], '<%s>; rel="next"' % next_url) else: self.assertFalse(rv.has_header('Link')) url = reverse('api-snapshot', url_args={'snapshot_id': snapshot}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, whole_snapshot) @given(snapshot()) def test_api_snapshot_filtered(self, snapshot): snapshot_branches = [] for k, v in sorted(self.snapshot_get(snapshot)['branches'].items()): snapshot_branches.append({ 'name': k, 'target_type': v['target_type'], 'target': v['target'] }) target_type = random.choice(snapshot_branches)['target_type'] url = reverse('api-snapshot', url_args={'snapshot_id': snapshot}, query_params={'target_types': target_type}) rv = self.client.get(url) expected_data = self.snapshot_get_branches( snapshot, target_types=target_type) expected_data = self._enrich_snapshot(expected_data) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_data) - @given(unknown_snapshot()) - def test_api_snapshot_errors(self, unknown_snapshot): + def test_api_snapshot_errors(self): + unknown_snapshot_ = random_sha1() url = reverse('api-snapshot', url_args={'snapshot_id': '63ce369'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 400) url = reverse('api-snapshot', - url_args={'snapshot_id': unknown_snapshot}) + url_args={'snapshot_id': unknown_snapshot_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) def _enrich_snapshot(self, snapshot): def _get_branch_url(target_type, target): url = None if target_type == 'revision': url = reverse('api-revision', url_args={'sha1_git': target}) if target_type == 'release': url = reverse('api-release', url_args={'sha1_git': target}) return url for branch in snapshot['branches'].keys(): target = snapshot['branches'][branch]['target'] target_type = snapshot['branches'][branch]['target_type'] snapshot['branches'][branch]['target_url'] = \ _get_branch_url(target_type, target) for branch in snapshot['branches'].keys(): target = snapshot['branches'][branch]['target'] target_type = snapshot['branches'][branch]['target_type'] if target_type == 'alias': if target in snapshot['branches']: snapshot['branches'][branch]['target_url'] = \ snapshot['branches'][target]['target_url'] else: snp = self.snapshot_get_branches(snapshot['id'], branches_from=target, branches_count=1) alias_target = snp['branches'][target]['target'] alias_target_type = snp['branches'][target]['target_type'] snapshot['branches'][branch]['target_url'] = \ _get_branch_url(alias_target_type, alias_target) return snapshot @given(snapshot()) def test_api_snapshot_uppercase(self, snapshot): url = reverse('api-snapshot-uppercase-checksum', url_args={'snapshot_id': snapshot.upper()}) resp = self.client.get(url) self.assertEqual(resp.status_code, 302) redirect_url = reverse('api-snapshot-uppercase-checksum', url_args={'snapshot_id': snapshot}) self.assertEqual(resp['location'], redirect_url) @given(new_snapshot(min_size=4)) def test_api_snapshot_null_branch(self, new_snapshot): snp_dict = new_snapshot.to_dict() snp_id = hash_to_hex(snp_dict['id']) for branch in snp_dict['branches'].keys(): snp_dict['branches'][branch] = None break self.storage.snapshot_add([snp_dict]) url = reverse('api-snapshot', url_args={'snapshot_id': snp_id}) rv = self.client.get(url) - self.assertEqual(rv.status_code, 200) + self.assertEqual(rv.status_code, 200, rv.data) diff --git a/swh/web/tests/browse/test_utils.py b/swh/web/tests/browse/test_utils.py index ff76b875..72ebc60a 100644 --- a/swh/web/tests/browse/test_utils.py +++ b/swh/web/tests/browse/test_utils.py @@ -1,88 +1,90 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.web.browse import utils from swh.web.common.utils import reverse, format_utc_iso_date from swh.web.tests.strategies import origin_with_multiple_visits from swh.web.tests.testcase import WebTestCase class SwhBrowseUtilsTestCase(WebTestCase): def test_get_mimetype_and_encoding_for_content(self): text = b'Hello world!' self.assertEqual(utils.get_mimetype_and_encoding_for_content(text), ('text/plain', 'us-ascii')) @given(origin_with_multiple_visits()) - def test_get_origin_visit_snapshot(self, origin): + def test_get_origin_visit_snapshot_simple(self, origin): visits = self.origin_visit_get(origin['id']) for visit in visits: snapshot = self.snapshot_get(visit['snapshot']) branches = [] releases = [] for branch in sorted(snapshot['branches'].keys()): branch_data = snapshot['branches'][branch] if branch_data['target_type'] == 'revision': rev_data = self.revision_get(branch_data['target']) branches.append({ 'name': branch, 'revision': branch_data['target'], 'directory': rev_data['directory'], 'date': format_utc_iso_date(rev_data['date']), 'message': rev_data['message'] }) elif branch_data['target_type'] == 'release': rel_data = self.release_get(branch_data['target']) rev_data = self.revision_get(rel_data['target']) releases.append({ 'name': rel_data['name'], 'branch_name': branch, 'date': format_utc_iso_date(rel_data['date']), 'id': rel_data['id'], 'message': rel_data['message'], 'target_type': rel_data['target_type'], 'target': rel_data['target'], 'directory': rev_data['directory'] }) + assert branches and releases, 'Incomplete test data.' + origin_visit_branches = utils.get_origin_visit_snapshot( origin, visit_id=visit['visit']) self.assertEqual(origin_visit_branches, (branches, releases)) def test_gen_link(self): self.assertEqual( utils.gen_link('https://www.softwareheritage.org/', 'swh'), 'swh') def test_gen_person_link(self): person_id = 8221896 person_name = 'Antoine Lambert' person_url = reverse('browse-person', url_args={'person_id': person_id}) self.assertEqual(utils.gen_person_link(person_id, person_name, link_attrs=None), '%s' % (person_url, person_name)) def test_gen_revision_link(self): revision_id = '28a0bc4120d38a394499382ba21d6965a67a3703' revision_url = reverse('browse-revision', url_args={'sha1_git': revision_id}) self.assertEqual(utils.gen_revision_link(revision_id, link_text=None, link_attrs=None), '%s' % (revision_url, revision_id)) self.assertEqual( utils.gen_revision_link(revision_id, shorten_id=True, link_attrs=None), '%s' % (revision_url, revision_id[:7])) diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index b0c5588d..869120d5 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,875 +1,879 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import pytest import random from collections import defaultdict from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.from_disk import DentryPerms from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc +from swh.web.tests.data import random_sha1, random_content from swh.web.tests.strategies import ( - content, contents, unknown_content, unknown_contents, + content, contents, unknown_contents, contents_with_ctags, origin, new_origin, visit_dates, directory, - release, revision, unknown_revision, revisions, unknown_revisions, + release, revision, unknown_revision, revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, - revision_with_submodules, unknown_directory, empty_directory, + revision_with_submodules, empty_directory, new_revision, new_origins ) from swh.web.tests.testcase import ( WebTestCase, ctags_json_missing, fossology_missing ) class ServiceTestCase(WebTestCase): @given(contents()) def test_lookup_multiple_hashes_all_present(self, contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': True}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(self, contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': cnt in contents}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) - @given(unknown_content()) - def test_lookup_hash_does_not_exist(self, unknown_content): + def test_lookup_hash_does_not_exist(self): + unknown_content_ = random_content() actual_lookup = service.lookup_hash('sha1_git:%s' % - unknown_content['sha1_git']) + unknown_content_['sha1_git']) self.assertEqual(actual_lookup, {'found': None, 'algo': 'sha1_git'}) @given(content()) def test_lookup_hash_exist(self, content): actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) content_metadata = self.content_get_metadata(content['sha1']) self.assertEqual({'found': content_metadata, 'algo': 'sha1'}, actual_lookup) - @given(unknown_content()) - def test_search_hash_does_not_exist(self, content): + def test_search_hash_does_not_exist(self): + unknown_content_ = random_content() actual_lookup = service.search_hash('sha1_git:%s' % - content['sha1_git']) + unknown_content_['sha1_git']) self.assertEqual({'found': False}, actual_lookup) @given(content()) def test_search_hash_exist(self, content): actual_lookup = service.search_hash('sha1:%s' % content['sha1']) self.assertEqual({'found': True}, actual_lookup) @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(contents_with_ctags()) def test_lookup_content_ctags(self, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags['sha1s']) self.content_add_ctags(content_sha1) actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % content_sha1)) expected_data = list(self.content_get_ctags(content_sha1)) for ctag in expected_data: ctag['id'] = content_sha1 self.assertEqual(actual_ctags, expected_data) - @given(unknown_content()) - def test_lookup_content_ctags_no_hash(self, unknown_content): + def test_lookup_content_ctags_no_hash(self): + unknown_content_ = random_content() actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % - unknown_content['sha1'])) + unknown_content_['sha1'])) self.assertEqual(actual_ctags, []) @given(content()) def test_lookup_content_filetype(self, content): self.content_add_mimetype(content['sha1']) actual_filetype = service.lookup_content_filetype(content['sha1']) expected_filetype = self.content_get_mimetype(content['sha1']) self.assertEqual(actual_filetype, expected_filetype) @pytest.mark.xfail # Language indexer is disabled. @given(content()) def test_lookup_content_language(self, content): self.content_add_language(content['sha1']) actual_language = service.lookup_content_language(content['sha1']) expected_language = self.content_get_language(content['sha1']) self.assertEqual(actual_language, expected_language) @given(contents_with_ctags()) def test_lookup_expression(self, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags['sha1s']: if len(expected_ctags) == per_page: break self.content_add_ctags(content_sha1) for ctag in self.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag['name'] == contents_with_ctags['symbol_name']: del ctag['id'] ctag['sha1'] = content_sha1 expected_ctags.append(ctag) actual_ctags = \ list(service.lookup_expression(contents_with_ctags['symbol_name'], last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) def test_lookup_expression_no_result(self): expected_ctags = [] actual_ctags = \ list(service.lookup_expression('barfoo', last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(self, content): self.content_add_license(content['sha1']) actual_license = service.lookup_content_license(content['sha1']) expected_license = self.content_get_license(content['sha1']) self.assertEqual(actual_license, expected_license) def test_stat_counters(self): actual_stats = service.stat_counters() self.assertEqual(actual_stats, self.storage.stat_counters()) @given(new_origin(), visit_dates()) def test_lookup_origin_visits(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) for ts in visit_dates: self.storage.origin_visit_add(origin_id, ts) actual_origin_visits = list( service.lookup_origin_visits(origin_id, per_page=100)) expected_visits = self.origin_visit_get(origin_id) self.assertEqual(actual_origin_visits, expected_visits) @given(new_origin(), visit_dates()) def test_lookup_origin_visit(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) visits = [] for ts in visit_dates: visits.append(self.storage.origin_visit_add(origin_id, ts)) visit = random.choice(visits)['visit'] actual_origin_visit = service.lookup_origin_visit(origin_id, visit) expected_visit = dict(self.storage.origin_visit_get_by(origin_id, visit)) expected_visit['date'] = expected_visit['date'].isoformat() expected_visit['metadata'] = {} self.assertEqual(actual_origin_visit, expected_visit) @given(new_origin()) def test_lookup_origin(self, new_origin): origin_id = self.storage.origin_add_one(new_origin) actual_origin = service.lookup_origin({'id': origin_id}) expected_origin = self.storage.origin_get({'id': origin_id}) self.assertEqual(actual_origin, expected_origin) actual_origin = service.lookup_origin({'type': new_origin['type'], 'url': new_origin['url']}) expected_origin = self.storage.origin_get({'type': new_origin['type'], 'url': new_origin['url']}) self.assertEqual(actual_origin, expected_origin) @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(self, invalid_sha1): with self.assertRaises(BadInputExc) as cm: service.lookup_release(invalid_sha1) self.assertIn('invalid checksum', cm.exception.args[0].lower()) @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(self, sha256): with self.assertRaises(BadInputExc) as cm: service.lookup_release(sha256) self.assertEqual('Only sha1_git is supported.', cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_not_found(self, directory): path = 'some/invalid/path/here' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_path(directory, path) self.assertEqual('Directory entry with path %s from %s ' 'not found' % (path, directory), cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_found(self, directory): directory_content = self.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry['name'] actual_result = service.lookup_directory_with_path(directory, path) self.assertEqual(actual_result, directory_entry) @given(release()) def test_lookup_release(self, release): actual_release = service.lookup_release(release) self.assertEqual(actual_release, self.release_get(release)) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(self, revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Invalid checksum query string', cm.exception.args[0]) sha1_git = sha256 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = revision sha1_git = unknown_revision with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision %s not found' % sha1_git, cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = unknown_revision sha1_git = revision with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision root %s not found' % sha1_git_root, cm.exception.args[0]) @given(ancestor_revisions()) def test_lookup_revision_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] root_sha1_git = ancestor_revisions['sha1_git_root'] for sha1_git_root in (root_sha1_git, {'id': hash_to_bytes(root_sha1_git)}): actual_revision = \ service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in self.revision_log(root_sha1_git): for p_rev in rev['parents']: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev['id']) expected_revision = self.revision_get(sha1_git) expected_revision['children'] = children self.assertEqual(actual_revision, expected_revision) @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(self, non_ancestor_revisions): sha1_git = non_ancestor_revisions['sha1_git'] root_sha1_git = non_ancestor_revisions['sha1_git_root'] with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(root_sha1_git, sha1_git) self.assertIn('Revision %s is not an ancestor of %s' % (sha1_git, root_sha1_git), cm.exception.args[0]) - @given(unknown_revision()) - def test_lookup_directory_with_revision_not_found(self, unknown_revision): + def test_lookup_directory_with_revision_not_found(self): + unknown_revision_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: - service.lookup_directory_with_revision(unknown_revision) - self.assertIn('Revision %s not found' % unknown_revision, + service.lookup_directory_with_revision(unknown_revision_) + self.assertIn('Revision %s not found' % unknown_revision_, cm.exception.args[0]) - @given(unknown_content(), unknown_revision(), unknown_directory()) - def test_lookup_directory_with_revision_unknown_content( - self, unknown_content, unknown_revision, unknown_directory): + def test_lookup_directory_with_revision_unknown_content(self): + unknown_content_ = random_content() + unknown_revision_ = random_sha1() + unknown_directory_ = random_sha1() dir_path = 'README.md' # Create a revision that points to a directory # Which points to unknown content revision = { 'author': { 'name': b'abcd', 'email': b'abcd@company.org', 'fullname': b'abcd abcd' }, 'committer': { 'email': b'aaaa@company.org', 'fullname': b'aaaa aaa', 'name': b'aaa' }, 'committer_date': { 'negative_utc': False, 'offset': 0, 'timestamp': 1437511651 }, 'date': { 'negative_utc': False, 'offset': 0, 'timestamp': 1437511651 }, 'message': b'bleh', 'metadata': [], 'parents': [], 'synthetic': False, 'type': 'file', - 'id': hash_to_bytes(unknown_revision), - 'directory': hash_to_bytes(unknown_directory) + 'id': hash_to_bytes(unknown_revision_), + 'directory': hash_to_bytes(unknown_directory_) } # A directory that points to unknown content dir = { - 'id': hash_to_bytes(unknown_directory), + 'id': hash_to_bytes(unknown_directory_), 'entries': [{ 'name': bytes(dir_path.encode('utf-8')), 'type': 'file', - 'target': hash_to_bytes(unknown_content['sha1_git']), + 'target': hash_to_bytes(unknown_content_['sha1_git']), 'perms': DentryPerms.content }] } # Add the directory and revision in mem self.storage.directory_add([dir]) self.storage.revision_add([revision]) with self.assertRaises(NotFoundExc) as cm: - service.lookup_directory_with_revision(unknown_revision, dir_path) - self.assertIn('Content not found for revision %s' % unknown_revision, + service.lookup_directory_with_revision( + unknown_revision_, dir_path) + self.assertIn('Content not found for revision %s' % + unknown_revision_, cm.exception.args[0]) @given(revision()) def test_lookup_directory_with_revision_ko_path_to_nowhere( self, revision): invalid_path = 'path/to/something/unknown' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision(revision, invalid_path) exception_text = cm.exception.args[0].lower() self.assertIn('directory or file', exception_text) self.assertIn(invalid_path, exception_text) self.assertIn('revision %s' % revision, exception_text) self.assertIn('not found', exception_text) @given(revision_with_submodules()) def test_lookup_directory_with_revision_submodules( self, revision_with_submodules): rev_sha1_git = revision_with_submodules['rev_sha1_git'] rev_dir_path = revision_with_submodules['rev_dir_rev_path'] actual_data = service.lookup_directory_with_revision( rev_sha1_git, rev_dir_path) revision = self.revision_get(revision_with_submodules['rev_sha1_git']) directory = self.directory_ls(revision['directory']) rev_entry = next(e for e in directory if e['name'] == rev_dir_path) expected_data = { 'content': self.revision_get(rev_entry['target']), 'path': rev_dir_path, 'revision': rev_sha1_git, 'type': 'rev' } self.assertEqual(actual_data, expected_data) @given(revision()) def test_lookup_directory_with_revision_without_path(self, revision): actual_directory_entries = \ service.lookup_directory_with_revision(revision) revision_data = self.revision_get(revision) expected_directory_entries = \ self.directory_ls(revision_data['directory']) self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(actual_directory_entries['content'], expected_directory_entries) @given(revision()) def test_lookup_directory_with_revision_with_path(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] in ('file', 'dir')] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name']) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) if actual_dir_entry['type'] == 'file': del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) else: sub_dir_entries = self.directory_ls(expected_dir_entry['target']) self.assertEqual(actual_dir_entry['content'], sub_dir_entries) @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] expected_dir_entry = random.choice(dir_entries) expected_data = \ self.content_get(expected_dir_entry['checksums']['sha1']) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name'], with_data=True) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) self.assertEqual(actual_dir_entry['content']['data'], expected_data['data']) @given(revision()) def test_lookup_revision(self, revision): actual_revision = service.lookup_revision(revision) self.assertEqual(actual_revision, self.revision_get(revision)) @given(new_revision()) def test_lookup_revision_invalid_msg(self, new_revision): new_revision['message'] = b'elegant fix for bug \xff' self.storage.revision_add([new_revision]) revision = service.lookup_revision(hash_to_hex(new_revision['id'])) self.assertEqual(revision['message'], None) self.assertEqual(revision['message_decoding_failed'], True) @given(new_revision()) def test_lookup_revision_msg_ok(self, new_revision): self.storage.revision_add([new_revision]) revision_message = service.lookup_revision_message( hash_to_hex(new_revision['id'])) self.assertEqual(revision_message, {'message': new_revision['message']}) @given(new_revision()) def test_lookup_revision_msg_absent(self, new_revision): del new_revision['message'] self.storage.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision['id']) with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message(new_revision_id) self.assertEqual( cm.exception.args[0], 'No message for revision with sha1_git %s.' % new_revision_id ) - @given(unknown_revision()) - def test_lookup_revision_msg_no_rev(self, unknown_revision): + def test_lookup_revision_msg_no_rev(self): + unknown_revision_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: - service.lookup_revision_message(unknown_revision) + service.lookup_revision_message(unknown_revision_) self.assertEqual( cm.exception.args[0], - 'Revision with sha1_git %s not found.' % unknown_revision + 'Revision with sha1_git %s not found.' % unknown_revision_ ) @given(revisions()) def test_lookup_revision_multiple(self, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(self.revision_get(rev)) self.assertEqual(actual_revisions, expected_revisions) - @given(unknown_revisions()) - def test_lookup_revision_multiple_none_found(self, unknown_revisions): + def test_lookup_revision_multiple_none_found(self): + unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] actual_revisions = \ - list(service.lookup_revision_multiple(unknown_revisions)) + list(service.lookup_revision_multiple(unknown_revisions_)) - self.assertEqual(actual_revisions, [None] * len(unknown_revisions)) + self.assertEqual(actual_revisions, [None] * len(unknown_revisions_)) @given(revision()) def test_lookup_revision_log(self, revision): actual_revision_log = \ list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = self.revision_log(revision, limit=25) self.assertEqual(actual_revision_log, expected_revision_log) def _get_origin_branches(self, origin): origin_visit = self.origin_visit_get(origin['id'])[-1] snapshot = self.snapshot_get(origin_visit['snapshot']) branches = {k: v for (k, v) in snapshot['branches'].items() if v['target_type'] == 'revision'} return branches @given(origin()) def test_lookup_revision_log_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_log = \ list(service.lookup_revision_log_by(origin['id'], branch_name, None, limit=25)) expected_log = \ self.revision_log(branches[branch_name]['target'], limit=25) self.assertEqual(actual_log, expected_log) @given(origin()) def test_lookup_revision_log_by_notfound(self, origin): with self.assertRaises(NotFoundExc): service.lookup_revision_log_by( origin['id'], 'unknown_branch_name', None, limit=100) - @given(unknown_content()) - def test_lookup_content_raw_not_found(self, unknown_content): + def test_lookup_content_raw_not_found(self): + unknown_content_ = random_content() with self.assertRaises(NotFoundExc) as cm: - service.lookup_content_raw('sha1:' + unknown_content['sha1']) + service.lookup_content_raw('sha1:' + unknown_content_['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % - ('sha1', unknown_content['sha1'])) + ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_raw(self, content): actual_content = service.lookup_content_raw( 'sha256:%s' % content['sha256']) expected_content = self.content_get(content['sha1']) self.assertEqual(actual_content, expected_content) - @given(unknown_content()) - def test_lookup_content_not_found(self, unknown_content): + def test_lookup_content_not_found(self): + unknown_content_ = random_content() with self.assertRaises(NotFoundExc) as cm: - service.lookup_content('sha1:%s' % unknown_content['sha1']) + service.lookup_content('sha1:%s' % unknown_content_['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % - ('sha1', unknown_content['sha1'])) + ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_with_sha1(self, content): actual_content = service.lookup_content( 'sha1:%s' % content['sha1']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) @given(content()) def test_lookup_content_with_sha256(self, content): actual_content = service.lookup_content( 'sha256:%s' % content['sha256']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) @given(revision()) def test_lookup_person(self, revision): rev_data = self.revision_get(revision) actual_person = service.lookup_person(rev_data['author']['id']) self.assertEqual(actual_person, rev_data['author']) def test_lookup_directory_bad_checksum(self): with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') - @given(unknown_directory()) - def test_lookup_directory_not_found(self, unknown_directory): + def test_lookup_directory_not_found(self): + unknown_directory_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: - service.lookup_directory(unknown_directory) + service.lookup_directory(unknown_directory_) self.assertIn('Directory with sha1_git %s not found' - % unknown_directory, cm.exception.args[0]) + % unknown_directory_, cm.exception.args[0]) @given(directory()) def test_lookup_directory(self, directory): actual_directory_ls = list(service.lookup_directory( directory)) expected_directory_ls = self.directory_ls(directory) self.assertEqual(actual_directory_ls, expected_directory_ls) @given(empty_directory()) def test_lookup_directory_empty(self, empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) self.assertEqual(actual_directory_ls, []) @given(origin()) def test_lookup_revision_by_nothing_found(self, origin): with self.assertRaises(NotFoundExc): service.lookup_revision_by(origin['id'], 'invalid-branch-name') @given(origin()) def test_lookup_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_revision = \ service.lookup_revision_by(origin['id'], branch_name, None) expected_revision = \ self.revision_get(branches[branch_name]['target']) self.assertEqual(actual_revision, expected_revision) @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(self, origin, revision): with self.assertRaises(NotFoundExc): service.lookup_revision_with_context_by(origin['id'], 'invalid-branch-name', None, revision) @given(origin()) def test_lookup_revision_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev['parents']: children[rev_p].append(rev['id']) rev = root_rev_log[-1]['id'] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin['id'], branch_name, None, rev) expected_root_rev = self.revision_get(root_rev) expected_rev = self.revision_get(rev) expected_rev['children'] = children[rev] self.assertEqual(actual_root_rev, expected_root_rev) self.assertEqual(actual_rev, expected_rev) def test_lookup_revision_through_ko_not_implemented(self): with self.assertRaises(NotImplementedError): service.lookup_revision_through({ 'something-unknown': 10, }) @given(origin()) def test_lookup_revision_through_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) rev = root_rev_log[-1]['id'] self.assertEqual(service.lookup_revision_through({ 'origin_id': origin['id'], 'branch_name': branch_name, 'ts': None, 'sha1_git': rev }), service.lookup_revision_with_context_by( origin['id'], branch_name, None, rev) ) @given(origin()) def test_lookup_revision_through_with_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) self.assertEqual(service.lookup_revision_through({ 'origin_id': origin['id'], 'branch_name': branch_name, 'ts': None, }), service.lookup_revision_by( origin['id'], branch_name, None) ) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] sha1_git_root = ancestor_revisions['sha1_git_root'] self.assertEqual(service.lookup_revision_through({ 'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, }), service.lookup_revision_with_context( sha1_git_root, sha1_git) ) @given(revision()) def test_lookup_revision_through_with_revision(self, revision): self.assertEqual(service.lookup_revision_through({ 'sha1_git': revision }), service.lookup_revision(revision) ) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(self, revision): with self.assertRaises(NotFoundExc): service.lookup_directory_through_revision( {'sha1_git': revision}, 'some/invalid/path') @given(revision()) def test_lookup_directory_through_revision_ok(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name']), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'])) ) @given(revision()) def test_lookup_directory_through_revision_ok_with_data(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name'], with_data=True), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'], with_data=True)) ) @given(new_origins(20)) def test_lookup_origins(self, new_origins): nb_origins = len(new_origins) expected_origins = self.storage.origin_add(new_origins) origin_from_idx = random.randint(1, nb_origins-1) - 1 origin_from = expected_origins[origin_from_idx]['id'] max_origin_idx = expected_origins[-1]['id'] origin_count = random.randint(1, max_origin_idx - origin_from) actual_origins = list(service.lookup_origins(origin_from, origin_count)) expected_origins = list(self.storage.origin_get_range(origin_from, origin_count)) self.assertEqual(actual_origins, expected_origins) diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index 41018ed0..265070c1 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,300 +1,322 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy import os +import random import time from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.ctags import CtagsIndexer from swh.indexer.storage import get_indexer_storage from swh.model.hashutil import hash_to_hex, hash_to_bytes, DEFAULT_ALGORITHMS from swh.model.identifiers import directory_identifier from swh.loader.git.from_disk import GitLoaderFromArchive from swh.storage.algos.dir_iterators import dir_iterator from swh.web import config from swh.web.browse.utils import ( get_mimetype_and_encoding_for_content, prepare_content_for_display ) from swh.web.common import service # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { 'storage': { 'cls': 'memory', 'args': {} }, 'send_contents': True, 'send_directories': True, 'send_revisions': True, 'send_releases': True, 'send_snapshot': True, 'content_size_limit': 100 * 1024 * 1024, 'content_packet_size': 10, 'content_packet_size_bytes': 100 * 1024 * 1024, 'directory_packet_size': 10, 'revision_packet_size': 10, 'release_packet_size': 10, 'save_data': False, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { 'storage': { 'cls': 'memory', 'args': {}, }, 'objstorage': { 'cls': 'memory', 'args': {}, }, 'indexer_storage': { 'cls': 'memory', 'args': {}, } } +def random_sha1(): + return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) + + +def random_sha256(): + return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) + + +def random_blake2s256(): + return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) + + +def random_content(): + return { + 'sha1': random_sha1(), + 'sha1_git': random_sha1(), + 'sha256': random_sha256(), + 'blake2s256': random_blake2s256(), + } + + # MimetypeIndexer with custom configuration for tests class _MimetypeIndexer(MimetypeIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" } } } # FossologyLicenseIndexer with custom configuration for tests class _FossologyLicenseIndexer(FossologyLicenseIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.fossology.license', 'tools': { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': { 'command_line': 'nomossa ', }, } } # CtagsIndexer with custom configuration for tests class _CtagsIndexer(CtagsIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.ctags', 'languages': {'c': 'c'}, 'tools': { 'name': 'universal-ctags', 'version': '~git7859817b', 'configuration': { 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' # noqa '''--output-format=json ''' }, } } # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { 'id': 1, 'type': 'git', 'url': 'https://github.com/wcoder/highlightjs-line-numbers.js', 'archives': ['highlightjs-line-numbers.js.zip', 'highlightjs-line-numbers.js_visit2.zip'] }, { 'id': 2, 'type': 'git', 'url': 'https://github.com/memononen/libtess2', 'archives': ['libtess2.zip'] }, { 'id': 3, 'type': 'git', 'url': 'repo_with_submodules', 'archives': ['repo_with_submodules.tgz'] } ] _contents = {} # Tests data initialization def _init_tests_data(): # Load git repositories from archives loader = GitLoaderFromArchive(config=_TEST_LOADER_CONFIG) # Get reference to the memory storage storage = loader.storage for origin in _TEST_ORIGINS: nb_visits = len(origin['archives']) for i, archive in enumerate(origin['archives']): origin_repo_archive = \ os.path.join(os.path.dirname(__file__), 'resources/repos/%s' % archive) loader.load(origin['url'], origin_repo_archive, None) if nb_visits > 1 and i != nb_visits - 1: time.sleep(1) contents = set() directories = set() revisions = set() releases = set() snapshots = set() persons = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: snp = storage.snapshot_get_latest(origin['id']) snapshots.add(hash_to_hex(snp['id'])) for branch_name, branch_data in snp['branches'].items(): if branch_data['target_type'] == 'revision': revisions.add(branch_data['target']) elif branch_data['target_type'] == 'release': release = next(storage.release_get([branch_data['target']])) revisions.add(release['target']) releases.add(hash_to_hex(branch_data['target'])) persons.add(release['author']['id']) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): dir_id = rev['directory'] persons.add(rev['author']['id']) persons.add(rev['committer']['id']) directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): content_path[entry['sha1']] = '/'.join( [hash_to_hex(dir_id), entry['path'].decode('utf-8')]) if entry['type'] == 'file': contents.add(entry['sha1']) elif entry['type'] == 'dir': directories.add(hash_to_hex(entry['target'])) # Get all checksums for each content contents_metadata = storage.content_get_metadata(contents) contents = [] for content_metadata in contents_metadata: contents.append({ algo: hash_to_hex(content_metadata[algo]) for algo in DEFAULT_ALGORITHMS }) path = content_path[content_metadata['sha1']] cnt = next(storage.content_get([content_metadata['sha1']])) mimetype, encoding = get_mimetype_and_encoding_for_content(cnt['data']) content_display_data = prepare_content_for_display( cnt['data'], mimetype, path) contents[-1]['path'] = path contents[-1]['mimetype'] = mimetype contents[-1]['encoding'] = encoding contents[-1]['hljs_language'] = content_display_data['language'] contents[-1]['data'] = content_display_data['content_data'] _contents[contents[-1]['sha1']] = contents[-1] # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage('memory', {}) # Add the empty directory to the test archive empty_dir_id = directory_identifier({'entries': []}) empty_dir_id_bin = hash_to_bytes(empty_dir_id) storage.directory_add([{'id': empty_dir_id_bin, 'entries': []}]) # Return tests data return { 'storage': storage, 'idx_storage': idx_storage, 'origins': _TEST_ORIGINS, 'contents': contents, 'directories': list(directories), 'persons': list(persons), 'releases': list(releases), 'revisions': list(map(hash_to_hex, revisions)), 'snapshots': list(snapshots), 'generated_checksums': set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer), ('license_indexer', _FossologyLicenseIndexer), ('ctags_indexer', _CtagsIndexer)): idx = idx_class() idx.storage = tests_data['storage'] idx.objstorage = tests_data['storage'].objstorage idx.idx_storage = tests_data['idx_storage'] idx.register_tools(idx.config['tools']) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update({'storage': storage}) service.storage = storage swh_config.update({'indexer_storage': idx_storage}) service.idx_storage = idx_storage