diff --git a/swh/web/tests/api/views/test_metadata.py b/swh/web/tests/api/views/test_metadata.py index 2a1053bb..53babe4e 100644 --- a/swh/web/tests/api/views/test_metadata.py +++ b/swh/web/tests/api/views/test_metadata.py @@ -1,170 +1,188 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import attr from hypothesis import given, strategies import pytest from swh.model.hypothesis_strategies import raw_extrinsic_metadata from swh.web.common.utils import reverse from swh.web.tests.api.views.utils import scroll_results from swh.web.tests.utils import check_api_get_responses, check_http_get_response @given(raw_extrinsic_metadata()) -def test_api_raw_extrinsic_metadata(api_client, archive_data, metadata): - archive_data.metadata_authority_add([metadata.authority]) - archive_data.metadata_fetcher_add([metadata.fetcher]) - archive_data.raw_extrinsic_metadata_add([metadata]) +def test_api_raw_extrinsic_metadata(api_client, subtest, metadata): + # ensure archive_data fixture will be reset between each hypothesis + # example test run + @subtest + def test_inner(archive_data): + archive_data.metadata_authority_add([metadata.authority]) + archive_data.metadata_fetcher_add([metadata.fetcher]) + archive_data.raw_extrinsic_metadata_add([metadata]) + + authority = metadata.authority + url = reverse( + "api-1-raw-extrinsic-metadata-swhid", + url_args={"target": str(metadata.target)}, + query_params={"authority": f"{authority.type.value} {authority.url}"}, + ) + rv = check_api_get_responses(api_client, url, status_code=200) + + assert len(rv.data) == 1 + + expected_result = metadata.to_dict() + del expected_result["id"] + del expected_result["metadata"] + metadata_url = rv.data[0]["metadata_url"] + expected_result["metadata_url"] = metadata_url + expected_result["discovery_date"] = expected_result[ + "discovery_date" + ].isoformat() + assert rv.data == [expected_result] - authority = metadata.authority - url = reverse( - "api-1-raw-extrinsic-metadata-swhid", - url_args={"target": str(metadata.target)}, - query_params={"authority": f"{authority.type.value} {authority.url}"}, - ) - rv = check_api_get_responses(api_client, url, status_code=200) - - assert len(rv.data) == 1 - - expected_result = metadata.to_dict() - del expected_result["id"] - del expected_result["metadata"] - metadata_url = rv.data[0]["metadata_url"] - expected_result["metadata_url"] = metadata_url - expected_result["discovery_date"] = expected_result["discovery_date"].isoformat() - assert rv.data == [expected_result] - - rv = check_http_get_response(api_client, metadata_url, status_code=200) - assert rv["Content-Type"] == "application/octet-stream" - assert ( - rv["Content-Disposition"] - == f'attachment; filename="{metadata.target}_metadata"' - ) - assert rv.content == metadata.metadata + rv = check_http_get_response(api_client, metadata_url, status_code=200) + assert rv["Content-Type"] == "application/octet-stream" + assert ( + rv["Content-Disposition"] + == f'attachment; filename="{metadata.target}_metadata"' + ) + assert rv.content == metadata.metadata @pytest.mark.parametrize("limit", [1, 2, 10, 100]) @given(strategies.sets(raw_extrinsic_metadata(), min_size=1)) -def test_api_raw_extrinsic_metadata_scroll(api_client, archive_data, limit, metadata): - # Make all metadata objects use the same authority and target - metadata0 = next(iter(metadata)) - metadata = { - attr.evolve(m, authority=metadata0.authority, target=metadata0.target) - for m in metadata - } - authority = metadata0.authority - - archive_data.metadata_authority_add([authority]) - archive_data.metadata_fetcher_add(list({m.fetcher for m in metadata})) - archive_data.raw_extrinsic_metadata_add(metadata) +def test_api_raw_extrinsic_metadata_scroll(api_client, subtest, limit, meta): + # ensure archive_data fixture will be reset between each hypothesis + # example test run + @subtest + def test_inner(archive_data): + # Make all metadata objects use the same authority and target + metadata0 = next(iter(meta)) + metadata = { + attr.evolve(m, authority=metadata0.authority, target=metadata0.target) + for m in meta + } + # Metadata ids must also be updated as they depend on authority and target + metadata = {attr.evolve(m, id=m.compute_hash()) for m in metadata} + authority = metadata0.authority + + archive_data.metadata_authority_add([authority]) + archive_data.metadata_fetcher_add(list({m.fetcher for m in metadata})) + archive_data.raw_extrinsic_metadata_add(metadata) + + url = reverse( + "api-1-raw-extrinsic-metadata-swhid", + url_args={"target": str(metadata0.target)}, + query_params={ + "authority": f"{authority.type.value} {authority.url}", + "limit": limit, + }, + ) - url = reverse( - "api-1-raw-extrinsic-metadata-swhid", - url_args={"target": str(metadata0.target)}, - query_params={ - "authority": f"{authority.type.value} {authority.url}", - "limit": limit, - }, - ) + results = scroll_results(api_client, url) - results = scroll_results(api_client, url) + expected_results = [m.to_dict() for m in metadata] - expected_results = [m.to_dict() for m in metadata] - for expected_result in expected_results: - del expected_result["id"] - del expected_result["metadata"] - expected_result["discovery_date"] = expected_result[ - "discovery_date" - ].isoformat() + for expected_result in expected_results: + del expected_result["id"] + del expected_result["metadata"] + expected_result["discovery_date"] = expected_result[ + "discovery_date" + ].isoformat() - for result in results: - del result["metadata_url"] + assert len(results) == len(expected_results) - assert results == expected_results + for result in results: + del result["metadata_url"] + assert result in expected_results _swhid = "swh:1:dir:a2faa28028657859c16ff506924212b33f0e1307" @pytest.mark.parametrize( "status_code,url_args,query_params", [ pytest.param( 200, {"target": _swhid}, {"authority": "forge http://example.org"}, id="minimal working", ), pytest.param( 200, {"target": _swhid}, { "authority": "forge http://example.org", "after": "2021-06-18T09:31:09", "limit": 100, }, id="maximal working", ), pytest.param( 400, {"target": _swhid}, {"authority": "foo http://example.org"}, id="invalid authority type", ), pytest.param( 400, {"target": _swhid}, {"authority": "forge http://example.org", "after": "yesterday",}, id="invalid 'after' format", ), pytest.param( 400, {"target": _swhid}, {"authority": "forge http://example.org", "limit": "abc",}, id="invalid 'limit'", ), ], ) def test_api_raw_extrinsic_metadata_check_params( api_client, archive_data, status_code, url_args, query_params ): url = reverse( "api-1-raw-extrinsic-metadata-swhid", url_args=url_args, query_params=query_params, ) check_api_get_responses(api_client, url, status_code=status_code) @given(raw_extrinsic_metadata()) -def test_api_raw_extrinsic_metadata_list_authorities( - api_client, archive_data, metadata -): - archive_data.metadata_authority_add([metadata.authority]) - archive_data.metadata_fetcher_add([metadata.fetcher]) - archive_data.raw_extrinsic_metadata_add([metadata]) - - authority = metadata.authority - url = reverse( - "api-1-raw-extrinsic-metadata-swhid-authorities", - url_args={"target": str(metadata.target)}, - ) - rv = check_api_get_responses(api_client, url, status_code=200) - - expected_results = [ - { - "type": authority.type.value, - "url": authority.url, - "metadata_list_url": "http://testserver" - + reverse( - "api-1-raw-extrinsic-metadata-swhid", - url_args={"target": str(metadata.target)}, - query_params={"authority": f"{authority.type.value} {authority.url}"}, - ), - } - ] - - assert rv.data == expected_results +def test_api_raw_extrinsic_metadata_list_authorities(api_client, subtest, metadata): + # ensure archive_data fixture will be reset between each hypothesis + # example test run + @subtest + def test_inner(archive_data): + archive_data.metadata_authority_add([metadata.authority]) + archive_data.metadata_fetcher_add([metadata.fetcher]) + archive_data.raw_extrinsic_metadata_add([metadata]) + + authority = metadata.authority + url = reverse( + "api-1-raw-extrinsic-metadata-swhid-authorities", + url_args={"target": str(metadata.target)}, + ) + rv = check_api_get_responses(api_client, url, status_code=200) + + expected_results = [ + { + "type": authority.type.value, + "url": authority.url, + "metadata_list_url": "http://testserver" + + reverse( + "api-1-raw-extrinsic-metadata-swhid", + url_args={"target": str(metadata.target)}, + query_params={ + "authority": f"{authority.type.value} {authority.url}" + }, + ), + } + ] + + assert rv.data == expected_results diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index 43c6c1af..be9428f1 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,714 +1,737 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import json from hypothesis import given import pytest from swh.indexer.storage.model import OriginIntrinsicMetadataRow from swh.model.hashutil import hash_to_bytes from swh.model.model import Origin, OriginVisit, OriginVisitStatus from swh.search.interface import PagedResult from swh.storage.exc import StorageAPIError, StorageDBError from swh.storage.utils import now from swh.web.api.utils import enrich_origin, enrich_origin_visit from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse from swh.web.tests.api.views.utils import scroll_results from swh.web.tests.data import ( INDEXER_TOOL, ORIGIN_MASTER_REVISION, ORIGIN_METADATA_KEY, ORIGIN_METADATA_VALUE, ) from swh.web.tests.strategies import new_origin, new_snapshots, origin, visit_dates from swh.web.tests.utils import check_api_get_responses def test_api_lookup_origin_visits_raise_error(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "voluntary error to check the bad request middleware." mock_get_origin_visits.side_effect = BadInputExc(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == {"exception": "BadInputExc", "reason": err_msg} def test_api_lookup_origin_visits_raise_swh_storage_error_db(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage exploded! Will be back online shortly!" mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageDBError", "reason": "An unexpected error occurred in the backend: %s" % err_msg, } def test_api_lookup_origin_visits_raise_swh_storage_error_api(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage API dropped dead! Will resurrect asap!" mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageAPIError", "reason": "An unexpected error occurred in the api backend: %s" % err_msg, } @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits( - api_client, archive_data, new_origin, visit_dates, new_snapshots + api_client, subtest, new_origin, visit_dates, new_snapshots ): + # ensure archive_data fixture will be reset between each hypothesis + # example test run + @subtest + def test_inner(archive_data): + archive_data.origin_add([new_origin]) + for i, visit_date in enumerate(visit_dates): + origin_visit = archive_data.origin_visit_add( + [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] + )[0] + archive_data.snapshot_add([new_snapshots[i]]) + visit_status = OriginVisitStatus( + origin=new_origin.url, + visit=origin_visit.visit, + date=now(), + status="full", + snapshot=new_snapshots[i].id, + ) + archive_data.origin_visit_status_add([visit_status]) - archive_data.origin_add([new_origin]) - for i, visit_date in enumerate(visit_dates): - origin_visit = archive_data.origin_visit_add( - [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] - )[0] - archive_data.snapshot_add([new_snapshots[i]]) - visit_status = OriginVisitStatus( - origin=new_origin.url, - visit=origin_visit.visit, - date=now(), - status="full", - snapshot=new_snapshots[i].id, - ) - archive_data.origin_visit_status_add([visit_status]) + all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) - all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) + for last_visit, expected_visits in ( + (None, all_visits[:2]), + (all_visits[1]["visit"], all_visits[2:]), + ): - for last_visit, expected_visits in ( - (None, all_visits[:2]), - (all_visits[1]["visit"], all_visits[2:]), - ): + url = reverse( + "api-1-origin-visits", + url_args={"origin_url": new_origin.url}, + query_params={"per_page": 2, "last_visit": last_visit}, + ) - url = reverse( - "api-1-origin-visits", - url_args={"origin_url": new_origin.url}, - query_params={"per_page": 2, "last_visit": last_visit}, - ) + rv = check_api_get_responses(api_client, url, status_code=200) - rv = check_api_get_responses(api_client, url, status_code=200) + for i in range(len(expected_visits)): + expected_visits[i] = enrich_origin_visit( + expected_visits[i], + with_origin_link=False, + with_origin_visit_link=True, + request=rv.wsgi_request, + ) - for i in range(len(expected_visits)): - expected_visits[i] = enrich_origin_visit( - expected_visits[i], - with_origin_link=False, - with_origin_visit_link=True, - request=rv.wsgi_request, - ) - - assert rv.data == expected_visits + assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id( - api_client, archive_data, new_origin, visit_dates, new_snapshots + api_client, subtest, new_origin, visit_dates, new_snapshots ): - archive_data.origin_add([new_origin]) - for i, visit_date in enumerate(visit_dates): - origin_visit = archive_data.origin_visit_add( - [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] - )[0] - archive_data.snapshot_add([new_snapshots[i]]) - visit_status = OriginVisitStatus( - origin=new_origin.url, - visit=origin_visit.visit, - date=now(), - status="full", - snapshot=new_snapshots[i].id, - ) - archive_data.origin_visit_status_add([visit_status]) + # ensure archive_data fixture will be reset between each hypothesis + # example test run + @subtest + def test_inner(archive_data): + archive_data.origin_add([new_origin]) + for i, visit_date in enumerate(visit_dates): + origin_visit = archive_data.origin_visit_add( + [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] + )[0] + archive_data.snapshot_add([new_snapshots[i]]) + visit_status = OriginVisitStatus( + origin=new_origin.url, + visit=origin_visit.visit, + date=now(), + status="full", + snapshot=new_snapshots[i].id, + ) + archive_data.origin_visit_status_add([visit_status]) - all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) + all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) - for last_visit, expected_visits in ( - (None, all_visits[:2]), - (all_visits[1]["visit"], all_visits[2:4]), - ): + for last_visit, expected_visits in ( + (None, all_visits[:2]), + (all_visits[1]["visit"], all_visits[2:4]), + ): - url = reverse( - "api-1-origin-visits", - url_args={"origin_url": new_origin.url}, - query_params={"per_page": 2, "last_visit": last_visit}, - ) + url = reverse( + "api-1-origin-visits", + url_args={"origin_url": new_origin.url}, + query_params={"per_page": 2, "last_visit": last_visit}, + ) - rv = check_api_get_responses(api_client, url, status_code=200) + rv = check_api_get_responses(api_client, url, status_code=200) - for i in range(len(expected_visits)): - expected_visits[i] = enrich_origin_visit( - expected_visits[i], - with_origin_link=False, - with_origin_visit_link=True, - request=rv.wsgi_request, - ) + for i in range(len(expected_visits)): + expected_visits[i] = enrich_origin_visit( + expected_visits[i], + with_origin_link=False, + with_origin_visit_link=True, + request=rv.wsgi_request, + ) - assert rv.data == expected_visits + assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit( - api_client, archive_data, new_origin, visit_dates, new_snapshots + api_client, subtest, new_origin, visit_dates, new_snapshots ): - archive_data.origin_add([new_origin]) - for i, visit_date in enumerate(visit_dates): - origin_visit = archive_data.origin_visit_add( - [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] - )[0] - visit_id = origin_visit.visit - archive_data.snapshot_add([new_snapshots[i]]) - visit_status = OriginVisitStatus( - origin=new_origin.url, - visit=origin_visit.visit, - date=visit_date + timedelta(minutes=5), - status="full", - snapshot=new_snapshots[i].id, - ) - archive_data.origin_visit_status_add([visit_status]) - url = reverse( - "api-1-origin-visit", - url_args={"origin_url": new_origin.url, "visit_id": visit_id}, - ) + # ensure archive_data fixture will be reset between each hypothesis + # example test run + @subtest + def test_inner(archive_data): + archive_data.origin_add([new_origin]) + for i, visit_date in enumerate(visit_dates): + origin_visit = archive_data.origin_visit_add( + [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] + )[0] + visit_id = origin_visit.visit + archive_data.snapshot_add([new_snapshots[i]]) + visit_status = OriginVisitStatus( + origin=new_origin.url, + visit=origin_visit.visit, + date=visit_date + timedelta(minutes=5), + status="full", + snapshot=new_snapshots[i].id, + ) + archive_data.origin_visit_status_add([visit_status]) + url = reverse( + "api-1-origin-visit", + url_args={"origin_url": new_origin.url, "visit_id": visit_id}, + ) - rv = check_api_get_responses(api_client, url, status_code=200) + rv = check_api_get_responses(api_client, url, status_code=200) - expected_visit = archive_data.origin_visit_get_by(new_origin.url, visit_id) + expected_visit = archive_data.origin_visit_get_by(new_origin.url, visit_id) - expected_visit = enrich_origin_visit( - expected_visit, - with_origin_link=True, - with_origin_visit_link=False, - request=rv.wsgi_request, - ) + expected_visit = enrich_origin_visit( + expected_visit, + with_origin_link=True, + with_origin_visit_link=False, + request=rv.wsgi_request, + ) - assert rv.data == expected_visit + assert rv.data == expected_visit @given(new_origin()) def test_api_lookup_origin_visit_latest_no_visit(api_client, archive_data, new_origin): archive_data.origin_add([new_origin]) url = reverse("api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No visit for origin %s found" % new_origin.url, } @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest( - api_client, archive_data, new_origin, visit_dates, new_snapshots + api_client, subtest, new_origin, visit_dates, new_snapshots ): - archive_data.origin_add([new_origin]) - visit_dates.sort() - visit_ids = [] - for i, visit_date in enumerate(visit_dates): - origin_visit = archive_data.origin_visit_add( - [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] - )[0] - visit_ids.append(origin_visit.visit) - - archive_data.snapshot_add([new_snapshots[0]]) - - visit_status = OriginVisitStatus( - origin=new_origin.url, - visit=visit_ids[0], - date=now(), - status="full", - snapshot=new_snapshots[0].id, - ) - archive_data.origin_visit_status_add([visit_status]) + # ensure archive_data fixture will be reset between each hypothesis + # example test run + @subtest + def test_inner(archive_data): + archive_data.origin_add([new_origin]) + visit_dates.sort() + visit_ids = [] + for i, visit_date in enumerate(visit_dates): + origin_visit = archive_data.origin_visit_add( + [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] + )[0] + visit_ids.append(origin_visit.visit) + + archive_data.snapshot_add([new_snapshots[0]]) - url = reverse("api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}) + visit_status = OriginVisitStatus( + origin=new_origin.url, + visit=visit_ids[0], + date=now(), + status="full", + snapshot=new_snapshots[0].id, + ) + archive_data.origin_visit_status_add([visit_status]) - rv = check_api_get_responses(api_client, url, status_code=200) + url = reverse( + "api-1-origin-visit-latest", url_args={"origin_url": new_origin.url} + ) - expected_visit = archive_data.origin_visit_get_by(new_origin.url, visit_ids[1]) + rv = check_api_get_responses(api_client, url, status_code=200) - expected_visit = enrich_origin_visit( - expected_visit, - with_origin_link=True, - with_origin_visit_link=False, - request=rv.wsgi_request, - ) + expected_visit = archive_data.origin_visit_status_get_latest( + new_origin.url, type="git" + ) + + expected_visit = enrich_origin_visit( + expected_visit, + with_origin_link=True, + with_origin_visit_link=False, + request=rv.wsgi_request, + ) - assert rv.data == expected_visit + assert rv.data == expected_visit @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest_with_snapshot( - api_client, archive_data, new_origin, visit_dates, new_snapshots + api_client, subtest, new_origin, visit_dates, new_snapshots ): - archive_data.origin_add([new_origin]) - visit_dates.sort() - visit_ids = [] - for i, visit_date in enumerate(visit_dates): - origin_visit = archive_data.origin_visit_add( - [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] - )[0] - visit_ids.append(origin_visit.visit) - - archive_data.snapshot_add([new_snapshots[0]]) - - # Add snapshot to the latest visit - visit_id = visit_ids[-1] - visit_status = OriginVisitStatus( - origin=new_origin.url, - visit=visit_id, - date=now(), - status="full", - snapshot=new_snapshots[0].id, - ) - archive_data.origin_visit_status_add([visit_status]) + # ensure archive_data fixture will be reset between each hypothesis + # example test run + @subtest + def test_inner(archive_data): + archive_data.origin_add([new_origin]) + visit_dates.sort() + visit_ids = [] + for i, visit_date in enumerate(visit_dates): + origin_visit = archive_data.origin_visit_add( + [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] + )[0] + visit_ids.append(origin_visit.visit) + + archive_data.snapshot_add([new_snapshots[0]]) + + # Add snapshot to the latest visit + visit_id = visit_ids[-1] + visit_status = OriginVisitStatus( + origin=new_origin.url, + visit=visit_id, + date=now(), + status="full", + snapshot=new_snapshots[0].id, + ) + archive_data.origin_visit_status_add([visit_status]) - url = reverse( - "api-1-origin-visit-latest", - url_args={"origin_url": new_origin.url}, - query_params={"require_snapshot": True}, - ) + url = reverse( + "api-1-origin-visit-latest", + url_args={"origin_url": new_origin.url}, + query_params={"require_snapshot": True}, + ) - rv = check_api_get_responses(api_client, url, status_code=200) + rv = check_api_get_responses(api_client, url, status_code=200) - expected_visit = archive_data.origin_visit_status_get_latest( - new_origin.url, type="git", require_snapshot=True - ) + expected_visit = archive_data.origin_visit_status_get_latest( + new_origin.url, type="git", require_snapshot=True + ) - expected_visit = enrich_origin_visit( - expected_visit, - with_origin_link=True, - with_origin_visit_link=False, - request=rv.wsgi_request, - ) + expected_visit = enrich_origin_visit( + expected_visit, + with_origin_link=True, + with_origin_visit_link=False, + request=rv.wsgi_request, + ) - assert rv.data == expected_visit + assert rv.data == expected_visit @given(origin()) def test_api_lookup_origin_visit_not_found(api_client, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v["visit"] for v in all_visits]) url = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": max_visit_id + 1}, ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin %s or its visit with id %s not found!" % (origin["url"], max_visit_id + 1), } def test_api_origins_wrong_input(api_client, archive_data): """Should fail with 400 if the input is deprecated. """ # fail if wrong input url = reverse("api-1-origins", query_params={"origin_from": 1}) rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == { "exception": "BadInputExc", "reason": "Please use the Link header to browse through result", } def test_api_origins(api_client, archive_data): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} # Get only one url = reverse("api-1-origins", query_params={"origin_count": 1}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= origin_urls # Get all url = reverse("api-1-origins", query_params={"origin_count": len(origins)}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls # Get "all + 10" url = reverse("api-1-origins", query_params={"origin_count": len(origins) + 10}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls @pytest.mark.parametrize("origin_count", [1, 2, 10, 100]) def test_api_origins_scroll(api_client, archive_data, origin_count): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} url = reverse("api-1-origins", query_params={"origin_count": origin_count}) results = scroll_results(api_client, url) assert len(results) == len(origins) assert {origin["url"] for origin in results} == origin_urls @given(origin()) def test_api_origin_by_url(api_client, archive_data, origin): origin_url = origin["url"] url = reverse("api-1-origin", url_args={"origin_url": origin_url}) rv = check_api_get_responses(api_client, url, status_code=200) expected_origin = archive_data.origin_get([origin_url])[0] expected_origin = enrich_origin(expected_origin, rv.wsgi_request) assert rv.data == expected_origin @given(new_origin()) def test_api_origin_not_found(api_client, new_origin): url = reverse("api-1-origin", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin with url %s not found!" % new_origin.url, } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } # Search for 'github.com', get only one url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 1}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] # Search for 'github.com', get all url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] # Search for 'github.com', get more than available url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 10}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_words(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "com github"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "memononen libtess2"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } url = reverse( "api-1-origin-search", url_args={"url_pattern": "libtess2 memononen"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_visit_type(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com",}, query_params={"visit_type": "git"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com",}, query_params={"visit_type": "foo"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == [] def test_api_origin_search_use_ql(api_client, mocker): mock_config = mocker.patch("swh.web.common.archive.config") mock_config.get_config.return_value = { "search_config": {"backend": "swh-search", "enable_ql": True} } expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } ORIGINS = [{"url": origin} for origin in expected_origins] mock_archive_search = mocker.patch("swh.web.common.archive.search") mock_archive_search.origin_search.return_value = PagedResult( results=ORIGINS, next_page_token=None, ) url = reverse( "api-1-origin-search", url_args={"url_pattern": "origin = 'github.com'",}, query_params={"visit_type": "git", "use_ql": "true"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) @pytest.mark.parametrize("limit", [1, 2, 3, 10]) def test_api_origin_search_scroll(api_client, archive_data, mocker, limit, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": limit}, ) results = scroll_results(api_client, url) assert {origin["url"] for origin in results} == expected_origins @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_limit(api_client, archive_data, tests_data, mocker, backend): if backend == "swh-search": tests_data["search"].origin_update( [{"url": "http://foobar/{}".format(i)} for i in range(2000)] ) else: # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) archive_data.origin_add( [Origin(url="http://foobar/{}".format(i)) for i in range(2000)] ) url = reverse( "api-1-origin-search", url_args={"url_pattern": "foobar"}, query_params={"limit": 1050}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1000 @pytest.mark.parametrize("backend", ["swh-search", "swh-indexer-storage"]) def test_api_origin_metadata_search(api_client, mocker, backend): mock_config = mocker.patch("swh.web.common.archive.config") mock_config.get_config.return_value = {"search_config": {"backend": backend}} url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE} ) rv = check_api_get_responses(api_client, url, status_code=200) rv.data = sorted(rv.data, key=lambda d: d["url"]) expected_data = sorted( [ { "url": origin_url, "metadata": { "from_revision": ORIGIN_MASTER_REVISION[origin_url], "tool": { "name": INDEXER_TOOL["tool_name"], "version": INDEXER_TOOL["tool_version"], "configuration": INDEXER_TOOL["tool_configuration"], "id": INDEXER_TOOL["id"], }, "mappings": [], }, } for origin_url in sorted(ORIGIN_MASTER_REVISION.keys()) ], key=lambda d: d["url"], ) for i in range(len(expected_data)): expected = expected_data[i] response = rv.data[i] metadata = response["metadata"].pop("metadata") assert any( [ORIGIN_METADATA_VALUE in json.dumps(val) for val in metadata.values()] ) assert response == expected def test_api_origin_metadata_search_limit(api_client, mocker): mock_idx_storage = mocker.patch("swh.web.common.archive.idx_storage") oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [ OriginIntrinsicMetadataRow( id=origin_url, from_revision=hash_to_bytes(master_rev), indexer_configuration_id=INDEXER_TOOL["id"], metadata={ORIGIN_METADATA_KEY: ORIGIN_METADATA_VALUE}, mappings=[], ) for origin_url, master_rev in ORIGIN_MASTER_REVISION.items() ] url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE} ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=70) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE, "limit": 10}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=10) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE, "limit": 987}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=100) @given(origin()) def test_api_origin_intrinsic_metadata(api_client, origin): url = reverse( "api-origin-intrinsic-metadata", url_args={"origin_url": origin["url"]} ) rv = check_api_get_responses(api_client, url, status_code=200) assert ORIGIN_METADATA_KEY in rv.data assert rv.data[ORIGIN_METADATA_KEY] == ORIGIN_METADATA_VALUE def test_api_origin_metadata_search_invalid(api_client, mocker): mock_idx_storage = mocker.patch("swh.web.common.archive.idx_storage") url = reverse("api-1-origin-metadata-search") check_api_get_responses(api_client, url, status_code=400) mock_idx_storage.assert_not_called() @pytest.mark.parametrize("backend", ["swh-counters", "swh-storage"]) def test_api_stat_counters(api_client, mocker, backend): mock_config = mocker.patch("swh.web.common.archive.config") mock_config.get_config.return_value = {"counters_backend": backend} url = reverse("api-1-stat-counters") rv = check_api_get_responses(api_client, url, status_code=200) counts = json.loads(rv.content) for obj in ["content", "origin", "release", "directory", "revision"]: assert counts.get(obj, 0) > 0 diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py index b59e3678..054e9423 100644 --- a/swh/web/tests/browse/views/test_origin.py +++ b/swh/web/tests/browse/views/test_origin.py @@ -1,1301 +1,1299 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random import re import string from hypothesis import given from django.utils.html import escape from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT from swh.model.model import ( OriginVisit, OriginVisitStatus, Snapshot, SnapshotBranch, TargetType, ) from swh.storage.utils import now from swh.web.browse.snapshot_context import process_snapshot_branches from swh.web.common.exc import NotFoundExc from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import ( format_utc_iso_date, gen_path_info, parse_iso8601_date_to_utc, reverse, ) from swh.web.tests.data import get_content, random_sha1 from swh.web.tests.django_asserts import assert_contains, assert_not_contains from swh.web.tests.strategies import ( new_origin, new_snapshot, origin, origin_with_multiple_visits, origin_with_pull_request_branches, origin_with_releases, ) from swh.web.tests.strategies import release as existing_release from swh.web.tests.strategies import revisions, unknown_revision, visit_dates from swh.web.tests.utils import check_html_get_response @given(origin_with_multiple_visits()) def test_origin_visits_browse(client, archive_data, origin): url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]}) resp = check_html_get_response( client, url, status_code=200, template_used="browse/origin-visits.html" ) visits = archive_data.origin_visit_get(origin["url"]) for v in visits: vdate = format_utc_iso_date(v["date"], "%Y-%m-%dT%H:%M:%SZ") browse_dir_url = reverse( "browse-origin-directory", query_params={"origin_url": origin["url"], "timestamp": vdate}, ) assert_contains(resp, browse_dir_url) _check_origin_link(resp, origin["url"]) @given(origin_with_multiple_visits()) def test_origin_content_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) def _get_archive_data(visit_idx): snapshot = archive_data.snapshot_get(origin_visits[visit_idx]["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) dir_content = archive_data.directory_ls(head_rev["directory"]) dir_files = [e for e in dir_content if e["type"] == "file"] dir_file = random.choice(dir_files) branches, releases, _ = process_snapshot_branches(snapshot) return { "branches": branches, "releases": releases, "root_dir_sha1": head_rev["directory"], "content": get_content(dir_file["checksums"]["sha1"]), "visit": origin_visits[visit_idx], "snapshot_sizes": archive_data.snapshot_count_branches(snapshot["id"]), } tdata = _get_archive_data(-1) _origin_content_view_test_helper( client, archive_data, origin, origin_visits[-1], tdata["snapshot_sizes"], tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], ) _origin_content_view_test_helper( client, archive_data, origin, origin_visits[-1], tdata["snapshot_sizes"], tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], timestamp=tdata["visit"]["date"], ) _origin_content_view_test_helper( client, archive_data, origin, origin_visits[-1], tdata["snapshot_sizes"], tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], snapshot_id=tdata["visit"]["snapshot"], ) tdata = _get_archive_data(0) _origin_content_view_test_helper( client, archive_data, origin, origin_visits[0], tdata["snapshot_sizes"], tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], visit_id=tdata["visit"]["visit"], ) _origin_content_view_test_helper( client, archive_data, origin, origin_visits[0], tdata["snapshot_sizes"], tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], snapshot_id=tdata["visit"]["snapshot"], ) @given(origin()) def test_origin_root_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) snapshot_sizes = archive_data.snapshot_count_branches(snapshot["id"]) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev["directory"] dir_content = archive_data.directory_ls(root_dir_sha1) branches, releases, _ = process_snapshot_branches(snapshot) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, dir_content, ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, dir_content, visit_id=visit["visit"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, dir_content, timestamp=visit["date"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, dir_content, snapshot_id=visit["snapshot"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, dir_content, ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, dir_content, visit_id=visit["visit"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, dir_content, timestamp=visit["date"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, dir_content, snapshot_id=visit["snapshot"], ) @given(origin()) def test_origin_sub_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) snapshot_sizes = archive_data.snapshot_count_branches(snapshot["id"]) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev["directory"] subdirs = [ e for e in archive_data.directory_ls(root_dir_sha1) if e["type"] == "dir" ] branches, releases, _ = process_snapshot_branches(snapshot) if len(subdirs) == 0: return subdir = random.choice(subdirs) subdir_content = archive_data.directory_ls(subdir["target"]) subdir_path = subdir["name"] _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit["visit"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit["date"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, snapshot_id=visit["snapshot"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit["visit"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit["date"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, snapshot_sizes, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, snapshot_id=visit["snapshot"], ) @given(origin()) def test_origin_branches(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) snapshot_sizes = archive_data.snapshot_count_branches(snapshot["id"]) snapshot_content = process_snapshot_branches(snapshot) _origin_branches_test_helper(client, origin, snapshot_content, snapshot_sizes) _origin_branches_test_helper( client, origin, snapshot_content, snapshot_sizes, snapshot_id=visit["snapshot"] ) @given(origin()) def test_origin_releases(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) snapshot_sizes = archive_data.snapshot_count_branches(snapshot["id"]) snapshot_content = process_snapshot_branches(snapshot) _origin_releases_test_helper(client, origin, snapshot_content, snapshot_sizes) _origin_releases_test_helper( client, origin, snapshot_content, snapshot_sizes, snapshot_id=visit["snapshot"] ) @given( new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=3, max_size=3), ) def test_origin_snapshot_null_branch( client, archive_data, new_origin, new_snapshot, visit_dates, revisions ): snp_dict = new_snapshot.to_dict() archive_data.origin_add([new_origin]) for i, branch in enumerate(snp_dict["branches"].keys()): if i == 0: snp_dict["branches"][branch] = None else: snp_dict["branches"][branch] = { "target_type": "revision", "target": hash_to_bytes(revisions[i - 1]), } archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)] )[0] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=now(), status="partial", snapshot=snp_dict["id"], ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "browse-origin-directory", query_params={"origin_url": new_origin.url} ) check_html_get_response( client, url, status_code=200, template_used="browse/directory.html" ) @given( new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=4, max_size=4), ) def test_origin_snapshot_invalid_branch( client, archive_data, new_origin, new_snapshot, visit_dates, revisions ): snp_dict = new_snapshot.to_dict() archive_data.origin_add([new_origin]) for i, branch in enumerate(snp_dict["branches"].keys()): snp_dict["branches"][branch] = { "target_type": "revision", "target": hash_to_bytes(revisions[i]), } archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)] )[0] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=now(), status="full", snapshot=snp_dict["id"], ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "browse-origin-directory", query_params={"origin_url": new_origin.url, "branch": "invalid_branch"}, ) check_html_get_response(client, url, status_code=404, template_used="error.html") @given(new_origin()) def test_browse_visits_origin_not_found(client, new_origin): url = reverse("browse-origin-visits", query_params={"origin_url": new_origin.url}) resp = check_html_get_response( client, url, status_code=404, template_used="error.html" ) assert_contains( resp, f"Origin with url {new_origin.url} not found", status_code=404 ) @given(origin()) def test_browse_origin_directory_no_visit(client, mocker, origin): mock_get_origin_visits = mocker.patch( "swh.web.common.origin_visits.get_origin_visits" ) mock_get_origin_visits.return_value = [] mock_archive = mocker.patch("swh.web.common.origin_visits.archive") mock_archive.lookup_origin_visit_latest.return_value = None url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]}) resp = check_html_get_response( client, url, status_code=404, template_used="error.html" ) assert_contains(resp, "No valid visit", status_code=404) assert not mock_get_origin_visits.called @given(origin()) def test_browse_origin_directory_unknown_visit(client, mocker, origin): mock_get_origin_visits = mocker.patch( "swh.web.common.origin_visits.get_origin_visits" ) mock_get_origin_visits.return_value = [{"visit": 1}] url = reverse( "browse-origin-directory", query_params={"origin_url": origin["url"], "visit_id": 2}, ) resp = check_html_get_response( client, url, status_code=404, template_used="error.html" ) assert re.search("Visit.*not found", resp.content.decode("utf-8")) assert mock_get_origin_visits.called @given(origin()) def test_browse_origin_directory_not_found(client, origin): url = reverse( "browse-origin-directory", query_params={"origin_url": origin["url"], "path": "/invalid/dir/path/"}, ) resp = check_html_get_response( client, url, status_code=404, template_used="browse/directory.html" ) assert re.search("Directory.*not found", resp.content.decode("utf-8")) @given(origin()) def test_browse_origin_content_no_visit(client, mocker, origin): mock_get_origin_visits = mocker.patch( "swh.web.common.origin_visits.get_origin_visits" ) mock_get_origin_visits.return_value = [] mock_archive = mocker.patch("swh.web.common.origin_visits.archive") mock_archive.lookup_origin_visit_latest.return_value = None url = reverse( "browse-origin-content", query_params={"origin_url": origin["url"], "path": "foo"}, ) resp = check_html_get_response( client, url, status_code=404, template_used="error.html" ) assert_contains(resp, "No valid visit", status_code=404) assert not mock_get_origin_visits.called @given(origin()) def test_browse_origin_content_unknown_visit(client, mocker, origin): mock_get_origin_visits = mocker.patch( "swh.web.common.origin_visits.get_origin_visits" ) mock_get_origin_visits.return_value = [{"visit": 1}] url = reverse( "browse-origin-content", query_params={"origin_url": origin["url"], "path": "foo", "visit_id": 2}, ) resp = check_html_get_response( client, url, status_code=404, template_used="error.html" ) assert re.search("Visit.*not found", resp.content.decode("utf-8")) assert mock_get_origin_visits.called @given(origin()) def test_browse_origin_content_directory_empty_snapshot(client, mocker, origin): mock_snapshot_archive = mocker.patch("swh.web.browse.snapshot_context.archive") mock_get_origin_visit_snapshot = mocker.patch( "swh.web.browse.snapshot_context.get_origin_visit_snapshot" ) mock_get_origin_visit_snapshot.return_value = ([], [], {}) mock_snapshot_archive.lookup_origin.return_value = origin mock_snapshot_archive.lookup_snapshot_sizes.return_value = { "alias": 0, "revision": 0, "release": 0, } for browse_context in ("content", "directory"): url = reverse( f"browse-origin-{browse_context}", query_params={"origin_url": origin["url"], "path": "baz"}, ) resp = check_html_get_response( client, url, status_code=200, template_used=f"browse/{browse_context}.html" ) assert re.search("snapshot.*is empty", resp.content.decode("utf-8")) assert mock_get_origin_visit_snapshot.called assert mock_snapshot_archive.lookup_origin.called - assert mock_snapshot_archive.lookup_snapshot_sizes.called @given(origin()) def test_browse_origin_content_not_found(client, origin): url = reverse( "browse-origin-content", query_params={"origin_url": origin["url"], "path": "/invalid/file/path"}, ) resp = check_html_get_response( client, url, status_code=404, template_used="browse/content.html" ) assert re.search("Directory entry.*not found", resp.content.decode("utf-8")) @given(origin()) def test_browse_directory_snapshot_not_found(client, mocker, origin): mock_get_snapshot_context = mocker.patch( "swh.web.browse.snapshot_context.get_snapshot_context" ) mock_get_snapshot_context.side_effect = NotFoundExc("Snapshot not found") url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]}) resp = check_html_get_response( client, url, status_code=404, template_used="error.html" ) assert_contains(resp, "Snapshot not found", status_code=404) assert mock_get_snapshot_context.called @given(origin()) def test_origin_empty_snapshot(client, mocker, origin): mock_archive = mocker.patch("swh.web.browse.snapshot_context.archive") mock_get_origin_visit_snapshot = mocker.patch( "swh.web.browse.snapshot_context.get_origin_visit_snapshot" ) mock_get_origin_visit_snapshot.return_value = ([], [], {}) mock_archive.lookup_snapshot_sizes.return_value = { "alias": 0, "revision": 0, "release": 0, } mock_archive.lookup_origin.return_value = origin url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]}) resp = check_html_get_response( client, url, status_code=200, template_used="browse/directory.html" ) resp_content = resp.content.decode("utf-8") assert re.search("snapshot.*is empty", resp_content) assert not re.search("swh-tr-link", resp_content) assert mock_get_origin_visit_snapshot.called - assert mock_archive.lookup_snapshot_sizes.called @given(new_origin()) def test_origin_empty_snapshot_null_revision(client, archive_data, new_origin): snapshot = Snapshot( branches={ b"HEAD": SnapshotBranch( target="refs/head/master".encode(), target_type=TargetType.ALIAS, ), b"refs/head/master": None, } ) archive_data.origin_add([new_origin]) archive_data.snapshot_add([snapshot]) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=now(), type="git",)] )[0] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=now(), status="partial", snapshot=snapshot.id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "browse-origin-directory", query_params={"origin_url": new_origin.url}, ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/directory.html" ) resp_content = resp.content.decode("utf-8") assert re.search("snapshot.*is empty", resp_content) assert not re.search("swh-tr-link", resp_content) @given(origin_with_releases()) def test_origin_release_browse(client, archive_data, origin): snapshot = archive_data.snapshot_get_latest(origin["url"]) release = [ b for b in snapshot["branches"].values() if b["target_type"] == "release" ][-1] release_data = archive_data.release_get(release["target"]) revision_data = archive_data.revision_get(release_data["target"]) url = reverse( "browse-origin-directory", query_params={"origin_url": origin["url"], "release": release_data["name"]}, ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/directory.html" ) assert_contains(resp, release_data["name"]) assert_contains(resp, release["target"]) swhid_context = { "origin": origin["url"], "visit": gen_swhid(SNAPSHOT, snapshot["id"]), "anchor": gen_swhid(RELEASE, release_data["id"]), } swh_dir_id = gen_swhid( DIRECTORY, revision_data["directory"], metadata=swhid_context ) swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url) @given(origin_with_releases()) def test_origin_release_browse_not_found(client, origin): invalid_release_name = "swh-foo-bar" url = reverse( "browse-origin-directory", query_params={"origin_url": origin["url"], "release": invalid_release_name}, ) resp = check_html_get_response( client, url, status_code=404, template_used="error.html" ) assert re.search( f"Release {invalid_release_name}.*not found", resp.content.decode("utf-8") ) @given(new_origin(), unknown_revision()) def test_origin_browse_directory_branch_with_non_resolvable_revision( client, archive_data, new_origin, unknown_revision ): branch_name = "master" snapshot = Snapshot( branches={ branch_name.encode(): SnapshotBranch( target=hash_to_bytes(unknown_revision), target_type=TargetType.REVISION, ) } ) archive_data.origin_add([new_origin]) archive_data.snapshot_add([snapshot]) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=now(), type="git",)] )[0] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=now(), status="partial", snapshot=snapshot.id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "browse-origin-directory", query_params={"origin_url": new_origin.url, "branch": branch_name}, ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/directory.html" ) assert_contains( resp, f"Revision {unknown_revision } could not be found in the archive." ) @given(origin()) def test_origin_content_no_path(client, origin): url = reverse("browse-origin-content", query_params={"origin_url": origin["url"]}) resp = check_html_get_response( client, url, status_code=400, template_used="error.html" ) assert_contains( resp, "The path of a content must be given as query parameter.", status_code=400 ) def test_origin_views_no_url_query_parameter(client): for browse_context in ( "content", "directory", "log", "branches", "releases", "visits", ): url = reverse(f"browse-origin-{browse_context}") resp = check_html_get_response( client, url, status_code=400, template_used="error.html" ) assert_contains( resp, "An origin URL must be provided as query parameter.", status_code=400 ) def _origin_content_view_test_helper( client, archive_data, origin_info, origin_visit, snapshot_sizes, origin_branches, origin_releases, root_dir_sha1, content, visit_id=None, timestamp=None, snapshot_id=None, ): content_path = "/".join(content["path"].split("/")[1:]) if not visit_id and not snapshot_id: visit_id = origin_visit["visit"] query_params = {"origin_url": origin_info["url"], "path": content_path} if timestamp: query_params["timestamp"] = timestamp if visit_id: query_params["visit_id"] = visit_id elif snapshot_id: query_params["snapshot"] = snapshot_id url = reverse("browse-origin-content", query_params=query_params) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) assert type(content["data"]) == str assert_contains(resp, '' % content["hljs_language"]) assert_contains(resp, escape(content["data"])) split_path = content_path.split("/") filename = split_path[-1] path = content_path.replace(filename, "")[:-1] path_info = gen_path_info(path) del query_params["path"] if timestamp: query_params["timestamp"] = format_utc_iso_date( parse_iso8601_date_to_utc(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ" ) root_dir_url = reverse("browse-origin-directory", query_params=query_params) assert_contains(resp, '
  • ', count=len(path_info) + 1) assert_contains(resp, '%s' % (root_dir_url, root_dir_sha1[:7])) for p in path_info: query_params["path"] = p["path"] dir_url = reverse("browse-origin-directory", query_params=query_params) assert_contains(resp, '%s' % (dir_url, p["name"])) assert_contains(resp, "
  • %s
  • " % filename) query_string = "sha1_git:" + content["sha1_git"] url_raw = reverse( "browse-content-raw", url_args={"query_string": query_string}, query_params={"filename": filename}, ) assert_contains(resp, url_raw) if "path" in query_params: del query_params["path"] origin_branches_url = reverse("browse-origin-branches", query_params=query_params) assert_contains(resp, f'href="{escape(origin_branches_url)}"') assert_contains(resp, f"Branches ({snapshot_sizes['revision']})") origin_releases_url = reverse("browse-origin-releases", query_params=query_params) assert_contains(resp, f'href="{escape(origin_releases_url)}">') assert_contains(resp, f"Releases ({snapshot_sizes['release']})") assert_contains(resp, '
  • ', count=len(origin_branches)) query_params["path"] = content_path for branch in origin_branches: root_dir_branch_url = reverse( "browse-origin-content", query_params={"branch": branch["name"], **query_params}, ) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params["branch"] = None for release in origin_releases: root_dir_release_url = reverse( "browse-origin-content", query_params={"release": release["name"], **query_params}, ) assert_contains(resp, '' % root_dir_release_url) url = reverse("browse-origin-content", query_params=query_params) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) snapshot = archive_data.snapshot_get(origin_visit["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) swhid_context = { "origin": origin_info["url"], "visit": gen_swhid(SNAPSHOT, snapshot["id"]), "anchor": gen_swhid(REVISION, head_rev_id), "path": f"/{content_path}", } swh_cnt_id = gen_swhid(CONTENT, content["sha1_git"], metadata=swhid_context) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) assert_contains(resp, "swh-take-new-snapshot") _check_origin_link(resp, origin_info["url"]) assert_not_contains(resp, "swh-metadata-popover") def _origin_directory_view_test_helper( client, archive_data, origin_info, origin_visit, snapshot_sizes, origin_branches, origin_releases, root_directory_sha1, directory_entries, visit_id=None, timestamp=None, snapshot_id=None, path=None, ): dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")] files = [e for e in directory_entries if e["type"] == "file"] if not visit_id and not snapshot_id: visit_id = origin_visit["visit"] query_params = {"origin_url": origin_info["url"]} if timestamp: query_params["timestamp"] = timestamp elif visit_id: query_params["visit_id"] = visit_id else: query_params["snapshot"] = snapshot_id if path: query_params["path"] = path url = reverse("browse-origin-directory", query_params=query_params) resp = check_html_get_response( client, url, status_code=200, template_used="browse/directory.html" ) assert_contains(resp, '', count=len(dirs)) assert_contains(resp, '', count=len(files)) if timestamp: query_params["timestamp"] = format_utc_iso_date( parse_iso8601_date_to_utc(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ" ) for d in dirs: if d["type"] == "rev": dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]}) else: dir_path = d["name"] if path: dir_path = "%s/%s" % (path, d["name"]) query_params["path"] = dir_path dir_url = reverse("browse-origin-directory", query_params=query_params,) assert_contains(resp, dir_url) for f in files: file_path = f["name"] if path: file_path = "%s/%s" % (path, f["name"]) query_params["path"] = file_path file_url = reverse("browse-origin-content", query_params=query_params) assert_contains(resp, file_url) if "path" in query_params: del query_params["path"] root_dir_branch_url = reverse("browse-origin-directory", query_params=query_params) nb_bc_paths = 1 if path: nb_bc_paths = len(path.split("/")) + 1 assert_contains(resp, '
  • ', count=nb_bc_paths) assert_contains( resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7]) ) origin_branches_url = reverse("browse-origin-branches", query_params=query_params) assert_contains(resp, f'href="{escape(origin_branches_url)}"') assert_contains(resp, f"Branches ({snapshot_sizes['revision']})") origin_releases_url = reverse("browse-origin-releases", query_params=query_params) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, f'href="{escape(origin_releases_url)}"') assert_contains(resp, f"Releases ({snapshot_sizes['release']})") if path: query_params["path"] = path assert_contains(resp, '
  • ', count=len(origin_branches)) for branch in origin_branches: query_params["branch"] = branch["name"] root_dir_branch_url = reverse( "browse-origin-directory", query_params=query_params ) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params["branch"] = None for release in origin_releases: query_params["release"] = release["name"] root_dir_release_url = reverse( "browse-origin-directory", query_params=query_params ) assert_contains(resp, 'href="%s"' % root_dir_release_url) assert_contains(resp, "vault-cook-directory") assert_contains(resp, "vault-cook-revision") snapshot = archive_data.snapshot_get(origin_visit["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) swhid_context = { "origin": origin_info["url"], "visit": gen_swhid(SNAPSHOT, snapshot["id"]), "anchor": gen_swhid(REVISION, head_rev_id), "path": f"/{path}" if path else None, } swh_dir_id = gen_swhid( DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context ) swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url) assert_contains(resp, "swh-take-new-snapshot") _check_origin_link(resp, origin_info["url"]) assert_not_contains(resp, "swh-metadata-popover") def _origin_branches_test_helper( client, origin_info, origin_snapshot, snapshot_sizes, snapshot_id=None ): query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id} url = reverse("browse-origin-branches", query_params=query_params) resp = check_html_get_response( client, url, status_code=200, template_used="browse/branches.html" ) origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse("browse-origin-branches", query_params=query_params) assert_contains(resp, f'href="{escape(origin_branches_url)}"') assert_contains(resp, f"Branches ({snapshot_sizes['revision']})") origin_releases_url = reverse("browse-origin-releases", query_params=query_params) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, f'href="{escape(origin_releases_url)}">') assert_contains(resp, f"Releases ({snapshot_sizes['release']})") assert_contains(resp, '' % escape(browse_branch_url)) browse_revision_url = reverse( "browse-revision", url_args={"sha1_git": branch["revision"]}, query_params=query_params, ) assert_contains(resp, '' % escape(browse_revision_url)) _check_origin_link(resp, origin_info["url"]) def _origin_releases_test_helper( client, origin_info, origin_snapshot, snapshot_sizes, snapshot_id=None ): query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id} url = reverse("browse-origin-releases", query_params=query_params) resp = check_html_get_response( client, url, status_code=200, template_used="browse/releases.html" ) origin_releases = origin_snapshot[1] origin_branches_url = reverse("browse-origin-branches", query_params=query_params) assert_contains(resp, f'href="{escape(origin_branches_url)}"') assert_contains(resp, f"Branches ({snapshot_sizes['revision']})") origin_releases_url = reverse("browse-origin-releases", query_params=query_params) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, f'href="{escape(origin_releases_url)}"') assert_contains(resp, f"Releases ({snapshot_sizes['release']}") assert_contains(resp, '' % escape(browse_release_url)) assert_contains(resp, '' % escape(browse_revision_url)) _check_origin_link(resp, origin_info["url"]) @given( new_origin(), visit_dates(), revisions(min_size=10, max_size=10), existing_release() ) def test_origin_branches_pagination_with_alias( client, archive_data, mocker, new_origin, visit_dates, revisions, existing_release ): """ When a snapshot contains a branch or a release alias, pagination links in the branches / releases view should be displayed. """ mocker.patch("swh.web.browse.snapshot_context.PER_PAGE", len(revisions) / 2) snp_dict = {"branches": {}, "id": hash_to_bytes(random_sha1())} for i in range(len(revisions)): branch = "".join(random.choices(string.ascii_lowercase, k=8)) snp_dict["branches"][branch.encode()] = { "target_type": "revision", "target": hash_to_bytes(revisions[i]), } release = "".join(random.choices(string.ascii_lowercase, k=8)) snp_dict["branches"][b"RELEASE_ALIAS"] = { "target_type": "alias", "target": release.encode(), } snp_dict["branches"][release.encode()] = { "target_type": "release", "target": hash_to_bytes(existing_release), } archive_data.origin_add([new_origin]) archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)] )[0] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=now(), status="full", snapshot=snp_dict["id"], ) archive_data.origin_visit_status_add([visit_status]) url = reverse("browse-origin-branches", query_params={"origin_url": new_origin.url}) resp = check_html_get_response( client, url, status_code=200, template_used="browse/branches.html" ) assert_contains(resp, '
      1: expected_url += f"-L{lines_number[1]}" assert obj_swhid_resolved["browse_url"] == expected_url @given(directory()) -def test_resolve_swhid_with_escaped_chars(directory): - origin = "http://example.org/?project=abc;" - origin_swhid_escaped = quote(origin, safe="/?:@&") - origin_swhid_url_escaped = quote(origin, safe="/:@;") +def test_resolve_swhid_with_escaped_chars(archive_data, directory): + origin_url = "http://example.org/?project=abc;" + archive_data.origin_add([Origin(url=origin_url)]) + origin_swhid_escaped = quote(origin_url, safe="/?:@&") + origin_swhid_url_escaped = quote(origin_url, safe="/:@;") swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": origin_swhid_escaped}) resolved_swhid = resolve_swhid(swhid) assert resolved_swhid["swhid_parsed"].origin == origin_swhid_escaped assert origin_swhid_url_escaped in resolved_swhid["browse_url"] @given(directory_with_subdirs()) def test_resolve_directory_swhid_path_without_trailing_slash(archive_data, directory): dir_content = archive_data.directory_ls(directory) dir_subdirs = [e for e in dir_content if e["type"] == "dir"] dir_subdir = random.choice(dir_subdirs) dir_subdir_path = dir_subdir["name"] anchor = gen_swhid(DIRECTORY, directory) swhid = gen_swhid( DIRECTORY, dir_subdir["target"], metadata={"anchor": anchor, "path": "/" + dir_subdir_path}, ) resolved_swhid = resolve_swhid(swhid) browse_url = reverse( "browse-directory", url_args={"sha1_git": directory}, query_params={"path": dir_subdir_path}, ) assert resolved_swhid["browse_url"] == browse_url @given(directory()) def test_resolve_swhid_with_malformed_origin_url(archive_data, directory): origin_url = "http://example.org/project/abc" malformed_origin_url = "http:/example.org/project/abc" archive_data.origin_add([Origin(url=origin_url)]) swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": malformed_origin_url}) resolved_swhid = resolve_swhid(swhid) assert origin_url in resolved_swhid["browse_url"] @given(revision()) def test_resolve_dir_entry_swhid_with_anchor_revision(archive_data, revision): revision_data = archive_data.revision_get(revision) directory = revision_data["directory"] dir_content = archive_data.directory_ls(directory) dir_entry = random.choice(dir_content) rev_swhid = gen_swhid(REVISION, revision) if dir_entry["type"] == "rev": return if dir_entry["type"] == "file": swhid = gen_swhid( CONTENT, dir_entry["checksums"]["sha1_git"], metadata={"anchor": rev_swhid, "path": f"/{dir_entry['name']}"}, ) else: swhid = gen_swhid( DIRECTORY, dir_entry["target"], metadata={"anchor": rev_swhid, "path": f"/{dir_entry['name']}/"}, ) browse_url = reverse( "browse-revision", url_args={"sha1_git": revision}, query_params={"path": dir_entry["name"]}, ) resolved_swhid = resolve_swhid(swhid) assert resolved_swhid["browse_url"] == browse_url @given(directory_with_subdirs()) def test_resolve_dir_entry_swhid_with_anchor_directory(archive_data, directory): dir_content = archive_data.directory_ls(directory) dir_entry = random.choice( [entry for entry in dir_content if entry["type"] == "dir"] ) dir_swhid = gen_swhid(DIRECTORY, directory) swhid = gen_swhid( DIRECTORY, dir_entry["target"], metadata={"anchor": dir_swhid, "path": f"/{dir_entry['name']}/"}, ) browse_url = reverse( "browse-directory", url_args={"sha1_git": directory}, query_params={"path": f"{dir_entry['name']}"}, ) resolved_swhid = resolve_swhid(swhid) assert resolved_swhid["browse_url"] == browse_url @given(directory_with_files()) def test_resolve_file_entry_swhid_with_anchor_directory(archive_data, directory): dir_content = archive_data.directory_ls(directory) file_entry = random.choice( [entry for entry in dir_content if entry["type"] == "file"] ) dir_swhid = gen_swhid(DIRECTORY, directory) sha1_git = file_entry["checksums"]["sha1_git"] swhid = gen_swhid( CONTENT, sha1_git, metadata={"anchor": dir_swhid, "path": f"/{file_entry['name']}"}, ) browse_url = reverse( "browse-content", url_args={"query_string": f"sha1_git:{sha1_git}"}, query_params={"path": f"{directory}/{file_entry['name']}"}, ) resolved_swhid = resolve_swhid(swhid) assert resolved_swhid["browse_url"] == browse_url diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py index 2eb385a9..f2a9da24 100644 --- a/swh/web/tests/conftest.py +++ b/swh/web/tests/conftest.py @@ -1,395 +1,431 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import shutil from subprocess import PIPE, run import sys from typing import Any, Dict, List, Optional +from _pytest.python import Function from hypothesis import HealthCheck, settings import pytest from django.core.cache import cache from rest_framework.test import APIClient, APIRequestFactory from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.common import converters from swh.web.common.typing import OriginVisitInfo from swh.web.config import get_config from swh.web.tests.data import get_tests_data, override_storages # Used to skip some tests ctags_json_missing = ( shutil.which("ctags") is None or b"+json" not in run(["ctags", "--version"], stdout=PIPE).stdout ) fossology_missing = shutil.which("nomossa") is None # Register some hypothesis profiles settings.register_profile("default", settings()) # we use getattr here to keep mypy happy regardless hypothesis version function_scoped_fixture_check = ( [getattr(HealthCheck, "function_scoped_fixture")] if hasattr(HealthCheck, "function_scoped_fixture") else [] ) suppress_health_check = [ HealthCheck.too_slow, HealthCheck.filter_too_much, ] + function_scoped_fixture_check settings.register_profile( "swh-web", settings(deadline=None, suppress_health_check=suppress_health_check,), ) settings.register_profile( "swh-web-fast", settings( deadline=None, max_examples=1, suppress_health_check=suppress_health_check, ), ) def pytest_configure(config): # Use fast hypothesis profile by default if none has been # explicitly specified in pytest option if config.getoption("--hypothesis-profile") is None: settings.load_profile("swh-web-fast") # Small hack in order to be able to run the unit tests # without static assets generated by webpack. # Those assets are not really needed for the Python tests # but the django templates will fail to load due to missing # generated file webpack-stats.json describing the js and css # files to include. # So generate a dummy webpack-stats.json file to overcome # that issue. test_dir = os.path.dirname(__file__) # location of the static folder when running tests through tox data_dir = os.path.join(sys.prefix, "share/swh/web") static_dir = os.path.join(data_dir, "static") if not os.path.exists(static_dir): # location of the static folder when running tests locally with pytest static_dir = os.path.join(test_dir, "../../../static") webpack_stats = os.path.join(static_dir, "webpack-stats.json") if os.path.exists(webpack_stats): return bundles_dir = os.path.join(test_dir, "../../../assets/src/bundles") if not os.path.exists(bundles_dir): # location of the bundles folder when running tests with tox bundles_dir = os.path.join(data_dir, "assets/src/bundles") _, bundles, _ = next(os.walk(bundles_dir)) mock_webpack_stats = { "status": "done", "publicPath": "/static", "chunks": {}, "assets": {}, } for bundle in bundles: asset = f"js/{bundle}.js" mock_webpack_stats["chunks"][bundle] = [asset] mock_webpack_stats["assets"][asset] = { "name": asset, "publicPath": f"/static/{asset}", } with open(webpack_stats, "w") as outfile: json.dump(mock_webpack_stats, outfile) # Clear Django cache before each test @pytest.fixture(autouse=True) def django_cache_cleared(): cache.clear() # Alias rf fixture from pytest-django @pytest.fixture def request_factory(rf): return rf # Fixture to get test client from Django REST Framework @pytest.fixture def api_client(): return APIClient() # Fixture to get API request factory from Django REST Framework @pytest.fixture def api_request_factory(): return APIRequestFactory() # Initialize tests data -@pytest.fixture(scope="session", autouse=True) +@pytest.fixture(scope="function", autouse=True) def tests_data(): data = get_tests_data(reset=True) # Update swh-web configuration to use the in-memory storages # instantiated in the tests.data module override_storages( data["storage"], data["idx_storage"], data["search"], data["counters"] ) return data # Fixture to manipulate data from a sample archive used in the tests -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def archive_data(tests_data): return _ArchiveData(tests_data) # Fixture to manipulate indexer data from a sample archive used in the tests -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def indexer_data(tests_data): return _IndexerData(tests_data) # Custom data directory for requests_mock @pytest.fixture def datadir(): return os.path.join(os.path.abspath(os.path.dirname(__file__)), "resources") class _ArchiveData: """ Helper class to manage data from a sample test archive. It is initialized with a reference to an in-memory storage containing raw tests data. It is basically a proxy to Storage interface but it overrides some methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.storage = tests_data["storage"] def __getattr__(self, key): if key == "storage": raise AttributeError(key) # Forward calls to non overridden Storage methods to wrapped # storage instance return getattr(self.storage, key) def content_find(self, content: Dict[str, Any]) -> Dict[str, Any]: cnt_ids_bytes = { algo_hash: hash_to_bytes(content[algo_hash]) for algo_hash in ALGORITHMS if content.get(algo_hash) } cnt = self.storage.content_find(cnt_ids_bytes) return converters.from_content(cnt[0].to_dict()) if cnt else cnt def content_get(self, cnt_id: str) -> Dict[str, Any]: cnt_id_bytes = hash_to_bytes(cnt_id) content = self.storage.content_get([cnt_id_bytes])[0] if content: content_d = content.to_dict() content_d.pop("ctime", None) else: content_d = None return converters.from_swh( content_d, hashess={"sha1", "sha1_git", "sha256", "blake2s256"} ) def content_get_data(self, cnt_id: str) -> Optional[Dict[str, Any]]: cnt_id_bytes = hash_to_bytes(cnt_id) cnt_data = self.storage.content_get_data(cnt_id_bytes) if cnt_data is None: return None return converters.from_content({"data": cnt_data, "sha1": cnt_id_bytes}) def directory_get(self, dir_id): return {"id": dir_id, "content": self.directory_ls(dir_id)} def directory_ls(self, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map( converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes) ) return list(dir_content) def release_get(self, rel_id: str) -> Optional[Dict[str, Any]]: rel_id_bytes = hash_to_bytes(rel_id) rel_data = self.storage.release_get([rel_id_bytes])[0] return converters.from_release(rel_data) if rel_data else None def revision_get(self, rev_id: str) -> Optional[Dict[str, Any]]: rev_id_bytes = hash_to_bytes(rev_id) rev_data = self.storage.revision_get([rev_id_bytes])[0] return converters.from_revision(rev_data) if rev_data else None def revision_log(self, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list( map( converters.from_revision, self.storage.revision_log([rev_id_bytes], limit=limit), ) ) def snapshot_get_latest(self, origin_url): snp = snapshot_get_latest(self.storage, origin_url) return converters.from_snapshot(snp.to_dict()) def origin_get(self, origin_urls): origins = self.storage.origin_get(origin_urls) return [converters.from_origin(o.to_dict()) for o in origins] def origin_visit_get(self, origin_url): next_page_token = None visits = [] while True: visit_page = self.storage.origin_visit_get( origin_url, page_token=next_page_token ) next_page_token = visit_page.next_page_token for visit in visit_page.results: visit_status = self.storage.origin_visit_status_get_latest( origin_url, visit.visit ) visits.append( converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) ) if not next_page_token: break return visits def origin_visit_get_by(self, origin_url: str, visit_id: int) -> OriginVisitInfo: visit = self.storage.origin_visit_get_by(origin_url, visit_id) assert visit is not None visit_status = self.storage.origin_visit_status_get_latest(origin_url, visit_id) assert visit_status is not None return converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) def origin_visit_status_get_latest( self, origin_url, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ): visit_status = origin_get_latest_visit_status( self.storage, origin_url, type=type, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, ) return ( converters.from_origin_visit(visit_status.to_dict()) if visit_status else None ) def snapshot_get(self, snapshot_id): snp = snapshot_get_all_branches(self.storage, hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp.to_dict()) def snapshot_get_branches( self, snapshot_id, branches_from="", branches_count=1000, target_types=None ): partial_branches = self.storage.snapshot_get_branches( hash_to_bytes(snapshot_id), branches_from.encode(), branches_count, target_types, ) return converters.from_partial_branches(partial_branches) def snapshot_get_head(self, snapshot): if snapshot["branches"]["HEAD"]["target_type"] == "alias": target = snapshot["branches"]["HEAD"]["target"] head = snapshot["branches"][target]["target"] else: head = snapshot["branches"]["HEAD"]["target"] return head def snapshot_count_branches(self, snapshot_id): counts = dict.fromkeys(("alias", "release", "revision"), 0) counts.update(self.storage.snapshot_count_branches(hash_to_bytes(snapshot_id))) counts.pop(None, None) return counts class _IndexerData: """ Helper class to manage indexer tests data It is initialized with a reference to an in-memory indexer storage containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.idx_storage = tests_data["idx_storage"] self.mimetype_indexer = tests_data["mimetype_indexer"] self.license_indexer = tests_data["license_indexer"] self.ctags_indexer = tests_data["ctags_indexer"] def content_add_mimetype(self, cnt_id): self.mimetype_indexer.run([hash_to_bytes(cnt_id)]) def content_get_mimetype(self, cnt_id): mimetype = self.idx_storage.content_mimetype_get([hash_to_bytes(cnt_id)])[ 0 ].to_dict() return converters.from_filetype(mimetype) def content_add_license(self, cnt_id): self.license_indexer.run([hash_to_bytes(cnt_id)]) def content_get_license(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) licenses = self.idx_storage.content_fossology_license_get([cnt_id_bytes]) for license in licenses: yield converters.from_swh(license.to_dict(), hashess={"id"}) def content_add_ctags(self, cnt_id): self.ctags_indexer.run([hash_to_bytes(cnt_id)]) def content_get_ctags(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) ctags = self.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={"id"}) @pytest.fixture def keycloak_oidc(keycloak_oidc, mocker): keycloak_config = get_config()["keycloak"] keycloak_oidc.server_url = keycloak_config["server_url"] keycloak_oidc.realm_name = keycloak_config["realm_name"] keycloak_oidc.client_id = OIDC_SWH_WEB_CLIENT_ID keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client") keycloak_oidc_client.return_value = keycloak_oidc return keycloak_oidc + + +@pytest.fixture +def subtest(request): + """A hack to explicitly set up and tear down fixtures. + + This fixture allows you to set up and tear down fixtures within the test + function itself. This is useful (necessary!) for using Hypothesis inside + pytest, as hypothesis will call the test function multiple times, without + setting up or tearing down fixture state as it is normally the case. + + Copied from the pytest-subtesthack project, public domain license + (https://github.com/untitaker/pytest-subtesthack). + """ + parent_test = request.node + + def inner(func): + if hasattr(Function, "from_parent"): + item = Function.from_parent( + parent_test, + name=request.function.__name__ + "[]", + originalname=request.function.__name__, + callobj=func, + ) + else: + item = Function( + name=request.function.__name__ + "[]", parent=parent_test, callobj=func + ) + nextitem = parent_test # prevents pytest from tearing down module fixtures + + item.ihook.pytest_runtest_setup(item=item) + item.ihook.pytest_runtest_call(item=item) + item.ihook.pytest_runtest_teardown(item=item, nextitem=nextitem) + + return inner diff --git a/swh/web/tests/strategies.py b/swh/web/tests/strategies.py index c33932f3..8a5cc103 100644 --- a/swh/web/tests/strategies.py +++ b/swh/web/tests/strategies.py @@ -1,658 +1,648 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from datetime import datetime import random from hypothesis import assume, settings from hypothesis.extra.dateutil import timezones from hypothesis.strategies import ( binary, characters, composite, datetimes, just, lists, sampled_from, text, ) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex from swh.model.hypothesis_strategies import origins as new_origin_strategy from swh.model.hypothesis_strategies import snapshots as new_snapshot from swh.model.model import ( Content, Directory, Person, Revision, RevisionType, TimestampWithTimezone, ) from swh.storage.algos.revisions_walker import get_revisions_walker from swh.storage.algos.snapshot import snapshot_get_latest from swh.web.common.utils import browsers_supported_image_mimes from swh.web.tests.data import get_tests_data # Module dedicated to the generation of input data for tests through # the use of hypothesis. # Some of these data are sampled from a test archive created and populated # in the swh.web.tests.data module. # Set the swh-web hypothesis profile if none has been explicitly set hypothesis_default_settings = settings.get_profile("default") if repr(settings()) == repr(hypothesis_default_settings): settings.load_profile("swh-web") # The following strategies exploit the hypothesis capabilities -def _filter_checksum(cs): - generated_checksums = get_tests_data()["generated_checksums"] - if not int.from_bytes(cs, byteorder="little") or cs in generated_checksums: - return False - generated_checksums.add(cs) - return True - - def _known_swh_object(object_type): return sampled_from(get_tests_data()[object_type]) def sha1(): """ Hypothesis strategy returning a valid hexadecimal sha1 value. """ - return binary(min_size=20, max_size=20).filter(_filter_checksum).map(hash_to_hex) + return binary(min_size=20, max_size=20).map(hash_to_hex) def invalid_sha1(): """ Hypothesis strategy returning an invalid sha1 representation. """ - return binary(min_size=50, max_size=50).filter(_filter_checksum).map(hash_to_hex) + return binary(min_size=50, max_size=50).map(hash_to_hex) def sha256(): """ Hypothesis strategy returning a valid hexadecimal sha256 value. """ - return binary(min_size=32, max_size=32).filter(_filter_checksum).map(hash_to_hex) + return binary(min_size=32, max_size=32).map(hash_to_hex) def content(): """ Hypothesis strategy returning a random content ingested into the test archive. """ return _known_swh_object("contents") def contents(): """ Hypothesis strategy returning random contents ingested into the test archive. """ return lists(content(), min_size=2, max_size=8) def empty_content(): """ Hypothesis strategy returning the empty content ingested into the test archive. """ empty_content = Content.from_data(data=b"").to_dict() for algo in DEFAULT_ALGORITHMS: empty_content[algo] = hash_to_hex(empty_content[algo]) return just(empty_content) def content_text(): """ Hypothesis strategy returning random textual contents ingested into the test archive. """ return content().filter(lambda c: c["mimetype"].startswith("text/")) def content_text_non_utf8(): """ Hypothesis strategy returning random textual contents not encoded to UTF-8 ingested into the test archive. """ return content().filter( lambda c: c["mimetype"].startswith("text/") and c["encoding"] not in ("utf-8", "us-ascii") ) def content_application_no_highlight(): """ Hypothesis strategy returning random textual contents with mimetype starting with application/ and no detected programming language to highlight ingested into the test archive. """ return content().filter( lambda c: c["mimetype"].startswith("application/") and c["encoding"] != "binary" and c["hljs_language"] == "nohighlight" ) def content_text_no_highlight(): """ Hypothesis strategy returning random textual contents with no detected programming language to highlight ingested into the test archive. """ return content().filter( lambda c: c["mimetype"].startswith("text/") and c["hljs_language"] == "nohighlight" ) def content_image_type(): """ Hypothesis strategy returning random image contents ingested into the test archive. """ return content().filter(lambda c: c["mimetype"] in browsers_supported_image_mimes) def content_unsupported_image_type_rendering(): """ Hypothesis strategy returning random image contents ingested into the test archive that can not be rendered by browsers. """ return content().filter( lambda c: c["mimetype"].startswith("image/") and c["mimetype"] not in browsers_supported_image_mimes ) def content_utf8_detected_as_binary(): """ Hypothesis strategy returning random textual contents detected as binary by libmagic while they are valid UTF-8 encoded files. """ def utf8_binary_detected(content): if content["encoding"] != "binary": return False try: content["raw_data"].decode("utf-8") except Exception: return False else: return True return content().filter(utf8_binary_detected) @composite def new_content(draw): blake2s256_hex = draw(sha256()) sha1_hex = draw(sha1()) sha1_git_hex = draw(sha1()) sha256_hex = draw(sha256()) assume(sha1_hex != sha1_git_hex) assume(blake2s256_hex != sha256_hex) return { "blake2S256": blake2s256_hex, "sha1": sha1_hex, "sha1_git": sha1_git_hex, "sha256": sha256_hex, } def unknown_content(): """ Hypothesis strategy returning a random content not ingested into the test archive. """ return new_content().filter( lambda c: get_tests_data()["storage"].content_get_data(hash_to_bytes(c["sha1"])) is None ) def unknown_contents(): """ Hypothesis strategy returning random contents not ingested into the test archive. """ return lists(unknown_content(), min_size=2, max_size=8) def directory(): """ Hypothesis strategy returning a random directory ingested into the test archive. """ return _known_swh_object("directories") def _directory_with_entry_type(type_): return directory().filter( lambda d: any( [ e["type"] == type_ for e in list( get_tests_data()["storage"].directory_ls(hash_to_bytes(d)) ) ] ) ) def directory_with_subdirs(): """ Hypothesis strategy returning a random directory containing sub directories ingested into the test archive. """ return _directory_with_entry_type("dir") def directory_with_files(): """ Hypothesis strategy returning a random directory containing at least one regular file """ return _directory_with_entry_type("file") def empty_directory(): """ Hypothesis strategy returning the empty directory ingested into the test archive. """ return just(Directory(entries=()).id.hex()) def unknown_directory(): """ Hypothesis strategy returning a random directory not ingested into the test archive. """ return sha1().filter( lambda s: len( list(get_tests_data()["storage"].directory_missing([hash_to_bytes(s)])) ) > 0 ) def origin(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ return _known_swh_object("origins") def origin_with_multiple_visits(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() storage = tests_data["storage"] for origin in tests_data["origins"]: visit_page = storage.origin_visit_get(origin["url"]) if len(visit_page.results) > 1: ret.append(origin) return sampled_from(ret) def origin_with_releases(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() for origin in tests_data["origins"]: snapshot = snapshot_get_latest(tests_data["storage"], origin["url"]) if any([b.target_type.value == "release" for b in snapshot.branches.values()]): ret.append(origin) return sampled_from(ret) def origin_with_pull_request_branches(): """ Hypothesis strategy returning a random origin with pull request branches ingested into the test archive. """ ret = [] tests_data = get_tests_data() storage = tests_data["storage"] origins = storage.origin_list(limit=1000) for origin in origins.results: snapshot = snapshot_get_latest(storage, origin.url) if any([b"refs/pull/" in b for b in snapshot.branches]): ret.append(origin) return sampled_from(ret) def new_origin(): """ Hypothesis strategy returning a random origin not ingested into the test archive. """ - return new_origin_strategy().filter( - lambda origin: get_tests_data()["storage"].origin_get([origin.url])[0] is None - ) + return new_origin_strategy() def new_origins(nb_origins=None): """ Hypothesis strategy returning random origins not ingested into the test archive. """ min_size = nb_origins if nb_origins is not None else 2 max_size = nb_origins if nb_origins is not None else 8 size = random.randint(min_size, max_size) return lists( new_origin(), min_size=size, max_size=size, unique_by=lambda o: tuple(sorted(o.items())), ) def visit_dates(nb_dates=None): """ Hypothesis strategy returning a list of visit dates. """ min_size = nb_dates if nb_dates else 2 max_size = nb_dates if nb_dates else 8 return lists( datetimes( min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0), timezones=timezones(), ), min_size=min_size, max_size=max_size, unique=True, ).map(sorted) def release(): """ Hypothesis strategy returning a random release ingested into the test archive. """ return _known_swh_object("releases") def releases(min_size=2, max_size=8): """ Hypothesis strategy returning random releases ingested into the test archive. """ return lists(release(), min_size=min_size, max_size=max_size) def unknown_release(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: get_tests_data()["storage"].release_get([s])[0] is None ) def revision(): """ Hypothesis strategy returning a random revision ingested into the test archive. """ return _known_swh_object("revisions") def unknown_revision(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: get_tests_data()["storage"].revision_get([hash_to_bytes(s)])[0] is None ) @composite def new_person(draw): """ Hypothesis strategy returning random raw swh person data. """ name = draw( text( min_size=5, max_size=30, alphabet=characters(min_codepoint=0, max_codepoint=255), ) ) email = "%s@company.org" % name return Person( name=name.encode(), email=email.encode(), fullname=("%s <%s>" % (name, email)).encode(), ) @composite def new_swh_date(draw): """ Hypothesis strategy returning random raw swh date data. """ timestamp = draw( datetimes( min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0) ).map(lambda d: int(d.timestamp())) ) return { "timestamp": timestamp, "offset": 0, "negative_utc": False, } @composite def new_revision(draw): """ Hypothesis strategy returning random raw swh revision data not ingested into the test archive. """ return Revision( directory=draw(sha1().map(hash_to_bytes)), author=draw(new_person()), committer=draw(new_person()), message=draw(text(min_size=20, max_size=100).map(lambda t: t.encode())), date=TimestampWithTimezone.from_datetime(draw(new_swh_date())), committer_date=TimestampWithTimezone.from_datetime(draw(new_swh_date())), synthetic=False, type=RevisionType.GIT, ) def revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions ingested into the test archive. """ return lists(revision(), min_size=min_size, max_size=max_size) def unknown_revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions not ingested into the test archive. """ return lists(unknown_revision(), min_size=min_size, max_size=max_size) def snapshot(): """ Hypothesis strategy returning a random snapshot ingested into the test archive. """ return _known_swh_object("snapshots") def new_snapshots(nb_snapshots=None): min_size = nb_snapshots if nb_snapshots else 2 max_size = nb_snapshots if nb_snapshots else 8 return lists( new_snapshot(min_size=2, max_size=10, only_objects=True), min_size=min_size, max_size=max_size, ) def unknown_snapshot(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: get_tests_data()["storage"].snapshot_get_branches(hash_to_bytes(s)) is None ) def _get_origin_dfs_revisions_walker(): tests_data = get_tests_data() storage = tests_data["storage"] origin = random.choice(tests_data["origins"][:-1]) snapshot = snapshot_get_latest(storage, origin["url"]) if snapshot.branches[b"HEAD"].target_type.value == "alias": target = snapshot.branches[b"HEAD"].target head = snapshot.branches[target].target else: head = snapshot.branches[b"HEAD"].target return get_revisions_walker("dfs", storage, head) def ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with an ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) if not init_rev_found: master_revisions.append(rev) if not rev["parents"]: init_rev_found = True # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions) - 1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev["id"]] return just( { "sha1_git_root": hash_to_hex(root_rev["id"]), "sha1_git": hash_to_hex(ancestor_rev["id"]), "children": [hash_to_hex(r) for r in ancestor_child_revs], } ) def non_ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with no ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev["parents"]) > 1: merge_revs.append(rev) for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev["parents"]): selected_revs = merge_rev["parents"] return just( { "sha1_git_root": hash_to_hex(selected_revs[0]), "sha1_git": hash_to_hex(selected_revs[1]), } ) # The following strategies returns data specific to some tests # that can not be generated and thus are hardcoded. def contents_with_ctags(): """ Hypothesis strategy returning contents ingested into the test archive. Those contents are ctags compatible, that is running ctags on those lay results. """ return just( { "sha1s": [ "0ab37c02043ebff946c1937523f60aadd0844351", "15554cf7608dde6bfefac7e3d525596343a85b6f", "2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd", "30acd0b47fc25e159e27a980102ddb1c4bea0b95", "4f81f05aaea3efb981f9d90144f746d6b682285b", "5153aa4b6e4455a62525bc4de38ed0ff6e7dd682", "59d08bafa6a749110dfb65ba43a61963d5a5bf9f", "7568285b2d7f31ae483ae71617bd3db873deaa2c", "7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4", "8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03", "9b3557f1ab4111c8607a4f2ea3c1e53c6992916c", "9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd", "c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b", "e89e55a12def4cd54d5bff58378a3b5119878eb7", "e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e", "eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5", ], "symbol_name": "ABS", } ) def revision_with_submodules(): """ Hypothesis strategy returning a revision that is known to point to a directory with revision entries (aka git submodule) """ return just( { "rev_sha1_git": "ffcb69001f3f6745dfd5b48f72ab6addb560e234", "rev_dir_sha1_git": "d92a21446387fa28410e5a74379c934298f39ae2", "rev_dir_rev_path": "libtess2", } )