diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index a88840da..9a8c9c47 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,671 +1,674 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given import pytest from rest_framework.test import APITestCase from unittest.mock import patch from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.common.utils import reverse from swh.web.common.origin_visits import get_origin_visits from swh.web.tests.strategies import ( origin, new_origin, new_origins, visit_dates, new_snapshots ) from swh.web.tests.testcase import WebTestCase class OriginApiTestCase(WebTestCase, APITestCase): @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_error( self, mock_get_origin_visits, ): err_msg = 'voluntary error to check the bad request middleware.' mock_get_origin_visits.side_effect = ValueError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 400, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'ValueError', 'reason': err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_db( self, mock_get_origin_visits): err_msg = 'Storage exploded! Will be back online shortly!' mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageDBError', 'reason': 'An unexpected error occurred in the backend: %s' % err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_api( self, mock_get_origin_visits): err_msg = 'Storage API dropped dead! Will resurrect asap!' mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageAPIError', 'reason': 'An unexpected error occurred in the api backend: %s' % err_msg }) @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) self.storage.snapshot_add([new_snapshots[i]]) self.storage.origin_visit_update( origin_id, origin_visit['visit'], snapshot=new_snapshots[i]['id']) all_visits = list(reversed(get_origin_visits(new_origin))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-1-origin-visits', url_args={'origin_url': new_origin['url']}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') for expected_visit in expected_visits: origin_visit_url = reverse( 'api-1-origin-visit', url_args={'origin_url': new_origin['url'], 'visit_id': expected_visit['visit']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_visit_url'] = origin_visit_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visits) @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) self.storage.snapshot_add([new_snapshots[i]]) self.storage.origin_visit_update( origin_id, origin_visit['visit'], snapshot=new_snapshots[i]['id']) all_visits = list(reversed(get_origin_visits(new_origin))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-1-origin-visits', url_args={'origin_url': new_origin['url']}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') for expected_visit in expected_visits: origin_visit_url = reverse( 'api-1-origin-visit', url_args={'origin_url': new_origin['url'], 'visit_id': expected_visit['visit']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_visit_url'] = origin_visit_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visits) @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_id = origin_visit['visit'] self.storage.snapshot_add([new_snapshots[i]]) self.storage.origin_visit_update( origin_id, origin_visit['visit'], snapshot=new_snapshots[i]['id']) url = reverse('api-1-origin-visit', url_args={'origin_url': new_origin['url'], 'visit_id': visit_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_id) origin_url = reverse('api-1-origin', url_args={'origin_url': new_origin['url']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visit) @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest( self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_ids.append(origin_visit['visit']) self.storage.snapshot_add([new_snapshots[0]]) self.storage.origin_visit_update( origin_id, visit_ids[0], snapshot=new_snapshots[0]['id']) url = reverse('api-1-origin-visit-latest', url_args={'origin_url': new_origin['url']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_ids[1]) origin_url = reverse('api-1-origin', url_args={'origin_url': new_origin['url']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = None self.assertEqual(rv.data, expected_visit) @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest_with_snapshot( self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_ids.append(origin_visit['visit']) self.storage.snapshot_add([new_snapshots[0]]) self.storage.origin_visit_update( origin_id, visit_ids[0], snapshot=new_snapshots[0]['id']) url = reverse('api-1-origin-visit-latest', url_args={'origin_url': new_origin['url']}) url += '?require_snapshot=true' rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_ids[0]) origin_url = reverse('api-1-origin', url_args={'origin_url': new_origin['url']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visit) @pytest.mark.origin_id @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit_by_id(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_id = origin_visit['visit'] self.storage.snapshot_add([new_snapshots[i]]) self.storage.origin_visit_update( origin_id, origin_visit['visit'], snapshot=new_snapshots[i]['id']) url = reverse('api-1-origin-visit', url_args={'origin_id': origin_id, 'visit_id': visit_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_id) origin_url = reverse('api-1-origin', url_args={'origin_url': new_origin['url']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visit) @given(origin()) def test_api_lookup_origin_visit_not_found(self, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) url = reverse('api-1-origin-visit', url_args={'origin_url': origin['url'], 'visit_id': max_visit_id + 1}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s or its visit with id %s not found!' % (origin['url'], max_visit_id+1) }) @pytest.mark.origin_id @given(origin()) def test_api_lookup_origin_visit_not_found_by_id(self, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) url = reverse('api-1-origin-visit', url_args={'origin_id': origin['id'], 'visit_id': max_visit_id + 1}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s or its visit with id %s not found!' % (origin['url'], max_visit_id+1) }) @pytest.mark.origin_id @given(origin()) def test_api_origin_by_id(self, origin): url = reverse('api-1-origin', url_args={'origin_id': origin['id']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-1-origin-visits', url_args={'origin_url': origin['url']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(origin()) def test_api_origin_by_url(self, origin): url = reverse('api-1-origin', url_args={'origin_url': origin['url']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-1-origin-visits', url_args={'origin_url': origin['url']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(origin()) def test_api_origin_by_type_url(self, origin): url = reverse('api-1-origin', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-1-origin-visits', url_args={'origin_url': origin['url']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(new_origin()) def test_api_origin_not_found(self, new_origin): url = reverse('api-1-origin', url_args={'origin_type': new_origin['type'], 'origin_url': new_origin['url']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s not found!' % new_origin['url'] }) @given(origin()) def test_api_origin_metadata_search(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'origin_url': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = [{ 'type': origin['type'], 'url': origin['url'], 'metadata': { 'metadata': {'author': 'Jane Doe'}, 'from_revision': ( '7026b7c1a2af56521e951c01ed20f255fa054238'), 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1', } } }] actual_data = rv.data for d in actual_data: if 'id' in d: del d['id'] self.assertEqual(rv.data, expected_data) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) @pytest.mark.origin_id @given(origin()) def test_api_origin_metadata_search_missing_url(self, origin): """indexer-storage with outdated db will return origin_url: None.""" with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'id': origin['id'], 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'origin_url': None, 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = [{ 'type': origin['type'], 'url': origin['url'], 'metadata': { 'metadata': {'author': 'Jane Doe'}, 'from_revision': ( '7026b7c1a2af56521e951c01ed20f255fa054238'), 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1', } } }] actual_data = rv.data for d in actual_data: if 'id' in d: del d['id'] self.assertEqual(rv.data, expected_data) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) @given(origin()) def test_api_origin_metadata_search_limit(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'origin_url': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 10}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=10) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 987}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=100) @given(origin()) def test_api_origin_intrinsic_metadata(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_get \ .side_effect = lambda origin_ids: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'origin_url': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-origin-intrinsic-metadata', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) rv = self.client.get(url) mock_idx_storage.origin_intrinsic_metadata_get \ .assert_called_once_with([origin['url']]) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = {'author': 'Jane Doe'} self.assertEqual(rv.data, expected_data) @patch('swh.web.common.service.idx_storage') def test_api_origin_metadata_search_invalid(self, mock_idx_storage): url = reverse('api-1-origin-metadata-search') rv = self.client.get(url) self.assertEqual(rv.status_code, 400, rv.content) mock_idx_storage.assert_not_called() + # in-memory storage does not handle different origin types for the + # same origin url but hypothesis will generate such kind of examples + @pytest.mark.xfail @pytest.mark.origin_id @given(new_origins(10)) def test_api_lookup_origins(self, new_origins): nb_origins = len(new_origins) expected_origins = self.storage.origin_add(new_origins) expected_origins.sort(key=lambda orig: orig['id']) origin_from_idx = random.randint(1, nb_origins-1) - 1 origin_from = expected_origins[origin_from_idx]['id'] max_origin_id = expected_origins[-1]['id'] origin_count = random.randint(1, max_origin_id - origin_from) url = reverse('api-1-origins', query_params={'origin_from': origin_from, 'origin_count': origin_count}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) start = origin_from_idx end = origin_from_idx + origin_count expected_origins = expected_origins[start:end] for expected_origin in expected_origins: expected_origin['origin_visits_url'] = reverse( 'api-1-origin-visits', url_args={'origin_url': expected_origin['url']}) self.assertEqual(rv.data, expected_origins) next_origin_id = expected_origins[-1]['id']+1 if self.storage.origin_get({'id': next_origin_id}): self.assertIn('Link', rv) next_url = reverse('api-1-origins', query_params={'origin_from': next_origin_id, 'origin_count': origin_count}) self.assertIn(next_url, rv['Link']) diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/tests/api/views/test_revision.py index 9280566c..a3ede02c 100644 --- a/swh/web/tests/api/views/test_revision.py +++ b/swh/web/tests/api/views/test_revision.py @@ -1,560 +1,538 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given import pytest from rest_framework.test import APITestCase from unittest.mock import patch -from swh.model.hashutil import hash_to_hex - from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse, parse_timestamp from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( - revision, new_revision, origin, origin_with_multiple_visits + revision, origin, origin_with_multiple_visits ) from swh.web.tests.testcase import WebTestCase class RevisionApiTestCase(WebTestCase, APITestCase): @given(revision()) def test_api_revision(self, revision): url = reverse('api-1-revision', url_args={'sha1_git': revision}) rv = self.client.get(url) expected_revision = self.revision_get(revision) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) def test_api_revision_not_found(self): unknown_revision_ = random_sha1() url = reverse('api-1-revision', url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_}) @given(revision()) def test_api_revision_raw_ok(self, revision): url = reverse('api-1-revision-raw-message', url_args={'sha1_git': revision}) rv = self.client.get(url) expected_message = self.revision_get(revision)['message'] self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/octet-stream') self.assertEqual(rv.content, expected_message.encode()) - @given(new_revision()) - def test_api_revision_raw_ok_no_msg(self, new_revision): - - del new_revision['message'] - self.storage.revision_add([new_revision]) - - new_revision_id = hash_to_hex(new_revision['id']) - - url = reverse('api-1-revision-raw-message', - url_args={'sha1_git': new_revision_id}) - - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'No message for revision with sha1_git %s.' % - new_revision_id}) - def test_api_revision_raw_ko_no_rev(self): unknown_revision_ = random_sha1() url = reverse('api-1-revision-raw-message', url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_}) @pytest.mark.origin_id def test_api_revision_with_origin_id_not_found(self): unknown_origin_id = random.randint(1000, 1000000) url = reverse('api-1-revision-origin', url_args={'origin_id': unknown_origin_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['content-type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s not found!' % unknown_origin_id}) @pytest.mark.origin_id @given(origin()) def test_api_revision_with_origin_id(self, origin): url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id']}) rv = self.client.get(url) snapshot = self.snapshot_get_latest(origin['url']) expected_revision = self.revision_get( snapshot['branches']['HEAD']['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) @pytest.mark.origin_id @given(origin()) def test_api_revision_with_origin_id_and_branch_name(self, origin): snapshot = self.snapshot_get_latest(origin['url']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['content-type'], 'application/json') self.assertEqual(rv.data, expected_revision) @pytest.mark.origin_id @given(origin_with_multiple_visits()) def test_api_revision_with_origin_id_and_branch_name_and_ts(self, origin): visit = random.choice(self.origin_visit_get(origin['url'])) snapshot = self.snapshot_get(visit['snapshot']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name, 'ts': visit['date']}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) @pytest.mark.origin_id @given(origin_with_multiple_visits()) def test_api_revision_with_origin_id_and_branch_name_and_ts_escapes( self, origin): visit = random.choice(self.origin_visit_get(origin['url'])) snapshot = self.snapshot_get(visit['snapshot']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) date = parse_timestamp(visit['date']) formatted_date = date.strftime('Today is %B %d, %Y at %X') url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name, 'ts': formatted_date}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) @pytest.mark.origin_id def test_api_directory_through_revision_origin_id_ko(self): unknown_origin_id_ = random.randint(1000, 1000000) url = reverse('api-1-revision-origin-directory', url_args={'origin_id': unknown_origin_id_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s not found!' % unknown_origin_id_ }) @pytest.mark.origin_id @given(origin()) def test_api_directory_through_revision_origin_id(self, origin): url = reverse('api-1-revision-origin-directory', url_args={'origin_id': origin['id']}) rv = self.client.get(url) snapshot = self.snapshot_get_latest(origin['id']) revision_id = snapshot['branches']['HEAD']['target'] revision = self.revision_get(revision_id) directory = self.directory_ls(revision['directory']) for entry in directory: if entry['type'] == 'dir': entry['target_url'] = reverse( 'api-1-directory', url_args={'sha1_git': entry['target']} ) entry['dir_url'] = reverse( 'api-1-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) elif entry['type'] == 'file': entry['target_url'] = reverse( 'api-1-content', url_args={'q': 'sha1_git:%s' % entry['target']} ) entry['file_url'] = reverse( 'api-1-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) elif entry['type'] == 'rev': entry['target_url'] = reverse( 'api-1-revision', url_args={'sha1_git': entry['target']} ) entry['rev_url'] = reverse( 'api-1-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) expected_result = { 'content': directory, 'path': '.', 'revision': revision_id, 'type': 'dir' } self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_result) @given(revision()) def test_api_revision_log(self, revision): per_page = 10 url = reverse('api-1-revision-log', url_args={'sha1_git': revision}, query_params={'per_page': per_page}) rv = self.client.get(url) expected_log = self.revision_log(revision, limit=per_page+1) expected_log = list(map(self._enrich_revision, expected_log)) has_next = len(expected_log) > per_page self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log[:-1] if has_next else expected_log) if has_next: self.assertIn('Link', rv) next_log_url = reverse( 'api-1-revision-log', url_args={'sha1_git': expected_log[-1]['id']}, query_params={'per_page': per_page}) self.assertIn(next_log_url, rv['Link']) def test_api_revision_log_not_found(self): unknown_revision_ = random_sha1() url = reverse('api-1-revision-log', url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_}) self.assertFalse(rv.has_header('Link')) @given(revision()) def test_api_revision_log_context(self, revision): revisions = self.revision_log(revision, limit=4) prev_rev = revisions[0]['id'] rev = revisions[-1]['id'] per_page = 10 url = reverse('api-1-revision-log', url_args={'sha1_git': rev, 'prev_sha1s': prev_rev}, query_params={'per_page': per_page}) rv = self.client.get(url) expected_log = self.revision_log(rev, limit=per_page) prev_revision = self.revision_get(prev_rev) expected_log.insert(0, prev_revision) expected_log = list(map(self._enrich_revision, expected_log)) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log) @pytest.mark.origin_id @given(origin()) def test_api_revision_log_by_origin_id(self, origin): per_page = 10 url = reverse('api-1-revision-origin-log', url_args={'origin_id': origin['id']}, query_params={'per_page': per_page}) rv = self.client.get(url) snapshot = self.snapshot_get_latest(origin['url']) expected_log = self.revision_log( snapshot['branches']['HEAD']['target'], limit=per_page+1) expected_log = list(map(self._enrich_revision, expected_log)) has_next = len(expected_log) > per_page self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log[:-1] if has_next else expected_log) if has_next: self.assertIn('Link', rv) next_log_url = reverse( 'api-1-revision-origin-log', url_args={'origin_id': origin['id'], 'branch_name': 'HEAD'}, query_params={'per_page': per_page, 'sha1_git': expected_log[-1]['id']}) self.assertIn(next_log_url, rv['Link']) @pytest.mark.origin_id @given(origin()) def test_api_revision_log_by_ko(self, origin): invalid_branch_name = 'foobar' url = reverse('api-1-revision-origin-log', url_args={'origin_id': origin['id'], 'branch_name': invalid_branch_name}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertFalse(rv.has_header('Link')) self.assertEqual( rv.data, {'exception': 'NotFoundExc', 'reason': 'Revision for origin %s and branch %s not found.' % (origin['id'], invalid_branch_name)}) @pytest.mark.origin_id @given(origin()) def test_api_revision_log_by_origin_id_ko(self, origin): invalid_branch_name = 'foobar' url = reverse('api-1-revision-origin-log', url_args={'origin_id': origin['id'], 'branch_name': invalid_branch_name}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertFalse(rv.has_header('Link')) self.assertEqual( rv.data, {'exception': 'NotFoundExc', 'reason': 'Revision for origin %s and branch %s not found.' % (origin['id'], invalid_branch_name)}) @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ko_not_found(self, mock_rev_dir): # given mock_rev_dir.side_effect = NotFoundExc('Not found') # then rv = self.client.get('/api/1/revision/999/directory/some/path/to/dir/') self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Not found'}) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path/to/dir', '/api/1/revision/999/directory/some/path/to/dir/', with_data=False) @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir): stub_dir = { 'type': 'dir', 'revision': '999', 'content': [ { 'sha1_git': '789', 'type': 'file', 'target': '101', 'target_url': '/api/1/content/sha1_git:101/', 'name': 'somefile', 'file_url': '/api/1/revision/999/directory/some/path/' 'somefile/' }, { 'sha1_git': '123', 'type': 'dir', 'target': '456', 'target_url': '/api/1/directory/456/', 'name': 'to-subdir', 'dir_url': '/api/1/revision/999/directory/some/path/' 'to-subdir/', }] } # given mock_rev_dir.return_value = stub_dir # then rv = self.client.get('/api/1/revision/999/directory/some/path/') self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, stub_dir) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path', '/api/1/revision/999/directory/some/path/', with_data=False) @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ok_returns_content(self, mock_rev_dir): stub_content = { 'type': 'file', 'revision': '999', 'content': { 'sha1_git': '789', 'sha1': '101', 'data_url': '/api/1/content/101/raw/', } } # given mock_rev_dir.return_value = stub_content # then url = '/api/1/revision/666/directory/some/other/path/' rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, stub_content) mock_rev_dir.assert_called_once_with( {'sha1_git': '666'}, 'some/other/path', url, with_data=False) def _enrich_revision(self, revision): directory_url = reverse( 'api-1-directory', url_args={'sha1_git': revision['directory']}) history_url = reverse('api-1-revision-log', url_args={'sha1_git': revision['id']}) parents_id_url = [] for p in revision['parents']: parents_id_url.append({ 'id': p, 'url': reverse('api-1-revision', url_args={'sha1_git': p}) }) revision_url = reverse('api-1-revision', url_args={'sha1_git': revision['id']}) revision['directory_url'] = directory_url revision['history_url'] = history_url revision['url'] = revision_url revision['parents'] = parents_id_url return revision @given(revision()) def test_api_revision_uppercase(self, revision): url = reverse('api-1-revision-uppercase-checksum', url_args={'sha1_git': revision.upper()}) resp = self.client.get(url) self.assertEqual(resp.status_code, 302) redirect_url = reverse('api-1-revision', url_args={'sha1_git': revision}) self.assertEqual(resp['location'], redirect_url) diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index 05f11fb1..cfa7de41 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,878 +1,862 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import pytest import random from collections import defaultdict from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.from_disk import DentryPerms from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.tests.data import random_sha1, random_content from swh.web.tests.strategies import ( content, contents, unknown_contents, contents_with_ctags, origin, new_origin, visit_dates, directory, release, revision, unknown_revision, revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, revision_with_submodules, empty_directory, new_revision, new_origins ) from swh.web.tests.testcase import ( WebTestCase, ctags_json_missing, fossology_missing ) class ServiceTestCase(WebTestCase): @given(contents()) def test_lookup_multiple_hashes_all_present(self, contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': True}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(self, contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': cnt in contents}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) def test_lookup_hash_does_not_exist(self): unknown_content_ = random_content() actual_lookup = service.lookup_hash('sha1_git:%s' % unknown_content_['sha1_git']) self.assertEqual(actual_lookup, {'found': None, 'algo': 'sha1_git'}) @given(content()) def test_lookup_hash_exist(self, content): actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) content_metadata = self.content_get_metadata(content['sha1']) self.assertEqual({'found': content_metadata, 'algo': 'sha1'}, actual_lookup) def test_search_hash_does_not_exist(self): unknown_content_ = random_content() actual_lookup = service.search_hash('sha1_git:%s' % unknown_content_['sha1_git']) self.assertEqual({'found': False}, actual_lookup) @given(content()) def test_search_hash_exist(self, content): actual_lookup = service.search_hash('sha1:%s' % content['sha1']) self.assertEqual({'found': True}, actual_lookup) @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(contents_with_ctags()) def test_lookup_content_ctags(self, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags['sha1s']) self.content_add_ctags(content_sha1) actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % content_sha1)) expected_data = list(self.content_get_ctags(content_sha1)) for ctag in expected_data: ctag['id'] = content_sha1 self.assertEqual(actual_ctags, expected_data) def test_lookup_content_ctags_no_hash(self): unknown_content_ = random_content() actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % unknown_content_['sha1'])) self.assertEqual(actual_ctags, []) @given(content()) def test_lookup_content_filetype(self, content): self.content_add_mimetype(content['sha1']) actual_filetype = service.lookup_content_filetype(content['sha1']) expected_filetype = self.content_get_mimetype(content['sha1']) self.assertEqual(actual_filetype, expected_filetype) @pytest.mark.xfail # Language indexer is disabled. @given(content()) def test_lookup_content_language(self, content): self.content_add_language(content['sha1']) actual_language = service.lookup_content_language(content['sha1']) expected_language = self.content_get_language(content['sha1']) self.assertEqual(actual_language, expected_language) @given(contents_with_ctags()) def test_lookup_expression(self, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags['sha1s']: if len(expected_ctags) == per_page: break self.content_add_ctags(content_sha1) for ctag in self.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag['name'] == contents_with_ctags['symbol_name']: del ctag['id'] ctag['sha1'] = content_sha1 expected_ctags.append(ctag) actual_ctags = \ list(service.lookup_expression(contents_with_ctags['symbol_name'], last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) def test_lookup_expression_no_result(self): expected_ctags = [] actual_ctags = \ list(service.lookup_expression('barfoo', last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(self, content): self.content_add_license(content['sha1']) actual_license = service.lookup_content_license(content['sha1']) expected_license = self.content_get_license(content['sha1']) self.assertEqual(actual_license, expected_license) def test_stat_counters(self): actual_stats = service.stat_counters() self.assertEqual(actual_stats, self.storage.stat_counters()) @given(new_origin(), visit_dates()) def test_lookup_origin_visits(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) for ts in visit_dates: self.storage.origin_visit_add(origin_id, ts) actual_origin_visits = list( service.lookup_origin_visits(origin_id, per_page=100)) expected_visits = self.origin_visit_get(origin_id) self.assertEqual(actual_origin_visits, expected_visits) @given(new_origin(), visit_dates()) def test_lookup_origin_visit(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) visits = [] for ts in visit_dates: visits.append(self.storage.origin_visit_add(origin_id, ts)) visit = random.choice(visits)['visit'] actual_origin_visit = service.lookup_origin_visit(origin_id, visit) expected_visit = dict(self.storage.origin_visit_get_by(origin_id, visit)) expected_visit['date'] = expected_visit['date'].isoformat() expected_visit['metadata'] = {} self.assertEqual(actual_origin_visit, expected_visit) @pytest.mark.origin_id @given(new_origin()) def test_lookup_origin_by_id(self, new_origin): origin_id = self.storage.origin_add_one(new_origin) actual_origin = service.lookup_origin({'id': origin_id}) expected_origin = self.storage.origin_get({'id': origin_id}) self.assertEqual(actual_origin, expected_origin) @given(new_origin()) def test_lookup_origin(self, new_origin): self.storage.origin_add_one(new_origin) actual_origin = service.lookup_origin({'type': new_origin['type'], 'url': new_origin['url']}) expected_origin = self.storage.origin_get({'type': new_origin['type'], 'url': new_origin['url']}) self.assertEqual(actual_origin, expected_origin) @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(self, invalid_sha1): with self.assertRaises(BadInputExc) as cm: service.lookup_release(invalid_sha1) self.assertIn('invalid checksum', cm.exception.args[0].lower()) @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(self, sha256): with self.assertRaises(BadInputExc) as cm: service.lookup_release(sha256) self.assertEqual('Only sha1_git is supported.', cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_not_found(self, directory): path = 'some/invalid/path/here' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_path(directory, path) self.assertEqual('Directory entry with path %s from %s ' 'not found' % (path, directory), cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_found(self, directory): directory_content = self.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry['name'] actual_result = service.lookup_directory_with_path(directory, path) self.assertEqual(actual_result, directory_entry) @given(release()) def test_lookup_release(self, release): actual_release = service.lookup_release(release) self.assertEqual(actual_release, self.release_get(release)) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(self, revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Invalid checksum query string', cm.exception.args[0]) sha1_git = sha256 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = revision sha1_git = unknown_revision with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision %s not found' % sha1_git, cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = unknown_revision sha1_git = revision with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision root %s not found' % sha1_git_root, cm.exception.args[0]) @given(ancestor_revisions()) def test_lookup_revision_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] root_sha1_git = ancestor_revisions['sha1_git_root'] for sha1_git_root in (root_sha1_git, {'id': hash_to_bytes(root_sha1_git)}): actual_revision = \ service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in self.revision_log(root_sha1_git): for p_rev in rev['parents']: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev['id']) expected_revision = self.revision_get(sha1_git) expected_revision['children'] = children self.assertEqual(actual_revision, expected_revision) @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(self, non_ancestor_revisions): sha1_git = non_ancestor_revisions['sha1_git'] root_sha1_git = non_ancestor_revisions['sha1_git_root'] with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(root_sha1_git, sha1_git) self.assertIn('Revision %s is not an ancestor of %s' % (sha1_git, root_sha1_git), cm.exception.args[0]) def test_lookup_directory_with_revision_not_found(self): unknown_revision_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision(unknown_revision_) self.assertIn('Revision %s not found' % unknown_revision_, cm.exception.args[0]) def test_lookup_directory_with_revision_unknown_content(self): unknown_content_ = random_content() unknown_revision_ = random_sha1() unknown_directory_ = random_sha1() dir_path = 'README.md' # Create a revision that points to a directory # Which points to unknown content revision = { 'author': { 'name': b'abcd', 'email': b'abcd@company.org', 'fullname': b'abcd abcd' }, 'committer': { 'email': b'aaaa@company.org', 'fullname': b'aaaa aaa', 'name': b'aaa' }, 'committer_date': { 'negative_utc': False, 'offset': 0, 'timestamp': 1437511651 }, 'date': { 'negative_utc': False, 'offset': 0, 'timestamp': 1437511651 }, 'message': b'bleh', 'metadata': [], 'parents': [], 'synthetic': False, - 'type': 'file', + 'type': 'git', 'id': hash_to_bytes(unknown_revision_), 'directory': hash_to_bytes(unknown_directory_) } # A directory that points to unknown content dir = { 'id': hash_to_bytes(unknown_directory_), 'entries': [{ 'name': bytes(dir_path.encode('utf-8')), 'type': 'file', 'target': hash_to_bytes(unknown_content_['sha1_git']), 'perms': DentryPerms.content }] } # Add the directory and revision in mem self.storage.directory_add([dir]) self.storage.revision_add([revision]) with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision( unknown_revision_, dir_path) self.assertIn('Content not found for revision %s' % unknown_revision_, cm.exception.args[0]) @given(revision()) def test_lookup_directory_with_revision_ko_path_to_nowhere( self, revision): invalid_path = 'path/to/something/unknown' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision(revision, invalid_path) exception_text = cm.exception.args[0].lower() self.assertIn('directory or file', exception_text) self.assertIn(invalid_path, exception_text) self.assertIn('revision %s' % revision, exception_text) self.assertIn('not found', exception_text) @given(revision_with_submodules()) def test_lookup_directory_with_revision_submodules( self, revision_with_submodules): rev_sha1_git = revision_with_submodules['rev_sha1_git'] rev_dir_path = revision_with_submodules['rev_dir_rev_path'] actual_data = service.lookup_directory_with_revision( rev_sha1_git, rev_dir_path) revision = self.revision_get(revision_with_submodules['rev_sha1_git']) directory = self.directory_ls(revision['directory']) rev_entry = next(e for e in directory if e['name'] == rev_dir_path) expected_data = { 'content': self.revision_get(rev_entry['target']), 'path': rev_dir_path, 'revision': rev_sha1_git, 'type': 'rev' } self.assertEqual(actual_data, expected_data) @given(revision()) def test_lookup_directory_with_revision_without_path(self, revision): actual_directory_entries = \ service.lookup_directory_with_revision(revision) revision_data = self.revision_get(revision) expected_directory_entries = \ self.directory_ls(revision_data['directory']) self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(actual_directory_entries['content'], expected_directory_entries) @given(revision()) def test_lookup_directory_with_revision_with_path(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] in ('file', 'dir')] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name']) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) if actual_dir_entry['type'] == 'file': del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) else: sub_dir_entries = self.directory_ls(expected_dir_entry['target']) self.assertEqual(actual_dir_entry['content'], sub_dir_entries) @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] expected_dir_entry = random.choice(dir_entries) expected_data = \ self.content_get(expected_dir_entry['checksums']['sha1']) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name'], with_data=True) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) self.assertEqual(actual_dir_entry['content']['data'], expected_data['data']) @given(revision()) def test_lookup_revision(self, revision): actual_revision = service.lookup_revision(revision) self.assertEqual(actual_revision, self.revision_get(revision)) @given(new_revision()) def test_lookup_revision_invalid_msg(self, new_revision): new_revision['message'] = b'elegant fix for bug \xff' self.storage.revision_add([new_revision]) revision = service.lookup_revision(hash_to_hex(new_revision['id'])) self.assertEqual(revision['message'], None) self.assertEqual(revision['message_decoding_failed'], True) @given(new_revision()) def test_lookup_revision_msg_ok(self, new_revision): self.storage.revision_add([new_revision]) revision_message = service.lookup_revision_message( hash_to_hex(new_revision['id'])) self.assertEqual(revision_message, {'message': new_revision['message']}) - @given(new_revision()) - def test_lookup_revision_msg_absent(self, new_revision): - - del new_revision['message'] - self.storage.revision_add([new_revision]) - - new_revision_id = hash_to_hex(new_revision['id']) - - with self.assertRaises(NotFoundExc) as cm: - service.lookup_revision_message(new_revision_id) - - self.assertEqual( - cm.exception.args[0], - 'No message for revision with sha1_git %s.' % new_revision_id - ) - def test_lookup_revision_msg_no_rev(self): unknown_revision_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message(unknown_revision_) self.assertEqual( cm.exception.args[0], 'Revision with sha1_git %s not found.' % unknown_revision_ ) @given(revisions()) def test_lookup_revision_multiple(self, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(self.revision_get(rev)) self.assertEqual(actual_revisions, expected_revisions) def test_lookup_revision_multiple_none_found(self): unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] actual_revisions = \ list(service.lookup_revision_multiple(unknown_revisions_)) self.assertEqual(actual_revisions, [None] * len(unknown_revisions_)) @given(revision()) def test_lookup_revision_log(self, revision): actual_revision_log = \ list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = self.revision_log(revision, limit=25) self.assertEqual(actual_revision_log, expected_revision_log) def _get_origin_branches(self, origin): origin_visit = self.origin_visit_get(origin['url'])[-1] snapshot = self.snapshot_get(origin_visit['snapshot']) branches = {k: v for (k, v) in snapshot['branches'].items() if v['target_type'] == 'revision'} return branches @given(origin()) def test_lookup_revision_log_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_log = \ list(service.lookup_revision_log_by(origin['url'], branch_name, None, limit=25)) expected_log = \ self.revision_log(branches[branch_name]['target'], limit=25) self.assertEqual(actual_log, expected_log) @given(origin()) def test_lookup_revision_log_by_notfound(self, origin): with self.assertRaises(NotFoundExc): service.lookup_revision_log_by( origin['url'], 'unknown_branch_name', None, limit=100) def test_lookup_content_raw_not_found(self): unknown_content_ = random_content() with self.assertRaises(NotFoundExc) as cm: service.lookup_content_raw('sha1:' + unknown_content_['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_raw(self, content): actual_content = service.lookup_content_raw( 'sha256:%s' % content['sha256']) expected_content = self.content_get(content['sha1']) self.assertEqual(actual_content, expected_content) def test_lookup_content_not_found(self): unknown_content_ = random_content() with self.assertRaises(NotFoundExc) as cm: service.lookup_content('sha1:%s' % unknown_content_['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_with_sha1(self, content): actual_content = service.lookup_content( 'sha1:%s' % content['sha1']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) @given(content()) def test_lookup_content_with_sha256(self, content): actual_content = service.lookup_content( 'sha256:%s' % content['sha256']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) def test_lookup_directory_bad_checksum(self): with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') def test_lookup_directory_not_found(self): unknown_directory_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: service.lookup_directory(unknown_directory_) self.assertIn('Directory with sha1_git %s not found' % unknown_directory_, cm.exception.args[0]) @given(directory()) def test_lookup_directory(self, directory): actual_directory_ls = list(service.lookup_directory( directory)) expected_directory_ls = self.directory_ls(directory) self.assertEqual(actual_directory_ls, expected_directory_ls) @given(empty_directory()) def test_lookup_directory_empty(self, empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) self.assertEqual(actual_directory_ls, []) @given(origin()) def test_lookup_revision_by_nothing_found(self, origin): with self.assertRaises(NotFoundExc): service.lookup_revision_by( origin['url'], 'invalid-branch-name') @given(origin()) def test_lookup_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_revision = \ service.lookup_revision_by(origin['url'], branch_name, None) expected_revision = \ self.revision_get(branches[branch_name]['target']) self.assertEqual(actual_revision, expected_revision) @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(self, origin, revision): with self.assertRaises(NotFoundExc): service.lookup_revision_with_context_by(origin['url'], 'invalid-branch-name', None, revision) @given(origin()) def test_lookup_revision_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev['parents']: children[rev_p].append(rev['id']) rev = root_rev_log[-1]['id'] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin['url'], branch_name, None, rev) expected_root_rev = self.revision_get(root_rev) expected_rev = self.revision_get(rev) expected_rev['children'] = children[rev] self.assertEqual(actual_root_rev, expected_root_rev) self.assertEqual(actual_rev, expected_rev) def test_lookup_revision_through_ko_not_implemented(self): with self.assertRaises(NotImplementedError): service.lookup_revision_through({ 'something-unknown': 10, }) @given(origin()) def test_lookup_revision_through_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) rev = root_rev_log[-1]['id'] self.assertEqual(service.lookup_revision_through({ 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, 'sha1_git': rev }), service.lookup_revision_with_context_by( origin['url'], branch_name, None, rev) ) @given(origin()) def test_lookup_revision_through_with_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) self.assertEqual(service.lookup_revision_through({ 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, }), service.lookup_revision_by( origin['url'], branch_name, None) ) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] sha1_git_root = ancestor_revisions['sha1_git_root'] self.assertEqual(service.lookup_revision_through({ 'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, }), service.lookup_revision_with_context( sha1_git_root, sha1_git) ) @given(revision()) def test_lookup_revision_through_with_revision(self, revision): self.assertEqual(service.lookup_revision_through({ 'sha1_git': revision }), service.lookup_revision(revision) ) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(self, revision): with self.assertRaises(NotFoundExc): service.lookup_directory_through_revision( {'sha1_git': revision}, 'some/invalid/path') @given(revision()) def test_lookup_directory_through_revision_ok(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name']), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'])) ) @given(revision()) def test_lookup_directory_through_revision_ok_with_data(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name'], with_data=True), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'], with_data=True)) ) @pytest.mark.origin_id @given(new_origins(20)) def test_lookup_origins(self, new_origins): nb_origins = len(new_origins) expected_origins = self.storage.origin_add(new_origins) expected_origins.sort(key=lambda orig: orig['id']) origin_from_idx = random.randint(1, nb_origins-1) - 1 origin_from = expected_origins[origin_from_idx]['id'] max_origin_idx = expected_origins[-1]['id'] origin_count = random.randint(1, max_origin_idx - origin_from) actual_origins = list(service.lookup_origins(origin_from, origin_count)) expected_origins = list(self.storage.origin_get_range(origin_from, origin_count)) self.assertEqual(actual_origins, expected_origins) diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index 604e6940..bb651049 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,464 +1,466 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy import os import random from rest_framework.decorators import api_view from rest_framework.response import Response from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.ctags import CtagsIndexer from swh.indexer.storage import get_indexer_storage from swh.model.from_disk import Directory from swh.model.hashutil import hash_to_hex, hash_to_bytes, DEFAULT_ALGORITHMS from swh.model.identifiers import directory_identifier from swh.loader.git.from_disk import GitLoaderFromArchive from swh.storage.algos.dir_iterators import dir_iterator from swh.web import config from swh.web.browse.utils import ( get_mimetype_and_encoding_for_content, prepare_content_for_display ) from swh.web.common import service from swh.web.common.highlightjs import get_hljs_language_from_filename # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { 'storage': { 'cls': 'memory', 'args': {} }, 'send_contents': True, 'send_directories': True, 'send_revisions': True, 'send_releases': True, 'send_snapshot': True, 'content_size_limit': 100 * 1024 * 1024, 'content_packet_size': 10, 'content_packet_size_bytes': 100 * 1024 * 1024, 'directory_packet_size': 10, 'revision_packet_size': 10, 'release_packet_size': 10, 'save_data': False, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { 'storage': { 'cls': 'memory', 'args': {}, }, 'objstorage': { 'cls': 'memory', 'args': {}, }, 'indexer_storage': { 'cls': 'memory', 'args': {}, } } def random_sha1(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) def random_sha256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_blake2s256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_content(): return { 'sha1': random_sha1(), 'sha1_git': random_sha1(), 'sha256': random_sha256(), 'blake2s256': random_blake2s256(), } # MimetypeIndexer with custom configuration for tests class _MimetypeIndexer(MimetypeIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" } } } # FossologyLicenseIndexer with custom configuration for tests class _FossologyLicenseIndexer(FossologyLicenseIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.fossology.license', 'tools': { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': { 'command_line': 'nomossa ', }, } } # CtagsIndexer with custom configuration for tests class _CtagsIndexer(CtagsIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.ctags', 'languages': {'c': 'c'}, 'tools': { 'name': 'universal-ctags', 'version': '~git7859817b', 'configuration': { 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' # noqa '''--output-format=json ''' }, } } # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { 'type': 'git', 'url': 'https://github.com/wcoder/highlightjs-line-numbers.js', 'archives': ['highlightjs-line-numbers.js.zip', 'highlightjs-line-numbers.js_visit2.zip'], 'visit_date': ['Dec 1 2018, 01:00 UTC', 'Jan 20 2019, 15:00 UTC'] }, { 'type': 'git', 'url': 'https://github.com/memononen/libtess2', 'archives': ['libtess2.zip'], 'visit_date': ['May 25 2018, 01:00 UTC'] }, { 'type': 'git', 'url': 'repo_with_submodules', 'archives': ['repo_with_submodules.tgz'], 'visit_date': ['Jan 1 2019, 01:00 UTC'] } ] _contents = {} # Tests data initialization def _init_tests_data(): # Load git repositories from archives loader = GitLoaderFromArchive(config=_TEST_LOADER_CONFIG) # Get reference to the memory storage storage = loader.storage for origin in _TEST_ORIGINS: for i, archive in enumerate(origin['archives']): origin_repo_archive = \ os.path.join(os.path.dirname(__file__), 'resources/repos/%s' % archive) loader.load(origin['url'], origin_repo_archive, origin['visit_date'][i]) origin.update(storage.origin_get(origin)) # add an 'id' key if enabled contents = set() directories = set() revisions = set() releases = set() snapshots = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: snp = storage.snapshot_get_latest(origin['url']) snapshots.add(hash_to_hex(snp['id'])) for branch_name, branch_data in snp['branches'].items(): if branch_data['target_type'] == 'revision': revisions.add(branch_data['target']) elif branch_data['target_type'] == 'release': release = next(storage.release_get([branch_data['target']])) revisions.add(release['target']) releases.add(hash_to_hex(branch_data['target'])) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): dir_id = rev['directory'] directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): content_path[entry['sha1']] = '/'.join( [hash_to_hex(dir_id), entry['path'].decode('utf-8')]) if entry['type'] == 'file': contents.add(entry['sha1']) elif entry['type'] == 'dir': directories.add(hash_to_hex(entry['target'])) # Get all checksums for each content contents_metadata = storage.content_get_metadata(contents) contents = [] for content_metadata in contents_metadata: contents.append({ algo: hash_to_hex(content_metadata[algo]) for algo in DEFAULT_ALGORITHMS }) path = content_path[content_metadata['sha1']] cnt = next(storage.content_get([content_metadata['sha1']])) mimetype, encoding = get_mimetype_and_encoding_for_content(cnt['data']) content_display_data = prepare_content_for_display( cnt['data'], mimetype, path) contents[-1]['path'] = path contents[-1]['mimetype'] = mimetype contents[-1]['encoding'] = encoding contents[-1]['hljs_language'] = content_display_data['language'] contents[-1]['data'] = content_display_data['content_data'] _contents[contents[-1]['sha1']] = contents[-1] # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage('memory', {}) # Add the empty directory to the test archive empty_dir_id = directory_identifier({'entries': []}) empty_dir_id_bin = hash_to_bytes(empty_dir_id) storage.directory_add([{'id': empty_dir_id_bin, 'entries': []}]) # Return tests data return { 'storage': storage, 'idx_storage': idx_storage, 'origins': _TEST_ORIGINS, 'contents': contents, 'directories': list(directories), 'releases': list(releases), 'revisions': list(map(hash_to_hex, revisions)), 'snapshots': list(snapshots), 'generated_checksums': set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer), ('license_indexer', _FossologyLicenseIndexer), ('ctags_indexer', _CtagsIndexer)): idx = idx_class() idx.storage = tests_data['storage'] idx.objstorage = tests_data['storage'].objstorage idx.idx_storage = tests_data['idx_storage'] idx.register_tools(idx.config['tools']) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update({'storage': storage}) service.storage = storage swh_config.update({'indexer_storage': idx_storage}) service.idx_storage = idx_storage # Implement some special endpoints used to provide input tests data # when executing end to end tests with cypress _content_code_data_exts = {} _content_code_data_filenames = {} _content_other_data_exts = {} def _init_content_tests_data(data_path, data_dict, ext_key): """ Helper function to read the content of a directory, store it into a test archive and add some files metadata (sha1 and/or expected programming language) in a dict. Args: data_path (str): path to a directory relative to the tests folder of swh-web data_dict (dict): the dict that will store files metadata ext_key (bool): whether to use file extensions or filenames as dict keys """ test_contents_dir = os.path.join( os.path.dirname(__file__), data_path).encode('utf-8') directory = Directory.from_disk(path=test_contents_dir, data=True, save_path=True) objects = directory.collect() for c in objects['content'].values(): c['status'] = 'visible' sha1 = hash_to_hex(c['sha1']) if ext_key: key = c['path'].decode('utf-8').split('.')[-1] filename = 'test.' + key else: filename = c['path'].decode('utf-8').split('/')[-1] key = filename language = get_hljs_language_from_filename(filename) data_dict[key] = {'sha1': sha1, 'language': language} + del c['path'] + del c['perms'] storage = get_tests_data()['storage'] storage.content_add(objects['content'].values()) def _init_content_code_data_exts(): """ Fill a global dictionary which maps source file extension to a code content example. """ global _content_code_data_exts _init_content_tests_data('resources/contents/code/extensions', _content_code_data_exts, True) def _init_content_other_data_exts(): """ Fill a global dictionary which maps a file extension to a content example. """ global _content_other_data_exts _init_content_tests_data('resources/contents/other/extensions', _content_other_data_exts, True) def _init_content_code_data_filenames(): """ Fill a global dictionary which maps a filename to a content example. """ global _content_code_data_filenames _init_content_tests_data('resources/contents/code/filenames', _content_code_data_filenames, False) if config.get_config()['e2e_tests_mode']: _init_content_code_data_exts() _init_content_other_data_exts() _init_content_code_data_filenames() @api_view(['GET']) def get_content_code_data_all_exts(request): """ Endpoint implementation returning a list of all source file extensions to test for highlighting using cypress. """ return Response(sorted(_content_code_data_exts.keys()), status=200, content_type='application/json') @api_view(['GET']) def get_content_code_data_by_ext(request, ext): """ Endpoint implementation returning metadata of a code content example based on the source file extension. """ data = None status = 404 if ext in _content_code_data_exts: data = _content_code_data_exts[ext] status = 200 return Response(data, status=status, content_type='application/json') @api_view(['GET']) def get_content_other_data_by_ext(request, ext): """ Endpoint implementation returning metadata of a content example based on the file extension. """ _init_content_other_data_exts() data = None status = 404 if ext in _content_other_data_exts: data = _content_other_data_exts[ext] status = 200 return Response(data, status=status, content_type='application/json') @api_view(['GET']) def get_content_code_data_all_filenames(request): """ Endpoint implementation returning a list of all source filenames to test for highlighting using cypress. """ return Response(sorted(_content_code_data_filenames.keys()), status=200, content_type='application/json') @api_view(['GET']) def get_content_code_data_by_filename(request, filename): """ Endpoint implementation returning metadata of a code content example based on the source filename. """ data = None status = 404 if filename in _content_code_data_filenames: data = _content_code_data_filenames[filename] status = 200 return Response(data, status=status, content_type='application/json')