diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index c57885ca..5c3f386a 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,669 +1,672 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given import pytest from requests.utils import parse_header_links +from swh.model.model import Origin + from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.api.utils import enrich_origin_visit, enrich_origin from swh.web.common.exc import BadInputExc from swh.web.common.utils import reverse from swh.web.common.origin_visits import get_origin_visits from swh.web.tests.strategies import ( origin, new_origin, visit_dates, new_snapshots ) def _scroll_results(api_client, url): """Iterates through pages of results, and returns them all.""" results = [] while True: rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' results.extend(rv.data) if 'Link' in rv: for link in parse_header_links(rv['Link']): if link['rel'] == 'next': # Found link to next page of results url = link['url'] break else: # No link with 'rel=next' break else: # No Link header break return results def test_api_lookup_origin_visits_raise_error(api_client, mocker): mock_get_origin_visits = mocker.patch( 'swh.web.api.views.origin.get_origin_visits') err_msg = 'voluntary error to check the bad request middleware.' mock_get_origin_visits.side_effect = BadInputExc(err_msg) url = reverse('api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = api_client.get(url) assert rv.status_code == 400, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'BadInputExc', 'reason': err_msg } def test_api_lookup_origin_visits_raise_swh_storage_error_db(api_client, mocker): mock_get_origin_visits = mocker.patch( 'swh.web.api.views.origin.get_origin_visits') err_msg = 'Storage exploded! Will be back online shortly!' mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse('api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = api_client.get(url) assert rv.status_code == 503, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'StorageDBError', 'reason': 'An unexpected error occurred in the backend: %s' % err_msg } def test_api_lookup_origin_visits_raise_swh_storage_error_api(api_client, mocker): mock_get_origin_visits = mocker.patch( 'swh.web.api.views.origin.get_origin_visits') err_msg = 'Storage API dropped dead! Will resurrect asap!' mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = api_client.get(url) assert rv.status_code == 503, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'StorageAPIError', 'reason': 'An unexpected error occurred in the api backend: %s' % err_msg } @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits(api_client, archive_data, new_origin, visit_dates, new_snapshots): + archive_data.origin_add_one(new_origin) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( - new_origin['url'], visit_date, type='git') + new_origin.url, visit_date, type='git') archive_data.snapshot_add([new_snapshots[i]]) archive_data.origin_visit_update( - new_origin['url'], origin_visit['visit'], - snapshot=new_snapshots[i]['id']) + new_origin.url, origin_visit['visit'], + snapshot=new_snapshots[i].id) - all_visits = list(reversed(get_origin_visits(new_origin))) + all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:])): url = reverse('api-1-origin-visits', - url_args={'origin_url': new_origin['url']}, + url_args={'origin_url': new_origin.url}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( - new_origin['url'], visit_date, type='git') + new_origin.url, visit_date, type='git') archive_data.snapshot_add([new_snapshots[i]]) archive_data.origin_visit_update( - new_origin['url'], origin_visit['visit'], - snapshot=new_snapshots[i]['id']) + new_origin.url, origin_visit['visit'], + snapshot=new_snapshots[i].id) - all_visits = list(reversed(get_origin_visits(new_origin))) + all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-1-origin-visits', - url_args={'origin_url': new_origin['url']}, + url_args={'origin_url': new_origin.url}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( - new_origin['url'], visit_date, type='git') + new_origin.url, visit_date, type='git') visit_id = origin_visit['visit'] archive_data.snapshot_add([new_snapshots[i]]) archive_data.origin_visit_update( - new_origin['url'], origin_visit['visit'], - snapshot=new_snapshots[i]['id']) + new_origin.url, origin_visit['visit'], + snapshot=new_snapshots[i].id) url = reverse('api-1-origin-visit', - url_args={'origin_url': new_origin['url'], + url_args={'origin_url': new_origin.url, 'visit_id': visit_id}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_visit = archive_data.origin_visit_get_by( - new_origin['url'], visit_id) + new_origin.url, visit_id) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request) assert rv.data == expected_visit @given(new_origin()) def test_api_lookup_origin_visit_latest_no_visit(api_client, archive_data, new_origin): archive_data.origin_add_one(new_origin) url = reverse('api-1-origin-visit-latest', - url_args={'origin_url': new_origin['url']}) + url_args={'origin_url': new_origin.url}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv.data == { 'exception': 'NotFoundExc', - 'reason': 'No visit for origin %s found' % new_origin['url'] + 'reason': 'No visit for origin %s found' % new_origin.url } @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( - new_origin['url'], visit_date, type='git') + new_origin.url, visit_date, type='git') visit_ids.append(origin_visit['visit']) archive_data.snapshot_add([new_snapshots[0]]) archive_data.origin_visit_update( - new_origin['url'], visit_ids[0], - snapshot=new_snapshots[0]['id']) + new_origin.url, visit_ids[0], + snapshot=new_snapshots[0].id) url = reverse('api-1-origin-visit-latest', - url_args={'origin_url': new_origin['url']}) + url_args={'origin_url': new_origin.url}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_visit = archive_data.origin_visit_get_by( - new_origin['url'], visit_ids[1]) + new_origin.url, visit_ids[1]) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request) assert rv.data == expected_visit @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest_with_snapshot(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( - new_origin['url'], visit_date, type='git') + new_origin.url, visit_date, type='git') visit_ids.append(origin_visit['visit']) archive_data.snapshot_add([new_snapshots[0]]) archive_data.origin_visit_update( - new_origin['url'], visit_ids[0], - snapshot=new_snapshots[0]['id']) + new_origin.url, visit_ids[0], + snapshot=new_snapshots[0].id) url = reverse('api-1-origin-visit-latest', - url_args={'origin_url': new_origin['url']}, + url_args={'origin_url': new_origin.url}, query_params={'require_snapshot': True}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_visit = archive_data.origin_visit_get_by( - new_origin['url'], visit_ids[0]) + new_origin.url, visit_ids[0]) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request) assert rv.data == expected_visit @given(origin()) def test_api_lookup_origin_visit_not_found(api_client, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) url = reverse('api-1-origin-visit', url_args={'origin_url': origin['url'], 'visit_id': max_visit_id + 1}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Origin %s or its visit with id %s not found!' % (origin['url'], max_visit_id+1) } def test_api_origins(api_client, archive_data): origins = list(archive_data.origin_get_range(0, 10000)) origin_urls = {origin['url'] for origin in origins} # Get only one url = reverse('api-1-origins', query_params={'origin_count': 1}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} <= origin_urls # Get all url = reverse('api-1-origins', query_params={'origin_count': len(origins)}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == len(origins) assert {origin['url'] for origin in rv.data} == origin_urls # Get "all + 10" url = reverse('api-1-origins', query_params={'origin_count': len(origins)+10}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == len(origins) assert {origin['url'] for origin in rv.data} == origin_urls @pytest.mark.parametrize('origin_count', [1, 2, 10, 100]) def test_api_origins_scroll(api_client, archive_data, origin_count): origins = list(archive_data.origin_get_range(0, 10000)) origin_urls = {origin['url'] for origin in origins} url = reverse('api-1-origins', query_params={'origin_count': origin_count}) results = _scroll_results(api_client, url) assert len(results) == len(origins) assert {origin['url'] for origin in results} == origin_urls @given(origin()) def test_api_origin_by_url(api_client, archive_data, origin): url = reverse('api-1-origin', url_args={'origin_url': origin['url']}) rv = api_client.get(url) expected_origin = archive_data.origin_get(origin) expected_origin = enrich_origin(expected_origin, rv.wsgi_request) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == expected_origin @given(new_origin()) def test_api_origin_not_found(api_client, new_origin): url = reverse('api-1-origin', - url_args={'origin_url': new_origin['url']}) + url_args={'origin_url': new_origin.url}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', - 'reason': 'Origin with url %s not found!' % new_origin['url'] + 'reason': 'Origin with url %s not found!' % new_origin.url } @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) def test_api_origin_search(api_client, mocker, backend): if backend != 'swh-search': # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) expected_origins = { 'https://github.com/wcoder/highlightjs-line-numbers.js', 'https://github.com/memononen/libtess2', } # Search for 'github.com', get only one url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': 1}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} <= expected_origins # Search for 'github.com', get all url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins # Search for 'github.com', get more than available url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': 10}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) def test_api_origin_search_words(api_client, mocker, backend): if backend != 'swh-search': # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) expected_origins = { 'https://github.com/wcoder/highlightjs-line-numbers.js', 'https://github.com/memononen/libtess2', } url = reverse('api-1-origin-search', url_args={'url_pattern': 'github com'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins url = reverse('api-1-origin-search', url_args={'url_pattern': 'com github'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins url = reverse('api-1-origin-search', url_args={'url_pattern': 'memononen libtess2'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} \ == {'https://github.com/memononen/libtess2'} url = reverse('api-1-origin-search', url_args={'url_pattern': 'libtess2 memononen'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} \ == {'https://github.com/memononen/libtess2'} @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) @pytest.mark.parametrize('limit', [1, 2, 3, 10]) def test_api_origin_search_scroll( api_client, archive_data, mocker, limit, backend): if backend != 'swh-search': # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) expected_origins = { 'https://github.com/wcoder/highlightjs-line-numbers.js', 'https://github.com/memononen/libtess2', } url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': limit}) results = _scroll_results(api_client, url) assert {origin['url'] for origin in results} == expected_origins @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) def test_api_origin_search_limit( api_client, archive_data, tests_data, mocker, backend): if backend == 'swh-search': tests_data['search'].origin_update([ {'url': 'http://foobar/{}'.format(i)} for i in range(2000) ]) else: # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) archive_data.origin_add([ - {'url': 'http://foobar/{}'.format(i)} + Origin(url='http://foobar/{}'.format(i)) for i in range(2000) ]) url = reverse('api-1-origin-search', url_args={'url_pattern': 'foobar'}, query_params={'limit': 1050}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1000 @given(origin()) def test_api_origin_metadata_search(api_client, mocker, origin): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' expected_data = [{ 'url': origin['url'], 'metadata': { 'metadata': {'author': 'Jane Doe'}, 'from_revision': ( '7026b7c1a2af56521e951c01ed20f255fa054238'), 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1', } } }] assert rv.data == expected_data oimsft.assert_called_with(conjunction=['Jane Doe'], limit=70) @given(origin()) def test_api_origin_metadata_search_limit(api_client, mocker, origin): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=['Jane Doe'], limit=70) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 10}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=['Jane Doe'], limit=10) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 987}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=['Jane Doe'], limit=100) @given(origin()) def test_api_origin_intrinsic_metadata(api_client, mocker, origin): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') oimg = mock_idx_storage.origin_intrinsic_metadata_get oimg.side_effect = lambda origin_urls: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-origin-intrinsic-metadata', url_args={'origin_url': origin['url']}) rv = api_client.get(url) oimg.assert_called_once_with([origin['url']]) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' expected_data = {'author': 'Jane Doe'} assert rv.data == expected_data def test_api_origin_metadata_search_invalid(api_client, mocker): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') url = reverse('api-1-origin-metadata-search') rv = api_client.get(url) assert rv.status_code == 400, rv.content mock_idx_storage.assert_not_called() diff --git a/swh/web/tests/api/views/test_release.py b/swh/web/tests/api/views/test_release.py index 7a6015d3..4ed36c35 100644 --- a/swh/web/tests/api/views/test_release.py +++ b/swh/web/tests/api/views/test_release.py @@ -1,116 +1,115 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from hypothesis import given -from swh.model.hashutil import hash_to_bytes +from swh.model.hashutil import hash_to_bytes, hash_to_hex +from swh.model.model import Person, Release, TimestampWithTimezone from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( - release, sha1, content, directory + release, content, directory ) @given(release()) def test_api_release(api_client, archive_data, release): url = reverse('api-1-release', url_args={'sha1_git': release}) rv = api_client.get(url) expected_release = archive_data.release_get(release) target_revision = expected_release['target'] target_url = reverse('api-1-revision', url_args={'sha1_git': target_revision}, request=rv.wsgi_request) expected_release['target_url'] = target_url assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == expected_release -@given(sha1(), sha1(), sha1(), content(), directory(), release()) +@given(content(), directory(), release()) def test_api_release_target_type_not_a_revision(api_client, archive_data, - new_rel1, new_rel2, - new_rel3, content, - directory, release): - for new_rel_id, target_type, target in ( - (new_rel1, 'content', content), - (new_rel2, 'directory', directory), - (new_rel3, 'release', release)): + content, directory, release): + for target_type, target in (('content', content), ('directory', directory), + ('release', release)): if target_type == 'content': target = target['sha1_git'] - sample_release = { - 'author': { - 'email': b'author@company.org', - 'fullname': b'author ', - 'name': b'author' - }, - 'date': { - 'timestamp': int(datetime.now().timestamp()), - 'offset': 0, - 'negative_utc': False, - }, - 'id': hash_to_bytes(new_rel_id), - 'message': b'sample release message', - 'name': b'sample release', - 'synthetic': False, - 'target': hash_to_bytes(target), - 'target_type': target_type - } + sample_release = Release( + author=Person( + email=b'author@company.org', + fullname=b'author ', + name=b'author' + ), + date=TimestampWithTimezone( + timestamp=int(datetime.now().timestamp()), + offset=0, + negative_utc=False, + ), + message=b'sample release message', + name=b'sample release', + synthetic=False, + target=hash_to_bytes(target), + target_type=target_type + ) archive_data.release_add([sample_release]) - url = reverse('api-1-release', url_args={'sha1_git': new_rel_id}) + new_release_id = hash_to_hex(sample_release.id) + + url = reverse('api-1-release', + url_args={'sha1_git': new_release_id}) rv = api_client.get(url) - expected_release = archive_data.release_get(new_rel_id) + expected_release = archive_data.release_get(new_release_id) if target_type == 'content': url_args = {'q': 'sha1_git:%s' % target} else: url_args = {'sha1_git': target} target_url = reverse('api-1-%s' % target_type, url_args=url_args, request=rv.wsgi_request) expected_release['target_url'] = target_url assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == expected_release def test_api_release_not_found(api_client): unknown_release_ = random_sha1() url = reverse('api-1-release', url_args={'sha1_git': unknown_release_}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Release with sha1_git %s not found.' % unknown_release_ } @given(release()) def test_api_release_uppercase(api_client, release): url = reverse('api-1-release-uppercase-checksum', url_args={'sha1_git': release.upper()}) resp = api_client.get(url) assert resp.status_code == 302 redirect_url = reverse('api-1-release-uppercase-checksum', url_args={'sha1_git': release}) assert resp['location'] == redirect_url diff --git a/swh/web/tests/api/views/test_snapshot.py b/swh/web/tests/api/views/test_snapshot.py index ea2ea81b..fabca5ee 100644 --- a/swh/web/tests/api/views/test_snapshot.py +++ b/swh/web/tests/api/views/test_snapshot.py @@ -1,162 +1,163 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.model.hashutil import hash_to_hex +from swh.model.model import Snapshot from swh.web.api.utils import enrich_snapshot from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( snapshot, new_snapshot ) @given(snapshot()) def test_api_snapshot(api_client, archive_data, snapshot): url = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_data = archive_data.snapshot_get(snapshot) expected_data = enrich_snapshot(expected_data, rv.wsgi_request) assert rv.data == expected_data @given(snapshot()) def test_api_snapshot_paginated(api_client, archive_data, snapshot): branches_offset = 0 branches_count = 2 snapshot_branches = [] for k, v in sorted( archive_data.snapshot_get(snapshot)['branches'].items()): snapshot_branches.append({ 'name': k, 'target_type': v['target_type'], 'target': v['target'] }) whole_snapshot = {'id': snapshot, 'branches': {}, 'next_branch': None} while branches_offset < len(snapshot_branches): branches_from = snapshot_branches[branches_offset]['name'] url = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}, query_params={'branches_from': branches_from, 'branches_count': branches_count}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_data = archive_data.snapshot_get_branches( snapshot, branches_from, branches_count) expected_data = enrich_snapshot(expected_data, rv.wsgi_request) branches_offset += branches_count if branches_offset < len(snapshot_branches): next_branch = snapshot_branches[branches_offset]['name'] expected_data['next_branch'] = next_branch else: expected_data['next_branch'] = None assert rv.data == expected_data whole_snapshot['branches'].update(expected_data['branches']) if branches_offset < len(snapshot_branches): next_url = rv.wsgi_request.build_absolute_uri( reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}, query_params={'branches_from': next_branch, 'branches_count': branches_count})) assert rv['Link'] == '<%s>; rel="next"' % next_url else: assert not rv.has_header('Link') url = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == whole_snapshot @given(snapshot()) def test_api_snapshot_filtered(api_client, archive_data, snapshot): snapshot_branches = [] for k, v in sorted( archive_data.snapshot_get(snapshot)['branches'].items()): snapshot_branches.append({ 'name': k, 'target_type': v['target_type'], 'target': v['target'] }) target_type = random.choice(snapshot_branches)['target_type'] url = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}, query_params={'target_types': target_type}) rv = api_client.get(url) expected_data = archive_data.snapshot_get_branches( snapshot, target_types=target_type) expected_data = enrich_snapshot(expected_data, rv.wsgi_request) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == expected_data def test_api_snapshot_errors(api_client): unknown_snapshot_ = random_sha1() url = reverse('api-1-snapshot', url_args={'snapshot_id': '63ce369'}) rv = api_client.get(url) assert rv.status_code == 400, rv.data url = reverse('api-1-snapshot', url_args={'snapshot_id': unknown_snapshot_}) rv = api_client.get(url) assert rv.status_code == 404, rv.data @given(snapshot()) def test_api_snapshot_uppercase(api_client, snapshot): url = reverse('api-1-snapshot-uppercase-checksum', url_args={'snapshot_id': snapshot.upper()}) resp = api_client.get(url) assert resp.status_code == 302 redirect_url = reverse('api-1-snapshot-uppercase-checksum', url_args={'snapshot_id': snapshot}) assert resp['location'] == redirect_url @given(new_snapshot(min_size=4)) def test_api_snapshot_null_branch(api_client, archive_data, new_snapshot): snp_dict = new_snapshot.to_dict() snp_id = hash_to_hex(snp_dict['id']) for branch in snp_dict['branches'].keys(): snp_dict['branches'][branch] = None break - archive_data.snapshot_add([snp_dict]) + archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) url = reverse('api-1-snapshot', url_args={'snapshot_id': snp_id}) rv = api_client.get(url) assert rv.status_code == 200, rv.data diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py index 8518430c..28a78a3e 100644 --- a/swh/web/tests/browse/views/test_origin.py +++ b/swh/web/tests/browse/views/test_origin.py @@ -1,904 +1,905 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random import re import swh.web.browse.utils from django.utils.html import escape from hypothesis import given from swh.model.hashutil import hash_to_bytes +from swh.model.model import Snapshot from swh.web.browse.utils import process_snapshot_branches from swh.web.common.exc import NotFoundExc from swh.web.common.utils import ( reverse, gen_path_info, format_utc_iso_date, parse_timestamp, get_swh_persistent_id ) from swh.web.tests.data import get_content from swh.web.tests.django_asserts import assert_contains, assert_template_used from swh.web.tests.strategies import ( origin, origin_with_multiple_visits, new_origin, new_snapshot, visit_dates, revisions, origin_with_releases ) @given(origin_with_multiple_visits()) def test_origin_visits_browse(client, archive_data, origin): url = reverse('browse-origin-visits', url_args={'origin_url': origin['url']}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/origin-visits.html') url = reverse('browse-origin-visits', url_args={'origin_url': origin['url']}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/origin-visits.html') visits = archive_data.origin_visit_get(origin['url']) for v in visits: vdate = format_utc_iso_date(v['date'], '%Y-%m-%dT%H:%M:%SZ') browse_dir_url = reverse('browse-origin-directory', url_args={'origin_url': origin['url'], 'timestamp': vdate}) assert_contains(resp, browse_dir_url) @given(origin_with_multiple_visits()) def test_origin_content_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) def _get_archive_data(visit_idx): snapshot = archive_data.snapshot_get( origin_visits[visit_idx]['snapshot']) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) dir_content = archive_data.directory_ls(head_rev['directory']) dir_files = [e for e in dir_content if e['type'] == 'file'] dir_file = random.choice(dir_files) branches, releases = process_snapshot_branches(snapshot) return { 'branches': branches, 'releases': releases, 'root_dir_sha1': head_rev['directory'], 'content': get_content(dir_file['checksums']['sha1']), 'visit': origin_visits[visit_idx] } tdata = _get_archive_data(-1) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content']) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content'], timestamp=tdata['visit']['date']) visit_unix_ts = parse_timestamp(tdata['visit']['date']).timestamp() visit_unix_ts = int(visit_unix_ts) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content'], timestamp=visit_unix_ts) tdata = _get_archive_data(0) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content'], visit_id=tdata['visit']['visit']) @given(origin()) def test_origin_root_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev['directory'] dir_content = archive_data.directory_ls(root_dir_sha1) branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit['date']).timestamp() visit_unix_ts = int(visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit['date']) origin = dict(origin) del origin['type'] _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit['date']) @given(origin()) def test_origin_sub_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev['directory'] subdirs = [e for e in archive_data.directory_ls(root_dir_sha1) if e['type'] == 'dir'] branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit['date']).timestamp() visit_unix_ts = int(visit_unix_ts) if len(subdirs) == 0: return subdir = random.choice(subdirs) subdir_content = archive_data.directory_ls(subdir['target']) subdir_path = subdir['name'] _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit['date']) origin = dict(origin) del origin['type'] _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit['date']) @given(origin()) def test_origin_branches(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) snapshot_content = process_snapshot_branches(snapshot) _origin_branches_test_helper(client, origin, snapshot_content) origin = dict(origin) origin['type'] = None _origin_branches_test_helper(client, origin, snapshot_content) @given(origin()) def test_origin_releases(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) snapshot_content = process_snapshot_branches(snapshot) _origin_releases_test_helper(client, origin, snapshot_content) origin = dict(origin) origin['type'] = None _origin_releases_test_helper(client, origin, snapshot_content) @given(new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=3, max_size=3)) def test_origin_snapshot_null_branch(client, archive_data, new_origin, new_snapshot, visit_dates, revisions): snp_dict = new_snapshot.to_dict() new_origin = archive_data.origin_add([new_origin])[0] for i, branch in enumerate(snp_dict['branches'].keys()): if i == 0: snp_dict['branches'][branch] = None else: snp_dict['branches'][branch] = { 'target_type': 'revision', 'target': hash_to_bytes(revisions[i-1]), } - archive_data.snapshot_add([snp_dict]) + archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add( new_origin['url'], visit_dates[0], type='git') archive_data.origin_visit_update(new_origin['url'], visit['visit'], status='partial', snapshot=snp_dict['id']) url = reverse('browse-origin-directory', url_args={'origin_url': new_origin['url']}) rv = client.get(url) assert rv.status_code == 200 @given(new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=4, max_size=4)) def test_origin_snapshot_invalid_branch(client, archive_data, new_origin, new_snapshot, visit_dates, revisions): snp_dict = new_snapshot.to_dict() new_origin = archive_data.origin_add([new_origin])[0] for i, branch in enumerate(snp_dict['branches'].keys()): snp_dict['branches'][branch] = { 'target_type': 'revision', 'target': hash_to_bytes(revisions[i]), } - archive_data.snapshot_add([snp_dict]) + archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add( new_origin['url'], visit_dates[0], type='git') archive_data.origin_visit_update(new_origin['url'], visit['visit'], status='full', snapshot=snp_dict['id']) url = reverse('browse-origin-directory', url_args={'origin_url': new_origin['url']}, query_params={'branch': 'invalid_branch'}) rv = client.get(url) assert rv.status_code == 404 def test_origin_request_errors(client, archive_data, mocker): mock_snapshot_service = mocker.patch( 'swh.web.browse.views.utils.snapshot_context.service') mock_origin_service = mocker.patch('swh.web.browse.views.origin.service') mock_utils_service = mocker.patch('swh.web.browse.utils.service') mock_get_origin_visit_snapshot = mocker.patch( 'swh.web.browse.utils.get_origin_visit_snapshot') mock_get_origin_visits = mocker.patch( 'swh.web.common.origin_visits.get_origin_visits') mock_request_content = mocker.patch( 'swh.web.browse.views.utils.snapshot_context.request_content') mock_origin_service.lookup_origin.side_effect = NotFoundExc( 'origin not found') url = reverse('browse-origin-visits', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, 'origin not found', status_code=404) mock_origin_service.lookup_origin.side_effect = None mock_origin_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} mock_get_origin_visits.return_value = [] url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{'visit': 1}] mock_get_origin_visit_snapshot.side_effect = NotFoundExc('visit not found') url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}, query_params={'visit_id': 2}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert re.search('Visit.*not found', resp.content.decode('utf-8')) mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'visit': 1 }] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ( [{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb', 'name': 'HEAD', 'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672', 'date': '04 May 2017, 13:27 UTC', 'message': ''}], [] ) mock_utils_service.lookup_snapshot_sizes.return_value = { 'revision': 1, 'release': 0 } mock_lookup_directory = mock_utils_service.lookup_directory mock_lookup_directory.side_effect = NotFoundExc('Directory not found') url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, 'Directory not found', status_code=404) mock_origin_service.lookup_origin.side_effect = None mock_origin_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} mock_get_origin_visits.return_value = [] url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'foo'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{'visit': 1}] mock_get_origin_visit_snapshot.side_effect = NotFoundExc('visit not found') url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'foo'}, query_params={'visit_id': 2}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert re.search('Visit.*not found', resp.content.decode('utf-8')) mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'type': 'git', 'visit': 1 }] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ([], []) mock_utils_service.lookup_snapshot_sizes.return_value = { 'revision': 0, 'release': 0 } mock_utils_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'baz'}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/content.html') assert re.search('snapshot.*is empty', resp.content.decode('utf-8')) mock_get_origin_visit_snapshot.return_value = ( [{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb', 'name': 'HEAD', 'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672', 'date': '04 May 2017, 13:27 UTC', 'message': ''}], [] ) mock_utils_service.lookup_snapshot_sizes.return_value = { 'revision': 1, 'release': 0 } mock_snapshot_service.lookup_directory_with_path.return_value = { 'target': '5ecd9f37b7a2d2e9980d201acd6286116f2ba1f1' } mock_request_content.side_effect = NotFoundExc('Content not found') url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'baz'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, 'Content not found', status_code=404) mock_get_snapshot_context = mocker.patch( 'swh.web.browse.views.utils.snapshot_context.get_snapshot_context') mock_get_snapshot_context.side_effect = NotFoundExc('Snapshot not found') url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, 'Snapshot not found', status_code=404) def test_origin_empty_snapshot(client, mocker): mock_utils_service = mocker.patch('swh.web.browse.utils.service') mock_get_origin_visit_snapshot = mocker.patch( 'swh.web.browse.utils.get_origin_visit_snapshot') mock_get_origin_visits = mocker.patch( 'swh.web.common.origin_visits.get_origin_visits') mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'type': 'git', 'visit': 1 }] mock_get_origin_visit_snapshot.return_value = ([], []) mock_utils_service.lookup_snapshot_sizes.return_value = { 'revision': 0, 'release': 0 } mock_utils_service.lookup_origin.return_value = { 'id': 457, 'url': 'https://github.com/foo/bar' } url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/directory.html') resp_content = resp.content.decode('utf-8') assert re.search('snapshot.*is empty', resp_content) assert not re.search('swh-tr-link', resp_content) @given(origin_with_releases()) def test_origin_release_browse(client, archive_data, origin): # for swh.web.browse.utils.get_snapshot_content to only return one branch snapshot_max_size = swh.web.browse.utils.snapshot_content_max_size swh.web.browse.utils.snapshot_content_max_size = 1 try: snapshot = archive_data.snapshot_get_latest(origin['url']) release = [b for b in snapshot['branches'].values() if b['target_type'] == 'release'][-1] release_data = archive_data.release_get(release['target']) url = reverse('browse-origin-directory', url_args={'origin_url': origin['url']}, query_params={'release': release_data['name']}) resp = client.get(url) assert resp.status_code == 200 assert_contains(resp, release_data['name']) assert_contains(resp, release['target']) finally: swh.web.browse.utils.snapshot_content_max_size = snapshot_max_size @given(origin_with_releases()) def test_origin_release_browse_not_found(client, archive_data, origin): invalid_release_name = 'swh-foo-bar' url = reverse('browse-origin-directory', url_args={'origin_url': origin['url']}, query_params={'release': invalid_release_name}) resp = client.get(url) assert resp.status_code == 404 assert re.search(f'Release {invalid_release_name}.*not found', resp.content.decode('utf-8')) def _origin_content_view_test_helper(client, origin_info, origin_visits, origin_branches, origin_releases, root_dir_sha1, content, visit_id=None, timestamp=None): content_path = '/'.join(content['path'].split('/')[1:]) url_args = {'origin_url': origin_info['url'], 'path': content_path} if not visit_id: visit_id = origin_visits[-1]['visit'] query_params = {} if timestamp: url_args['timestamp'] = timestamp if visit_id: query_params['visit_id'] = visit_id url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/content.html') assert type(content['data']) == str assert_contains(resp, '' % content['hljs_language']) assert_contains(resp, escape(content['data'])) split_path = content_path.split('/') filename = split_path[-1] path = content_path.replace(filename, '')[:-1] path_info = gen_path_info(path) del url_args['path'] if timestamp: url_args['timestamp'] = format_utc_iso_date( parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S') root_dir_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '
  • ', count=len(path_info)+1) assert_contains(resp, '%s' % (root_dir_url, root_dir_sha1[:7])) for p in path_info: url_args['path'] = p['path'] dir_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '%s' % (dir_url, p['name'])) assert_contains(resp, '
  • %s
  • ' % filename) query_string = 'sha1_git:' + content['sha1_git'] url_raw = reverse('browse-content-raw', url_args={'query_string': query_string}, query_params={'filename': filename}) assert_contains(resp, url_raw) if 'args' in url_args: del url_args['path'] origin_branches_url = reverse('browse-origin-branches', url_args=url_args, query_params=query_params) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args, query_params=query_params) assert_contains(resp, 'Releases (%s)' % (origin_releases_url, len(origin_releases))) assert_contains(resp, '
  • ', count=len(origin_branches)) url_args['path'] = content_path for branch in origin_branches: query_params['branch'] = branch['name'] root_dir_branch_url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params['branch'] = None for release in origin_releases: query_params['release'] = release['name'] root_dir_release_url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_release_url) url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/content.html') swh_cnt_id = get_swh_persistent_id('content', content['sha1_git']) swh_cnt_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) assert_contains(resp, 'swh-take-new-snapshot') def _origin_directory_view_test_helper(client, origin_info, origin_visits, origin_branches, origin_releases, root_directory_sha1, directory_entries, visit_id=None, timestamp=None, path=None): dirs = [e for e in directory_entries if e['type'] in ('dir', 'rev')] files = [e for e in directory_entries if e['type'] == 'file'] if not visit_id: visit_id = origin_visits[-1]['visit'] url_args = {'origin_url': origin_info['url']} query_params = {} if timestamp: url_args['timestamp'] = timestamp else: query_params['visit_id'] = visit_id if path: url_args['path'] = path url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/directory.html') assert resp.status_code == 200 assert_template_used(resp, 'browse/directory.html') assert_contains(resp, '', count=len(dirs)) assert_contains(resp, '', count=len(files)) if timestamp: url_args['timestamp'] = format_utc_iso_date( parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S') for d in dirs: if d['type'] == 'rev': dir_url = reverse('browse-revision', url_args={'sha1_git': d['target']}) else: dir_path = d['name'] if path: dir_path = "%s/%s" % (path, d['name']) dir_url_args = dict(url_args) dir_url_args['path'] = dir_path dir_url = reverse('browse-origin-directory', url_args=dir_url_args, query_params=query_params) assert_contains(resp, dir_url) for f in files: file_path = f['name'] if path: file_path = "%s/%s" % (path, f['name']) file_url_args = dict(url_args) file_url_args['path'] = file_path file_url = reverse('browse-origin-content', url_args=file_url_args, query_params=query_params) assert_contains(resp, file_url) if 'path' in url_args: del url_args['path'] root_dir_branch_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) nb_bc_paths = 1 if path: nb_bc_paths = len(path.split('/')) + 1 assert_contains(resp, '
  • ', count=nb_bc_paths) assert_contains(resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7])) origin_branches_url = reverse('browse-origin-branches', url_args=url_args, query_params=query_params) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args, query_params=query_params) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) if path: url_args['path'] = path assert_contains(resp, '
  • ', count=len(origin_branches)) for branch in origin_branches: query_params['branch'] = branch['name'] root_dir_branch_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params['branch'] = None for release in origin_releases: query_params['release'] = release['name'] root_dir_release_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_release_url) assert_contains(resp, 'vault-cook-directory') assert_contains(resp, 'vault-cook-revision') swh_dir_id = get_swh_persistent_id('directory', directory_entries[0]['dir_id']) # noqa swh_dir_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url) assert_contains(resp, 'swh-take-new-snapshot') def _origin_branches_test_helper(client, origin_info, origin_snapshot): url_args = {'origin_url': origin_info['url']} url = reverse('browse-origin-branches', url_args=url_args) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/branches.html') origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse('browse-origin-branches', url_args=url_args) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) assert_contains(resp, '' % escape(browse_branch_url)) browse_revision_url = reverse( 'browse-revision', url_args={'sha1_git': branch['revision']}, query_params={'origin': origin_info['url']}) assert_contains(resp, '' % escape(browse_revision_url)) def _origin_releases_test_helper(client, origin_info, origin_snapshot): url_args = {'origin_url': origin_info['url']} url = reverse('browse-origin-releases', url_args=url_args) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/releases.html') origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse('browse-origin-branches', url_args=url_args) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) assert_contains(resp, '' % escape(browse_release_url)) assert_contains(resp, '' % escape(browse_revision_url)) diff --git a/swh/web/tests/browse/views/test_revision.py b/swh/web/tests/browse/views/test_revision.py index 2084a2b6..541a8329 100644 --- a/swh/web/tests/browse/views/test_revision.py +++ b/swh/web/tests/browse/views/test_revision.py @@ -1,245 +1,245 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.utils.html import escape from hypothesis import given from swh.web.common.utils import ( reverse, format_utc_iso_date, get_swh_persistent_id, parse_timestamp ) from swh.web.tests.django_asserts import assert_contains, assert_template_used from swh.web.tests.strategies import ( origin, revision, unknown_revision, new_origin ) @given(revision()) def test_revision_browse(client, archive_data, revision): url = reverse('browse-revision', url_args={'sha1_git': revision}) revision_data = archive_data.revision_get(revision) author_name = revision_data['author']['name'] committer_name = revision_data['committer']['name'] dir_id = revision_data['directory'] directory_url = reverse('browse-directory', url_args={'sha1_git': dir_id}) history_url = reverse('browse-revision-log', url_args={'sha1_git': revision}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/revision.html') assert_contains(resp, author_name) assert_contains(resp, committer_name) assert_contains(resp, directory_url) assert_contains(resp, history_url) for parent in revision_data['parents']: parent_url = reverse('browse-revision', url_args={'sha1_git': parent}) assert_contains(resp, '%s' % (parent_url, parent)) author_date = revision_data['date'] committer_date = revision_data['committer_date'] message_lines = revision_data['message'].split('\n') assert_contains(resp, format_utc_iso_date(author_date)) assert_contains(resp, format_utc_iso_date(committer_date)) assert_contains(resp, escape(message_lines[0])) assert_contains(resp, escape('\n'.join(message_lines[1:]))) @given(origin()) def test_revision_origin_browse(client, archive_data, origin): snapshot = archive_data.snapshot_get_latest(origin['url']) revision = archive_data.snapshot_get_head(snapshot) revision_data = archive_data.revision_get(revision) dir_id = revision_data['directory'] origin_revision_log_url = reverse('browse-origin-log', url_args={'origin_url': origin['url']}, # noqa query_params={'revision': revision}) url = reverse('browse-revision', url_args={'sha1_git': revision}, query_params={'origin': origin['url']}) resp = client.get(url) assert_contains(resp, origin_revision_log_url) for parent in revision_data['parents']: parent_url = reverse('browse-revision', url_args={'sha1_git': parent}, query_params={'origin': origin['url']}) assert_contains(resp, '%s' % (parent_url, parent)) assert_contains(resp, 'vault-cook-directory') assert_contains(resp, 'vault-cook-revision') swh_rev_id = get_swh_persistent_id('revision', revision) swh_rev_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_rev_id}) assert_contains(resp, swh_rev_id) assert_contains(resp, swh_rev_id_url) swh_dir_id = get_swh_persistent_id('directory', dir_id) swh_dir_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url) assert_contains(resp, 'swh-take-new-snapshot') @given(revision()) def test_revision_log_browse(client, archive_data, revision): per_page = 10 revision_log = archive_data.revision_log(revision) revision_log_sorted = \ sorted(revision_log, key=lambda rev: -parse_timestamp( rev['committer_date']).timestamp()) url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'per_page': per_page}) resp = client.get(url) next_page_url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'offset': per_page, 'per_page': per_page}) nb_log_entries = per_page if len(revision_log_sorted) < per_page: nb_log_entries = len(revision_log_sorted) assert resp.status_code == 200 assert_template_used(resp, 'browse/revision-log.html') assert_contains(resp, 'Newer') if len(revision_log_sorted) > per_page: assert_contains(resp, 'Older' % # noqa escape(next_page_url)) for log in revision_log_sorted[:per_page]: revision_url = reverse('browse-revision', url_args={'sha1_git': log['id']}) assert_contains(resp, log['id'][:7]) assert_contains(resp, log['author']['name']) assert_contains(resp, format_utc_iso_date(log['date'])) assert_contains(resp, escape(log['message'])) assert_contains(resp, format_utc_iso_date(log['committer_date'])) # noqa assert_contains(resp, revision_url) if len(revision_log_sorted) <= per_page: return resp = client.get(next_page_url) prev_page_url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'per_page': per_page}) next_page_url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'offset': 2 * per_page, 'per_page': per_page}) nb_log_entries = len(revision_log_sorted) - per_page if nb_log_entries > per_page: nb_log_entries = per_page assert resp.status_code == 200 assert_template_used(resp, 'browse/revision-log.html') assert_contains(resp, 'Newer' % escape(prev_page_url)) if len(revision_log_sorted) > 2 * per_page: assert_contains(resp, 'Older' % # noqa escape(next_page_url)) if len(revision_log_sorted) <= 2 * per_page: return resp = client.get(next_page_url) prev_page_url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'offset': per_page, 'per_page': per_page}) next_page_url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'offset': 3 * per_page, 'per_page': per_page}) nb_log_entries = len(revision_log_sorted) - 2 * per_page if nb_log_entries > per_page: nb_log_entries = per_page assert resp.status_code == 200 assert_template_used(resp, 'browse/revision-log.html') assert_contains(resp, 'Newer' % escape(prev_page_url)) if len(revision_log_sorted) > 3 * per_page: assert_contains(resp, 'Older' % # noqa escape(next_page_url)) @given(revision(), unknown_revision(), new_origin()) def test_revision_request_errors(client, revision, unknown_revision, new_origin): url = reverse('browse-revision', url_args={'sha1_git': unknown_revision}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, 'Revision with sha1_git %s not found' % unknown_revision, status_code=404) url = reverse('browse-revision', url_args={'sha1_git': revision}, - query_params={'origin': new_origin['url']}) + query_params={'origin': new_origin.url}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, 'the origin mentioned in your request' ' appears broken', status_code=404) @given(revision()) def test_revision_uppercase(client, revision): url = reverse('browse-revision-uppercase-checksum', url_args={'sha1_git': revision.upper()}) resp = client.get(url) assert resp.status_code == 302 redirect_url = reverse('browse-revision', url_args={'sha1_git': revision}) assert resp['location'] == redirect_url diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index a38ec6e8..cc7c86a0 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,931 +1,911 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import pytest import random from collections import defaultdict from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.from_disk import DentryPerms from swh.model.identifiers import ( CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT ) +from swh.model.model import Directory, DirectoryEntry, Origin, Revision from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.tests.data import random_sha1, random_content from swh.web.tests.strategies import ( content, unknown_content, contents, unknown_contents, contents_with_ctags, origin, new_origin, visit_dates, directory, unknown_directory, release, unknown_release, revision, unknown_revision, revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, revision_with_submodules, empty_directory, new_revision, snapshot, unknown_snapshot ) from swh.web.tests.conftest import ctags_json_missing, fossology_missing @given(contents()) def test_lookup_multiple_hashes_all_present(contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': True}) assert service.lookup_multiple_hashes(input_data) == expected_output @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': cnt in contents}) assert service.lookup_multiple_hashes(input_data) == expected_output def test_lookup_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.lookup_hash('sha1_git:%s' % unknown_content_['sha1_git']) assert actual_lookup == {'found': None, 'algo': 'sha1_git'} @given(content()) def test_lookup_hash_exist(archive_data, content): actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) content_metadata = archive_data.content_get_metadata(content['sha1']) assert {'found': content_metadata, 'algo': 'sha1'} == actual_lookup def test_search_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.search_hash('sha1_git:%s' % unknown_content_['sha1_git']) assert {'found': False} == actual_lookup @given(content()) def test_search_hash_exist(content): actual_lookup = service.search_hash('sha1:%s' % content['sha1']) assert {'found': True} == actual_lookup @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(contents_with_ctags()) def test_lookup_content_ctags(indexer_data, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags['sha1s']) indexer_data.content_add_ctags(content_sha1) actual_ctags = list(service.lookup_content_ctags('sha1:%s' % content_sha1)) expected_data = list(indexer_data.content_get_ctags(content_sha1)) for ctag in expected_data: ctag['id'] = content_sha1 assert actual_ctags == expected_data def test_lookup_content_ctags_no_hash(): unknown_content_ = random_content() actual_ctags = list(service.lookup_content_ctags('sha1:%s' % unknown_content_['sha1'])) assert actual_ctags == [] @given(content()) def test_lookup_content_filetype(indexer_data, content): indexer_data.content_add_mimetype(content['sha1']) actual_filetype = service.lookup_content_filetype(content['sha1']) expected_filetype = indexer_data.content_get_mimetype(content['sha1']) assert actual_filetype == expected_filetype @pytest.mark.skip # Language indexer is disabled. @given(content()) def test_lookup_content_language(indexer_data, content): indexer_data.content_add_language(content['sha1']) actual_language = service.lookup_content_language(content['sha1']) expected_language = indexer_data.content_get_language(content['sha1']) assert actual_language == expected_language @given(contents_with_ctags()) def test_lookup_expression(indexer_data, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags['sha1s']: if len(expected_ctags) == per_page: break indexer_data.content_add_ctags(content_sha1) for ctag in indexer_data.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag['name'] == contents_with_ctags['symbol_name']: del ctag['id'] ctag['sha1'] = content_sha1 expected_ctags.append(ctag) actual_ctags = list( service.lookup_expression(contents_with_ctags['symbol_name'], last_sha1=None, per_page=10)) assert actual_ctags == expected_ctags def test_lookup_expression_no_result(): expected_ctags = [] actual_ctags = list(service.lookup_expression('barfoo', last_sha1=None, per_page=10)) assert actual_ctags == expected_ctags @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(indexer_data, content): indexer_data.content_add_license(content['sha1']) actual_license = service.lookup_content_license(content['sha1']) expected_license = indexer_data.content_get_license(content['sha1']) assert actual_license == expected_license def test_stat_counters(archive_data): actual_stats = service.stat_counters() assert actual_stats == archive_data.stat_counters() @given(new_origin(), visit_dates()) def test_lookup_origin_visits(archive_data, new_origin, visit_dates): archive_data.origin_add_one(new_origin) for ts in visit_dates: archive_data.origin_visit_add( - new_origin['url'], ts, type='git') + new_origin.url, ts, type='git') actual_origin_visits = list( - service.lookup_origin_visits(new_origin['url'], per_page=100)) + service.lookup_origin_visits(new_origin.url, per_page=100)) - expected_visits = archive_data.origin_visit_get(new_origin['url']) + expected_visits = archive_data.origin_visit_get(new_origin.url) for expected_visit in expected_visits: - expected_visit['origin'] = new_origin['url'] + expected_visit['origin'] = new_origin.url assert actual_origin_visits == expected_visits @given(new_origin(), visit_dates()) def test_lookup_origin_visit(archive_data, new_origin, visit_dates): archive_data.origin_add_one(new_origin) visits = [] for ts in visit_dates: visits.append(archive_data.origin_visit_add( - new_origin['url'], ts, type='git')) + new_origin.url, ts, type='git')) visit = random.choice(visits)['visit'] actual_origin_visit = service.lookup_origin_visit( - new_origin['url'], visit) + new_origin.url, visit) expected_visit = dict(archive_data.origin_visit_get_by( - new_origin['url'], visit)) + new_origin.url, visit)) assert actual_origin_visit == expected_visit @given(new_origin()) def test_lookup_origin(archive_data, new_origin): archive_data.origin_add_one(new_origin) - actual_origin = service.lookup_origin({'url': new_origin['url']}) + actual_origin = service.lookup_origin({'url': new_origin.url}) expected_origin = archive_data.origin_get( - {'url': new_origin['url']}) + {'url': new_origin.url}) assert actual_origin == expected_origin @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_release(invalid_sha1) assert e.match('Invalid checksum') @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(sha256): with pytest.raises(BadInputExc) as e: service.lookup_release(sha256) assert e.match('Only sha1_git is supported.') @given(directory()) def test_lookup_directory_with_path_not_found(directory): path = 'some/invalid/path/here' with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_path(directory, path) assert e.match('Directory entry with path %s from %s not found' % (path, directory)) @given(directory()) def test_lookup_directory_with_path_found(archive_data, directory): directory_content = archive_data.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry['name'] actual_result = service.lookup_directory_with_path(directory, path) assert actual_result == directory_entry @given(release()) def test_lookup_release(archive_data, release): actual_release = service.lookup_release(release) assert actual_release == archive_data.release_get(release) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Invalid checksum query string') sha1_git = sha256 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Only sha1_git is supported') @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( revision, unknown_revision): sha1_git_root = revision sha1_git = unknown_revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Revision %s not found' % sha1_git) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( revision, unknown_revision): sha1_git_root = unknown_revision sha1_git = revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Revision root %s not found' % sha1_git_root) @given(ancestor_revisions()) def test_lookup_revision_with_context(archive_data, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] root_sha1_git = ancestor_revisions['sha1_git_root'] for sha1_git_root in (root_sha1_git, {'id': hash_to_bytes(root_sha1_git)}): actual_revision = service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in archive_data.revision_log(root_sha1_git): for p_rev in rev['parents']: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev['id']) expected_revision = archive_data.revision_get(sha1_git) expected_revision['children'] = children assert actual_revision == expected_revision @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(non_ancestor_revisions): sha1_git = non_ancestor_revisions['sha1_git'] root_sha1_git = non_ancestor_revisions['sha1_git_root'] with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(root_sha1_git, sha1_git) assert e.match('Revision %s is not an ancestor of %s' % (sha1_git, root_sha1_git)) def test_lookup_directory_with_revision_not_found(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(unknown_revision_) assert e.match('Revision %s not found' % unknown_revision_) -def test_lookup_directory_with_revision_unknown_content(archive_data): +@given(new_revision()) +def test_lookup_directory_with_revision_unknown_content(archive_data, + new_revision): unknown_content_ = random_content() - unknown_revision_ = random_sha1() - unknown_directory_ = random_sha1() dir_path = 'README.md' + + # A directory that points to unknown content + dir = Directory(entries=[ + DirectoryEntry( + name=bytes(dir_path.encode('utf-8')), + type='file', + target=hash_to_bytes(unknown_content_['sha1_git']), + perms=DentryPerms.content + ) + ]) + # Create a revision that points to a directory # Which points to unknown content - revision = { - 'author': { - 'name': b'abcd', - 'email': b'abcd@company.org', - 'fullname': b'abcd abcd' - }, - 'committer': { - 'email': b'aaaa@company.org', - 'fullname': b'aaaa aaa', - 'name': b'aaa' - }, - 'committer_date': { - 'negative_utc': False, - 'offset': 0, - 'timestamp': 1437511651 - }, - 'date': { - 'negative_utc': False, - 'offset': 0, - 'timestamp': 1437511651 - }, - 'message': b'bleh', - 'metadata': [], - 'parents': [], - 'synthetic': False, - 'type': 'git', - 'id': hash_to_bytes(unknown_revision_), - 'directory': hash_to_bytes(unknown_directory_) - } - # A directory that points to unknown content - dir = { - 'id': hash_to_bytes(unknown_directory_), - 'entries': [{ - 'name': bytes(dir_path.encode('utf-8')), - 'type': 'file', - 'target': hash_to_bytes(unknown_content_['sha1_git']), - 'perms': DentryPerms.content - }] - } + new_revision = new_revision.to_dict() + new_revision['directory'] = dir.id + del new_revision['id'] + new_revision = Revision.from_dict(new_revision) + # Add the directory and revision in mem archive_data.directory_add([dir]) - archive_data.revision_add([revision]) + archive_data.revision_add([new_revision]) + new_revision_id = hash_to_hex(new_revision.id) with pytest.raises(NotFoundExc) as e: - service.lookup_directory_with_revision(unknown_revision_, dir_path) - assert e.match('Content not found for revision %s' % unknown_revision_) + service.lookup_directory_with_revision(new_revision_id, dir_path) + assert e.match('Content not found for revision %s' % new_revision_id) @given(revision()) def test_lookup_directory_with_revision_ko_path_to_nowhere(revision): invalid_path = 'path/to/something/unknown' with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(revision, invalid_path) assert e.match('Directory or File') assert e.match(invalid_path) assert e.match('revision %s' % revision) assert e.match('not found') @given(revision_with_submodules()) def test_lookup_directory_with_revision_submodules(archive_data, revision_with_submodules): rev_sha1_git = revision_with_submodules['rev_sha1_git'] rev_dir_path = revision_with_submodules['rev_dir_rev_path'] actual_data = service.lookup_directory_with_revision( rev_sha1_git, rev_dir_path) revision = archive_data.revision_get( revision_with_submodules['rev_sha1_git']) directory = archive_data.directory_ls(revision['directory']) rev_entry = next(e for e in directory if e['name'] == rev_dir_path) expected_data = { 'content': archive_data.revision_get(rev_entry['target']), 'path': rev_dir_path, 'revision': rev_sha1_git, 'type': 'rev' } assert actual_data == expected_data @given(revision()) def test_lookup_directory_with_revision_without_path(archive_data, revision): actual_directory_entries = service.lookup_directory_with_revision(revision) revision_data = archive_data.revision_get(revision) expected_directory_entries = archive_data.directory_ls( revision_data['directory']) assert actual_directory_entries['type'] == 'dir' assert actual_directory_entries['content'] == expected_directory_entries @given(revision()) def test_lookup_directory_with_revision_with_path(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] in ('file', 'dir')] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry['name']) assert actual_dir_entry['type'] == expected_dir_entry['type'] assert actual_dir_entry['revision'] == revision assert actual_dir_entry['path'] == expected_dir_entry['name'] if actual_dir_entry['type'] == 'file': del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): assert actual_dir_entry['content'][key] == expected_dir_entry[key] else: sub_dir_entries = archive_data.directory_ls( expected_dir_entry['target']) assert actual_dir_entry['content'] == sub_dir_entries @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] == 'file'] expected_dir_entry = random.choice(dir_entries) expected_data = archive_data.content_get( expected_dir_entry['checksums']['sha1']) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry['name'], with_data=True) assert actual_dir_entry['type'] == expected_dir_entry['type'] assert actual_dir_entry['revision'] == revision assert actual_dir_entry['path'] == expected_dir_entry['name'] del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): assert actual_dir_entry['content'][key] == expected_dir_entry[key] assert actual_dir_entry['content']['data'] == expected_data['data'] @given(revision()) def test_lookup_revision(archive_data, revision): actual_revision = service.lookup_revision(revision) assert actual_revision == archive_data.revision_get(revision) @given(new_revision()) def test_lookup_revision_invalid_msg(archive_data, new_revision): + new_revision = new_revision.to_dict() new_revision['message'] = b'elegant fix for bug \xff' - archive_data.revision_add([new_revision]) + archive_data.revision_add([Revision.from_dict(new_revision)]) revision = service.lookup_revision(hash_to_hex(new_revision['id'])) assert revision['message'] is None assert revision['message_decoding_failed'] is True @given(new_revision()) def test_lookup_revision_msg_ok(archive_data, new_revision): archive_data.revision_add([new_revision]) revision_message = service.lookup_revision_message( - hash_to_hex(new_revision['id'])) + hash_to_hex(new_revision.id)) - assert revision_message == {'message': new_revision['message']} + assert revision_message == {'message': new_revision.message} def test_lookup_revision_msg_no_rev(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_revision_message(unknown_revision_) assert e.match('Revision with sha1_git %s not found.' % unknown_revision_) @given(revisions()) def test_lookup_revision_multiple(archive_data, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(archive_data.revision_get(rev)) assert actual_revisions == expected_revisions def test_lookup_revision_multiple_none_found(): unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] actual_revisions = list( service.lookup_revision_multiple(unknown_revisions_)) assert actual_revisions == [None] * len(unknown_revisions_) @given(revision()) def test_lookup_revision_log(archive_data, revision): actual_revision_log = list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = archive_data.revision_log(revision, limit=25) assert actual_revision_log == expected_revision_log def _get_origin_branches(archive_data, origin): origin_visit = archive_data.origin_visit_get(origin['url'])[-1] snapshot = archive_data.snapshot_get(origin_visit['snapshot']) branches = {k: v for (k, v) in snapshot['branches'].items() if v['target_type'] == 'revision'} return branches @given(origin()) def test_lookup_revision_log_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_log = list( service.lookup_revision_log_by(origin['url'], branch_name, None, limit=25)) expected_log = archive_data.revision_log( branches[branch_name]['target'], limit=25) assert actual_log == expected_log @given(origin()) def test_lookup_revision_log_by_notfound(origin): with pytest.raises(NotFoundExc): service.lookup_revision_log_by( origin['url'], 'unknown_branch_name', None, limit=100) def test_lookup_content_raw_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content_raw('sha1:' + unknown_content_['sha1']) assert e.match('Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_raw(archive_data, content): actual_content = service.lookup_content_raw( 'sha256:%s' % content['sha256']) expected_content = archive_data.content_get(content['sha1']) assert actual_content == expected_content def test_lookup_content_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content('sha1:%s' % unknown_content_['sha1']) assert e.match('Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_with_sha1(archive_data, content): actual_content = service.lookup_content('sha1:%s' % content['sha1']) expected_content = archive_data.content_get_metadata(content['sha1']) assert actual_content == expected_content @given(content()) def test_lookup_content_with_sha256(archive_data, content): actual_content = service.lookup_content('sha256:%s' % content['sha256']) expected_content = archive_data.content_get_metadata(content['sha1']) assert actual_content == expected_content def test_lookup_directory_bad_checksum(): with pytest.raises(BadInputExc): service.lookup_directory('directory_id') def test_lookup_directory_not_found(): unknown_directory_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory(unknown_directory_) assert e.match('Directory with sha1_git %s not found' % unknown_directory_) @given(directory()) def test_lookup_directory(archive_data, directory): actual_directory_ls = list(service.lookup_directory(directory)) expected_directory_ls = archive_data.directory_ls(directory) assert actual_directory_ls == expected_directory_ls @given(empty_directory()) def test_lookup_directory_empty(empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) assert actual_directory_ls == [] @given(origin()) def test_lookup_revision_by_nothing_found(origin): with pytest.raises(NotFoundExc): service.lookup_revision_by(origin['url'], 'invalid-branch-name') @given(origin()) def test_lookup_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_revision = service.lookup_revision_by(origin['url'], branch_name) expected_revision = archive_data.revision_get( branches[branch_name]['target']) assert actual_revision == expected_revision @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(origin, revision): with pytest.raises(NotFoundExc): service.lookup_revision_with_context_by(origin['url'], 'invalid-branch-name', None, revision) @given(origin()) def test_lookup_revision_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = archive_data.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev['parents']: children[rev_p].append(rev['id']) rev = root_rev_log[-1]['id'] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin['url'], branch_name, None, rev) expected_root_rev = archive_data.revision_get(root_rev) expected_rev = archive_data.revision_get(rev) expected_rev['children'] = children[rev] assert actual_root_rev == expected_root_rev assert actual_rev == expected_rev def test_lookup_revision_through_ko_not_implemented(): with pytest.raises(NotImplementedError): service.lookup_revision_through({'something-unknown': 10}) @given(origin()) def test_lookup_revision_through_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = archive_data.revision_log(root_rev) rev = root_rev_log[-1]['id'] assert service.lookup_revision_through({ 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, 'sha1_git': rev }) == service.lookup_revision_with_context_by(origin['url'], branch_name, None, rev) @given(origin()) def test_lookup_revision_through_with_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) assert service.lookup_revision_through({ 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, }) == service.lookup_revision_by(origin['url'], branch_name, None) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] sha1_git_root = ancestor_revisions['sha1_git_root'] assert service.lookup_revision_through({ 'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, }) == service.lookup_revision_with_context(sha1_git_root, sha1_git) @given(revision()) def test_lookup_revision_through_with_revision(revision): assert service.lookup_revision_through({ 'sha1_git': revision }) == service.lookup_revision(revision) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(revision): with pytest.raises(NotFoundExc): service.lookup_directory_through_revision( {'sha1_git': revision}, 'some/invalid/path') @given(revision()) def test_lookup_directory_through_revision_ok(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {'sha1_git': revision}, dir_entry['name'] ) == (revision, service.lookup_directory_with_revision(revision, dir_entry['name'])) @given(revision()) def test_lookup_directory_through_revision_ok_with_data( archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {'sha1_git': revision}, dir_entry['name'], with_data=True ) == (revision, service.lookup_directory_with_revision(revision, dir_entry['name'], with_data=True)) @given(content(), directory(), release(), revision(), snapshot()) def test_lookup_known_objects(archive_data, content, directory, release, revision, snapshot): expected = archive_data.content_find(content) assert service.lookup_object(CONTENT, content['sha1_git']) == expected expected = archive_data.directory_get(directory) assert service.lookup_object(DIRECTORY, directory) == expected expected = archive_data.release_get(release) assert service.lookup_object(RELEASE, release) == expected expected = archive_data.revision_get(revision) assert service.lookup_object(REVISION, revision) == expected expected = archive_data.snapshot_get(snapshot) assert service.lookup_object(SNAPSHOT, snapshot) == expected @given(unknown_content(), unknown_directory(), unknown_release(), unknown_revision(), unknown_snapshot()) def test_lookup_unknown_objects(unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot): with pytest.raises(NotFoundExc) as e: service.lookup_object(CONTENT, unknown_content['sha1_git']) assert e.match(r'Content.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(DIRECTORY, unknown_directory) assert e.match(r'Directory.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(RELEASE, unknown_release) assert e.match(r'Release.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(REVISION, unknown_revision) assert e.match(r'Revision.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(SNAPSHOT, unknown_snapshot) assert e.match(r'Snapshot.*not found') @given(invalid_sha1()) def test_lookup_invalid_objects(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_object('foo', invalid_sha1) assert e.match('Invalid swh object type') with pytest.raises(BadInputExc) as e: service.lookup_object(CONTENT, invalid_sha1) assert e.match('Invalid hash') with pytest.raises(BadInputExc) as e: service.lookup_object(DIRECTORY, invalid_sha1) assert e.match('Invalid checksum') with pytest.raises(BadInputExc) as e: service.lookup_object(RELEASE, invalid_sha1) assert e.match('Invalid checksum') with pytest.raises(BadInputExc) as e: service.lookup_object(REVISION, invalid_sha1) assert e.match('Invalid checksum') with pytest.raises(BadInputExc) as e: service.lookup_object(SNAPSHOT, invalid_sha1) assert e.match('Invalid checksum') def test_lookup_missing_hashes_non_present(): missing_cnt = random_sha1() missing_dir = random_sha1() missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_pids = { CONTENT: [hash_to_bytes(missing_cnt)], DIRECTORY: [hash_to_bytes(missing_dir)], REVISION: [hash_to_bytes(missing_rev)], RELEASE: [hash_to_bytes(missing_rel)], SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = service.lookup_missing_hashes(grouped_pids) assert actual_result == {missing_cnt, missing_dir, missing_rev, missing_rel, missing_snp} @given(content(), directory()) def test_lookup_missing_hashes_some_present(archive_data, content, directory): missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_pids = { CONTENT: [hash_to_bytes(content['sha1_git'])], DIRECTORY: [hash_to_bytes(directory)], REVISION: [hash_to_bytes(missing_rev)], RELEASE: [hash_to_bytes(missing_rel)], SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = service.lookup_missing_hashes(grouped_pids) assert actual_result == {missing_rev, missing_rel, missing_snp} @given(origin()) def test_lookup_origin_extra_trailing_slash(origin): origin_info = service.lookup_origin({'url': f"{origin['url']}/"}) assert origin_info['url'] == origin['url'] def test_lookup_origin_missing_trailing_slash(archive_data): - deb_origin = {'url': 'http://snapshot.debian.org/package/r-base/'} + deb_origin = Origin(url='http://snapshot.debian.org/package/r-base/') archive_data.origin_add_one(deb_origin) - origin_info = service.lookup_origin({'url': deb_origin['url'][:-1]}) - assert origin_info['url'] == deb_origin['url'] + origin_info = service.lookup_origin({'url': deb_origin.url[:-1]}) + assert origin_info['url'] == deb_origin.url diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index f1eba86f..556ed935 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,488 +1,484 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os import random from copy import deepcopy from typing import Dict from rest_framework.decorators import api_view from rest_framework.response import Response from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.ctags import CtagsIndexer from swh.indexer.storage import get_indexer_storage -from swh.model.from_disk import Content, Directory -from swh.model.hashutil import hash_to_hex, hash_to_bytes, DEFAULT_ALGORITHMS -from swh.model.identifiers import directory_identifier +from swh.model import from_disk +from swh.model.hashutil import hash_to_hex, DEFAULT_ALGORITHMS +from swh.model.model import Content, Directory, Origin from swh.loader.git.from_disk import GitLoaderFromArchive from swh.search import get_search from swh.storage.algos.dir_iterators import dir_iterator from swh.web import config from swh.web.browse.utils import ( get_mimetype_and_encoding_for_content, prepare_content_for_display, _re_encode_content ) from swh.web.common import service from swh.web.common.highlightjs import get_hljs_language_from_filename # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { 'storage': { - 'cls': 'validate', - 'storage': { - 'cls': 'memory' - } + 'cls': 'memory', }, 'save_data': False, 'max_content_size': 100 * 1024 * 1024, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { 'storage': { 'cls': 'memory' }, 'objstorage': { 'cls': 'memory', 'args': {}, }, 'indexer_storage': { 'cls': 'memory', 'args': {}, } } def random_sha1(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) def random_sha256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_blake2s256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_content(): return { 'sha1': random_sha1(), 'sha1_git': random_sha1(), 'sha256': random_sha256(), 'blake2s256': random_blake2s256(), } # MimetypeIndexer with custom configuration for tests class _MimetypeIndexer(MimetypeIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" } } } # FossologyLicenseIndexer with custom configuration for tests class _FossologyLicenseIndexer(FossologyLicenseIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.fossology.license', 'tools': { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': { 'command_line': 'nomossa ', }, } } # CtagsIndexer with custom configuration for tests class _CtagsIndexer(CtagsIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.ctags', 'languages': {'c': 'c'}, 'tools': { 'name': 'universal-ctags', 'version': '~git7859817b', 'configuration': { 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' # noqa '''--output-format=json ''' }, } } # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { 'type': 'git', 'url': 'https://github.com/wcoder/highlightjs-line-numbers.js', 'archives': ['highlightjs-line-numbers.js.zip', 'highlightjs-line-numbers.js_visit2.zip'], 'visit_date': ['Dec 1 2018, 01:00 UTC', 'Jan 20 2019, 15:00 UTC'] }, { 'type': 'git', 'url': 'https://github.com/memononen/libtess2', 'archives': ['libtess2.zip'], 'visit_date': ['May 25 2018, 01:00 UTC'] }, { 'type': 'git', 'url': 'repo_with_submodules', 'archives': ['repo_with_submodules.tgz'], 'visit_date': ['Jan 1 2019, 01:00 UTC'] } ] _contents = {} # Tests data initialization def _init_tests_data(): # To hold reference to the memory storage storage = None # Create search instance search = get_search('memory', {}) search.initialize() search.origin_update({'url': origin['url']} for origin in _TEST_ORIGINS) # Load git repositories from archives for origin in _TEST_ORIGINS: for i, archive in enumerate(origin['archives']): origin_repo_archive = \ os.path.join(os.path.dirname(__file__), 'resources/repos/%s' % archive) loader = GitLoaderFromArchive(origin['url'], archive_path=origin_repo_archive, config=_TEST_LOADER_CONFIG, visit_date=origin['visit_date'][i]) if storage is None: storage = loader.storage else: loader.storage = storage loader.load() origin.update(storage.origin_get(origin)) # add an 'id' key if enabled search.origin_update([{'url': origin['url'], 'has_visits': True}]) for i in range(250): url = 'https://many.origins/%d' % (i+1) - storage.origin_add([{'url': url}]) + # storage.origin_add([{'url': url}]) + storage.origin_add([Origin(url=url)]) search.origin_update([{'url': url, 'has_visits': True}]) visit = storage.origin_visit_add(url, '2019-12-03 13:55:05', 'tar') storage.origin_visit_update( url, visit['visit'], snapshot='1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') contents = set() directories = set() revisions = set() releases = set() snapshots = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: snp = storage.snapshot_get_latest(origin['url']) snapshots.add(hash_to_hex(snp['id'])) for branch_name, branch_data in snp['branches'].items(): if branch_data['target_type'] == 'revision': revisions.add(branch_data['target']) elif branch_data['target_type'] == 'release': release = next(storage.release_get([branch_data['target']])) revisions.add(release['target']) releases.add(hash_to_hex(branch_data['target'])) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): dir_id = rev['directory'] directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): content_path[entry['sha1']] = '/'.join( [hash_to_hex(dir_id), entry['path'].decode('utf-8')]) if entry['type'] == 'file': contents.add(entry['sha1']) elif entry['type'] == 'dir': directories.add(hash_to_hex(entry['target'])) # Get all checksums for each content result = storage.content_get_metadata(contents) contents = [] for sha1, contents_metadata in result.items(): for content_metadata in contents_metadata: contents.append({ algo: hash_to_hex(content_metadata[algo]) for algo in DEFAULT_ALGORITHMS }) path = content_path[sha1] cnt = next(storage.content_get([sha1])) mimetype, encoding = get_mimetype_and_encoding_for_content( cnt['data']) _, _, cnt['data'] = _re_encode_content( mimetype, encoding, cnt['data']) content_display_data = prepare_content_for_display( cnt['data'], mimetype, path) contents[-1]['path'] = path contents[-1]['mimetype'] = mimetype contents[-1]['encoding'] = encoding contents[-1]['hljs_language'] = content_display_data['language'] contents[-1]['data'] = content_display_data['content_data'] _contents[contents[-1]['sha1']] = contents[-1] # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage('memory', {}) # Add the empty directory to the test archive - empty_dir_id = directory_identifier({'entries': []}) - empty_dir_id_bin = hash_to_bytes(empty_dir_id) - storage.directory_add([{'id': empty_dir_id_bin, 'entries': []}]) + storage.directory_add([Directory(entries=[])]) # Return tests data return { 'search': search, 'storage': storage, 'idx_storage': idx_storage, 'origins': _TEST_ORIGINS, 'contents': contents, 'directories': list(directories), 'releases': list(releases), 'revisions': list(map(hash_to_hex, revisions)), 'snapshots': list(snapshots), 'generated_checksums': set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer), ('license_indexer', _FossologyLicenseIndexer), ('ctags_indexer', _CtagsIndexer)): idx = idx_class() idx.storage = tests_data['storage'] idx.objstorage = tests_data['storage'].objstorage idx.idx_storage = tests_data['idx_storage'] idx.register_tools(idx.config['tools']) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage, search): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update({ 'storage': storage, 'indexer_storage': idx_storage, 'search': search, }) service.storage = storage service.idx_storage = idx_storage service.search = search # Implement some special endpoints used to provide input tests data # when executing end to end tests with cypress _content_code_data_exts = {} # type: Dict[str, Dict[str, str]] _content_code_data_filenames = {} # type: Dict[str, Dict[str, str]] _content_other_data_exts = {} # type: Dict[str, Dict[str, str]] def _init_content_tests_data(data_path, data_dict, ext_key): """ Helper function to read the content of a directory, store it into a test archive and add some files metadata (sha1 and/or expected programming language) in a dict. Args: data_path (str): path to a directory relative to the tests folder of swh-web data_dict (dict): the dict that will store files metadata ext_key (bool): whether to use file extensions or filenames as dict keys """ test_contents_dir = os.path.join( os.path.dirname(__file__), data_path).encode('utf-8') - directory = Directory.from_disk(path=test_contents_dir) + directory = from_disk.Directory.from_disk(path=test_contents_dir) contents = [] for name, obj in directory.items(): - if isinstance(obj, Content): + if isinstance(obj, from_disk.Content): c = obj.to_model().with_data().to_dict() c['status'] = 'visible' sha1 = hash_to_hex(c['sha1']) if ext_key: key = name.decode('utf-8').split('.')[-1] filename = 'test.' + key else: filename = name.decode('utf-8').split('/')[-1] key = filename language = get_hljs_language_from_filename(filename) data_dict[key] = {'sha1': sha1, 'language': language} - contents.append(c) + contents.append(Content.from_dict(c)) storage = get_tests_data()['storage'] storage.content_add(contents) def _init_content_code_data_exts(): """ Fill a global dictionary which maps source file extension to a code content example. """ global _content_code_data_exts _init_content_tests_data('resources/contents/code/extensions', _content_code_data_exts, True) def _init_content_other_data_exts(): """ Fill a global dictionary which maps a file extension to a content example. """ global _content_other_data_exts _init_content_tests_data('resources/contents/other/extensions', _content_other_data_exts, True) def _init_content_code_data_filenames(): """ Fill a global dictionary which maps a filename to a content example. """ global _content_code_data_filenames _init_content_tests_data('resources/contents/code/filenames', _content_code_data_filenames, False) if config.get_config()['e2e_tests_mode']: _init_content_code_data_exts() _init_content_other_data_exts() _init_content_code_data_filenames() @api_view(['GET']) def get_content_code_data_all_exts(request): """ Endpoint implementation returning a list of all source file extensions to test for highlighting using cypress. """ return Response(sorted(_content_code_data_exts.keys()), status=200, content_type='application/json') @api_view(['GET']) def get_content_code_data_by_ext(request, ext): """ Endpoint implementation returning metadata of a code content example based on the source file extension. """ data = None status = 404 if ext in _content_code_data_exts: data = _content_code_data_exts[ext] status = 200 return Response(data, status=status, content_type='application/json') @api_view(['GET']) def get_content_other_data_by_ext(request, ext): """ Endpoint implementation returning metadata of a content example based on the file extension. """ _init_content_other_data_exts() data = None status = 404 if ext in _content_other_data_exts: data = _content_other_data_exts[ext] status = 200 return Response(data, status=status, content_type='application/json') @api_view(['GET']) def get_content_code_data_all_filenames(request): """ Endpoint implementation returning a list of all source filenames to test for highlighting using cypress. """ return Response(sorted(_content_code_data_filenames.keys()), status=200, content_type='application/json') @api_view(['GET']) def get_content_code_data_by_filename(request, filename): """ Endpoint implementation returning metadata of a code content example based on the source filename. """ data = None status = 404 if filename in _content_code_data_filenames: data = _content_code_data_filenames[filename] status = 200 return Response(data, status=status, content_type='application/json') diff --git a/swh/web/tests/misc/test_badges.py b/swh/web/tests/misc/test_badges.py index bd51eb83..73c6dd31 100644 --- a/swh/web/tests/misc/test_badges.py +++ b/swh/web/tests/misc/test_badges.py @@ -1,164 +1,164 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from corsheaders.middleware import ACCESS_CONTROL_ALLOW_ORIGIN from hypothesis import given from swh.model.identifiers import ( persistent_identifier, CONTENT, DIRECTORY, ORIGIN, RELEASE, REVISION, SNAPSHOT ) from swh.web.common import service from swh.web.common.utils import reverse, resolve_swh_persistent_id from swh.web.misc.badges import _badge_config, _get_logo_data from swh.web.tests.django_asserts import assert_contains from swh.web.tests.strategies import ( content, directory, origin, release, revision, snapshot, unknown_content, unknown_directory, new_origin, unknown_release, unknown_revision, unknown_snapshot, invalid_sha1 ) @given(content()) def test_content_badge(client, content): _test_badge_endpoints(client, CONTENT, content['sha1_git']) @given(directory()) def test_directory_badge(client, directory): _test_badge_endpoints(client, DIRECTORY, directory) @given(origin()) def test_origin_badge(client, origin): _test_badge_endpoints(client, ORIGIN, origin['url']) @given(release()) def test_release_badge(client, release): _test_badge_endpoints(client, RELEASE, release) @given(revision()) def test_revision_badge(client, revision): _test_badge_endpoints(client, REVISION, revision) @given(snapshot()) def test_snapshot_badge(client, snapshot): _test_badge_endpoints(client, SNAPSHOT, snapshot) @given(unknown_content(), unknown_directory(), new_origin(), unknown_release(), unknown_revision(), unknown_snapshot(), invalid_sha1()) def test_badge_errors(client, unknown_content, unknown_directory, new_origin, unknown_release, unknown_revision, unknown_snapshot, invalid_sha1): for object_type, object_id in ( (CONTENT, unknown_content['sha1_git']), (DIRECTORY, unknown_directory), - (ORIGIN, new_origin['url']), + (ORIGIN, new_origin.url), (RELEASE, unknown_release), (REVISION, unknown_revision), (SNAPSHOT, unknown_snapshot) ): url_args = { 'object_type': object_type, 'object_id': object_id } url = reverse('swh-badge', url_args=url_args) resp = client.get(url) _check_generated_badge(resp, **url_args, error='not found') if object_type != ORIGIN: object_pid = persistent_identifier(object_type, object_id) url = reverse('swh-badge-pid', url_args={'object_pid': object_pid}) resp = client.get(url) _check_generated_badge(resp, **url_args, error='not found') for object_type, object_id in ( (CONTENT, invalid_sha1), (DIRECTORY, invalid_sha1), (RELEASE, invalid_sha1), (REVISION, invalid_sha1), (SNAPSHOT, invalid_sha1) ): url_args = { 'object_type': object_type, 'object_id': object_id } url = reverse('swh-badge', url_args=url_args) resp = client.get(url) _check_generated_badge(resp, **url_args, error='invalid id') object_pid = f'swh:1:{object_type[:3]}:{object_id}' url = reverse('swh-badge-pid', url_args={'object_pid': object_pid}) resp = client.get(url) _check_generated_badge(resp, '', '', error='invalid id') @given(origin(), release()) def test_badge_endpoints_have_cors_header(client, origin, release): url = reverse('swh-badge', url_args={'object_type': ORIGIN, 'object_id': origin['url']}) resp = client.get(url, HTTP_ORIGIN='https://example.org') assert resp.status_code == 200, resp.content assert ACCESS_CONTROL_ALLOW_ORIGIN in resp release_pid = persistent_identifier(RELEASE, release) url = reverse('swh-badge-pid', url_args={'object_pid': release_pid}) resp = client.get(url, HTTP_ORIGIN='https://example.org') assert resp.status_code == 200, resp.content assert ACCESS_CONTROL_ALLOW_ORIGIN in resp def _test_badge_endpoints(client, object_type, object_id): url_args = {'object_type': object_type, 'object_id': object_id} url = reverse('swh-badge', url_args=url_args) resp = client.get(url) _check_generated_badge(resp, **url_args) if object_type != ORIGIN: pid = persistent_identifier(object_type, object_id) url = reverse('swh-badge-pid', url_args={'object_pid': pid}) resp = client.get(url) _check_generated_badge(resp, **url_args) def _check_generated_badge(response, object_type, object_id, error=None): assert response.status_code == 200, response.content assert response['Content-Type'] == 'image/svg+xml' if not object_type: object_type = 'object' if object_type == ORIGIN and error is None: link = reverse('browse-origin', url_args={'origin_url': object_id}) text = 'repository' elif error is None: text = persistent_identifier(object_type, object_id) link = resolve_swh_persistent_id(text)['browse_url'] if object_type == RELEASE: release = service.lookup_release(object_id) text = release['name'] elif error == 'invalid id': text = 'error' link = f'invalid {object_type} id' object_type = 'error' elif error == 'not found': text = 'error' link = f'{object_type} not found' object_type = 'error' assert_contains(response, '') assert_contains(response, _get_logo_data()) assert_contains(response, _badge_config[object_type]['color']) assert_contains(response, _badge_config[object_type]['title']) assert_contains(response, text) assert_contains(response, link) diff --git a/swh/web/tests/strategies.py b/swh/web/tests/strategies.py index f374b335..f0fa30bd 100644 --- a/swh/web/tests/strategies.py +++ b/swh/web/tests/strategies.py @@ -1,533 +1,531 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from collections import defaultdict from datetime import datetime from hypothesis import settings, assume from hypothesis.strategies import ( just, sampled_from, lists, composite, datetimes, binary, text, characters ) from swh.model.hashutil import hash_to_hex, hash_to_bytes from swh.model.identifiers import directory_identifier +from swh.model.model import Person, Revision, TimestampWithTimezone from swh.storage.algos.revisions_walker import get_revisions_walker from swh.model.hypothesis_strategies import ( origins as new_origin_strategy, snapshots as new_snapshot ) from swh.web.tests.data import get_tests_data # Module dedicated to the generation of input data for tests through # the use of hypothesis. # Some of these data are sampled from a test archive created and populated # in the swh.web.tests.data module. # Set the swh-web hypothesis profile if none has been explicitly set hypothesis_default_settings = settings.get_profile('default') if repr(settings()) == repr(hypothesis_default_settings): settings.load_profile('swh-web') # The following strategies exploit the hypothesis capabilities def _filter_checksum(cs): generated_checksums = get_tests_data()['generated_checksums'] if not int.from_bytes(cs, byteorder='little') or \ cs in generated_checksums: return False generated_checksums.add(cs) return True def _known_swh_object(object_type): return sampled_from(get_tests_data()[object_type]) def sha1(): """ Hypothesis strategy returning a valid hexadecimal sha1 value. """ return binary( min_size=20, max_size=20).filter(_filter_checksum).map(hash_to_hex) def invalid_sha1(): """ Hypothesis strategy returning an invalid sha1 representation. """ return binary( min_size=50, max_size=50).filter(_filter_checksum).map(hash_to_hex) def sha256(): """ Hypothesis strategy returning a valid hexadecimal sha256 value. """ return binary( min_size=32, max_size=32).filter(_filter_checksum).map(hash_to_hex) def content(): """ Hypothesis strategy returning a random content ingested into the test archive. """ return _known_swh_object('contents') def contents(): """ Hypothesis strategy returning random contents ingested into the test archive. """ return lists(content(), min_size=2, max_size=8) def content_text(): """ Hypothesis strategy returning random textual contents ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/')) def content_text_non_utf8(): """ Hypothesis strategy returning random textual contents not encoded to UTF-8 ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/') and c['encoding'] not in ('utf-8', 'us-ascii')) def content_text_no_highlight(): """ Hypothesis strategy returning random textual contents with no detected programming language to highlight ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/') and c['hljs_language'] == 'nohighlight') def content_image_type(): """ Hypothesis strategy returning random image contents ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('image/')) def content_utf8_detected_as_binary(): """ Hypothesis strategy returning random textual contents detected as binary by libmagic while they are valid UTF-8 encoded files. """ def utf8_binary_detected(content): if content['encoding'] != 'binary': return False try: content['data'].decode('utf-8') except Exception: return False else: return True return content().filter(utf8_binary_detected) @composite def new_content(draw): blake2s256_hex = draw(sha256()) sha1_hex = draw(sha1()) sha1_git_hex = draw(sha1()) sha256_hex = draw(sha256()) assume(sha1_hex != sha1_git_hex) assume(blake2s256_hex != sha256_hex) return { 'blake2S256': blake2s256_hex, 'sha1': sha1_hex, 'sha1_git': sha1_git_hex, 'sha256': sha256_hex } def unknown_content(): """ Hypothesis strategy returning a random content not ingested into the test archive. """ return new_content().filter( lambda c: next(get_tests_data()['storage'].content_get( [hash_to_bytes(c['sha1'])])) is None) def unknown_contents(): """ Hypothesis strategy returning random contents not ingested into the test archive. """ return lists(unknown_content(), min_size=2, max_size=8) def directory(): """ Hypothesis strategy returning a random directory ingested into the test archive. """ return _known_swh_object('directories') def directory_with_subdirs(): """ Hypothesis strategy returning a random directory containing sub directories ingested into the test archive. """ return directory().filter( lambda d: any([e['type'] == 'dir' for e in list( get_tests_data()['storage'].directory_ls(hash_to_bytes(d)))])) def empty_directory(): """ Hypothesis strategy returning the empty directory ingested into the test archive. """ return just(directory_identifier({'entries': []})) def unknown_directory(): """ Hypothesis strategy returning a random directory not ingested into the test archive. """ return sha1().filter( lambda s: len(list(get_tests_data()['storage'].directory_missing( [hash_to_bytes(s)]))) > 0) def origin(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ return _known_swh_object('origins') def origin_with_multiple_visits(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() for origin in tests_data['origins']: visits = list(tests_data['storage'].origin_visit_get(origin['url'])) if len(visits) > 1: ret.append(origin) return sampled_from(ret) def origin_with_releases(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() for origin in tests_data['origins']: snapshot = tests_data['storage'].snapshot_get_latest(origin['url']) if any([b['target_type'] == 'release' for b in snapshot['branches'].values()]): ret.append(origin) return sampled_from(ret) def new_origin(): """ Hypothesis strategy returning a random origin not ingested into the test archive. """ - return new_origin_strategy().map(lambda origin: origin.to_dict()).filter( + return new_origin_strategy().filter( lambda origin: get_tests_data()['storage'].origin_get( - [origin])[0] is None) + [origin.to_dict()])[0] is None) def new_origins(nb_origins=None): """ Hypothesis strategy returning random origins not ingested into the test archive. """ min_size = nb_origins if nb_origins is not None else 2 max_size = nb_origins if nb_origins is not None else 8 size = random.randint(min_size, max_size) return lists(new_origin(), min_size=size, max_size=size, unique_by=lambda o: tuple(sorted(o.items()))) def visit_dates(nb_dates=None): """ Hypothesis strategy returning a list of visit dates. """ min_size = nb_dates if nb_dates else 2 max_size = nb_dates if nb_dates else 8 return lists(datetimes(min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0)), min_size=min_size, max_size=max_size, unique=True).map(sorted) def release(): """ Hypothesis strategy returning a random release ingested into the test archive. """ return _known_swh_object('releases') def unknown_release(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: next(get_tests_data()['storage'].release_get([s])) is None) def revision(): """ Hypothesis strategy returning a random revision ingested into the test archive. """ return _known_swh_object('revisions') def unknown_revision(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: next(get_tests_data()['storage'].revision_get( [hash_to_bytes(s)])) is None) @composite def new_person(draw): """ Hypothesis strategy returning random raw swh person data. """ name = draw(text(min_size=5, max_size=30, alphabet=characters(min_codepoint=0, max_codepoint=255))) email = '%s@company.org' % name - return { - 'name': name.encode(), - 'email': email.encode(), - 'fullname': ('%s <%s>' % (name, email)).encode() - } + return Person( + name=name.encode(), + email=email.encode(), + fullname=('%s <%s>' % (name, email)).encode() + ) @composite def new_swh_date(draw): """ Hypothesis strategy returning random raw swh date data. """ timestamp = draw( datetimes(min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0)).map( lambda d: int(d.timestamp()))) return { 'timestamp': timestamp, 'offset': 0, 'negative_utc': False, } @composite def new_revision(draw): """ Hypothesis strategy returning random raw swh revision data not ingested into the test archive. """ - return { - 'id': draw(unknown_revision().map(hash_to_bytes)), - 'directory': draw(sha1().map(hash_to_bytes)), - 'author': draw(new_person()), - 'committer': draw(new_person()), - 'message': draw( + return Revision( + directory=draw(sha1().map(hash_to_bytes)), + author=draw(new_person()), + committer=draw(new_person()), + message=draw( text(min_size=20, max_size=100).map(lambda t: t.encode())), - 'date': draw(new_swh_date()), - 'committer_date': draw(new_swh_date()), - 'synthetic': False, - 'type': 'git', - 'parents': [], - 'metadata': [], - } + date=TimestampWithTimezone.from_datetime(draw(new_swh_date())), + committer_date=TimestampWithTimezone.from_datetime( + draw(new_swh_date())), + synthetic=False, + type='git', + ) def revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions ingested into the test archive. """ return lists(revision(), min_size=min_size, max_size=max_size) def unknown_revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions not ingested into the test archive. """ return lists(unknown_revision(), min_size=min_size, max_size=max_size) def snapshot(): """ Hypothesis strategy returning a random snapshot ingested into the test archive. """ return _known_swh_object('snapshots') def new_snapshots(nb_snapshots=None): min_size = nb_snapshots if nb_snapshots else 2 max_size = nb_snapshots if nb_snapshots else 8 - return lists(new_snapshot(min_size=2, max_size=10, only_objects=True) - .map(lambda snp: snp.to_dict()), + return lists(new_snapshot(min_size=2, max_size=10, only_objects=True), min_size=min_size, max_size=max_size) def unknown_snapshot(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: get_tests_data()['storage'].snapshot_get( hash_to_bytes(s)) is None) def _get_origin_dfs_revisions_walker(): tests_data = get_tests_data() storage = tests_data['storage'] origin = random.choice(tests_data['origins'][:-1]) snapshot = storage.snapshot_get_latest(origin['url']) if snapshot['branches'][b'HEAD']['target_type'] == 'alias': target = snapshot['branches'][b'HEAD']['target'] head = snapshot['branches'][target]['target'] else: head = snapshot['branches'][b'HEAD']['target'] return get_revisions_walker('dfs', storage, head) def ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with an ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev['parents']: children[rev_p].append(rev['id']) if not init_rev_found: master_revisions.append(rev) if not rev['parents']: init_rev_found = True # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions)-1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev['id']] return just({ 'sha1_git_root': hash_to_hex(root_rev['id']), 'sha1_git': hash_to_hex(ancestor_rev['id']), 'children': [hash_to_hex(r) for r in ancestor_child_revs] }) def non_ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with no ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev['parents']) > 1: merge_revs.append(rev) for rev_p in rev['parents']: children[rev_p].append(rev['id']) # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev['parents']): selected_revs = merge_rev['parents'] return just({ 'sha1_git_root': hash_to_hex(selected_revs[0]), 'sha1_git': hash_to_hex(selected_revs[1]) }) # The following strategies returns data specific to some tests # that can not be generated and thus are hardcoded. def contents_with_ctags(): """ Hypothesis strategy returning contents ingested into the test archive. Those contents are ctags compatible, that is running ctags on those lay results. """ return just({ 'sha1s': ['0ab37c02043ebff946c1937523f60aadd0844351', '15554cf7608dde6bfefac7e3d525596343a85b6f', '2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd', '30acd0b47fc25e159e27a980102ddb1c4bea0b95', '4f81f05aaea3efb981f9d90144f746d6b682285b', '5153aa4b6e4455a62525bc4de38ed0ff6e7dd682', '59d08bafa6a749110dfb65ba43a61963d5a5bf9f', '7568285b2d7f31ae483ae71617bd3db873deaa2c', '7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4', '8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03', '9b3557f1ab4111c8607a4f2ea3c1e53c6992916c', '9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd', 'c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b', 'e89e55a12def4cd54d5bff58378a3b5119878eb7', 'e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e', 'eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5'], 'symbol_name': 'ABS' }) def revision_with_submodules(): """ Hypothesis strategy returning a revision that is known to point to a directory with revision entries (aka git submodule) """ return just({ 'rev_sha1_git': 'ffcb69001f3f6745dfd5b48f72ab6addb560e234', 'rev_dir_sha1_git': 'd92a21446387fa28410e5a74379c934298f39ae2', 'rev_dir_rev_path': 'libtess2' })