diff --git a/requirements-test.txt b/requirements-test.txt index 6822beb7..3270ac1c 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,10 +1,11 @@ hypothesis pytest pytest-django pytest-mock django-stubs djangorestframework-stubs requests-mock swh.core[http] >= 0.0.81 swh.loader.git >= 0.0.55 +swh.storage >= 0.0.178 decorator # dependency of swh.core[http] diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index 5c3f386a..3eac0194 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,672 +1,672 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given import pytest from requests.utils import parse_header_links from swh.model.model import Origin from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.api.utils import enrich_origin_visit, enrich_origin from swh.web.common.exc import BadInputExc from swh.web.common.utils import reverse from swh.web.common.origin_visits import get_origin_visits from swh.web.tests.strategies import ( origin, new_origin, visit_dates, new_snapshots ) def _scroll_results(api_client, url): """Iterates through pages of results, and returns them all.""" results = [] while True: rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' results.extend(rv.data) if 'Link' in rv: for link in parse_header_links(rv['Link']): if link['rel'] == 'next': # Found link to next page of results url = link['url'] break else: # No link with 'rel=next' break else: # No Link header break return results def test_api_lookup_origin_visits_raise_error(api_client, mocker): mock_get_origin_visits = mocker.patch( 'swh.web.api.views.origin.get_origin_visits') err_msg = 'voluntary error to check the bad request middleware.' mock_get_origin_visits.side_effect = BadInputExc(err_msg) url = reverse('api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = api_client.get(url) assert rv.status_code == 400, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'BadInputExc', 'reason': err_msg } def test_api_lookup_origin_visits_raise_swh_storage_error_db(api_client, mocker): mock_get_origin_visits = mocker.patch( 'swh.web.api.views.origin.get_origin_visits') err_msg = 'Storage exploded! Will be back online shortly!' mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse('api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = api_client.get(url) assert rv.status_code == 503, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'StorageDBError', 'reason': 'An unexpected error occurred in the backend: %s' % err_msg } def test_api_lookup_origin_visits_raise_swh_storage_error_api(api_client, mocker): mock_get_origin_visits = mocker.patch( 'swh.web.api.views.origin.get_origin_visits') err_msg = 'Storage API dropped dead! Will resurrect asap!' mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = api_client.get(url) assert rv.status_code == 503, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'StorageAPIError', 'reason': 'An unexpected error occurred in the api backend: %s' % err_msg } @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( new_origin.url, visit_date, type='git') archive_data.snapshot_add([new_snapshots[i]]) archive_data.origin_visit_update( - new_origin.url, origin_visit['visit'], + new_origin.url, origin_visit.visit, snapshot=new_snapshots[i].id) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:])): url = reverse('api-1-origin-visits', url_args={'origin_url': new_origin.url}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( new_origin.url, visit_date, type='git') archive_data.snapshot_add([new_snapshots[i]]) archive_data.origin_visit_update( - new_origin.url, origin_visit['visit'], + new_origin.url, origin_visit.visit, snapshot=new_snapshots[i].id) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-1-origin-visits', url_args={'origin_url': new_origin.url}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( new_origin.url, visit_date, type='git') - visit_id = origin_visit['visit'] + visit_id = origin_visit.visit archive_data.snapshot_add([new_snapshots[i]]) archive_data.origin_visit_update( - new_origin.url, origin_visit['visit'], + new_origin.url, visit_id, snapshot=new_snapshots[i].id) url = reverse('api-1-origin-visit', url_args={'origin_url': new_origin.url, 'visit_id': visit_id}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_visit = archive_data.origin_visit_get_by( new_origin.url, visit_id) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request) assert rv.data == expected_visit @given(new_origin()) def test_api_lookup_origin_visit_latest_no_visit(api_client, archive_data, new_origin): archive_data.origin_add_one(new_origin) url = reverse('api-1-origin-visit-latest', url_args={'origin_url': new_origin.url}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'No visit for origin %s found' % new_origin.url } @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( new_origin.url, visit_date, type='git') - visit_ids.append(origin_visit['visit']) + visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) archive_data.origin_visit_update( new_origin.url, visit_ids[0], snapshot=new_snapshots[0].id) url = reverse('api-1-origin-visit-latest', url_args={'origin_url': new_origin.url}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_visit = archive_data.origin_visit_get_by( new_origin.url, visit_ids[1]) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request) assert rv.data == expected_visit @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest_with_snapshot(api_client, archive_data, new_origin, visit_dates, new_snapshots): archive_data.origin_add_one(new_origin) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( new_origin.url, visit_date, type='git') - visit_ids.append(origin_visit['visit']) + visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) archive_data.origin_visit_update( new_origin.url, visit_ids[0], snapshot=new_snapshots[0].id) url = reverse('api-1-origin-visit-latest', url_args={'origin_url': new_origin.url}, query_params={'require_snapshot': True}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_visit = archive_data.origin_visit_get_by( new_origin.url, visit_ids[0]) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request) assert rv.data == expected_visit @given(origin()) def test_api_lookup_origin_visit_not_found(api_client, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) url = reverse('api-1-origin-visit', url_args={'origin_url': origin['url'], 'visit_id': max_visit_id + 1}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Origin %s or its visit with id %s not found!' % (origin['url'], max_visit_id+1) } def test_api_origins(api_client, archive_data): origins = list(archive_data.origin_get_range(0, 10000)) origin_urls = {origin['url'] for origin in origins} # Get only one url = reverse('api-1-origins', query_params={'origin_count': 1}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} <= origin_urls # Get all url = reverse('api-1-origins', query_params={'origin_count': len(origins)}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == len(origins) assert {origin['url'] for origin in rv.data} == origin_urls # Get "all + 10" url = reverse('api-1-origins', query_params={'origin_count': len(origins)+10}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == len(origins) assert {origin['url'] for origin in rv.data} == origin_urls @pytest.mark.parametrize('origin_count', [1, 2, 10, 100]) def test_api_origins_scroll(api_client, archive_data, origin_count): origins = list(archive_data.origin_get_range(0, 10000)) origin_urls = {origin['url'] for origin in origins} url = reverse('api-1-origins', query_params={'origin_count': origin_count}) results = _scroll_results(api_client, url) assert len(results) == len(origins) assert {origin['url'] for origin in results} == origin_urls @given(origin()) def test_api_origin_by_url(api_client, archive_data, origin): url = reverse('api-1-origin', url_args={'origin_url': origin['url']}) rv = api_client.get(url) expected_origin = archive_data.origin_get(origin) expected_origin = enrich_origin(expected_origin, rv.wsgi_request) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == expected_origin @given(new_origin()) def test_api_origin_not_found(api_client, new_origin): url = reverse('api-1-origin', url_args={'origin_url': new_origin.url}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Origin with url %s not found!' % new_origin.url } @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) def test_api_origin_search(api_client, mocker, backend): if backend != 'swh-search': # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) expected_origins = { 'https://github.com/wcoder/highlightjs-line-numbers.js', 'https://github.com/memononen/libtess2', } # Search for 'github.com', get only one url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': 1}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} <= expected_origins # Search for 'github.com', get all url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins # Search for 'github.com', get more than available url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': 10}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) def test_api_origin_search_words(api_client, mocker, backend): if backend != 'swh-search': # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) expected_origins = { 'https://github.com/wcoder/highlightjs-line-numbers.js', 'https://github.com/memononen/libtess2', } url = reverse('api-1-origin-search', url_args={'url_pattern': 'github com'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins url = reverse('api-1-origin-search', url_args={'url_pattern': 'com github'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert {origin['url'] for origin in rv.data} == expected_origins url = reverse('api-1-origin-search', url_args={'url_pattern': 'memononen libtess2'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} \ == {'https://github.com/memononen/libtess2'} url = reverse('api-1-origin-search', url_args={'url_pattern': 'libtess2 memononen'}, query_params={'limit': 2}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 assert {origin['url'] for origin in rv.data} \ == {'https://github.com/memononen/libtess2'} @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) @pytest.mark.parametrize('limit', [1, 2, 3, 10]) def test_api_origin_search_scroll( api_client, archive_data, mocker, limit, backend): if backend != 'swh-search': # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) expected_origins = { 'https://github.com/wcoder/highlightjs-line-numbers.js', 'https://github.com/memononen/libtess2', } url = reverse('api-1-origin-search', url_args={'url_pattern': 'github.com'}, query_params={'limit': limit}) results = _scroll_results(api_client, url) assert {origin['url'] for origin in results} == expected_origins @pytest.mark.parametrize('backend', ['swh-search', 'swh-storage']) def test_api_origin_search_limit( api_client, archive_data, tests_data, mocker, backend): if backend == 'swh-search': tests_data['search'].origin_update([ {'url': 'http://foobar/{}'.format(i)} for i in range(2000) ]) else: # equivalent to not configuring search in the config mocker.patch('swh.web.common.service.search', None) archive_data.origin_add([ Origin(url='http://foobar/{}'.format(i)) for i in range(2000) ]) url = reverse('api-1-origin-search', url_args={'url_pattern': 'foobar'}, query_params={'limit': 1050}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1000 @given(origin()) def test_api_origin_metadata_search(api_client, mocker, origin): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' expected_data = [{ 'url': origin['url'], 'metadata': { 'metadata': {'author': 'Jane Doe'}, 'from_revision': ( '7026b7c1a2af56521e951c01ed20f255fa054238'), 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1', } } }] assert rv.data == expected_data oimsft.assert_called_with(conjunction=['Jane Doe'], limit=70) @given(origin()) def test_api_origin_metadata_search_limit(api_client, mocker, origin): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=['Jane Doe'], limit=70) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 10}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=['Jane Doe'], limit=10) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 987}) rv = api_client.get(url) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=['Jane Doe'], limit=100) @given(origin()) def test_api_origin_intrinsic_metadata(api_client, mocker, origin): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') oimg = mock_idx_storage.origin_intrinsic_metadata_get oimg.side_effect = lambda origin_urls: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-origin-intrinsic-metadata', url_args={'origin_url': origin['url']}) rv = api_client.get(url) oimg.assert_called_once_with([origin['url']]) assert rv.status_code == 200, rv.content assert rv['Content-Type'] == 'application/json' expected_data = {'author': 'Jane Doe'} assert rv.data == expected_data def test_api_origin_metadata_search_invalid(api_client, mocker): mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') url = reverse('api-1-origin-metadata-search') rv = api_client.get(url) assert rv.status_code == 400, rv.content mock_idx_storage.assert_not_called() diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py index 28a78a3e..99443168 100644 --- a/swh/web/tests/browse/views/test_origin.py +++ b/swh/web/tests/browse/views/test_origin.py @@ -1,905 +1,905 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random import re import swh.web.browse.utils from django.utils.html import escape from hypothesis import given from swh.model.hashutil import hash_to_bytes from swh.model.model import Snapshot from swh.web.browse.utils import process_snapshot_branches from swh.web.common.exc import NotFoundExc from swh.web.common.utils import ( reverse, gen_path_info, format_utc_iso_date, parse_timestamp, get_swh_persistent_id ) from swh.web.tests.data import get_content from swh.web.tests.django_asserts import assert_contains, assert_template_used from swh.web.tests.strategies import ( origin, origin_with_multiple_visits, new_origin, new_snapshot, visit_dates, revisions, origin_with_releases ) @given(origin_with_multiple_visits()) def test_origin_visits_browse(client, archive_data, origin): url = reverse('browse-origin-visits', url_args={'origin_url': origin['url']}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/origin-visits.html') url = reverse('browse-origin-visits', url_args={'origin_url': origin['url']}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/origin-visits.html') visits = archive_data.origin_visit_get(origin['url']) for v in visits: vdate = format_utc_iso_date(v['date'], '%Y-%m-%dT%H:%M:%SZ') browse_dir_url = reverse('browse-origin-directory', url_args={'origin_url': origin['url'], 'timestamp': vdate}) assert_contains(resp, browse_dir_url) @given(origin_with_multiple_visits()) def test_origin_content_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) def _get_archive_data(visit_idx): snapshot = archive_data.snapshot_get( origin_visits[visit_idx]['snapshot']) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) dir_content = archive_data.directory_ls(head_rev['directory']) dir_files = [e for e in dir_content if e['type'] == 'file'] dir_file = random.choice(dir_files) branches, releases = process_snapshot_branches(snapshot) return { 'branches': branches, 'releases': releases, 'root_dir_sha1': head_rev['directory'], 'content': get_content(dir_file['checksums']['sha1']), 'visit': origin_visits[visit_idx] } tdata = _get_archive_data(-1) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content']) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content'], timestamp=tdata['visit']['date']) visit_unix_ts = parse_timestamp(tdata['visit']['date']).timestamp() visit_unix_ts = int(visit_unix_ts) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content'], timestamp=visit_unix_ts) tdata = _get_archive_data(0) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content'], visit_id=tdata['visit']['visit']) @given(origin()) def test_origin_root_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev['directory'] dir_content = archive_data.directory_ls(root_dir_sha1) branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit['date']).timestamp() visit_unix_ts = int(visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit['date']) origin = dict(origin) del origin['type'] _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit['date']) @given(origin()) def test_origin_sub_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev['directory'] subdirs = [e for e in archive_data.directory_ls(root_dir_sha1) if e['type'] == 'dir'] branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit['date']).timestamp() visit_unix_ts = int(visit_unix_ts) if len(subdirs) == 0: return subdir = random.choice(subdirs) subdir_content = archive_data.directory_ls(subdir['target']) subdir_path = subdir['name'] _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit['date']) origin = dict(origin) del origin['type'] _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit['date']) @given(origin()) def test_origin_branches(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) snapshot_content = process_snapshot_branches(snapshot) _origin_branches_test_helper(client, origin, snapshot_content) origin = dict(origin) origin['type'] = None _origin_branches_test_helper(client, origin, snapshot_content) @given(origin()) def test_origin_releases(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) snapshot_content = process_snapshot_branches(snapshot) _origin_releases_test_helper(client, origin, snapshot_content) origin = dict(origin) origin['type'] = None _origin_releases_test_helper(client, origin, snapshot_content) @given(new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=3, max_size=3)) def test_origin_snapshot_null_branch(client, archive_data, new_origin, new_snapshot, visit_dates, revisions): snp_dict = new_snapshot.to_dict() new_origin = archive_data.origin_add([new_origin])[0] for i, branch in enumerate(snp_dict['branches'].keys()): if i == 0: snp_dict['branches'][branch] = None else: snp_dict['branches'][branch] = { 'target_type': 'revision', 'target': hash_to_bytes(revisions[i-1]), } archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add( new_origin['url'], visit_dates[0], type='git') - archive_data.origin_visit_update(new_origin['url'], visit['visit'], + archive_data.origin_visit_update(new_origin['url'], visit.visit, status='partial', snapshot=snp_dict['id']) url = reverse('browse-origin-directory', url_args={'origin_url': new_origin['url']}) rv = client.get(url) assert rv.status_code == 200 @given(new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=4, max_size=4)) def test_origin_snapshot_invalid_branch(client, archive_data, new_origin, new_snapshot, visit_dates, revisions): snp_dict = new_snapshot.to_dict() new_origin = archive_data.origin_add([new_origin])[0] for i, branch in enumerate(snp_dict['branches'].keys()): snp_dict['branches'][branch] = { 'target_type': 'revision', 'target': hash_to_bytes(revisions[i]), } archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add( new_origin['url'], visit_dates[0], type='git') - archive_data.origin_visit_update(new_origin['url'], visit['visit'], + archive_data.origin_visit_update(new_origin['url'], visit.visit, status='full', snapshot=snp_dict['id']) url = reverse('browse-origin-directory', url_args={'origin_url': new_origin['url']}, query_params={'branch': 'invalid_branch'}) rv = client.get(url) assert rv.status_code == 404 def test_origin_request_errors(client, archive_data, mocker): mock_snapshot_service = mocker.patch( 'swh.web.browse.views.utils.snapshot_context.service') mock_origin_service = mocker.patch('swh.web.browse.views.origin.service') mock_utils_service = mocker.patch('swh.web.browse.utils.service') mock_get_origin_visit_snapshot = mocker.patch( 'swh.web.browse.utils.get_origin_visit_snapshot') mock_get_origin_visits = mocker.patch( 'swh.web.common.origin_visits.get_origin_visits') mock_request_content = mocker.patch( 'swh.web.browse.views.utils.snapshot_context.request_content') mock_origin_service.lookup_origin.side_effect = NotFoundExc( 'origin not found') url = reverse('browse-origin-visits', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, 'origin not found', status_code=404) mock_origin_service.lookup_origin.side_effect = None mock_origin_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} mock_get_origin_visits.return_value = [] url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{'visit': 1}] mock_get_origin_visit_snapshot.side_effect = NotFoundExc('visit not found') url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}, query_params={'visit_id': 2}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert re.search('Visit.*not found', resp.content.decode('utf-8')) mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'visit': 1 }] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ( [{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb', 'name': 'HEAD', 'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672', 'date': '04 May 2017, 13:27 UTC', 'message': ''}], [] ) mock_utils_service.lookup_snapshot_sizes.return_value = { 'revision': 1, 'release': 0 } mock_lookup_directory = mock_utils_service.lookup_directory mock_lookup_directory.side_effect = NotFoundExc('Directory not found') url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, 'Directory not found', status_code=404) mock_origin_service.lookup_origin.side_effect = None mock_origin_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} mock_get_origin_visits.return_value = [] url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'foo'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{'visit': 1}] mock_get_origin_visit_snapshot.side_effect = NotFoundExc('visit not found') url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'foo'}, query_params={'visit_id': 2}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert re.search('Visit.*not found', resp.content.decode('utf-8')) mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'type': 'git', 'visit': 1 }] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ([], []) mock_utils_service.lookup_snapshot_sizes.return_value = { 'revision': 0, 'release': 0 } mock_utils_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'baz'}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/content.html') assert re.search('snapshot.*is empty', resp.content.decode('utf-8')) mock_get_origin_visit_snapshot.return_value = ( [{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb', 'name': 'HEAD', 'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672', 'date': '04 May 2017, 13:27 UTC', 'message': ''}], [] ) mock_utils_service.lookup_snapshot_sizes.return_value = { 'revision': 1, 'release': 0 } mock_snapshot_service.lookup_directory_with_path.return_value = { 'target': '5ecd9f37b7a2d2e9980d201acd6286116f2ba1f1' } mock_request_content.side_effect = NotFoundExc('Content not found') url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'baz'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, 'Content not found', status_code=404) mock_get_snapshot_context = mocker.patch( 'swh.web.browse.views.utils.snapshot_context.get_snapshot_context') mock_get_snapshot_context.side_effect = NotFoundExc('Snapshot not found') url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, 'error.html') assert_contains(resp, 'Snapshot not found', status_code=404) def test_origin_empty_snapshot(client, mocker): mock_utils_service = mocker.patch('swh.web.browse.utils.service') mock_get_origin_visit_snapshot = mocker.patch( 'swh.web.browse.utils.get_origin_visit_snapshot') mock_get_origin_visits = mocker.patch( 'swh.web.common.origin_visits.get_origin_visits') mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'type': 'git', 'visit': 1 }] mock_get_origin_visit_snapshot.return_value = ([], []) mock_utils_service.lookup_snapshot_sizes.return_value = { 'revision': 0, 'release': 0 } mock_utils_service.lookup_origin.return_value = { 'id': 457, 'url': 'https://github.com/foo/bar' } url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/directory.html') resp_content = resp.content.decode('utf-8') assert re.search('snapshot.*is empty', resp_content) assert not re.search('swh-tr-link', resp_content) @given(origin_with_releases()) def test_origin_release_browse(client, archive_data, origin): # for swh.web.browse.utils.get_snapshot_content to only return one branch snapshot_max_size = swh.web.browse.utils.snapshot_content_max_size swh.web.browse.utils.snapshot_content_max_size = 1 try: snapshot = archive_data.snapshot_get_latest(origin['url']) release = [b for b in snapshot['branches'].values() if b['target_type'] == 'release'][-1] release_data = archive_data.release_get(release['target']) url = reverse('browse-origin-directory', url_args={'origin_url': origin['url']}, query_params={'release': release_data['name']}) resp = client.get(url) assert resp.status_code == 200 assert_contains(resp, release_data['name']) assert_contains(resp, release['target']) finally: swh.web.browse.utils.snapshot_content_max_size = snapshot_max_size @given(origin_with_releases()) def test_origin_release_browse_not_found(client, archive_data, origin): invalid_release_name = 'swh-foo-bar' url = reverse('browse-origin-directory', url_args={'origin_url': origin['url']}, query_params={'release': invalid_release_name}) resp = client.get(url) assert resp.status_code == 404 assert re.search(f'Release {invalid_release_name}.*not found', resp.content.decode('utf-8')) def _origin_content_view_test_helper(client, origin_info, origin_visits, origin_branches, origin_releases, root_dir_sha1, content, visit_id=None, timestamp=None): content_path = '/'.join(content['path'].split('/')[1:]) url_args = {'origin_url': origin_info['url'], 'path': content_path} if not visit_id: visit_id = origin_visits[-1]['visit'] query_params = {} if timestamp: url_args['timestamp'] = timestamp if visit_id: query_params['visit_id'] = visit_id url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/content.html') assert type(content['data']) == str assert_contains(resp, '' % content['hljs_language']) assert_contains(resp, escape(content['data'])) split_path = content_path.split('/') filename = split_path[-1] path = content_path.replace(filename, '')[:-1] path_info = gen_path_info(path) del url_args['path'] if timestamp: url_args['timestamp'] = format_utc_iso_date( parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S') root_dir_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '
  • ', count=len(path_info)+1) assert_contains(resp, '%s' % (root_dir_url, root_dir_sha1[:7])) for p in path_info: url_args['path'] = p['path'] dir_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '%s' % (dir_url, p['name'])) assert_contains(resp, '
  • %s
  • ' % filename) query_string = 'sha1_git:' + content['sha1_git'] url_raw = reverse('browse-content-raw', url_args={'query_string': query_string}, query_params={'filename': filename}) assert_contains(resp, url_raw) if 'args' in url_args: del url_args['path'] origin_branches_url = reverse('browse-origin-branches', url_args=url_args, query_params=query_params) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args, query_params=query_params) assert_contains(resp, 'Releases (%s)' % (origin_releases_url, len(origin_releases))) assert_contains(resp, '
  • ', count=len(origin_branches)) url_args['path'] = content_path for branch in origin_branches: query_params['branch'] = branch['name'] root_dir_branch_url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params['branch'] = None for release in origin_releases: query_params['release'] = release['name'] root_dir_release_url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_release_url) url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/content.html') swh_cnt_id = get_swh_persistent_id('content', content['sha1_git']) swh_cnt_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) assert_contains(resp, 'swh-take-new-snapshot') def _origin_directory_view_test_helper(client, origin_info, origin_visits, origin_branches, origin_releases, root_directory_sha1, directory_entries, visit_id=None, timestamp=None, path=None): dirs = [e for e in directory_entries if e['type'] in ('dir', 'rev')] files = [e for e in directory_entries if e['type'] == 'file'] if not visit_id: visit_id = origin_visits[-1]['visit'] url_args = {'origin_url': origin_info['url']} query_params = {} if timestamp: url_args['timestamp'] = timestamp else: query_params['visit_id'] = visit_id if path: url_args['path'] = path url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/directory.html') assert resp.status_code == 200 assert_template_used(resp, 'browse/directory.html') assert_contains(resp, '', count=len(dirs)) assert_contains(resp, '', count=len(files)) if timestamp: url_args['timestamp'] = format_utc_iso_date( parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S') for d in dirs: if d['type'] == 'rev': dir_url = reverse('browse-revision', url_args={'sha1_git': d['target']}) else: dir_path = d['name'] if path: dir_path = "%s/%s" % (path, d['name']) dir_url_args = dict(url_args) dir_url_args['path'] = dir_path dir_url = reverse('browse-origin-directory', url_args=dir_url_args, query_params=query_params) assert_contains(resp, dir_url) for f in files: file_path = f['name'] if path: file_path = "%s/%s" % (path, f['name']) file_url_args = dict(url_args) file_url_args['path'] = file_path file_url = reverse('browse-origin-content', url_args=file_url_args, query_params=query_params) assert_contains(resp, file_url) if 'path' in url_args: del url_args['path'] root_dir_branch_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) nb_bc_paths = 1 if path: nb_bc_paths = len(path.split('/')) + 1 assert_contains(resp, '
  • ', count=nb_bc_paths) assert_contains(resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7])) origin_branches_url = reverse('browse-origin-branches', url_args=url_args, query_params=query_params) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args, query_params=query_params) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) if path: url_args['path'] = path assert_contains(resp, '
  • ', count=len(origin_branches)) for branch in origin_branches: query_params['branch'] = branch['name'] root_dir_branch_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params['branch'] = None for release in origin_releases: query_params['release'] = release['name'] root_dir_release_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_release_url) assert_contains(resp, 'vault-cook-directory') assert_contains(resp, 'vault-cook-revision') swh_dir_id = get_swh_persistent_id('directory', directory_entries[0]['dir_id']) # noqa swh_dir_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url) assert_contains(resp, 'swh-take-new-snapshot') def _origin_branches_test_helper(client, origin_info, origin_snapshot): url_args = {'origin_url': origin_info['url']} url = reverse('browse-origin-branches', url_args=url_args) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/branches.html') origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse('browse-origin-branches', url_args=url_args) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) assert_contains(resp, '' % escape(browse_branch_url)) browse_revision_url = reverse( 'browse-revision', url_args={'sha1_git': branch['revision']}, query_params={'origin': origin_info['url']}) assert_contains(resp, '' % escape(browse_revision_url)) def _origin_releases_test_helper(client, origin_info, origin_snapshot): url_args = {'origin_url': origin_info['url']} url = reverse('browse-origin-releases', url_args=url_args) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, 'browse/releases.html') origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse('browse-origin-branches', url_args=url_args) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) assert_contains(resp, '' % escape(browse_release_url)) assert_contains(resp, '' % escape(browse_revision_url)) diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index cc7c86a0..4547716b 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,911 +1,911 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import pytest import random from collections import defaultdict from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.from_disk import DentryPerms from swh.model.identifiers import ( CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT ) from swh.model.model import Directory, DirectoryEntry, Origin, Revision from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.tests.data import random_sha1, random_content from swh.web.tests.strategies import ( content, unknown_content, contents, unknown_contents, contents_with_ctags, origin, new_origin, visit_dates, directory, unknown_directory, release, unknown_release, revision, unknown_revision, revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, revision_with_submodules, empty_directory, new_revision, snapshot, unknown_snapshot ) from swh.web.tests.conftest import ctags_json_missing, fossology_missing @given(contents()) def test_lookup_multiple_hashes_all_present(contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': True}) assert service.lookup_multiple_hashes(input_data) == expected_output @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': cnt in contents}) assert service.lookup_multiple_hashes(input_data) == expected_output def test_lookup_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.lookup_hash('sha1_git:%s' % unknown_content_['sha1_git']) assert actual_lookup == {'found': None, 'algo': 'sha1_git'} @given(content()) def test_lookup_hash_exist(archive_data, content): actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) content_metadata = archive_data.content_get_metadata(content['sha1']) assert {'found': content_metadata, 'algo': 'sha1'} == actual_lookup def test_search_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.search_hash('sha1_git:%s' % unknown_content_['sha1_git']) assert {'found': False} == actual_lookup @given(content()) def test_search_hash_exist(content): actual_lookup = service.search_hash('sha1:%s' % content['sha1']) assert {'found': True} == actual_lookup @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(contents_with_ctags()) def test_lookup_content_ctags(indexer_data, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags['sha1s']) indexer_data.content_add_ctags(content_sha1) actual_ctags = list(service.lookup_content_ctags('sha1:%s' % content_sha1)) expected_data = list(indexer_data.content_get_ctags(content_sha1)) for ctag in expected_data: ctag['id'] = content_sha1 assert actual_ctags == expected_data def test_lookup_content_ctags_no_hash(): unknown_content_ = random_content() actual_ctags = list(service.lookup_content_ctags('sha1:%s' % unknown_content_['sha1'])) assert actual_ctags == [] @given(content()) def test_lookup_content_filetype(indexer_data, content): indexer_data.content_add_mimetype(content['sha1']) actual_filetype = service.lookup_content_filetype(content['sha1']) expected_filetype = indexer_data.content_get_mimetype(content['sha1']) assert actual_filetype == expected_filetype @pytest.mark.skip # Language indexer is disabled. @given(content()) def test_lookup_content_language(indexer_data, content): indexer_data.content_add_language(content['sha1']) actual_language = service.lookup_content_language(content['sha1']) expected_language = indexer_data.content_get_language(content['sha1']) assert actual_language == expected_language @given(contents_with_ctags()) def test_lookup_expression(indexer_data, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags['sha1s']: if len(expected_ctags) == per_page: break indexer_data.content_add_ctags(content_sha1) for ctag in indexer_data.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag['name'] == contents_with_ctags['symbol_name']: del ctag['id'] ctag['sha1'] = content_sha1 expected_ctags.append(ctag) actual_ctags = list( service.lookup_expression(contents_with_ctags['symbol_name'], last_sha1=None, per_page=10)) assert actual_ctags == expected_ctags def test_lookup_expression_no_result(): expected_ctags = [] actual_ctags = list(service.lookup_expression('barfoo', last_sha1=None, per_page=10)) assert actual_ctags == expected_ctags @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(indexer_data, content): indexer_data.content_add_license(content['sha1']) actual_license = service.lookup_content_license(content['sha1']) expected_license = indexer_data.content_get_license(content['sha1']) assert actual_license == expected_license def test_stat_counters(archive_data): actual_stats = service.stat_counters() assert actual_stats == archive_data.stat_counters() @given(new_origin(), visit_dates()) def test_lookup_origin_visits(archive_data, new_origin, visit_dates): archive_data.origin_add_one(new_origin) for ts in visit_dates: archive_data.origin_visit_add( new_origin.url, ts, type='git') actual_origin_visits = list( service.lookup_origin_visits(new_origin.url, per_page=100)) expected_visits = archive_data.origin_visit_get(new_origin.url) for expected_visit in expected_visits: expected_visit['origin'] = new_origin.url assert actual_origin_visits == expected_visits @given(new_origin(), visit_dates()) def test_lookup_origin_visit(archive_data, new_origin, visit_dates): archive_data.origin_add_one(new_origin) visits = [] for ts in visit_dates: visits.append(archive_data.origin_visit_add( new_origin.url, ts, type='git')) - visit = random.choice(visits)['visit'] + visit = random.choice(visits).visit actual_origin_visit = service.lookup_origin_visit( new_origin.url, visit) expected_visit = dict(archive_data.origin_visit_get_by( new_origin.url, visit)) assert actual_origin_visit == expected_visit @given(new_origin()) def test_lookup_origin(archive_data, new_origin): archive_data.origin_add_one(new_origin) actual_origin = service.lookup_origin({'url': new_origin.url}) expected_origin = archive_data.origin_get( {'url': new_origin.url}) assert actual_origin == expected_origin @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_release(invalid_sha1) assert e.match('Invalid checksum') @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(sha256): with pytest.raises(BadInputExc) as e: service.lookup_release(sha256) assert e.match('Only sha1_git is supported.') @given(directory()) def test_lookup_directory_with_path_not_found(directory): path = 'some/invalid/path/here' with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_path(directory, path) assert e.match('Directory entry with path %s from %s not found' % (path, directory)) @given(directory()) def test_lookup_directory_with_path_found(archive_data, directory): directory_content = archive_data.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry['name'] actual_result = service.lookup_directory_with_path(directory, path) assert actual_result == directory_entry @given(release()) def test_lookup_release(archive_data, release): actual_release = service.lookup_release(release) assert actual_release == archive_data.release_get(release) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Invalid checksum query string') sha1_git = sha256 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Only sha1_git is supported') @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( revision, unknown_revision): sha1_git_root = revision sha1_git = unknown_revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Revision %s not found' % sha1_git) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( revision, unknown_revision): sha1_git_root = unknown_revision sha1_git = revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Revision root %s not found' % sha1_git_root) @given(ancestor_revisions()) def test_lookup_revision_with_context(archive_data, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] root_sha1_git = ancestor_revisions['sha1_git_root'] for sha1_git_root in (root_sha1_git, {'id': hash_to_bytes(root_sha1_git)}): actual_revision = service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in archive_data.revision_log(root_sha1_git): for p_rev in rev['parents']: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev['id']) expected_revision = archive_data.revision_get(sha1_git) expected_revision['children'] = children assert actual_revision == expected_revision @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(non_ancestor_revisions): sha1_git = non_ancestor_revisions['sha1_git'] root_sha1_git = non_ancestor_revisions['sha1_git_root'] with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(root_sha1_git, sha1_git) assert e.match('Revision %s is not an ancestor of %s' % (sha1_git, root_sha1_git)) def test_lookup_directory_with_revision_not_found(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(unknown_revision_) assert e.match('Revision %s not found' % unknown_revision_) @given(new_revision()) def test_lookup_directory_with_revision_unknown_content(archive_data, new_revision): unknown_content_ = random_content() dir_path = 'README.md' # A directory that points to unknown content dir = Directory(entries=[ DirectoryEntry( name=bytes(dir_path.encode('utf-8')), type='file', target=hash_to_bytes(unknown_content_['sha1_git']), perms=DentryPerms.content ) ]) # Create a revision that points to a directory # Which points to unknown content new_revision = new_revision.to_dict() new_revision['directory'] = dir.id del new_revision['id'] new_revision = Revision.from_dict(new_revision) # Add the directory and revision in mem archive_data.directory_add([dir]) archive_data.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision.id) with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(new_revision_id, dir_path) assert e.match('Content not found for revision %s' % new_revision_id) @given(revision()) def test_lookup_directory_with_revision_ko_path_to_nowhere(revision): invalid_path = 'path/to/something/unknown' with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(revision, invalid_path) assert e.match('Directory or File') assert e.match(invalid_path) assert e.match('revision %s' % revision) assert e.match('not found') @given(revision_with_submodules()) def test_lookup_directory_with_revision_submodules(archive_data, revision_with_submodules): rev_sha1_git = revision_with_submodules['rev_sha1_git'] rev_dir_path = revision_with_submodules['rev_dir_rev_path'] actual_data = service.lookup_directory_with_revision( rev_sha1_git, rev_dir_path) revision = archive_data.revision_get( revision_with_submodules['rev_sha1_git']) directory = archive_data.directory_ls(revision['directory']) rev_entry = next(e for e in directory if e['name'] == rev_dir_path) expected_data = { 'content': archive_data.revision_get(rev_entry['target']), 'path': rev_dir_path, 'revision': rev_sha1_git, 'type': 'rev' } assert actual_data == expected_data @given(revision()) def test_lookup_directory_with_revision_without_path(archive_data, revision): actual_directory_entries = service.lookup_directory_with_revision(revision) revision_data = archive_data.revision_get(revision) expected_directory_entries = archive_data.directory_ls( revision_data['directory']) assert actual_directory_entries['type'] == 'dir' assert actual_directory_entries['content'] == expected_directory_entries @given(revision()) def test_lookup_directory_with_revision_with_path(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] in ('file', 'dir')] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry['name']) assert actual_dir_entry['type'] == expected_dir_entry['type'] assert actual_dir_entry['revision'] == revision assert actual_dir_entry['path'] == expected_dir_entry['name'] if actual_dir_entry['type'] == 'file': del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): assert actual_dir_entry['content'][key] == expected_dir_entry[key] else: sub_dir_entries = archive_data.directory_ls( expected_dir_entry['target']) assert actual_dir_entry['content'] == sub_dir_entries @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] == 'file'] expected_dir_entry = random.choice(dir_entries) expected_data = archive_data.content_get( expected_dir_entry['checksums']['sha1']) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry['name'], with_data=True) assert actual_dir_entry['type'] == expected_dir_entry['type'] assert actual_dir_entry['revision'] == revision assert actual_dir_entry['path'] == expected_dir_entry['name'] del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): assert actual_dir_entry['content'][key] == expected_dir_entry[key] assert actual_dir_entry['content']['data'] == expected_data['data'] @given(revision()) def test_lookup_revision(archive_data, revision): actual_revision = service.lookup_revision(revision) assert actual_revision == archive_data.revision_get(revision) @given(new_revision()) def test_lookup_revision_invalid_msg(archive_data, new_revision): new_revision = new_revision.to_dict() new_revision['message'] = b'elegant fix for bug \xff' archive_data.revision_add([Revision.from_dict(new_revision)]) revision = service.lookup_revision(hash_to_hex(new_revision['id'])) assert revision['message'] is None assert revision['message_decoding_failed'] is True @given(new_revision()) def test_lookup_revision_msg_ok(archive_data, new_revision): archive_data.revision_add([new_revision]) revision_message = service.lookup_revision_message( hash_to_hex(new_revision.id)) assert revision_message == {'message': new_revision.message} def test_lookup_revision_msg_no_rev(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_revision_message(unknown_revision_) assert e.match('Revision with sha1_git %s not found.' % unknown_revision_) @given(revisions()) def test_lookup_revision_multiple(archive_data, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(archive_data.revision_get(rev)) assert actual_revisions == expected_revisions def test_lookup_revision_multiple_none_found(): unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] actual_revisions = list( service.lookup_revision_multiple(unknown_revisions_)) assert actual_revisions == [None] * len(unknown_revisions_) @given(revision()) def test_lookup_revision_log(archive_data, revision): actual_revision_log = list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = archive_data.revision_log(revision, limit=25) assert actual_revision_log == expected_revision_log def _get_origin_branches(archive_data, origin): origin_visit = archive_data.origin_visit_get(origin['url'])[-1] snapshot = archive_data.snapshot_get(origin_visit['snapshot']) branches = {k: v for (k, v) in snapshot['branches'].items() if v['target_type'] == 'revision'} return branches @given(origin()) def test_lookup_revision_log_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_log = list( service.lookup_revision_log_by(origin['url'], branch_name, None, limit=25)) expected_log = archive_data.revision_log( branches[branch_name]['target'], limit=25) assert actual_log == expected_log @given(origin()) def test_lookup_revision_log_by_notfound(origin): with pytest.raises(NotFoundExc): service.lookup_revision_log_by( origin['url'], 'unknown_branch_name', None, limit=100) def test_lookup_content_raw_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content_raw('sha1:' + unknown_content_['sha1']) assert e.match('Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_raw(archive_data, content): actual_content = service.lookup_content_raw( 'sha256:%s' % content['sha256']) expected_content = archive_data.content_get(content['sha1']) assert actual_content == expected_content def test_lookup_content_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content('sha1:%s' % unknown_content_['sha1']) assert e.match('Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_with_sha1(archive_data, content): actual_content = service.lookup_content('sha1:%s' % content['sha1']) expected_content = archive_data.content_get_metadata(content['sha1']) assert actual_content == expected_content @given(content()) def test_lookup_content_with_sha256(archive_data, content): actual_content = service.lookup_content('sha256:%s' % content['sha256']) expected_content = archive_data.content_get_metadata(content['sha1']) assert actual_content == expected_content def test_lookup_directory_bad_checksum(): with pytest.raises(BadInputExc): service.lookup_directory('directory_id') def test_lookup_directory_not_found(): unknown_directory_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory(unknown_directory_) assert e.match('Directory with sha1_git %s not found' % unknown_directory_) @given(directory()) def test_lookup_directory(archive_data, directory): actual_directory_ls = list(service.lookup_directory(directory)) expected_directory_ls = archive_data.directory_ls(directory) assert actual_directory_ls == expected_directory_ls @given(empty_directory()) def test_lookup_directory_empty(empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) assert actual_directory_ls == [] @given(origin()) def test_lookup_revision_by_nothing_found(origin): with pytest.raises(NotFoundExc): service.lookup_revision_by(origin['url'], 'invalid-branch-name') @given(origin()) def test_lookup_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_revision = service.lookup_revision_by(origin['url'], branch_name) expected_revision = archive_data.revision_get( branches[branch_name]['target']) assert actual_revision == expected_revision @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(origin, revision): with pytest.raises(NotFoundExc): service.lookup_revision_with_context_by(origin['url'], 'invalid-branch-name', None, revision) @given(origin()) def test_lookup_revision_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = archive_data.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev['parents']: children[rev_p].append(rev['id']) rev = root_rev_log[-1]['id'] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin['url'], branch_name, None, rev) expected_root_rev = archive_data.revision_get(root_rev) expected_rev = archive_data.revision_get(rev) expected_rev['children'] = children[rev] assert actual_root_rev == expected_root_rev assert actual_rev == expected_rev def test_lookup_revision_through_ko_not_implemented(): with pytest.raises(NotImplementedError): service.lookup_revision_through({'something-unknown': 10}) @given(origin()) def test_lookup_revision_through_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = archive_data.revision_log(root_rev) rev = root_rev_log[-1]['id'] assert service.lookup_revision_through({ 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, 'sha1_git': rev }) == service.lookup_revision_with_context_by(origin['url'], branch_name, None, rev) @given(origin()) def test_lookup_revision_through_with_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) assert service.lookup_revision_through({ 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, }) == service.lookup_revision_by(origin['url'], branch_name, None) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] sha1_git_root = ancestor_revisions['sha1_git_root'] assert service.lookup_revision_through({ 'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, }) == service.lookup_revision_with_context(sha1_git_root, sha1_git) @given(revision()) def test_lookup_revision_through_with_revision(revision): assert service.lookup_revision_through({ 'sha1_git': revision }) == service.lookup_revision(revision) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(revision): with pytest.raises(NotFoundExc): service.lookup_directory_through_revision( {'sha1_git': revision}, 'some/invalid/path') @given(revision()) def test_lookup_directory_through_revision_ok(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {'sha1_git': revision}, dir_entry['name'] ) == (revision, service.lookup_directory_with_revision(revision, dir_entry['name'])) @given(revision()) def test_lookup_directory_through_revision_ok_with_data( archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {'sha1_git': revision}, dir_entry['name'], with_data=True ) == (revision, service.lookup_directory_with_revision(revision, dir_entry['name'], with_data=True)) @given(content(), directory(), release(), revision(), snapshot()) def test_lookup_known_objects(archive_data, content, directory, release, revision, snapshot): expected = archive_data.content_find(content) assert service.lookup_object(CONTENT, content['sha1_git']) == expected expected = archive_data.directory_get(directory) assert service.lookup_object(DIRECTORY, directory) == expected expected = archive_data.release_get(release) assert service.lookup_object(RELEASE, release) == expected expected = archive_data.revision_get(revision) assert service.lookup_object(REVISION, revision) == expected expected = archive_data.snapshot_get(snapshot) assert service.lookup_object(SNAPSHOT, snapshot) == expected @given(unknown_content(), unknown_directory(), unknown_release(), unknown_revision(), unknown_snapshot()) def test_lookup_unknown_objects(unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot): with pytest.raises(NotFoundExc) as e: service.lookup_object(CONTENT, unknown_content['sha1_git']) assert e.match(r'Content.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(DIRECTORY, unknown_directory) assert e.match(r'Directory.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(RELEASE, unknown_release) assert e.match(r'Release.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(REVISION, unknown_revision) assert e.match(r'Revision.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(SNAPSHOT, unknown_snapshot) assert e.match(r'Snapshot.*not found') @given(invalid_sha1()) def test_lookup_invalid_objects(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_object('foo', invalid_sha1) assert e.match('Invalid swh object type') with pytest.raises(BadInputExc) as e: service.lookup_object(CONTENT, invalid_sha1) assert e.match('Invalid hash') with pytest.raises(BadInputExc) as e: service.lookup_object(DIRECTORY, invalid_sha1) assert e.match('Invalid checksum') with pytest.raises(BadInputExc) as e: service.lookup_object(RELEASE, invalid_sha1) assert e.match('Invalid checksum') with pytest.raises(BadInputExc) as e: service.lookup_object(REVISION, invalid_sha1) assert e.match('Invalid checksum') with pytest.raises(BadInputExc) as e: service.lookup_object(SNAPSHOT, invalid_sha1) assert e.match('Invalid checksum') def test_lookup_missing_hashes_non_present(): missing_cnt = random_sha1() missing_dir = random_sha1() missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_pids = { CONTENT: [hash_to_bytes(missing_cnt)], DIRECTORY: [hash_to_bytes(missing_dir)], REVISION: [hash_to_bytes(missing_rev)], RELEASE: [hash_to_bytes(missing_rel)], SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = service.lookup_missing_hashes(grouped_pids) assert actual_result == {missing_cnt, missing_dir, missing_rev, missing_rel, missing_snp} @given(content(), directory()) def test_lookup_missing_hashes_some_present(archive_data, content, directory): missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_pids = { CONTENT: [hash_to_bytes(content['sha1_git'])], DIRECTORY: [hash_to_bytes(directory)], REVISION: [hash_to_bytes(missing_rev)], RELEASE: [hash_to_bytes(missing_rel)], SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = service.lookup_missing_hashes(grouped_pids) assert actual_result == {missing_rev, missing_rel, missing_snp} @given(origin()) def test_lookup_origin_extra_trailing_slash(origin): origin_info = service.lookup_origin({'url': f"{origin['url']}/"}) assert origin_info['url'] == origin['url'] def test_lookup_origin_missing_trailing_slash(archive_data): deb_origin = Origin(url='http://snapshot.debian.org/package/r-base/') archive_data.origin_add_one(deb_origin) origin_info = service.lookup_origin({'url': deb_origin.url[:-1]}) assert origin_info['url'] == deb_origin.url diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index 556ed935..7b39535f 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,484 +1,484 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os import random from copy import deepcopy from typing import Dict from rest_framework.decorators import api_view from rest_framework.response import Response from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.ctags import CtagsIndexer from swh.indexer.storage import get_indexer_storage from swh.model import from_disk from swh.model.hashutil import hash_to_hex, DEFAULT_ALGORITHMS from swh.model.model import Content, Directory, Origin from swh.loader.git.from_disk import GitLoaderFromArchive from swh.search import get_search from swh.storage.algos.dir_iterators import dir_iterator from swh.web import config from swh.web.browse.utils import ( get_mimetype_and_encoding_for_content, prepare_content_for_display, _re_encode_content ) from swh.web.common import service from swh.web.common.highlightjs import get_hljs_language_from_filename # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { 'storage': { 'cls': 'memory', }, 'save_data': False, 'max_content_size': 100 * 1024 * 1024, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { 'storage': { 'cls': 'memory' }, 'objstorage': { 'cls': 'memory', 'args': {}, }, 'indexer_storage': { 'cls': 'memory', 'args': {}, } } def random_sha1(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) def random_sha256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_blake2s256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_content(): return { 'sha1': random_sha1(), 'sha1_git': random_sha1(), 'sha256': random_sha256(), 'blake2s256': random_blake2s256(), } # MimetypeIndexer with custom configuration for tests class _MimetypeIndexer(MimetypeIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" } } } # FossologyLicenseIndexer with custom configuration for tests class _FossologyLicenseIndexer(FossologyLicenseIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.fossology.license', 'tools': { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': { 'command_line': 'nomossa ', }, } } # CtagsIndexer with custom configuration for tests class _CtagsIndexer(CtagsIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.ctags', 'languages': {'c': 'c'}, 'tools': { 'name': 'universal-ctags', 'version': '~git7859817b', 'configuration': { 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' # noqa '''--output-format=json ''' }, } } # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { 'type': 'git', 'url': 'https://github.com/wcoder/highlightjs-line-numbers.js', 'archives': ['highlightjs-line-numbers.js.zip', 'highlightjs-line-numbers.js_visit2.zip'], 'visit_date': ['Dec 1 2018, 01:00 UTC', 'Jan 20 2019, 15:00 UTC'] }, { 'type': 'git', 'url': 'https://github.com/memononen/libtess2', 'archives': ['libtess2.zip'], 'visit_date': ['May 25 2018, 01:00 UTC'] }, { 'type': 'git', 'url': 'repo_with_submodules', 'archives': ['repo_with_submodules.tgz'], 'visit_date': ['Jan 1 2019, 01:00 UTC'] } ] _contents = {} # Tests data initialization def _init_tests_data(): # To hold reference to the memory storage storage = None # Create search instance search = get_search('memory', {}) search.initialize() search.origin_update({'url': origin['url']} for origin in _TEST_ORIGINS) # Load git repositories from archives for origin in _TEST_ORIGINS: for i, archive in enumerate(origin['archives']): origin_repo_archive = \ os.path.join(os.path.dirname(__file__), 'resources/repos/%s' % archive) loader = GitLoaderFromArchive(origin['url'], archive_path=origin_repo_archive, config=_TEST_LOADER_CONFIG, visit_date=origin['visit_date'][i]) if storage is None: storage = loader.storage else: loader.storage = storage loader.load() origin.update(storage.origin_get(origin)) # add an 'id' key if enabled search.origin_update([{'url': origin['url'], 'has_visits': True}]) for i in range(250): url = 'https://many.origins/%d' % (i+1) # storage.origin_add([{'url': url}]) storage.origin_add([Origin(url=url)]) search.origin_update([{'url': url, 'has_visits': True}]) visit = storage.origin_visit_add(url, '2019-12-03 13:55:05', 'tar') storage.origin_visit_update( - url, visit['visit'], + url, visit.visit, snapshot='1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') contents = set() directories = set() revisions = set() releases = set() snapshots = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: snp = storage.snapshot_get_latest(origin['url']) snapshots.add(hash_to_hex(snp['id'])) for branch_name, branch_data in snp['branches'].items(): if branch_data['target_type'] == 'revision': revisions.add(branch_data['target']) elif branch_data['target_type'] == 'release': release = next(storage.release_get([branch_data['target']])) revisions.add(release['target']) releases.add(hash_to_hex(branch_data['target'])) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): dir_id = rev['directory'] directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): content_path[entry['sha1']] = '/'.join( [hash_to_hex(dir_id), entry['path'].decode('utf-8')]) if entry['type'] == 'file': contents.add(entry['sha1']) elif entry['type'] == 'dir': directories.add(hash_to_hex(entry['target'])) # Get all checksums for each content result = storage.content_get_metadata(contents) contents = [] for sha1, contents_metadata in result.items(): for content_metadata in contents_metadata: contents.append({ algo: hash_to_hex(content_metadata[algo]) for algo in DEFAULT_ALGORITHMS }) path = content_path[sha1] cnt = next(storage.content_get([sha1])) mimetype, encoding = get_mimetype_and_encoding_for_content( cnt['data']) _, _, cnt['data'] = _re_encode_content( mimetype, encoding, cnt['data']) content_display_data = prepare_content_for_display( cnt['data'], mimetype, path) contents[-1]['path'] = path contents[-1]['mimetype'] = mimetype contents[-1]['encoding'] = encoding contents[-1]['hljs_language'] = content_display_data['language'] contents[-1]['data'] = content_display_data['content_data'] _contents[contents[-1]['sha1']] = contents[-1] # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage('memory', {}) # Add the empty directory to the test archive storage.directory_add([Directory(entries=[])]) # Return tests data return { 'search': search, 'storage': storage, 'idx_storage': idx_storage, 'origins': _TEST_ORIGINS, 'contents': contents, 'directories': list(directories), 'releases': list(releases), 'revisions': list(map(hash_to_hex, revisions)), 'snapshots': list(snapshots), 'generated_checksums': set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer), ('license_indexer', _FossologyLicenseIndexer), ('ctags_indexer', _CtagsIndexer)): idx = idx_class() idx.storage = tests_data['storage'] idx.objstorage = tests_data['storage'].objstorage idx.idx_storage = tests_data['idx_storage'] idx.register_tools(idx.config['tools']) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage, search): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update({ 'storage': storage, 'indexer_storage': idx_storage, 'search': search, }) service.storage = storage service.idx_storage = idx_storage service.search = search # Implement some special endpoints used to provide input tests data # when executing end to end tests with cypress _content_code_data_exts = {} # type: Dict[str, Dict[str, str]] _content_code_data_filenames = {} # type: Dict[str, Dict[str, str]] _content_other_data_exts = {} # type: Dict[str, Dict[str, str]] def _init_content_tests_data(data_path, data_dict, ext_key): """ Helper function to read the content of a directory, store it into a test archive and add some files metadata (sha1 and/or expected programming language) in a dict. Args: data_path (str): path to a directory relative to the tests folder of swh-web data_dict (dict): the dict that will store files metadata ext_key (bool): whether to use file extensions or filenames as dict keys """ test_contents_dir = os.path.join( os.path.dirname(__file__), data_path).encode('utf-8') directory = from_disk.Directory.from_disk(path=test_contents_dir) contents = [] for name, obj in directory.items(): if isinstance(obj, from_disk.Content): c = obj.to_model().with_data().to_dict() c['status'] = 'visible' sha1 = hash_to_hex(c['sha1']) if ext_key: key = name.decode('utf-8').split('.')[-1] filename = 'test.' + key else: filename = name.decode('utf-8').split('/')[-1] key = filename language = get_hljs_language_from_filename(filename) data_dict[key] = {'sha1': sha1, 'language': language} contents.append(Content.from_dict(c)) storage = get_tests_data()['storage'] storage.content_add(contents) def _init_content_code_data_exts(): """ Fill a global dictionary which maps source file extension to a code content example. """ global _content_code_data_exts _init_content_tests_data('resources/contents/code/extensions', _content_code_data_exts, True) def _init_content_other_data_exts(): """ Fill a global dictionary which maps a file extension to a content example. """ global _content_other_data_exts _init_content_tests_data('resources/contents/other/extensions', _content_other_data_exts, True) def _init_content_code_data_filenames(): """ Fill a global dictionary which maps a filename to a content example. """ global _content_code_data_filenames _init_content_tests_data('resources/contents/code/filenames', _content_code_data_filenames, False) if config.get_config()['e2e_tests_mode']: _init_content_code_data_exts() _init_content_other_data_exts() _init_content_code_data_filenames() @api_view(['GET']) def get_content_code_data_all_exts(request): """ Endpoint implementation returning a list of all source file extensions to test for highlighting using cypress. """ return Response(sorted(_content_code_data_exts.keys()), status=200, content_type='application/json') @api_view(['GET']) def get_content_code_data_by_ext(request, ext): """ Endpoint implementation returning metadata of a code content example based on the source file extension. """ data = None status = 404 if ext in _content_code_data_exts: data = _content_code_data_exts[ext] status = 200 return Response(data, status=status, content_type='application/json') @api_view(['GET']) def get_content_other_data_by_ext(request, ext): """ Endpoint implementation returning metadata of a content example based on the file extension. """ _init_content_other_data_exts() data = None status = 404 if ext in _content_other_data_exts: data = _content_other_data_exts[ext] status = 200 return Response(data, status=status, content_type='application/json') @api_view(['GET']) def get_content_code_data_all_filenames(request): """ Endpoint implementation returning a list of all source filenames to test for highlighting using cypress. """ return Response(sorted(_content_code_data_filenames.keys()), status=200, content_type='application/json') @api_view(['GET']) def get_content_code_data_by_filename(request, filename): """ Endpoint implementation returning metadata of a code content example based on the source filename. """ data = None status = 404 if filename in _content_code_data_filenames: data = _content_code_data_filenames[filename] status = 200 return Response(data, status=status, content_type='application/json')