diff --git a/MANIFEST.in b/MANIFEST.in index 51d20eaf..33315b78 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,15 +1,15 @@ include pytest.ini include README.md include requirements*.txt include tox.ini include version.txt recursive-include swh py.typed recursive-include assets * recursive-include swh/web/*/templates * -recursive-include swh/web/*/assets * -recursive-include swh/web/tests/resources * -recursive-include swh/web/tests/inbound_email/resources *.eml +recursive-include swh/web/*/tests/assets * +recursive-include swh/web/*/tests/data * +recursive-include swh/web/*/tests/resources * include package.json include yarn.lock diff --git a/Makefile.local b/Makefile.local index a523a65d..caa363f3 100644 --- a/Makefile.local +++ b/Makefile.local @@ -1,127 +1,127 @@ -TEST_DIRS := ./swh/web/tests +TEST_DIRS := ./swh/web/ TESTFLAGS = --hypothesis-profile=swh-web-fast TESTFULL_FLAGS = --hypothesis-profile=swh-web YARN ?= yarn SETTINGS_TEST ?= swh.web.settings.tests SETTINGS_DEV ?= swh.web.settings.development SETTINGS_PROD = swh.web.settings.production yarn-install: package.json $(YARN) install --frozen-lockfile .PHONY: build-webpack-dev build-webpack-dev: yarn-install $(YARN) build-dev .PHONY: build-webpack-test build-webpack-test: yarn-install $(YARN) build-test .PHONY: build-webpack-dev-no-verbose build-webpack-dev-no-verbose: yarn-install $(YARN) build-dev >/dev/null .PHONY: build-webpack-prod build-webpack-prod: yarn-install $(YARN) build .PHONY: run-migrations-dev run-migrations-dev: python3 swh/web/manage.py rename_app --settings=$(SETTINGS_DEV) swh_web_common swh_web_save_code_now python3 swh/web/manage.py migrate --settings=$(SETTINGS_DEV) -v0 .PHONY: run-migrations-prod run-migrations-prod: django-admin rename_app --settings=$(SETTINGS_PROD) swh_web_common swh_web_save_code_now django-admin migrate --settings=$(SETTINGS_PROD) -v0 .PHONY: run-migrations-test run-migrations-test: rm -f swh-web-test*.sqlite3* django-admin migrate --settings=$(SETTINGS_TEST) -v0 add-users-test: run-migrations-test cat swh/web/tests/create_test_admin.py | django-admin shell --settings=$(SETTINGS_TEST) cat swh/web/tests/create_test_users.py | django-admin shell --settings=$(SETTINGS_TEST) add-users-dev: run-migrations-dev cat swh/web/tests/create_test_admin.py | django-admin shell --settings=$(SETTINGS_DEV) cat swh/web/tests/create_test_users.py | django-admin shell --settings=$(SETTINGS_DEV) add-users-prod: run-migrations-prod cat swh/web/tests/create_test_admin.py | django-admin shell --settings=$(SETTINGS_PROD) cat swh/web/tests/create_test_users.py | django-admin shell --settings=$(SETTINGS_PROD) .PHONY: clear-memcached clear-memcached: echo "flush_all" | nc -q 2 localhost 11211 2>/dev/null run-django-webpack-devserver: add-users-dev yarn-install bash -c "trap 'trap - SIGINT SIGTERM ERR EXIT && \ # ensure all child processes will be killed by PGID when exiting \ ps -o pgid= $$$$ | grep -o [0-9]* | xargs pkill -g' SIGINT SIGTERM ERR EXIT; \ $(YARN) start-dev & sleep 10 && cd swh/web && \ python3 manage.py runserver --nostatic --settings=$(SETTINGS_DEV) || exit 1" run-django-webpack-dev: build-webpack-dev add-users-dev python3 swh/web/manage.py runserver --nostatic --settings=$(SETTINGS_DEV) run-django-webpack-prod: build-webpack-prod add-users-prod clear-memcached python3 swh/web/manage.py runserver --nostatic --settings=$(SETTINGS_PROD) run-django-server-dev: add-users-dev python3 swh/web/manage.py runserver --nostatic --settings=$(SETTINGS_DEV) run-django-server-prod: add-users-prod clear-memcached python3 swh/web/manage.py runserver --nostatic --settings=$(SETTINGS_PROD) run-gunicorn-server: add-users-prod clear-memcached DJANGO_SETTINGS_MODULE=$(SETTINGS_PROD) \ gunicorn --bind 127.0.0.1:5004 \ --threads 2 \ --workers 2 'django.core.wsgi:get_wsgi_application()' run-django-webpack-memory-storages: build-webpack-dev add-users-test python3 swh/web/manage.py runserver --nostatic --settings=$(SETTINGS_TEST) test-full: $(TEST) $(TESTFULL_FLAGS) $(TEST_DIRS) .PHONY: test-frontend-cmd test-frontend-cmd: build-webpack-test add-users-test bash -c "trap 'trap - SIGINT SIGTERM ERR EXIT && \ jobs -p | xargs -r kill' SIGINT SIGTERM ERR EXIT; \ python3 swh/web/manage.py runserver --nostatic --settings=$(SETTINGS_TEST) & \ sleep 10 && $(YARN) run cypress run --config numTestsKeptInMemory=0 && \ $(YARN) mochawesome && $(YARN) nyc-report" test-frontend: export CYPRESS_SKIP_SLOW_TESTS=1 test-frontend: test-frontend-cmd test-frontend-full: export CYPRESS_SKIP_SLOW_TESTS=0 test-frontend-full: test-frontend-cmd .PHONY: test-frontend-ui-cmd test-frontend-ui-cmd: add-users-test yarn-install # ensure all child processes will be killed when hitting Ctrl-C in terminal # or manually closing the Cypress UI window, killing by PGID seems the only # reliable way to do it in that case bash -c "trap 'trap - SIGINT SIGTERM ERR EXIT && \ ps -o pgid= $$$$ | grep -o [0-9]* | xargs pkill -g' SIGINT SIGTERM ERR EXIT; \ $(YARN) start-dev & \ python3 swh/web/manage.py runserver --nostatic --settings=$(SETTINGS_TEST) & \ sleep 10 && $(YARN) run cypress open" test-frontend-ui: export CYPRESS_SKIP_SLOW_TESTS=1 test-frontend-ui: test-frontend-ui-cmd test-frontend-full-ui: export CYPRESS_SKIP_SLOW_TESTS=0 test-frontend-full-ui: test-frontend-ui-cmd # Override default rule to make sure DJANGO env var is properly set. It # *should* work without any override thanks to the mypy django-stubs plugin, # but it currently doesn't; see # https://github.com/typeddjango/django-stubs/issues/166 check-mypy: DJANGO_SETTINGS_MODULE=$(SETTINGS_DEV) $(MYPY) $(MYPYFLAGS) swh diff --git a/swh/web/tests/add_forge_now/__init__.py b/swh/web/add_forge_now/tests/__init__.py similarity index 100% rename from swh/web/tests/add_forge_now/__init__.py rename to swh/web/add_forge_now/tests/__init__.py diff --git a/swh/web/tests/add_forge_now/test_api_views.py b/swh/web/add_forge_now/tests/test_api_views.py similarity index 100% rename from swh/web/tests/add_forge_now/test_api_views.py rename to swh/web/add_forge_now/tests/test_api_views.py diff --git a/swh/web/tests/add_forge_now/test_app.py b/swh/web/add_forge_now/tests/test_app.py similarity index 100% rename from swh/web/tests/add_forge_now/test_app.py rename to swh/web/add_forge_now/tests/test_app.py diff --git a/swh/web/tests/add_forge_now/test_migration.py b/swh/web/add_forge_now/tests/test_migration.py similarity index 100% rename from swh/web/tests/add_forge_now/test_migration.py rename to swh/web/add_forge_now/tests/test_migration.py diff --git a/swh/web/tests/add_forge_now/test_models.py b/swh/web/add_forge_now/tests/test_models.py similarity index 100% rename from swh/web/tests/add_forge_now/test_models.py rename to swh/web/add_forge_now/tests/test_models.py diff --git a/swh/web/tests/add_forge_now/test_views.py b/swh/web/add_forge_now/tests/test_views.py similarity index 100% rename from swh/web/tests/add_forge_now/test_views.py rename to swh/web/add_forge_now/tests/test_views.py diff --git a/swh/web/tests/api/__init__.py b/swh/web/api/tests/__init__.py similarity index 100% rename from swh/web/tests/api/__init__.py rename to swh/web/api/tests/__init__.py diff --git a/swh/web/tests/api/test_api_lookup.py b/swh/web/api/tests/test_api_lookup.py similarity index 100% rename from swh/web/tests/api/test_api_lookup.py rename to swh/web/api/tests/test_api_lookup.py diff --git a/swh/web/tests/api/test_apidoc.py b/swh/web/api/tests/test_apidoc.py similarity index 100% rename from swh/web/tests/api/test_apidoc.py rename to swh/web/api/tests/test_apidoc.py diff --git a/swh/web/tests/api/test_apiresponse.py b/swh/web/api/tests/test_apiresponse.py similarity index 100% rename from swh/web/tests/api/test_apiresponse.py rename to swh/web/api/tests/test_apiresponse.py diff --git a/swh/web/tests/api/test_apiurls.py b/swh/web/api/tests/test_apiurls.py similarity index 100% rename from swh/web/tests/api/test_apiurls.py rename to swh/web/api/tests/test_apiurls.py diff --git a/swh/web/tests/api/test_throttling.py b/swh/web/api/tests/test_throttling.py similarity index 100% rename from swh/web/tests/api/test_throttling.py rename to swh/web/api/tests/test_throttling.py diff --git a/swh/web/tests/api/test_utils.py b/swh/web/api/tests/test_utils.py similarity index 100% rename from swh/web/tests/api/test_utils.py rename to swh/web/api/tests/test_utils.py diff --git a/swh/web/tests/api/views/__init__.py b/swh/web/api/tests/views/__init__.py similarity index 100% rename from swh/web/tests/api/views/__init__.py rename to swh/web/api/tests/views/__init__.py diff --git a/swh/web/tests/api/views/test_content.py b/swh/web/api/tests/views/test_content.py similarity index 99% rename from swh/web/tests/api/views/test_content.py rename to swh/web/api/tests/views/test_content.py index 63deb7da..a2afbbd8 100644 --- a/swh/web/tests/api/views/test_content.py +++ b/swh/web/api/tests/views/test_content.py @@ -1,251 +1,251 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -from swh.web.tests.conftest import fossology_missing from swh.web.tests.data import random_content from swh.web.tests.helpers import ( check_api_get_responses, check_api_post_responses, check_http_get_response, + fossology_missing, ) from swh.web.utils import reverse def test_api_content_filetype(api_client, indexer_data, content): indexer_data.content_add_mimetype(content["sha1"]) url = reverse( "api-1-content-filetype", url_args={"q": "sha1_git:%s" % content["sha1_git"]} ) rv = check_api_get_responses(api_client, url, status_code=200) content_url = reverse( "api-1-content", url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) expected_data = indexer_data.content_get_mimetype(content["sha1"]) expected_data["content_url"] = content_url assert rv.data == expected_data def test_api_content_filetype_sha_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-filetype", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No filetype information found for content " "sha1:%s." % unknown_content_["sha1"], } def test_api_content_language_sha_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-language", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No language information found for content " "sha1:%s." % unknown_content_["sha1"], } @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") def test_api_content_license(api_client, indexer_data, content): indexer_data.content_add_license(content["sha1"]) url = reverse( "api-1-content-license", url_args={"q": "sha1_git:%s" % content["sha1_git"]} ) rv = check_api_get_responses(api_client, url, status_code=200) content_url = reverse( "api-1-content", url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) expected_data = list(indexer_data.content_get_license(content["sha1"])) for license in expected_data: del license["id"] assert rv.data == { "content_url": content_url, "id": content["sha1"], "facts": expected_data, } def test_api_content_license_sha_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-license", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No license information found for content " "sha1:%s." % unknown_content_["sha1"], } def test_api_content_metadata(api_client, archive_data, content): url = reverse("api-1-content", {"q": "sha1:%s" % content["sha1"]}) rv = check_api_get_responses(api_client, url, status_code=200) expected_data = archive_data.content_get(content["sha1"]) for key, view_name in ( ("data_url", "api-1-content-raw"), ("license_url", "api-1-content-license"), ("language_url", "api-1-content-language"), ("filetype_url", "api-1-content-filetype"), ): expected_data[key] = reverse( view_name, url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) assert rv.data == expected_data def test_api_content_not_found(api_client): unknown_content_ = random_content() url = reverse("api-1-content", url_args={"q": "sha1:%s" % unknown_content_["sha1"]}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Content with sha1 checksum equals to %s not found!" % unknown_content_["sha1"], } def test_api_content_raw_ko_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-raw", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Content with sha1 checksum equals to %s not found!" % unknown_content_["sha1"], } def test_api_content_raw_text(api_client, archive_data, content): url = reverse("api-1-content-raw", url_args={"q": "sha1:%s" % content["sha1"]}) rv = check_http_get_response(api_client, url, status_code=200) assert rv["Content-Type"] == "application/octet-stream" assert ( rv["Content-disposition"] == 'attachment; filename="content_sha1_%s_raw"' % content["sha1"] ) expected_data = archive_data.content_get_data(content["sha1"]) assert b"".join(rv.streaming_content) == expected_data["data"] assert int(rv["Content-Length"]) == len(expected_data["data"]) def test_api_content_raw_text_with_filename(api_client, archive_data, content): url = reverse( "api-1-content-raw", url_args={"q": "sha1:%s" % content["sha1"]}, query_params={"filename": "filename.txt"}, ) rv = check_http_get_response(api_client, url, status_code=200) assert rv["Content-disposition"] == 'attachment; filename="filename.txt"' assert rv["Content-Type"] == "application/octet-stream" expected_data = archive_data.content_get_data(content["sha1"]) assert b"".join(rv.streaming_content) == expected_data["data"] assert int(rv["Content-Length"]) == len(expected_data["data"]) @pytest.mark.parametrize( "encoded,expected", [ # From https://datatracker.ietf.org/doc/html/rfc5987#section-3.2.2 ( "%c2%a3%20and%20%e2%82%ac%20rates.txt", "%C2%A3%20and%20%E2%82%AC%20rates.txt", ), ("%A3%20rates.txt", "%EF%BF%BD%20rates.txt"), # found in the wild ( "Th%C3%A9orie%20de%20sant%C3%A9-aide-justice.pdf", "Th%C3%A9orie%20de%20sant%C3%A9-aide-justice.pdf", ), ], ) def test_api_content_raw_text_with_nonascii_filename( api_client, archive_data, content, encoded, expected ): url = reverse( "api-1-content-raw", url_args={"q": "sha1:%s" % content["sha1"]}, ) rv = check_http_get_response( api_client, f"{url}?filename={encoded}", status_code=200 ) # technically, ISO8859-1 is allowed too assert rv["Content-disposition"].isascii(), rv["Content-disposition"] assert rv["Content-disposition"] == ( f"attachment; filename*=utf-8''{expected}" ), rv["Content-disposition"] assert rv["Content-Type"] == "application/octet-stream" expected_data = archive_data.content_get_data(content["sha1"]) assert b"".join(rv.streaming_content) == expected_data["data"] assert int(rv["Content-Length"]) == len(expected_data["data"]) def test_api_check_content_known(api_client, content): url = reverse("api-1-content-known", url_args={"q": content["sha1"]}) rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == { "search_res": [{"found": True, "sha1": content["sha1"]}], "search_stats": {"nbfiles": 1, "pct": 100.0}, } def test_api_check_content_known_post(api_client, content): url = reverse("api-1-content-known") rv = check_api_post_responses( api_client, url, data={"q": content["sha1"]}, status_code=200 ) assert rv.data == { "search_res": [{"found": True, "sha1": content["sha1"]}], "search_stats": {"nbfiles": 1, "pct": 100.0}, } def test_api_check_content_known_not_found(api_client): unknown_content_ = random_content() url = reverse("api-1-content-known", url_args={"q": unknown_content_["sha1"]}) rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == { "search_res": [{"found": False, "sha1": unknown_content_["sha1"]}], "search_stats": {"nbfiles": 1, "pct": 0.0}, } def test_api_content_uppercase(api_client, content): url = reverse( "api-1-content-uppercase-checksum", url_args={"q": content["sha1"].upper()} ) rv = check_http_get_response(api_client, url, status_code=302) redirect_url = reverse("api-1-content", url_args={"q": content["sha1"]}) assert rv["location"] == redirect_url diff --git a/swh/web/tests/api/views/test_directory.py b/swh/web/api/tests/views/test_directory.py similarity index 100% rename from swh/web/tests/api/views/test_directory.py rename to swh/web/api/tests/views/test_directory.py diff --git a/swh/web/tests/api/views/test_graph.py b/swh/web/api/tests/views/test_graph.py similarity index 100% rename from swh/web/tests/api/views/test_graph.py rename to swh/web/api/tests/views/test_graph.py diff --git a/swh/web/tests/api/views/test_identifiers.py b/swh/web/api/tests/views/test_identifiers.py similarity index 100% rename from swh/web/tests/api/views/test_identifiers.py rename to swh/web/api/tests/views/test_identifiers.py diff --git a/swh/web/tests/api/views/test_metadata.py b/swh/web/api/tests/views/test_metadata.py similarity index 99% rename from swh/web/tests/api/views/test_metadata.py rename to swh/web/api/tests/views/test_metadata.py index 19a84181..fa536dc0 100644 --- a/swh/web/tests/api/views/test_metadata.py +++ b/swh/web/api/tests/views/test_metadata.py @@ -1,255 +1,255 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import attr from hypothesis import given, settings from hypothesis.strategies import sets import pytest from swh.model.hypothesis_strategies import raw_extrinsic_metadata from swh.model.model import Origin -from swh.web.tests.api.views.utils import scroll_results +from swh.web.api.tests.views.utils import scroll_results from swh.web.tests.helpers import check_api_get_responses, check_http_get_response from swh.web.utils import reverse @given(raw_extrinsic_metadata()) def test_api_raw_extrinsic_metadata(api_client, subtest, metadata): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.metadata_authority_add([metadata.authority]) archive_data.metadata_fetcher_add([metadata.fetcher]) archive_data.raw_extrinsic_metadata_add([metadata]) authority = metadata.authority url = reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": str(metadata.target)}, query_params={"authority": f"{authority.type.value} {authority.url}"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 expected_result = metadata.to_dict() del expected_result["id"] del expected_result["metadata"] metadata_url = rv.data[0]["metadata_url"] expected_result["metadata_url"] = metadata_url expected_result["discovery_date"] = expected_result[ "discovery_date" ].isoformat() if expected_result["target"].startswith(("swh:1:ori:", "swh:1:emd:")): # non-core SWHID are hidden from the API del expected_result["target"] assert rv.data == [expected_result] rv = check_http_get_response(api_client, metadata_url, status_code=200) assert rv["Content-Type"] == "application/octet-stream" assert ( rv["Content-Disposition"] == f'attachment; filename="{metadata.target}_metadata"' ) assert rv.content == metadata.metadata @settings(max_examples=1) @given(raw_extrinsic_metadata()) def test_api_raw_extrinsic_metadata_origin_filename(api_client, subtest, metadata): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): nonlocal metadata origin = Origin(url="http://example.com/repo.git") metadata = attr.evolve(metadata, target=origin.swhid()) metadata = attr.evolve(metadata, id=metadata.compute_hash()) archive_data.origin_add([origin]) archive_data.metadata_authority_add([metadata.authority]) archive_data.metadata_fetcher_add([metadata.fetcher]) archive_data.raw_extrinsic_metadata_add([metadata]) authority = metadata.authority url = reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": str(metadata.target)}, query_params={"authority": f"{authority.type.value} {authority.url}"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 metadata_url = rv.data[0]["metadata_url"] rv = check_http_get_response(api_client, metadata_url, status_code=200) assert rv["Content-Type"] == "application/octet-stream" assert ( rv["Content-Disposition"] == 'attachment; filename="http_example_com_repo_git_metadata"' ) assert rv.content == metadata.metadata @pytest.mark.parametrize("limit", [1, 2, 10, 100]) @given(sets(raw_extrinsic_metadata(), min_size=1)) def test_api_raw_extrinsic_metadata_scroll(api_client, subtest, limit, meta): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): # Make all metadata objects use the same authority and target metadata0 = next(iter(meta)) metadata = { attr.evolve(m, authority=metadata0.authority, target=metadata0.target) for m in meta } # Metadata ids must also be updated as they depend on authority and target metadata = {attr.evolve(m, id=m.compute_hash()) for m in metadata} authority = metadata0.authority archive_data.metadata_authority_add([authority]) archive_data.metadata_fetcher_add(list({m.fetcher for m in metadata})) archive_data.raw_extrinsic_metadata_add(metadata) url = reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": str(metadata0.target)}, query_params={ "authority": f"{authority.type.value} {authority.url}", "limit": limit, }, ) results = scroll_results(api_client, url) expected_results = [m.to_dict() for m in metadata] for expected_result in expected_results: del expected_result["id"] del expected_result["metadata"] expected_result["discovery_date"] = expected_result[ "discovery_date" ].isoformat() if expected_result["target"].startswith(("swh:1:ori:", "swh:1:emd:")): # non-core SWHID are hidden from the API del expected_result["target"] assert len(results) == len(expected_results) for result in results: del result["metadata_url"] assert result in expected_results, str(expected_results) _swhid = "swh:1:dir:a2faa28028657859c16ff506924212b33f0e1307" @pytest.mark.parametrize( "status_code,url_args,query_params", [ pytest.param( 200, {"target": _swhid}, {"authority": "forge http://example.org"}, id="minimal working", ), pytest.param( 200, {"target": _swhid}, { "authority": "forge http://example.org", "after": "2021-06-18T09:31:09", "limit": 100, }, id="maximal working", ), pytest.param( 400, {"target": _swhid}, {"authority": "foo http://example.org"}, id="invalid authority type", ), pytest.param( 400, {"target": _swhid}, { "authority": "forge http://example.org", "after": "yesterday", }, id="invalid 'after' format", ), pytest.param( 400, {"target": _swhid}, { "authority": "forge http://example.org", "limit": "abc", }, id="invalid 'limit'", ), ], ) def test_api_raw_extrinsic_metadata_check_params( api_client, archive_data, status_code, url_args, query_params ): url = reverse( "api-1-raw-extrinsic-metadata-swhid", url_args=url_args, query_params=query_params, ) check_api_get_responses(api_client, url, status_code=status_code) @given(raw_extrinsic_metadata()) def test_api_raw_extrinsic_metadata_list_authorities(api_client, subtest, metadata): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.metadata_authority_add([metadata.authority]) archive_data.metadata_fetcher_add([metadata.fetcher]) archive_data.raw_extrinsic_metadata_add([metadata]) authority = metadata.authority url = reverse( "api-1-raw-extrinsic-metadata-swhid-authorities", url_args={"target": str(metadata.target)}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_results = [ { "type": authority.type.value, "url": authority.url, "metadata_list_url": "http://testserver" + reverse( "api-1-raw-extrinsic-metadata-swhid", url_args={"target": str(metadata.target)}, query_params={ "authority": f"{authority.type.value} {authority.url}" }, ), } ] assert rv.data == expected_results def test_api_raw_extrinsic_metadata_origin_redirect(api_client, archive_data): origin = Origin(url="http://example.com/repo.git") archive_data.origin_add([origin]) url = reverse( "api-1-raw-extrinsic-metadata-origin-authorities", url_args={"origin_url": origin.url}, ) rv = check_http_get_response(api_client, url, status_code=302) redirect_url = reverse( "api-1-raw-extrinsic-metadata-swhid-authorities", url_args={"target": str(origin.swhid())}, ) assert rv["location"] == redirect_url diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/api/tests/views/test_origin.py similarity index 99% rename from swh/web/tests/api/views/test_origin.py rename to swh/web/api/tests/views/test_origin.py index 5802b5dd..c468af32 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/api/tests/views/test_origin.py @@ -1,867 +1,867 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import json from hypothesis import given import pytest from swh.indexer.storage.model import OriginIntrinsicMetadataRow from swh.model.hashutil import hash_to_bytes from swh.model.model import Origin, OriginVisit, OriginVisitStatus from swh.search.exc import SearchQuerySyntaxError from swh.search.interface import PagedResult from swh.storage.exc import StorageAPIError, StorageDBError from swh.storage.utils import now +from swh.web.api.tests.views.utils import scroll_results from swh.web.api.utils import enrich_origin, enrich_origin_visit -from swh.web.tests.api.views.utils import scroll_results from swh.web.tests.data import ( INDEXER_TOOL, ORIGIN_MASTER_DIRECTORY, ORIGIN_MASTER_REVISION, ORIGIN_METADATA_KEY, ORIGIN_METADATA_VALUE, ) from swh.web.tests.helpers import check_api_get_responses from swh.web.tests.strategies import new_origin, new_snapshots, visit_dates from swh.web.utils import reverse from swh.web.utils.exc import BadInputExc from swh.web.utils.origin_visits import get_origin_visits def test_api_lookup_origin_visits_raise_error(api_client, origin, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "voluntary error to check the bad request middleware." mock_get_origin_visits.side_effect = BadInputExc(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": origin["url"]}) rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == {"exception": "BadInputExc", "reason": err_msg} def test_api_lookup_origin_visits_raise_swh_storage_error_db( api_client, origin, mocker ): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage exploded! Will be back online shortly!" mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": origin["url"]}) rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageDBError", "reason": "An unexpected error occurred in the backend: %s" % err_msg, } def test_api_lookup_origin_visits_raise_swh_storage_error_api( api_client, origin, mocker ): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage API dropped dead! Will resurrect asap!" mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": origin["url"]}) rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageAPIError", "reason": "An unexpected error occurred in the api backend: %s" % err_msg, } @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits( api_client, subtest, new_origin, visit_dates, new_snapshots ): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [ OriginVisit( origin=new_origin.url, date=visit_date, type="git", ) ] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=now(), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]["visit"], all_visits[2:]), ): url = reverse( "api-1-origin-visits", url_args={"origin_url": new_origin.url}, query_params={"per_page": 2, "last_visit": last_visit}, ) rv = check_api_get_responses(api_client, url, status_code=200) for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request, ) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id( api_client, subtest, new_origin, visit_dates, new_snapshots ): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [ OriginVisit( origin=new_origin.url, date=visit_date, type="git", ) ] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=now(), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]["visit"], all_visits[2:4]), ): url = reverse( "api-1-origin-visits", url_args={"origin_url": new_origin.url}, query_params={"per_page": 2, "last_visit": last_visit}, ) rv = check_api_get_responses(api_client, url, status_code=200) for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request, ) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit( api_client, subtest, new_origin, visit_dates, new_snapshots ): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [ OriginVisit( origin=new_origin.url, date=visit_date, type="git", ) ] )[0] visit_id = origin_visit.visit archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=visit_date + timedelta(minutes=5), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit", url_args={"origin_url": new_origin.url, "visit_id": visit_id}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_get_by(new_origin.url, visit_id) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(new_origin()) def test_api_lookup_origin_visit_latest_no_visit(api_client, archive_data, new_origin): archive_data.origin_add([new_origin]) url = reverse("api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No visit for origin %s found" % new_origin.url, } @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest( api_client, subtest, new_origin, visit_dates, new_snapshots ): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.origin_add([new_origin]) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [ OriginVisit( origin=new_origin.url, date=visit_date, type="git", ) ] )[0] visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit_ids[0], date=now(), status="full", snapshot=new_snapshots[0].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit-latest", url_args={"origin_url": new_origin.url} ) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_status_get_latest( new_origin.url, type="git" ) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest_with_snapshot( api_client, subtest, new_origin, visit_dates, new_snapshots ): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.origin_add([new_origin]) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [ OriginVisit( origin=new_origin.url, date=visit_date, type="git", ) ] )[0] visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) # Add snapshot to the latest visit visit_id = visit_ids[-1] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit_id, date=now(), status="full", snapshot=new_snapshots[0].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}, query_params={"require_snapshot": True}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_status_get_latest( new_origin.url, type="git", require_snapshot=True ) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit def test_api_lookup_origin_visit_not_found(api_client, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v["visit"] for v in all_visits]) url = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": max_visit_id + 1}, ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin %s or its visit with id %s not found!" % (origin["url"], max_visit_id + 1), } def test_api_origins_wrong_input(api_client, archive_data): """Should fail with 400 if the input is deprecated.""" # fail if wrong input url = reverse("api-1-origins", query_params={"origin_from": 1}) rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == { "exception": "BadInputExc", "reason": "Please use the Link header to browse through result", } def test_api_origins(api_client, archive_data): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} # Get only one url = reverse("api-1-origins", query_params={"origin_count": 1}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= origin_urls # Get all url = reverse("api-1-origins", query_params={"origin_count": len(origins)}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls # Get "all + 10" url = reverse("api-1-origins", query_params={"origin_count": len(origins) + 10}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls @pytest.mark.parametrize("origin_count", [1, 2, 10, 100]) def test_api_origins_scroll(api_client, archive_data, origin_count): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} url = reverse("api-1-origins", query_params={"origin_count": origin_count}) results = scroll_results(api_client, url) assert len(results) == len(origins) assert {origin["url"] for origin in results} == origin_urls def test_api_origin_by_url(api_client, archive_data, origin): origin_url = origin["url"] url = reverse("api-1-origin", url_args={"origin_url": origin_url}) rv = check_api_get_responses(api_client, url, status_code=200) expected_origin = archive_data.origin_get([origin_url])[0] expected_origin = enrich_origin(expected_origin, rv.wsgi_request) assert rv.data == expected_origin @given(new_origin()) def test_api_origin_not_found(api_client, new_origin): url = reverse("api-1-origin", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin with url %s not found!" % new_origin.url, } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.utils.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } # Search for 'github.com', get only one url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 1}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] # Search for 'github.com', get all url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] # Search for 'github.com', get more than available url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 10}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_words(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.utils.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "com github"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "memononen libtess2"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } url = reverse( "api-1-origin-search", url_args={"url_pattern": "libtess2 memononen"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_visit_type(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.utils.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={ "url_pattern": "github com", }, query_params={"visit_type": "git"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={ "url_pattern": "github com", }, query_params={"visit_type": "foo"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == [] def test_api_origin_search_use_ql(api_client, mocker): expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } ORIGINS = [{"url": origin} for origin in expected_origins] mock_archive_search = mocker.patch("swh.web.utils.archive.search") mock_archive_search.origin_search.return_value = PagedResult( results=ORIGINS, next_page_token=None, ) query = "origin : 'github.com'" url = reverse( "api-1-origin-search", url_args={"url_pattern": query}, query_params={"visit_type": "git", "use_ql": "true"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins mock_archive_search.origin_search.assert_called_with( query=query, page_token=None, with_visit=False, visit_types=["git"], limit=70 ) def test_api_origin_search_ql_syntax_error(api_client, mocker): mock_archive_search = mocker.patch("swh.web.utils.archive.search") mock_archive_search.origin_search.side_effect = SearchQuerySyntaxError( "Invalid syntax" ) query = "this is not a valid query" url = reverse( "api-1-origin-search", url_args={"url_pattern": query}, query_params={"visit_type": "git", "use_ql": "true"}, ) rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == { "exception": "BadInputExc", "reason": "Syntax error in search query: Invalid syntax", } mock_archive_search.origin_search.assert_called_with( query=query, page_token=None, with_visit=False, visit_types=["git"], limit=70 ) @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) @pytest.mark.parametrize("limit", [1, 2, 3, 10]) def test_api_origin_search_scroll(api_client, archive_data, mocker, limit, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.utils.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": limit}, ) results = scroll_results(api_client, url) assert {origin["url"] for origin in results} == expected_origins @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_limit(api_client, archive_data, tests_data, mocker, backend): if backend == "swh-search": tests_data["search"].origin_update( [{"url": "http://foobar/{}".format(i)} for i in range(2000)] ) else: # equivalent to not configuring search in the config mocker.patch("swh.web.utils.archive.search", None) archive_data.origin_add( [Origin(url="http://foobar/{}".format(i)) for i in range(2000)] ) url = reverse( "api-1-origin-search", url_args={"url_pattern": "foobar"}, query_params={"limit": 1050}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1000 @pytest.mark.parametrize("backend", ["swh-search", "swh-indexer-storage"]) def test_api_origin_metadata_search(api_client, mocker, backend): mock_config = mocker.patch("swh.web.utils.archive.config") mock_config.get_config.return_value = { "search_config": {"metadata_backend": backend} } url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE} ) rv = check_api_get_responses(api_client, url, status_code=200) rv.data = sorted(rv.data, key=lambda d: d["url"]) expected_data = sorted( [ { "url": origin_url, "metadata": { "from_directory": ORIGIN_MASTER_DIRECTORY[origin_url], "tool": { "name": INDEXER_TOOL["tool_name"], "version": INDEXER_TOOL["tool_version"], "configuration": INDEXER_TOOL["tool_configuration"], "id": INDEXER_TOOL["id"], }, "mappings": [], }, } for origin_url in sorted(ORIGIN_MASTER_REVISION.keys()) ], key=lambda d: d["url"], ) for i in range(len(expected_data)): expected = expected_data[i] response = rv.data[i] metadata = response["metadata"].pop("metadata") assert any( [ORIGIN_METADATA_VALUE in json.dumps(val) for val in metadata.values()] ) assert response == expected def test_api_origin_metadata_search_limit(api_client, mocker): mock_idx_storage = mocker.patch("swh.web.utils.archive.idx_storage") oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [ OriginIntrinsicMetadataRow( id=origin_url, from_directory=hash_to_bytes(directory), indexer_configuration_id=INDEXER_TOOL["id"], metadata={ORIGIN_METADATA_KEY: ORIGIN_METADATA_VALUE}, mappings=[], ) for origin_url, directory in ORIGIN_MASTER_DIRECTORY.items() ] url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE} ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=70) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE, "limit": 10}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=10) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE, "limit": 987}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=100) def test_api_origin_intrinsic_metadata(api_client, origin): url = reverse( "api-origin-intrinsic-metadata", url_args={"origin_url": origin["url"]} ) rv = check_api_get_responses(api_client, url, status_code=200) assert ORIGIN_METADATA_KEY in rv.data assert rv.data[ORIGIN_METADATA_KEY] == ORIGIN_METADATA_VALUE def test_api_origin_metadata_search_invalid(api_client, mocker): mock_idx_storage = mocker.patch("swh.web.utils.archive.idx_storage") url = reverse("api-1-origin-metadata-search") check_api_get_responses(api_client, url, status_code=400) mock_idx_storage.assert_not_called() @pytest.mark.parametrize("backend", ["swh-counters", "swh-storage"]) def test_api_stat_counters(api_client, mocker, backend): mock_config = mocker.patch("swh.web.utils.archive.config") mock_config.get_config.return_value = {"counters_backend": backend} url = reverse("api-1-stat-counters") rv = check_api_get_responses(api_client, url, status_code=200) counts = json.loads(rv.content) for obj in ["content", "origin", "release", "directory", "revision"]: assert counts.get(obj, 0) > 0 @pytest.fixture def archived_origins(archive_data): page_result = archive_data.origin_list(page_token=None, limit=10000) origins = [origin.to_dict() for origin in page_result.results] for origin in origins: ovs = archive_data.origin_visit_get_with_statuses(origin["url"]).results del origin["id"] origin["type"] = ovs[0].visit.type return origins def test_api_origin_search_empty_pattern(api_client, archived_origins): url = reverse( "api-1-origin-search", url_args={"url_pattern": ""}, query_params={"limit": 10000}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {o["url"] for o in rv.data} == {o["url"] for o in archived_origins} def test_api_origin_search_empty_pattern_and_visit_type(api_client, archived_origins): visit_types = {o["type"] for o in archived_origins} for visit_type in visit_types: url = reverse( "api-1-origin-search", url_args={"url_pattern": ""}, query_params={"visit_type": visit_type, "limit": 10000}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {o["url"] for o in rv.data} == { o["url"] for o in archived_origins if o["type"] == visit_type } @pytest.mark.parametrize( "view_name, extra_args", [ ("api-1-origin", {}), ("api-1-origin-visits", {}), ("api-1-origin-visit", {"visit_id": 1}), ("api-1-origin-visit-latest", {}), ("api-origin-intrinsic-metadata", {}), ], ) def test_api_origin_by_url_with_extra_trailing_slash( api_client, origin, view_name, extra_args ): origin_url = origin["url"] assert not origin_url.endswith("/") origin_url = origin_url + "/" url = reverse(view_name, url_args={"origin_url": origin_url, **extra_args}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": f"Origin with url {origin_url} not found!", } diff --git a/swh/web/tests/api/views/test_ping.py b/swh/web/api/tests/views/test_ping.py similarity index 100% rename from swh/web/tests/api/views/test_ping.py rename to swh/web/api/tests/views/test_ping.py diff --git a/swh/web/tests/api/views/test_raw.py b/swh/web/api/tests/views/test_raw.py similarity index 100% rename from swh/web/tests/api/views/test_raw.py rename to swh/web/api/tests/views/test_raw.py diff --git a/swh/web/tests/api/views/test_release.py b/swh/web/api/tests/views/test_release.py similarity index 100% rename from swh/web/tests/api/views/test_release.py rename to swh/web/api/tests/views/test_release.py diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/api/tests/views/test_revision.py similarity index 100% rename from swh/web/tests/api/views/test_revision.py rename to swh/web/api/tests/views/test_revision.py diff --git a/swh/web/tests/api/views/test_snapshot.py b/swh/web/api/tests/views/test_snapshot.py similarity index 100% rename from swh/web/tests/api/views/test_snapshot.py rename to swh/web/api/tests/views/test_snapshot.py diff --git a/swh/web/tests/api/views/test_stat.py b/swh/web/api/tests/views/test_stat.py similarity index 100% rename from swh/web/tests/api/views/test_stat.py rename to swh/web/api/tests/views/test_stat.py diff --git a/swh/web/tests/api/views/utils.py b/swh/web/api/tests/views/utils.py similarity index 100% rename from swh/web/tests/api/views/utils.py rename to swh/web/api/tests/views/utils.py diff --git a/swh/web/tests/archive_coverage/__init__.py b/swh/web/archive_coverage/tests/__init__.py similarity index 100% rename from swh/web/tests/archive_coverage/__init__.py rename to swh/web/archive_coverage/tests/__init__.py diff --git a/swh/web/tests/archive_coverage/test_app.py b/swh/web/archive_coverage/tests/test_app.py similarity index 100% rename from swh/web/tests/archive_coverage/test_app.py rename to swh/web/archive_coverage/tests/test_app.py diff --git a/swh/web/tests/archive_coverage/test_coverage.py b/swh/web/archive_coverage/tests/test_coverage.py similarity index 100% rename from swh/web/tests/archive_coverage/test_coverage.py rename to swh/web/archive_coverage/tests/test_coverage.py diff --git a/swh/web/tests/auth/__init__.py b/swh/web/auth/tests/__init__.py similarity index 100% rename from swh/web/tests/auth/__init__.py rename to swh/web/auth/tests/__init__.py diff --git a/swh/web/tests/auth/test_migrations.py b/swh/web/auth/tests/test_migrations.py similarity index 100% rename from swh/web/tests/auth/test_migrations.py rename to swh/web/auth/tests/test_migrations.py diff --git a/swh/web/tests/auth/test_utils.py b/swh/web/auth/tests/test_utils.py similarity index 100% rename from swh/web/tests/auth/test_utils.py rename to swh/web/auth/tests/test_utils.py diff --git a/swh/web/tests/auth/test_views.py b/swh/web/auth/tests/test_views.py similarity index 100% rename from swh/web/tests/auth/test_views.py rename to swh/web/auth/tests/test_views.py diff --git a/swh/web/tests/badges/__init__.py b/swh/web/badges/tests/__init__.py similarity index 100% rename from swh/web/tests/badges/__init__.py rename to swh/web/badges/tests/__init__.py diff --git a/swh/web/tests/badges/test_app.py b/swh/web/badges/tests/test_app.py similarity index 100% rename from swh/web/tests/badges/test_app.py rename to swh/web/badges/tests/test_app.py diff --git a/swh/web/tests/badges/test_badges.py b/swh/web/badges/tests/test_badges.py similarity index 100% rename from swh/web/tests/badges/test_badges.py rename to swh/web/badges/tests/test_badges.py diff --git a/swh/web/tests/banners/__init__.py b/swh/web/banners/tests/__init__.py similarity index 100% rename from swh/web/tests/banners/__init__.py rename to swh/web/banners/tests/__init__.py diff --git a/swh/web/tests/banners/test_app.py b/swh/web/banners/tests/test_app.py similarity index 100% rename from swh/web/tests/banners/test_app.py rename to swh/web/banners/tests/test_app.py diff --git a/swh/web/tests/banners/test_fundraising.py b/swh/web/banners/tests/test_fundraising.py similarity index 100% rename from swh/web/tests/banners/test_fundraising.py rename to swh/web/banners/tests/test_fundraising.py diff --git a/swh/web/tests/browse/__init__.py b/swh/web/browse/tests/__init__.py similarity index 100% rename from swh/web/tests/browse/__init__.py rename to swh/web/browse/tests/__init__.py diff --git a/swh/web/tests/browse/test_snapshot_context.py b/swh/web/browse/tests/test_snapshot_context.py similarity index 100% rename from swh/web/tests/browse/test_snapshot_context.py rename to swh/web/browse/tests/test_snapshot_context.py diff --git a/swh/web/tests/browse/test_utils.py b/swh/web/browse/tests/test_utils.py similarity index 100% rename from swh/web/tests/browse/test_utils.py rename to swh/web/browse/tests/test_utils.py diff --git a/swh/web/tests/browse/views/__init__.py b/swh/web/browse/tests/views/__init__.py similarity index 100% rename from swh/web/tests/browse/views/__init__.py rename to swh/web/browse/tests/views/__init__.py diff --git a/swh/web/tests/browse/views/test_content.py b/swh/web/browse/tests/views/test_content.py similarity index 100% rename from swh/web/tests/browse/views/test_content.py rename to swh/web/browse/tests/views/test_content.py diff --git a/swh/web/tests/browse/views/test_directory.py b/swh/web/browse/tests/views/test_directory.py similarity index 100% rename from swh/web/tests/browse/views/test_directory.py rename to swh/web/browse/tests/views/test_directory.py diff --git a/swh/web/tests/browse/views/test_identifiers.py b/swh/web/browse/tests/views/test_identifiers.py similarity index 100% rename from swh/web/tests/browse/views/test_identifiers.py rename to swh/web/browse/tests/views/test_identifiers.py diff --git a/swh/web/tests/browse/views/test_iframe.py b/swh/web/browse/tests/views/test_iframe.py similarity index 100% rename from swh/web/tests/browse/views/test_iframe.py rename to swh/web/browse/tests/views/test_iframe.py diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/browse/tests/views/test_origin.py similarity index 100% rename from swh/web/tests/browse/views/test_origin.py rename to swh/web/browse/tests/views/test_origin.py diff --git a/swh/web/tests/browse/views/test_release.py b/swh/web/browse/tests/views/test_release.py similarity index 100% rename from swh/web/tests/browse/views/test_release.py rename to swh/web/browse/tests/views/test_release.py diff --git a/swh/web/tests/browse/views/test_revision.py b/swh/web/browse/tests/views/test_revision.py similarity index 100% rename from swh/web/tests/browse/views/test_revision.py rename to swh/web/browse/tests/views/test_revision.py diff --git a/swh/web/tests/browse/views/test_snapshot.py b/swh/web/browse/tests/views/test_snapshot.py similarity index 100% rename from swh/web/tests/browse/views/test_snapshot.py rename to swh/web/browse/tests/views/test_snapshot.py diff --git a/swh/web/tests/conftest.py b/swh/web/conftest.py similarity index 99% rename from swh/web/tests/conftest.py rename to swh/web/conftest.py index 2a2f2975..83956744 100644 --- a/swh/web/tests/conftest.py +++ b/swh/web/conftest.py @@ -1,1259 +1,1254 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from datetime import timedelta import functools from importlib import import_module, reload import json import os import random -import shutil import sys import time from typing import Any, Dict, List, Optional from _pytest.python import Function from hypothesis import HealthCheck from hypothesis import settings as hypothesis_settings import pytest from pytest_django.fixtures import SettingsWrapper from django.conf import settings from django.contrib.auth.models import User from django.core.cache import cache from django.test.utils import setup_databases from django.urls import clear_url_caches from rest_framework.test import APIClient, APIRequestFactory from swh.model.hashutil import ( ALGORITHMS, DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex, ) from swh.model.model import Content, Directory from swh.model.swhids import CoreSWHID, ObjectType from swh.scheduler.tests.common import TASK_TYPES from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.algos.revisions_walker import get_revisions_walker from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest from swh.web.auth.utils import ( ADD_FORGE_MODERATOR_PERMISSION, MAILMAP_ADMIN_PERMISSION, MAILMAP_PERMISSION, ) from swh.web.config import get_config from swh.web.save_code_now.origin_save import get_scheduler_load_task_types from swh.web.tests.data import ( get_tests_data, override_storages, random_content, random_sha1, random_sha1_bytes, random_sha256, ) from swh.web.tests.helpers import create_django_permission from swh.web.utils import browsers_supported_image_mimes, converters from swh.web.utils.typing import OriginVisitInfo os.environ["LC_ALL"] = "C.UTF-8" -fossology_missing = shutil.which("nomossa") is None - # Register some hypothesis profiles hypothesis_settings.register_profile("default", hypothesis_settings()) # we use getattr here to keep mypy happy regardless hypothesis version function_scoped_fixture_check = ( [getattr(HealthCheck, "function_scoped_fixture")] if hasattr(HealthCheck, "function_scoped_fixture") else [] ) suppress_health_check = [ HealthCheck.too_slow, HealthCheck.filter_too_much, ] + function_scoped_fixture_check hypothesis_settings.register_profile( "swh-web", hypothesis_settings( deadline=None, suppress_health_check=suppress_health_check, ), ) hypothesis_settings.register_profile( "swh-web-fast", hypothesis_settings( deadline=None, max_examples=5, suppress_health_check=suppress_health_check, ), ) def pytest_addoption(parser): parser.addoption("--swh-web-random-seed", action="store", default=None) def pytest_configure(config): # Use fast hypothesis profile by default if none has been # explicitly specified in pytest option if config.getoption("--hypothesis-profile") is None: hypothesis_settings.load_profile("swh-web-fast") # Small hack in order to be able to run the unit tests # without static assets generated by webpack. # Those assets are not really needed for the Python tests # but the django templates will fail to load due to missing # generated file webpack-stats.json describing the js and css # files to include. # So generate a dummy webpack-stats.json file to overcome # that issue. test_dir = os.path.dirname(__file__) # location of the static folder when running tests through tox data_dir = os.path.join(sys.prefix, "share/swh/web") static_dir = os.path.join(data_dir, "static") if not os.path.exists(static_dir): # location of the static folder when running tests locally with pytest - static_dir = os.path.join(test_dir, "../../../static") + static_dir = os.path.join(test_dir, "../../static") webpack_stats = os.path.join(static_dir, "webpack-stats.json") if os.path.exists(webpack_stats): return - django_apps_dir = os.path.join(test_dir, "../../../swh/web") + django_apps_dir = os.path.join(test_dir, "../../swh/web") if not os.path.exists(django_apps_dir): # location of the applications folder when running tests with tox django_apps_dir = os.path.join(data_dir, "swh/web") bundles = [] _, apps, _ = next(os.walk(django_apps_dir)) for app in apps: app_assets_dir = os.path.join(django_apps_dir, app, "assets") if os.path.exists(app_assets_dir): if os.path.exists(os.path.join(app_assets_dir, "index.js")): bundles.append(app) else: _, app_bundles, _ = next(os.walk(app_assets_dir)) for app_bundle in app_bundles: if os.path.exists( os.path.join(app_assets_dir, app_bundle, "index.js") ): bundles.append(app_bundle) - print(bundles) - mock_webpack_stats = { "status": "done", "publicPath": "/static", "chunks": {}, "assets": {}, } for bundle in bundles: asset = f"js/{bundle}.js" mock_webpack_stats["chunks"][bundle] = [asset] mock_webpack_stats["assets"][asset] = { "name": asset, "publicPath": f"/static/{asset}", } with open(webpack_stats, "w") as outfile: json.dump(mock_webpack_stats, outfile) _swh_web_custom_section = "swh-web custom section" _random_seed_cache_key = "swh-web/random-seed" @pytest.fixture(scope="function", autouse=True) def random_seed(pytestconfig): state = random.getstate() seed = pytestconfig.getoption("--swh-web-random-seed") if seed is None: seed = time.time() seed = int(seed) cache.set(_random_seed_cache_key, seed) random.seed(seed) yield seed random.setstate(state) def pytest_report_teststatus(report, *args): if report.when == "call" and report.outcome == "failed": seed = cache.get(_random_seed_cache_key, None) line = ( f'FAILED {report.nodeid}: Use "pytest --swh-web-random-seed={seed} ' f'{report.nodeid}" to reproduce that test failure with same inputs' ) report.sections.append((_swh_web_custom_section, line)) def pytest_terminal_summary(terminalreporter, *args): reports = terminalreporter.getreports("failed") content = os.linesep.join( text for report in reports for secname, text in report.sections if secname == _swh_web_custom_section ) if content: terminalreporter.ensure_newline() terminalreporter.section(_swh_web_custom_section, sep="-", blue=True, bold=True) terminalreporter.line(content) # Clear Django cache before each test @pytest.fixture(autouse=True) def django_cache_cleared(): cache.clear() # Alias rf fixture from pytest-django @pytest.fixture def request_factory(rf): return rf # Fixture to get test client from Django REST Framework @pytest.fixture def api_client(): return APIClient() # Fixture to get API request factory from Django REST Framework @pytest.fixture def api_request_factory(): return APIRequestFactory() # Initialize tests data @pytest.fixture(scope="function", autouse=True) def tests_data(): data = get_tests_data(reset=True) # Update swh-web configuration to use the in-memory storages # instantiated in the tests.data module override_storages( data["storage"], data["idx_storage"], data["search"], data["counters"] ) return data @pytest.fixture(scope="function") def sha1(): """Fixture returning a valid hexadecimal sha1 value.""" return random_sha1() @pytest.fixture(scope="function") def invalid_sha1(): """Fixture returning an invalid sha1 representation.""" return hash_to_hex(bytes(random.randint(0, 255) for _ in range(50))) @pytest.fixture(scope="function") def sha256(): """Fixture returning a valid hexadecimal sha256 value.""" return random_sha256() def _known_swh_objects(tests_data, object_type): return tests_data[object_type] @pytest.fixture(scope="function") def content(tests_data): """Fixture returning a random content ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "contents")) @pytest.fixture(scope="function") def contents(tests_data): """Fixture returning random contents ingested into the test archive.""" return random.choices( _known_swh_objects(tests_data, "contents"), k=random.randint(2, 8) ) def _new_content(tests_data): while True: new_content = random_content() sha1_bytes = hash_to_bytes(new_content["sha1"]) if tests_data["storage"].content_get_data(sha1_bytes) is None: return new_content @pytest.fixture(scope="function") def unknown_content(tests_data): """Fixture returning a random content not ingested into the test archive.""" return _new_content(tests_data) @pytest.fixture(scope="function") def unknown_contents(tests_data): """Fixture returning random contents not ingested into the test archive.""" new_contents = [] new_content_ids = set() nb_contents = random.randint(2, 8) while len(new_contents) != nb_contents: new_content = _new_content(tests_data) if new_content["sha1"] not in new_content_ids: new_contents.append(new_content) new_content_ids.add(new_content["sha1"]) return list(new_contents) @pytest.fixture(scope="function") def empty_content(): """Fixture returning the empty content ingested into the test archive.""" empty_content = Content.from_data(data=b"").to_dict() for algo in DEFAULT_ALGORITHMS: empty_content[algo] = hash_to_hex(empty_content[algo]) return empty_content @functools.lru_cache(maxsize=None) def _content_text(): return list( filter( lambda c: c["mimetype"].startswith("text/"), _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_text(): """ Fixture returning a random textual content ingested into the test archive. """ return random.choice(_content_text()) @functools.lru_cache(maxsize=None) def _content_text_non_utf8(): return list( filter( lambda c: c["mimetype"].startswith("text/") and c["encoding"] not in ("utf-8", "us-ascii"), _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_text_non_utf8(): """Fixture returning a random textual content not encoded to UTF-8 ingested into the test archive. """ return random.choice(_content_text_non_utf8()) @functools.lru_cache(maxsize=None) def _content_application_no_highlight(): return list( filter( lambda c: c["mimetype"].startswith("application/") and c["hljs_language"] == "plaintext", _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_application_no_highlight(): """Fixture returning a random textual content with mimetype starting with application/ and no detected programming language to highlight ingested into the test archive. """ return random.choice(_content_application_no_highlight()) @functools.lru_cache(maxsize=None) def _content_text_no_highlight(): return list( filter( lambda c: c["mimetype"].startswith("text/") and c["hljs_language"] == "plaintext", _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_text_no_highlight(): """Fixture returning a random textual content with no detected programming language to highlight ingested into the test archive. """ return random.choice(_content_text_no_highlight()) @functools.lru_cache(maxsize=None) def _content_image_type(): return list( filter( lambda c: c["mimetype"] in browsers_supported_image_mimes, _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_image_type(): """Fixture returning a random image content ingested into the test archive.""" return random.choice(_content_image_type()) @functools.lru_cache(maxsize=None) def _content_unsupported_image_type_rendering(): return list( filter( lambda c: c["mimetype"].startswith("image/") and c["mimetype"] not in browsers_supported_image_mimes, _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_unsupported_image_type_rendering(): """Fixture returning a random image content ingested into the test archive that can not be rendered by browsers. """ return random.choice(_content_unsupported_image_type_rendering()) @functools.lru_cache(maxsize=None) def _content_utf8_detected_as_binary(): def utf8_binary_detected(content): if content["encoding"] != "binary": return False try: content["raw_data"].decode("utf-8") except Exception: return False else: return True return list( filter(utf8_binary_detected, _known_swh_objects(get_tests_data(), "contents")) ) @pytest.fixture(scope="function") def content_utf8_detected_as_binary(): """Fixture returning a random textual content detected as binary by libmagic while they are valid UTF-8 encoded files. """ return random.choice(_content_utf8_detected_as_binary()) @pytest.fixture(scope="function") def directory(tests_data): """Fixture returning a random directory ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "directories")) @functools.lru_cache(maxsize=None) def _directory_with_entry_type(type_): tests_data = get_tests_data() return list( filter( lambda d: any( [ e["type"] == type_ for e in list(tests_data["storage"].directory_ls(hash_to_bytes(d))) ] ), _known_swh_objects(tests_data, "directories"), ) ) @pytest.fixture(scope="function") def directory_with_subdirs(): """Fixture returning a random directory containing sub directories ingested into the test archive. """ return random.choice(_directory_with_entry_type("dir")) @pytest.fixture(scope="function") def directory_with_files(): """Fixture returning a random directory containing at least one regular file.""" return random.choice(_directory_with_entry_type("file")) @pytest.fixture(scope="function") def unknown_directory(tests_data): """Fixture returning a random directory not ingested into the test archive.""" while True: new_directory = random_sha1() sha1_bytes = hash_to_bytes(new_directory) if list(tests_data["storage"].directory_missing([sha1_bytes])): return new_directory @pytest.fixture(scope="function") def empty_directory(): """Fixture returning the empty directory ingested into the test archive.""" return Directory(entries=()).id.hex() @pytest.fixture(scope="function") def revision(tests_data): """Fixturereturning a random revision ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "revisions")) @pytest.fixture(scope="function") def revisions(tests_data): """Fixture returning random revisions ingested into the test archive.""" return random.choices( _known_swh_objects(tests_data, "revisions"), k=random.randint(2, 8), ) @pytest.fixture(scope="function") def revisions_list(tests_data): """Fixture returning random revisions ingested into the test archive.""" def gen_revisions_list(size): return random.choices( _known_swh_objects(tests_data, "revisions"), k=size, ) return gen_revisions_list @pytest.fixture(scope="function") def unknown_revision(tests_data): """Fixture returning a random revision not ingested into the test archive.""" while True: new_revision = random_sha1() sha1_bytes = hash_to_bytes(new_revision) if tests_data["storage"].revision_get([sha1_bytes])[0] is None: return new_revision def _get_origin_dfs_revisions_walker(tests_data): storage = tests_data["storage"] origin = random.choice(tests_data["origins"][:-1]) snapshot = snapshot_get_latest(storage, origin["url"]) if snapshot.branches[b"HEAD"].target_type.value == "alias": target = snapshot.branches[b"HEAD"].target head = snapshot.branches[target].target else: head = snapshot.branches[b"HEAD"].target return get_revisions_walker("dfs", storage, head) @functools.lru_cache(maxsize=None) def _ancestor_revisions_data(): # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker(get_tests_data()) master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) if not init_rev_found: master_revisions.append(rev) if not rev["parents"]: init_rev_found = True return master_revisions, children @pytest.fixture(scope="function") def ancestor_revisions(): """Fixture returning a pair of revisions ingested into the test archive with an ancestor relation. """ master_revisions, children = _ancestor_revisions_data() # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions) - 1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev["id"]] return { "sha1_git_root": hash_to_hex(root_rev["id"]), "sha1_git": hash_to_hex(ancestor_rev["id"]), "children": [hash_to_hex(r) for r in ancestor_child_revs], } @functools.lru_cache(maxsize=None) def _non_ancestor_revisions_data(): # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker(get_tests_data()) merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev["parents"]) > 1: merge_revs.append(rev) for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) return merge_revs, children @pytest.fixture(scope="function") def non_ancestor_revisions(): """Fixture returning a pair of revisions ingested into the test archive with no ancestor relation. """ merge_revs, children = _non_ancestor_revisions_data() # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev["parents"]): selected_revs = merge_rev["parents"] return { "sha1_git_root": hash_to_hex(selected_revs[0]), "sha1_git": hash_to_hex(selected_revs[1]), } @pytest.fixture(scope="function") def revision_with_submodules(): """Fixture returning a revision that is known to point to a directory with revision entries (aka git submodules) """ return { "rev_sha1_git": "ffcb69001f3f6745dfd5b48f72ab6addb560e234", "rev_dir_sha1_git": "d92a21446387fa28410e5a74379c934298f39ae2", "rev_dir_rev_path": "libtess2", } @pytest.fixture(scope="function") def release(tests_data): """Fixture returning a random release ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "releases")) @pytest.fixture(scope="function") def releases(tests_data): """Fixture returning random releases ingested into the test archive.""" return random.choices( _known_swh_objects(tests_data, "releases"), k=random.randint(2, 8) ) @pytest.fixture(scope="function") def unknown_release(tests_data): """Fixture returning a random release not ingested into the test archive.""" while True: new_release = random_sha1() sha1_bytes = hash_to_bytes(new_release) if tests_data["storage"].release_get([sha1_bytes])[0] is None: return new_release @pytest.fixture(scope="function") def snapshot(tests_data): """Fixture returning a random snapshot ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "snapshots")) @pytest.fixture(scope="function") def unknown_snapshot(tests_data): """Fixture returning a random snapshot not ingested into the test archive.""" while True: new_snapshot = random_sha1() sha1_bytes = hash_to_bytes(new_snapshot) if tests_data["storage"].snapshot_get_branches(sha1_bytes) is None: return new_snapshot @pytest.fixture(scope="function") def origin(tests_data): """Fixture returning a random origin ingested into the test archive.""" return random.choice(_known_swh_objects(tests_data, "origins")) @functools.lru_cache(maxsize=None) def _origin_with_multiple_visits(): tests_data = get_tests_data() origins = [] storage = tests_data["storage"] for origin in tests_data["origins"]: visit_page = storage.origin_visit_get(origin["url"]) if len(visit_page.results) > 1: origins.append(origin) return origins @pytest.fixture(scope="function") def origin_with_multiple_visits(): """Fixture returning a random origin with multiple visits ingested into the test archive. """ return random.choice(_origin_with_multiple_visits()) @functools.lru_cache(maxsize=None) def _origin_with_releases(): tests_data = get_tests_data() origins = [] for origin in tests_data["origins"]: snapshot = snapshot_get_latest(tests_data["storage"], origin["url"]) if any([b.target_type.value == "release" for b in snapshot.branches.values()]): origins.append(origin) return origins @pytest.fixture(scope="function") def origin_with_releases(): """Fixture returning a random origin with releases ingested into the test archive.""" return random.choice(_origin_with_releases()) @functools.lru_cache(maxsize=None) def _origin_with_pull_request_branches(): tests_data = get_tests_data() origins = [] storage = tests_data["storage"] for origin in storage.origin_list(limit=1000).results: snapshot = snapshot_get_latest(storage, origin.url) if any([b"refs/pull/" in b for b in snapshot.branches]): origins.append(origin) return origins @pytest.fixture(scope="function") def origin_with_pull_request_branches(): """Fixture returning a random origin with pull request branches ingested into the test archive. """ return random.choice(_origin_with_pull_request_branches()) @functools.lru_cache(maxsize=None) def _object_type_swhid(object_type): return list( filter( lambda swhid: swhid.object_type == object_type, _known_swh_objects(get_tests_data(), "swhids"), ) ) @pytest.fixture(scope="function") def content_swhid(): """Fixture returning a qualified SWHID for a random content object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.CONTENT)) @pytest.fixture(scope="function") def directory_swhid(): """Fixture returning a qualified SWHID for a random directory object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.DIRECTORY)) @pytest.fixture(scope="function") def release_swhid(): """Fixture returning a qualified SWHID for a random release object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.RELEASE)) @pytest.fixture(scope="function") def revision_swhid(): """Fixture returning a qualified SWHID for a random revision object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.REVISION)) @pytest.fixture(scope="function") def snapshot_swhid(): """Fixture returning a qualified SWHID for a snapshot object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.SNAPSHOT)) @pytest.fixture(scope="function", params=list(ObjectType)) def unknown_core_swhid(request) -> CoreSWHID: """Fixture returning an unknown core SWHID. Tests using this will be called once per object type. """ return CoreSWHID( object_type=request.param, object_id=random_sha1_bytes(), ) # Fixture to manipulate data from a sample archive used in the tests @pytest.fixture(scope="function") def archive_data(tests_data): return _ArchiveData(tests_data) # Fixture to manipulate indexer data from a sample archive used in the tests @pytest.fixture(scope="function") def indexer_data(tests_data): return _IndexerData(tests_data) class _ArchiveData: """ Helper class to manage data from a sample test archive. It is initialized with a reference to an in-memory storage containing raw tests data. It is basically a proxy to Storage interface but it overrides some methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.storage = tests_data["storage"] def __getattr__(self, key): if key == "storage": raise AttributeError(key) # Forward calls to non overridden Storage methods to wrapped # storage instance return getattr(self.storage, key) def content_find(self, content: Dict[str, Any]) -> Dict[str, Any]: cnt_ids_bytes = { algo_hash: hash_to_bytes(content[algo_hash]) for algo_hash in ALGORITHMS if content.get(algo_hash) } cnt = self.storage.content_find(cnt_ids_bytes) return converters.from_content(cnt[0].to_dict()) if cnt else cnt def content_get(self, cnt_id: str) -> Dict[str, Any]: cnt_id_bytes = hash_to_bytes(cnt_id) content = self.storage.content_get([cnt_id_bytes])[0] if content: content_d = content.to_dict() content_d.pop("ctime", None) else: content_d = None return converters.from_swh( content_d, hashess={"sha1", "sha1_git", "sha256", "blake2s256"} ) def content_get_data(self, cnt_id: str) -> Optional[Dict[str, Any]]: cnt_id_bytes = hash_to_bytes(cnt_id) cnt_data = self.storage.content_get_data(cnt_id_bytes) if cnt_data is None: return None return converters.from_content({"data": cnt_data, "sha1": cnt_id_bytes}) def directory_get(self, dir_id): return {"id": dir_id, "content": self.directory_ls(dir_id)} def directory_ls(self, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map( converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes) ) return list(dir_content) def release_get(self, rel_id: str) -> Optional[Dict[str, Any]]: rel_id_bytes = hash_to_bytes(rel_id) rel_data = self.storage.release_get([rel_id_bytes])[0] return converters.from_release(rel_data) if rel_data else None def revision_get(self, rev_id: str) -> Optional[Dict[str, Any]]: rev_id_bytes = hash_to_bytes(rev_id) rev_data = self.storage.revision_get([rev_id_bytes])[0] return converters.from_revision(rev_data) if rev_data else None def revision_log(self, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list( map( converters.from_revision, self.storage.revision_log([rev_id_bytes], limit=limit), ) ) def snapshot_get_latest(self, origin_url): snp = snapshot_get_latest(self.storage, origin_url) return converters.from_snapshot(snp.to_dict()) def origin_get(self, origin_urls): origins = self.storage.origin_get(origin_urls) return [converters.from_origin(o.to_dict()) for o in origins] def origin_visit_get(self, origin_url): next_page_token = None visits = [] while True: visit_page = self.storage.origin_visit_get( origin_url, page_token=next_page_token ) next_page_token = visit_page.next_page_token for visit in visit_page.results: visit_status = self.storage.origin_visit_status_get_latest( origin_url, visit.visit ) visits.append( converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) ) if not next_page_token: break return visits def origin_visit_get_by(self, origin_url: str, visit_id: int) -> OriginVisitInfo: visit = self.storage.origin_visit_get_by(origin_url, visit_id) assert visit is not None visit_status = self.storage.origin_visit_status_get_latest(origin_url, visit_id) assert visit_status is not None return converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) def origin_visit_status_get_latest( self, origin_url, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ): visit_status = origin_get_latest_visit_status( self.storage, origin_url, type=type, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, ) return ( converters.from_origin_visit(visit_status.to_dict()) if visit_status else None ) def snapshot_get(self, snapshot_id): snp = snapshot_get_all_branches(self.storage, hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp.to_dict()) def snapshot_get_branches( self, snapshot_id, branches_from="", branches_count=1000, target_types=None ): partial_branches = self.storage.snapshot_get_branches( hash_to_bytes(snapshot_id), branches_from.encode(), branches_count, target_types, ) return converters.from_partial_branches(partial_branches) def snapshot_get_head(self, snapshot): if snapshot["branches"]["HEAD"]["target_type"] == "alias": target = snapshot["branches"]["HEAD"]["target"] head = snapshot["branches"][target]["target"] else: head = snapshot["branches"]["HEAD"]["target"] return head def snapshot_count_branches(self, snapshot_id): counts = dict.fromkeys(("alias", "release", "revision"), 0) counts.update(self.storage.snapshot_count_branches(hash_to_bytes(snapshot_id))) counts.pop(None, None) counts["branch"] = sum( counts.get(target_type, 0) for target_type in ("content", "directory", "revision") ) return counts class _IndexerData: """ Helper class to manage indexer tests data It is initialized with a reference to an in-memory indexer storage containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.idx_storage = tests_data["idx_storage"] self.mimetype_indexer = tests_data["mimetype_indexer"] self.license_indexer = tests_data["license_indexer"] def content_add_mimetype(self, cnt_id): self.mimetype_indexer.run([hash_to_bytes(cnt_id)]) def content_get_mimetype(self, cnt_id): mimetype = self.idx_storage.content_mimetype_get([hash_to_bytes(cnt_id)])[ 0 ].to_dict() return converters.from_filetype(mimetype) def content_add_license(self, cnt_id): self.license_indexer.run([hash_to_bytes(cnt_id)]) def content_get_license(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) licenses = self.idx_storage.content_fossology_license_get([cnt_id_bytes]) for license in licenses: yield converters.from_swh(license.to_dict(), hashess={"id"}) @pytest.fixture def keycloak_oidc(keycloak_oidc, mocker): keycloak_config = get_config()["keycloak"] keycloak_oidc.server_url = keycloak_config["server_url"] keycloak_oidc.realm_name = keycloak_config["realm_name"] keycloak_oidc.client_id = settings.OIDC_SWH_WEB_CLIENT_ID keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client") keycloak_oidc_client.return_value = keycloak_oidc return keycloak_oidc @pytest.fixture def subtest(request): """A hack to explicitly set up and tear down fixtures. This fixture allows you to set up and tear down fixtures within the test function itself. This is useful (necessary!) for using Hypothesis inside pytest, as hypothesis will call the test function multiple times, without setting up or tearing down fixture state as it is normally the case. Copied from the pytest-subtesthack project, public domain license (https://github.com/untitaker/pytest-subtesthack). """ parent_test = request.node def inner(func): if hasattr(Function, "from_parent"): item = Function.from_parent( parent_test, name=request.function.__name__ + "[]", originalname=request.function.__name__, callobj=func, ) else: item = Function( name=request.function.__name__ + "[]", parent=parent_test, callobj=func ) nextitem = parent_test # prevents pytest from tearing down module fixtures item.ihook.pytest_runtest_setup(item=item) try: item.ihook.pytest_runtest_call(item=item) finally: item.ihook.pytest_runtest_teardown(item=item, nextitem=nextitem) return inner @pytest.fixture def swh_scheduler(swh_scheduler): config = get_config() scheduler = config["scheduler"] config["scheduler"] = swh_scheduler # create load-git and load-hg task types for task_type in TASK_TYPES.values(): # see https://forge.softwareheritage.org/rDSCHc46ffadf7adf24c7eb3ffce062e8ade3818c79cc # noqa task_type["type"] = task_type["type"].replace("load-test-", "load-", 1) swh_scheduler.create_task_type(task_type) # create load-svn task type swh_scheduler.create_task_type( { "type": "load-svn", "description": "Update a Subversion repository", "backend_name": "swh.loader.svn.tasks.DumpMountAndLoadSvnRepository", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) # create load-cvs task type swh_scheduler.create_task_type( { "type": "load-cvs", "description": "Update a CVS repository", "backend_name": "swh.loader.cvs.tasks.DumpMountAndLoadSvnRepository", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) # create load-bzr task type swh_scheduler.create_task_type( { "type": "load-bzr", "description": "Update a Bazaar repository", "backend_name": "swh.loader.bzr.tasks.LoadBazaar", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) # add method to add load-archive-files task type during tests def add_load_archive_task_type(): swh_scheduler.create_task_type( { "type": "load-archive-files", "description": "Load tarballs", "backend_name": "swh.loader.package.archive.tasks.LoadArchive", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) swh_scheduler.add_load_archive_task_type = add_load_archive_task_type yield swh_scheduler config["scheduler"] = scheduler get_scheduler_load_task_types.cache_clear() @pytest.fixture(scope="session") def django_db_setup(request, django_db_blocker, postgresql_proc): from django.conf import settings settings.DATABASES["default"].update( { ("ENGINE", "django.db.backends.postgresql"), ("NAME", get_config()["test_db"]["name"]), ("USER", postgresql_proc.user), ("HOST", postgresql_proc.host), ("PORT", postgresql_proc.port), } ) with django_db_blocker.unblock(): setup_databases( verbosity=request.config.option.verbose, interactive=False, keepdb=False ) @pytest.fixture def staff_user(): return User.objects.create_user(username="admin", password="", is_staff=True) @pytest.fixture def regular_user(): return User.objects.create_user(username="johndoe", password="") @pytest.fixture def regular_user2(): return User.objects.create_user(username="janedoe", password="") @pytest.fixture def add_forge_moderator(): moderator = User.objects.create_user(username="add-forge moderator", password="") moderator.user_permissions.add( create_django_permission(ADD_FORGE_MODERATOR_PERMISSION) ) return moderator @pytest.fixture def mailmap_admin(): mailmap_admin = User.objects.create_user(username="mailmap-admin", password="") mailmap_admin.user_permissions.add( create_django_permission(MAILMAP_ADMIN_PERMISSION) ) return mailmap_admin @pytest.fixture def mailmap_user(): mailmap_user = User.objects.create_user(username="mailmap-user", password="") mailmap_user.user_permissions.add(create_django_permission(MAILMAP_PERMISSION)) return mailmap_user def reload_urlconf(): from django.conf import settings clear_url_caches() # force reloading of all URLs as they depend on django settings # and swh-web configuration urlconfs = [settings.ROOT_URLCONF] urlconfs += [f"{app}.urls" for app in settings.SWH_DJANGO_APPS] for urlconf in urlconfs: try: if urlconf in sys.modules: reload(sys.modules[urlconf]) else: import_module(urlconf) except ModuleNotFoundError: pass class SwhSettingsWrapper(SettingsWrapper): def __setattr__(self, attr: str, value) -> None: super().__setattr__(attr, value) reload_urlconf() def finalize(self) -> None: super().finalize() reload_urlconf() @pytest.fixture def django_settings(): """Override pytest-django settings fixture in order to reload URLs when modifying settings in test and after test execution as most of them depend on installed django apps in swh-web. """ settings = SwhSettingsWrapper() yield settings settings.finalize() diff --git a/swh/web/tests/deposit/__init__.py b/swh/web/deposit/tests/__init__.py similarity index 100% rename from swh/web/tests/deposit/__init__.py rename to swh/web/deposit/tests/__init__.py diff --git a/swh/web/tests/deposit/test_app.py b/swh/web/deposit/tests/test_app.py similarity index 100% rename from swh/web/tests/deposit/test_app.py rename to swh/web/deposit/tests/test_app.py diff --git a/swh/web/tests/deposit/test_views.py b/swh/web/deposit/tests/test_views.py similarity index 100% rename from swh/web/tests/deposit/test_views.py rename to swh/web/deposit/tests/test_views.py diff --git a/swh/web/tests/inbound_email/__init__.py b/swh/web/inbound_email/tests/__init__.py similarity index 100% rename from swh/web/tests/inbound_email/__init__.py rename to swh/web/inbound_email/tests/__init__.py diff --git a/swh/web/tests/inbound_email/resources/__init__.py b/swh/web/inbound_email/tests/resources/__init__.py similarity index 100% rename from swh/web/tests/inbound_email/resources/__init__.py rename to swh/web/inbound_email/tests/resources/__init__.py diff --git a/swh/web/tests/inbound_email/resources/multipart_alternative.eml b/swh/web/inbound_email/tests/resources/multipart_alternative.eml similarity index 100% rename from swh/web/tests/inbound_email/resources/multipart_alternative.eml rename to swh/web/inbound_email/tests/resources/multipart_alternative.eml diff --git a/swh/web/tests/inbound_email/resources/multipart_alternative_html_only.eml b/swh/web/inbound_email/tests/resources/multipart_alternative_html_only.eml similarity index 100% rename from swh/web/tests/inbound_email/resources/multipart_alternative_html_only.eml rename to swh/web/inbound_email/tests/resources/multipart_alternative_html_only.eml diff --git a/swh/web/tests/inbound_email/resources/multipart_alternative_recursive.eml b/swh/web/inbound_email/tests/resources/multipart_alternative_recursive.eml similarity index 100% rename from swh/web/tests/inbound_email/resources/multipart_alternative_recursive.eml rename to swh/web/inbound_email/tests/resources/multipart_alternative_recursive.eml diff --git a/swh/web/tests/inbound_email/resources/multipart_alternative_text_only.eml b/swh/web/inbound_email/tests/resources/multipart_alternative_text_only.eml similarity index 100% rename from swh/web/tests/inbound_email/resources/multipart_alternative_text_only.eml rename to swh/web/inbound_email/tests/resources/multipart_alternative_text_only.eml diff --git a/swh/web/tests/inbound_email/resources/multipart_mixed.eml b/swh/web/inbound_email/tests/resources/multipart_mixed.eml similarity index 100% rename from swh/web/tests/inbound_email/resources/multipart_mixed.eml rename to swh/web/inbound_email/tests/resources/multipart_mixed.eml diff --git a/swh/web/tests/inbound_email/resources/multipart_mixed2.eml b/swh/web/inbound_email/tests/resources/multipart_mixed2.eml similarity index 100% rename from swh/web/tests/inbound_email/resources/multipart_mixed2.eml rename to swh/web/inbound_email/tests/resources/multipart_mixed2.eml diff --git a/swh/web/tests/inbound_email/resources/multipart_mixed_text_only.eml b/swh/web/inbound_email/tests/resources/multipart_mixed_text_only.eml similarity index 100% rename from swh/web/tests/inbound_email/resources/multipart_mixed_text_only.eml rename to swh/web/inbound_email/tests/resources/multipart_mixed_text_only.eml diff --git a/swh/web/tests/inbound_email/resources/multipart_related.eml b/swh/web/inbound_email/tests/resources/multipart_related.eml similarity index 100% rename from swh/web/tests/inbound_email/resources/multipart_related.eml rename to swh/web/inbound_email/tests/resources/multipart_related.eml diff --git a/swh/web/tests/inbound_email/resources/plaintext.eml b/swh/web/inbound_email/tests/resources/plaintext.eml similarity index 100% rename from swh/web/tests/inbound_email/resources/plaintext.eml rename to swh/web/inbound_email/tests/resources/plaintext.eml diff --git a/swh/web/tests/inbound_email/test_management_command.py b/swh/web/inbound_email/tests/test_management_command.py similarity index 100% rename from swh/web/tests/inbound_email/test_management_command.py rename to swh/web/inbound_email/tests/test_management_command.py diff --git a/swh/web/tests/inbound_email/test_utils.py b/swh/web/inbound_email/tests/test_utils.py similarity index 99% rename from swh/web/tests/inbound_email/test_utils.py rename to swh/web/inbound_email/tests/test_utils.py index 3b14cfc9..49ddc561 100644 --- a/swh/web/tests/inbound_email/test_utils.py +++ b/swh/web/inbound_email/tests/test_utils.py @@ -1,336 +1,336 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import email from email.headerregistry import Address from email.message import EmailMessage import email.policy from importlib.resources import open_binary from typing import List import pytest from swh.web.inbound_email import utils def test_extract_recipients(): message = EmailMessage() assert utils.extract_recipients(message) == [] message["To"] = "Test Recipient " assert utils.extract_recipients(message) == [ Address(display_name="Test Recipient", addr_spec="test-recipient@example.com") ] message["Cc"] = ( "test-recipient-2@example.com, " "Another Test Recipient " ) assert utils.extract_recipients(message) == [ Address(display_name="Test Recipient", addr_spec="test-recipient@example.com"), Address(addr_spec="test-recipient-2@example.com"), Address( display_name="Another Test Recipient", addr_spec="test-recipient-3@example.com", ), ] del message["To"] assert utils.extract_recipients(message) == [ Address(addr_spec="test-recipient-2@example.com"), Address( display_name="Another Test Recipient", addr_spec="test-recipient-3@example.com", ), ] def test_single_recipient_matches(): assert ( utils.single_recipient_matches( Address(addr_spec="test@example.com"), "match@example.com" ) is None ) assert utils.single_recipient_matches( Address(addr_spec="match@example.com"), "match@example.com" ) == utils.AddressMatch( recipient=Address(addr_spec="match@example.com"), extension=None ) assert utils.single_recipient_matches( Address(addr_spec="MaTch+12345AbC@exaMple.Com"), "match@example.com" ) == utils.AddressMatch( recipient=Address(addr_spec="MaTch+12345AbC@exaMple.Com"), extension="12345AbC" ) def test_recipient_matches(): message = EmailMessage() assert utils.recipient_matches(message, "match@example.com") == [] message = EmailMessage() message["to"] = "nomatch@example.com" assert utils.recipient_matches(message, "match@example.com") == [] message = EmailMessage() message["to"] = "match@example.com" assert utils.recipient_matches(message, "match@example.com") == [ utils.AddressMatch( recipient=Address(addr_spec="match@example.com"), extension=None ) ] message = EmailMessage() message["to"] = "match+extension@example.com" assert utils.recipient_matches(message, "match@example.com") == [ utils.AddressMatch( recipient=Address(addr_spec="match+extension@example.com"), extension="extension", ) ] message = EmailMessage() message["to"] = "match+weird+plussed+extension@example.com" assert utils.recipient_matches(message, "match@example.com") == [ utils.AddressMatch( recipient=Address(addr_spec="match+weird+plussed+extension@example.com"), extension="weird+plussed+extension", ) ] message = EmailMessage() message["to"] = "nomatch@example.com" message["cc"] = ", ".join( ( "match@example.com", "match@notamatch.example.com", "Another Match ", ) ) assert utils.recipient_matches(message, "match@example.com") == [ utils.AddressMatch( recipient=Address(addr_spec="match@example.com"), extension=None, ), utils.AddressMatch( recipient=Address( display_name="Another Match", addr_spec="match+extension@example.com" ), extension="extension", ), ] def test_recipient_matches_casemapping(): message = EmailMessage() message["to"] = "match@example.com" assert utils.recipient_matches(message, "Match@Example.Com") assert utils.recipient_matches(message, "match@example.com") message = EmailMessage() message["to"] = "Match+weirdCaseMapping@Example.Com" matches = utils.recipient_matches(message, "match@example.com") assert matches assert matches[0].extension == "weirdCaseMapping" def test_get_address_for_pk(): salt = "test_salt" pks = [1, 10, 1000] base_address = "base@example.com" addresses = { pk: utils.get_address_for_pk(salt=salt, base_address=base_address, pk=pk) for pk in pks } assert len(set(addresses.values())) == len(addresses) for pk, address in addresses.items(): localpart, _, domain = address.partition("@") base_localpart, _, extension = localpart.partition("+") assert domain == "example.com" assert base_localpart == "base" assert extension.startswith(f"{pk}.") def test_get_address_for_pk_salt(): pk = 1000 base_address = "base@example.com" addresses = [ utils.get_address_for_pk(salt=salt, base_address=base_address, pk=pk) for salt in ["salt1", "salt2"] ] assert len(addresses) == len(set(addresses)) def test_get_pks_from_message(): salt = "test_salt" pks = [1, 10, 1000] base_address = "base@example.com" addresses = { pk: utils.get_address_for_pk(salt=salt, base_address=base_address, pk=pk) for pk in pks } message = EmailMessage() message["To"] = "test@example.com" assert utils.get_pks_from_message(salt, base_address, message) == set() message = EmailMessage() message["To"] = f"Test Address <{addresses[1]}>" assert utils.get_pks_from_message(salt, base_address, message) == {1} message = EmailMessage() message["To"] = f"Test Address <{addresses[1]}>" message["Cc"] = ", ".join( [ f"Test Address <{addresses[1]}>", f"Another Test Address <{addresses[10].lower()}>", "A Third Address ", ] ) assert utils.get_pks_from_message(salt, base_address, message) == {1, 10} def test_get_pks_from_message_logging(caplog): salt = "test_salt" pks = [1, 10, 1000] base_address = "base@example.com" addresses = { pk: utils.get_address_for_pk(salt=salt, base_address=base_address, pk=pk) for pk in pks } message = EmailMessage() message["To"] = f"Test Address <{base_address}>" assert utils.get_pks_from_message(salt, base_address, message) == set() relevant_records = [ record for record in caplog.records if record.name == "swh.web.inbound_email.utils" ] assert len(relevant_records) == 1 assert relevant_records[0].levelname == "DEBUG" assert ( f"{base_address} cannot be matched to a request" in relevant_records[0].getMessage() ) # Replace the signature with "mangle{signature}" mangled_address = addresses[1].replace(".", ".mangle", 1) message = EmailMessage() message["To"] = f"Test Address <{mangled_address}>" assert utils.get_pks_from_message(salt, base_address, message) == set() relevant_records = [ record for record in caplog.records if record.name == "swh.web.inbound_email.utils" ] assert len(relevant_records) == 2 assert relevant_records[0].levelname == "DEBUG" assert relevant_records[1].levelname == "DEBUG" assert f"{mangled_address} failed" in relevant_records[1].getMessage() @pytest.mark.parametrize( "filename,expected_parts,expected_absent", ( pytest.param( "plaintext.eml", [b"Plain text email.\n\n-- \nTest User"], [], id="plaintext", ), pytest.param( "multipart_alternative.eml", [b"*Multipart email.*\n\n-- \nTest User"], [], id="multipart_alternative", ), pytest.param( "multipart_alternative_html_only.eml", [b"", b"Multipart email (a much longer html part)."], [b"Multipart email (short html part)"], id="multipart_alternative_html_only", ), pytest.param( "multipart_alternative_text_only.eml", [b"*Multipart email, but a longer text part.*\n\n--\nTest User"], [], id="multipart_alternative_text_only", ), pytest.param( "multipart_mixed.eml", [b"This is plain text", b"and this is HTML"], [b"This is a multi-part message in MIME format."], id="multipart_mixed", ), pytest.param( "multipart_mixed2.eml", [b"This is plain text", b"and this is more text"], [b"This is a multi-part message in MIME format."], id="multipart_mixed2", ), pytest.param( "multipart_mixed_text_only.eml", [b"My test email"], [ b"HTML attachment", b"text attachment", b"This is a multi-part message in MIME format.", ], id="multipart_mixed_text_only", ), pytest.param( "multipart_alternative_recursive.eml", [b"This is plain text", b"and more plain text"], [b"this is HTML", b"This is a multi-part message in MIME format."], id="multipart_alternative_recursive", ), pytest.param( "multipart_related.eml", [ b"See the message below\n\n---------- Forwarded message ---------", b"Hello everyone,\n\nSee my attachment", ], [b"this is HTML", b"This is a multi-part message in MIME format."], id="multipart_alternative_recursive", ), ), ) def test_get_message_plaintext( filename: str, expected_parts: List[bytes], expected_absent: List[bytes] ): - with open_binary("swh.web.tests.inbound_email.resources", filename) as f: + with open_binary("swh.web.inbound_email.tests.resources", filename) as f: message = email.message_from_binary_file(f, policy=email.policy.default) assert isinstance(message, EmailMessage) plaintext = utils.get_message_plaintext(message) assert plaintext is not None if len(expected_parts) == 1: assert plaintext == expected_parts[0] else: for part in expected_parts: assert part in plaintext for part in expected_absent: assert part not in plaintext diff --git a/swh/web/tests/jslicenses/__init__.py b/swh/web/jslicenses/tests/__init__.py similarity index 100% rename from swh/web/tests/jslicenses/__init__.py rename to swh/web/jslicenses/tests/__init__.py diff --git a/swh/web/tests/jslicenses/test_app.py b/swh/web/jslicenses/tests/test_app.py similarity index 100% rename from swh/web/tests/jslicenses/test_app.py rename to swh/web/jslicenses/tests/test_app.py diff --git a/swh/web/tests/jslicenses/test_jslicenses.py b/swh/web/jslicenses/tests/test_jslicenses.py similarity index 100% rename from swh/web/tests/jslicenses/test_jslicenses.py rename to swh/web/jslicenses/tests/test_jslicenses.py diff --git a/swh/web/tests/mailmap/__init__.py b/swh/web/mailmap/tests/__init__.py similarity index 100% rename from swh/web/tests/mailmap/__init__.py rename to swh/web/mailmap/tests/__init__.py diff --git a/swh/web/tests/mailmap/test_app.py b/swh/web/mailmap/tests/test_app.py similarity index 100% rename from swh/web/tests/mailmap/test_app.py rename to swh/web/mailmap/tests/test_app.py diff --git a/swh/web/tests/mailmap/test_mailmap.py b/swh/web/mailmap/tests/test_mailmap.py similarity index 100% rename from swh/web/tests/mailmap/test_mailmap.py rename to swh/web/mailmap/tests/test_mailmap.py diff --git a/swh/web/tests/mailmap/test_migrations.py b/swh/web/mailmap/tests/test_migrations.py similarity index 100% rename from swh/web/tests/mailmap/test_migrations.py rename to swh/web/mailmap/tests/test_migrations.py diff --git a/swh/web/tests/metrics/__init__.py b/swh/web/metrics/tests/__init__.py similarity index 100% rename from swh/web/tests/metrics/__init__.py rename to swh/web/metrics/tests/__init__.py diff --git a/swh/web/tests/metrics/test_app.py b/swh/web/metrics/tests/test_app.py similarity index 100% rename from swh/web/tests/metrics/test_app.py rename to swh/web/metrics/tests/test_app.py diff --git a/swh/web/tests/metrics/test_metrics.py b/swh/web/metrics/tests/test_metrics.py similarity index 100% rename from swh/web/tests/metrics/test_metrics.py rename to swh/web/metrics/tests/test_metrics.py diff --git a/swh/web/tests/save_code_now/__init__.py b/swh/web/save_code_now/tests/__init__.py similarity index 100% rename from swh/web/tests/save_code_now/__init__.py rename to swh/web/save_code_now/tests/__init__.py diff --git a/swh/web/tests/save_code_now/data/http_esnode1.internal.softwareheritage.org/swh_workers-*__search b/swh/web/save_code_now/tests/data/http_esnode1.internal.softwareheritage.org/swh_workers-*__search similarity index 100% rename from swh/web/tests/save_code_now/data/http_esnode1.internal.softwareheritage.org/swh_workers-*__search rename to swh/web/save_code_now/tests/data/http_esnode1.internal.softwareheritage.org/swh_workers-*__search diff --git a/swh/web/tests/save_code_now/test_app.py b/swh/web/save_code_now/tests/test_app.py similarity index 100% rename from swh/web/tests/save_code_now/test_app.py rename to swh/web/save_code_now/tests/test_app.py diff --git a/swh/web/tests/save_code_now/test_django_command.py b/swh/web/save_code_now/tests/test_django_command.py similarity index 100% rename from swh/web/tests/save_code_now/test_django_command.py rename to swh/web/save_code_now/tests/test_django_command.py diff --git a/swh/web/tests/save_code_now/test_migrations.py b/swh/web/save_code_now/tests/test_migrations.py similarity index 100% rename from swh/web/tests/save_code_now/test_migrations.py rename to swh/web/save_code_now/tests/test_migrations.py diff --git a/swh/web/tests/save_code_now/test_origin_save.py b/swh/web/save_code_now/tests/test_origin_save.py similarity index 100% rename from swh/web/tests/save_code_now/test_origin_save.py rename to swh/web/save_code_now/tests/test_origin_save.py diff --git a/swh/web/tests/save_code_now/test_origin_save_admin.py b/swh/web/save_code_now/tests/test_origin_save_admin.py similarity index 100% rename from swh/web/tests/save_code_now/test_origin_save_admin.py rename to swh/web/save_code_now/tests/test_origin_save_admin.py diff --git a/swh/web/tests/save_code_now/test_origin_save_api.py b/swh/web/save_code_now/tests/test_origin_save_api.py similarity index 100% rename from swh/web/tests/save_code_now/test_origin_save_api.py rename to swh/web/save_code_now/tests/test_origin_save_api.py diff --git a/swh/web/tests/save_code_now/test_origin_save_views.py b/swh/web/save_code_now/tests/test_origin_save_views.py similarity index 100% rename from swh/web/tests/save_code_now/test_origin_save_views.py rename to swh/web/save_code_now/tests/test_origin_save_views.py diff --git a/swh/web/tests/save_origin_webhooks/__init__.py b/swh/web/save_origin_webhooks/tests/__init__.py similarity index 100% rename from swh/web/tests/save_origin_webhooks/__init__.py rename to swh/web/save_origin_webhooks/tests/__init__.py diff --git a/swh/web/tests/save_origin_webhooks/data/bitbucket_webhook_payload.json b/swh/web/save_origin_webhooks/tests/data/bitbucket_webhook_payload.json similarity index 100% rename from swh/web/tests/save_origin_webhooks/data/bitbucket_webhook_payload.json rename to swh/web/save_origin_webhooks/tests/data/bitbucket_webhook_payload.json diff --git a/swh/web/tests/save_origin_webhooks/data/gitea_webhook_payload.json b/swh/web/save_origin_webhooks/tests/data/gitea_webhook_payload.json similarity index 100% rename from swh/web/tests/save_origin_webhooks/data/gitea_webhook_payload.json rename to swh/web/save_origin_webhooks/tests/data/gitea_webhook_payload.json diff --git a/swh/web/tests/save_origin_webhooks/data/github_webhook_payload.json b/swh/web/save_origin_webhooks/tests/data/github_webhook_payload.json similarity index 100% rename from swh/web/tests/save_origin_webhooks/data/github_webhook_payload.json rename to swh/web/save_origin_webhooks/tests/data/github_webhook_payload.json diff --git a/swh/web/tests/save_origin_webhooks/data/gitlab_webhook_payload.json b/swh/web/save_origin_webhooks/tests/data/gitlab_webhook_payload.json similarity index 100% rename from swh/web/tests/save_origin_webhooks/data/gitlab_webhook_payload.json rename to swh/web/save_origin_webhooks/tests/data/gitlab_webhook_payload.json diff --git a/swh/web/tests/save_origin_webhooks/data/https_sourceforge.net/rest_p_webhook-test-git b/swh/web/save_origin_webhooks/tests/data/https_sourceforge.net/rest_p_webhook-test-git similarity index 100% rename from swh/web/tests/save_origin_webhooks/data/https_sourceforge.net/rest_p_webhook-test-git rename to swh/web/save_origin_webhooks/tests/data/https_sourceforge.net/rest_p_webhook-test-git diff --git a/swh/web/tests/save_origin_webhooks/data/https_sourceforge.net/rest_p_webhook-test-hg b/swh/web/save_origin_webhooks/tests/data/https_sourceforge.net/rest_p_webhook-test-hg similarity index 100% rename from swh/web/tests/save_origin_webhooks/data/https_sourceforge.net/rest_p_webhook-test-hg rename to swh/web/save_origin_webhooks/tests/data/https_sourceforge.net/rest_p_webhook-test-hg diff --git a/swh/web/tests/save_origin_webhooks/data/https_sourceforge.net/rest_p_webhook-test-svn b/swh/web/save_origin_webhooks/tests/data/https_sourceforge.net/rest_p_webhook-test-svn similarity index 100% rename from swh/web/tests/save_origin_webhooks/data/https_sourceforge.net/rest_p_webhook-test-svn rename to swh/web/save_origin_webhooks/tests/data/https_sourceforge.net/rest_p_webhook-test-svn diff --git a/swh/web/tests/save_origin_webhooks/data/sourceforge_webhook_payload_git.json b/swh/web/save_origin_webhooks/tests/data/sourceforge_webhook_payload_git.json similarity index 100% rename from swh/web/tests/save_origin_webhooks/data/sourceforge_webhook_payload_git.json rename to swh/web/save_origin_webhooks/tests/data/sourceforge_webhook_payload_git.json diff --git a/swh/web/tests/save_origin_webhooks/data/sourceforge_webhook_payload_hg.json b/swh/web/save_origin_webhooks/tests/data/sourceforge_webhook_payload_hg.json similarity index 100% rename from swh/web/tests/save_origin_webhooks/data/sourceforge_webhook_payload_hg.json rename to swh/web/save_origin_webhooks/tests/data/sourceforge_webhook_payload_hg.json diff --git a/swh/web/tests/save_origin_webhooks/data/sourceforge_webhook_payload_svn.json b/swh/web/save_origin_webhooks/tests/data/sourceforge_webhook_payload_svn.json similarity index 100% rename from swh/web/tests/save_origin_webhooks/data/sourceforge_webhook_payload_svn.json rename to swh/web/save_origin_webhooks/tests/data/sourceforge_webhook_payload_svn.json diff --git a/swh/web/tests/save_origin_webhooks/test_app.py b/swh/web/save_origin_webhooks/tests/test_app.py similarity index 100% rename from swh/web/tests/save_origin_webhooks/test_app.py rename to swh/web/save_origin_webhooks/tests/test_app.py diff --git a/swh/web/tests/save_origin_webhooks/test_bitbucket.py b/swh/web/save_origin_webhooks/tests/test_bitbucket.py similarity index 100% rename from swh/web/tests/save_origin_webhooks/test_bitbucket.py rename to swh/web/save_origin_webhooks/tests/test_bitbucket.py diff --git a/swh/web/tests/save_origin_webhooks/test_gitea.py b/swh/web/save_origin_webhooks/tests/test_gitea.py similarity index 100% rename from swh/web/tests/save_origin_webhooks/test_gitea.py rename to swh/web/save_origin_webhooks/tests/test_gitea.py diff --git a/swh/web/tests/save_origin_webhooks/test_github.py b/swh/web/save_origin_webhooks/tests/test_github.py similarity index 100% rename from swh/web/tests/save_origin_webhooks/test_github.py rename to swh/web/save_origin_webhooks/tests/test_github.py diff --git a/swh/web/tests/save_origin_webhooks/test_gitlab.py b/swh/web/save_origin_webhooks/tests/test_gitlab.py similarity index 100% rename from swh/web/tests/save_origin_webhooks/test_gitlab.py rename to swh/web/save_origin_webhooks/tests/test_gitlab.py diff --git a/swh/web/tests/save_origin_webhooks/test_sourceforge.py b/swh/web/save_origin_webhooks/tests/test_sourceforge.py similarity index 100% rename from swh/web/tests/save_origin_webhooks/test_sourceforge.py rename to swh/web/save_origin_webhooks/tests/test_sourceforge.py diff --git a/swh/web/tests/save_origin_webhooks/utils.py b/swh/web/save_origin_webhooks/tests/utils.py similarity index 100% rename from swh/web/tests/save_origin_webhooks/utils.py rename to swh/web/save_origin_webhooks/tests/utils.py diff --git a/swh/web/tests/helpers.py b/swh/web/tests/helpers.py index 5651a946..3c6fc0b5 100644 --- a/swh/web/tests/helpers.py +++ b/swh/web/tests/helpers.py @@ -1,267 +1,271 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +import shutil from typing import Any, Dict, Optional, cast from django.contrib.auth.models import Permission from django.contrib.contenttypes.models import ContentType from django.http.response import HttpResponse, HttpResponseBase, StreamingHttpResponse from django.test.client import Client from rest_framework.response import Response from rest_framework.test import APIClient from swh.web.tests.django_asserts import assert_template_used def _assert_http_response( response: HttpResponseBase, status_code: int, content_type: str ) -> HttpResponseBase: if isinstance(response, Response): drf_response = cast(Response, response) error_context = ( drf_response.data.pop("traceback") if isinstance(drf_response.data, dict) and "traceback" in drf_response.data else drf_response.data ) elif isinstance(response, StreamingHttpResponse): error_context = getattr(response, "traceback", response.streaming_content) elif isinstance(response, HttpResponse): error_context = getattr(response, "traceback", response.content) assert response.status_code == status_code, error_context if content_type != "*/*": assert response["Content-Type"].startswith(content_type) return response def check_http_get_response( client: Client, url: str, status_code: int, content_type: str = "*/*", http_origin: Optional[str] = None, server_name: Optional[str] = None, ) -> HttpResponseBase: """Helper function to check HTTP response for a GET request. Args: client: Django test client url: URL to check response status_code: expected HTTP status code content_type: expected response content type http_origin: optional HTTP_ORIGIN header value Returns: The HTTP response """ return _assert_http_response( response=client.get( url, HTTP_ACCEPT=content_type, HTTP_ORIGIN=http_origin, SERVER_NAME=server_name if server_name else "testserver", ), status_code=status_code, content_type=content_type, ) def check_http_post_response( client: Client, url: str, status_code: int, content_type: str = "*/*", request_content_type="application/json", data: Optional[Dict[str, Any]] = None, http_origin: Optional[str] = None, ) -> HttpResponseBase: """Helper function to check HTTP response for a POST request. Args: client: Django test client url: URL to check response status_code: expected HTTP status code content_type: expected response content type request_content_type: content type of request body data: optional POST data Returns: The HTTP response """ return _assert_http_response( response=client.post( url, data=data, content_type=request_content_type, HTTP_ACCEPT=content_type, HTTP_ORIGIN=http_origin, ), status_code=status_code, content_type=content_type, ) def check_api_get_responses( api_client: APIClient, url: str, status_code: int ) -> Response: """Helper function to check Web API responses for GET requests for all accepted content types (JSON, YAML, HTML). Args: api_client: DRF test client url: Web API URL to check responses status_code: expected HTTP status code Returns: The Web API JSON response """ # check JSON response response_json = check_http_get_response( api_client, url, status_code, content_type="application/json" ) # check HTML response (API Web UI) check_http_get_response(api_client, url, status_code, content_type="text/html") # check YAML response check_http_get_response( api_client, url, status_code, content_type="application/yaml" ) return cast(Response, response_json) def check_api_post_response( api_client: APIClient, url: str, status_code: int, content_type: str = "*/*", data: Optional[Dict[str, Any]] = None, **headers, ) -> HttpResponseBase: """Helper function to check Web API response for a POST request for all accepted content types. Args: api_client: DRF test client url: Web API URL to check response status_code: expected HTTP status code Returns: The HTTP response """ return _assert_http_response( response=api_client.post( url, data=data, format="json", HTTP_ACCEPT=content_type, **headers, ), status_code=status_code, content_type=content_type, ) def check_api_post_responses( api_client: APIClient, url: str, status_code: int, data: Optional[Dict[str, Any]] = None, **headers, ) -> Response: """Helper function to check Web API responses for POST requests for all accepted content types (JSON, YAML). Args: api_client: DRF test client url: Web API URL to check responses status_code: expected HTTP status code Returns: The Web API JSON response """ # check JSON response response_json = check_api_post_response( api_client, url, status_code, content_type="application/json", data=data, **headers, ) # check YAML response check_api_post_response( api_client, url, status_code, content_type="application/yaml", data=data, **headers, ) return cast(Response, response_json) def check_html_get_response( client: Client, url: str, status_code: int, template_used: Optional[str] = None, http_origin: Optional[str] = None, server_name: Optional[str] = None, ) -> HttpResponseBase: """Helper function to check HTML responses for a GET request. Args: client: Django test client url: URL to check responses status_code: expected HTTP status code template_used: optional used Django template to check Returns: The HTML response """ response = check_http_get_response( client, url, status_code, content_type="text/html", http_origin=http_origin, server_name=server_name, ) if template_used is not None: assert_template_used(response, template_used) return response def create_django_permission(perm_name: str) -> Permission: """Create permission out of a permission name string Args: perm_name: Permission name (e.g. swh.web.api.throttling_exempted, swh.ambassador, ...) Returns: The persisted permission """ perm_splitted = perm_name.split(".") app_label = ".".join(perm_splitted[:-1]) perm_name = perm_splitted[-1] content_type = ContentType.objects.create( id=1000 + ContentType.objects.count(), app_label=app_label, model=perm_splitted[-1], ) return Permission.objects.create( codename=perm_name, name=perm_name, content_type=content_type, id=1000 + Permission.objects.count(), ) + + +fossology_missing = shutil.which("nomossa") is None diff --git a/swh/web/tests/resources/deposit/raw-metadata-add-to-origin.xml b/swh/web/tests/resources/deposit/raw-metadata-add-to-origin.xml deleted file mode 100644 index 5174af34..00000000 --- a/swh/web/tests/resources/deposit/raw-metadata-add-to-origin.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - Awesome Compiler - urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a - dudess - - - - - - diff --git a/swh/web/tests/resources/deposit/raw-metadata-create-origin.xml b/swh/web/tests/resources/deposit/raw-metadata-create-origin.xml deleted file mode 100644 index fca04f42..00000000 --- a/swh/web/tests/resources/deposit/raw-metadata-create-origin.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - Awesome Compiler - urn:uuid:1225c695-cfb8-4ebb-daaaa-80da344efa6a - dudess - - - - - - diff --git a/swh/web/tests/resources/deposit/raw-metadata-no-swh.xml b/swh/web/tests/resources/deposit/raw-metadata-no-swh.xml deleted file mode 100644 index 14675e8a..00000000 --- a/swh/web/tests/resources/deposit/raw-metadata-no-swh.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - Awesome Compiler - urn:uuid:1225c695-cfb8-4ebb-daaaa-80da344efa6a - dudess - diff --git a/swh/web/tests/resources/deposit/raw-metadata-provenance.xml b/swh/web/tests/resources/deposit/raw-metadata-provenance.xml deleted file mode 100644 index 12d958a5..00000000 --- a/swh/web/tests/resources/deposit/raw-metadata-provenance.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - Awesome Compiler - urn:uuid:1225c695-cfb8-4ebb-daaaa-80da344efa6a - dudess - - - https://example.org/metadata/provenance - - - diff --git a/swh/web/tests/utils/__init__.py b/swh/web/utils/tests/__init__.py similarity index 100% rename from swh/web/tests/utils/__init__.py rename to swh/web/utils/tests/__init__.py diff --git a/swh/web/tests/utils/test_archive.py b/swh/web/utils/tests/test_archive.py similarity index 99% rename from swh/web/tests/utils/test_archive.py rename to swh/web/utils/tests/test_archive.py index 8a948508..24395220 100644 --- a/swh/web/tests/utils/test_archive.py +++ b/swh/web/utils/tests/test_archive.py @@ -1,1197 +1,1197 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime import hashlib import itertools import random from hypothesis import given, settings import pytest from swh.model.from_disk import DentryPerms from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import ( Directory, DirectoryEntry, Origin, OriginVisit, OriginVisitStatus, Revision, Snapshot, SnapshotBranch, TargetType, ) from swh.model.swhids import ObjectType from swh.storage.utils import now -from swh.web.tests.conftest import fossology_missing from swh.web.tests.data import random_content, random_sha1 +from swh.web.tests.helpers import fossology_missing from swh.web.tests.strategies import new_origin, new_revision, visit_dates from swh.web.utils import archive from swh.web.utils.exc import BadInputExc, NotFoundExc from swh.web.utils.typing import OriginInfo, PagedResult def test_lookup_multiple_hashes_all_present(contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({"sha1": cnt["sha1"]}) expected_output.append({"sha1": cnt["sha1"], "found": True}) assert archive.lookup_multiple_hashes(input_data) == expected_output def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({"sha1": cnt["sha1"]}) expected_output.append({"sha1": cnt["sha1"], "found": cnt in contents}) assert archive.lookup_multiple_hashes(input_data) == expected_output def test_lookup_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = archive.lookup_hash("sha1_git:%s" % unknown_content_["sha1_git"]) assert actual_lookup == {"found": None, "algo": "sha1_git"} def test_lookup_hash_exist(archive_data, content): actual_lookup = archive.lookup_hash("sha1:%s" % content["sha1"]) content_metadata = archive_data.content_get(content["sha1"]) assert {"found": content_metadata, "algo": "sha1"} == actual_lookup def test_search_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = archive.search_hash("sha1_git:%s" % unknown_content_["sha1_git"]) assert {"found": False} == actual_lookup def test_search_hash_exist(content): actual_lookup = archive.search_hash("sha1:%s" % content["sha1"]) assert {"found": True} == actual_lookup def test_lookup_content_filetype(indexer_data, content): indexer_data.content_add_mimetype(content["sha1"]) actual_filetype = archive.lookup_content_filetype(content["sha1"]) expected_filetype = indexer_data.content_get_mimetype(content["sha1"]) assert actual_filetype == expected_filetype @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") def test_lookup_content_license(indexer_data, content): indexer_data.content_add_license(content["sha1"]) actual_license = archive.lookup_content_license(content["sha1"]) expected_license = indexer_data.content_get_license(content["sha1"]) assert actual_license == expected_license def test_stat_counters(archive_data): actual_stats = archive.stat_counters() assert actual_stats == archive_data.stat_counters() @given(new_origin(), visit_dates()) def test_lookup_origin_visits(subtest, new_origin, visit_dates): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.origin_add([new_origin]) archive_data.origin_visit_add( [ OriginVisit( origin=new_origin.url, date=ts, type="git", ) for ts in visit_dates ] ) actual_origin_visits = list( archive.lookup_origin_visits(new_origin.url, per_page=100) ) expected_visits = archive_data.origin_visit_get(new_origin.url) for expected_visit in expected_visits: expected_visit["origin"] = new_origin.url assert actual_origin_visits == expected_visits @given(new_origin(), visit_dates()) def test_lookup_origin_visit(archive_data, new_origin, visit_dates): archive_data.origin_add([new_origin]) visits = archive_data.origin_visit_add( [ OriginVisit( origin=new_origin.url, date=ts, type="git", ) for ts in visit_dates ] ) visit = random.choice(visits).visit actual_origin_visit = archive.lookup_origin_visit(new_origin.url, visit) expected_visit = dict(archive_data.origin_visit_get_by(new_origin.url, visit)) assert actual_origin_visit == expected_visit @given(new_origin(), visit_dates()) @settings(max_examples=1) def test_origin_visit_find_by_date_no_result(archive_data, new_origin, visit_dates): """No visit registered in storage for an origin should return no visit""" archive_data.origin_add([new_origin]) for visit_date in visit_dates: # No visit yet, so nothing will get returned actual_origin_visit_status = archive.origin_visit_find_by_date( new_origin.url, visit_date ) assert actual_origin_visit_status is None @settings(max_examples=1) @given(new_origin()) def test_origin_visit_find_by_date(archive_data, new_origin): # Add origin and two visits archive_data.origin_add([new_origin]) pivot_date = now() # First visit one hour before pivot date first_visit_date = pivot_date - datetime.timedelta(hours=1) # Second visit two hours after pivot date second_visit_date = pivot_date + datetime.timedelta(hours=2) visits = archive_data.origin_visit_add( [ OriginVisit( origin=new_origin.url, date=visit_date, type="git", ) for visit_date in [first_visit_date, second_visit_date] ] ) # Finalize visits visit_statuses = [] for visit in visits: visit_statuses.append( OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=visit.date + datetime.timedelta(hours=1), type=visit.type, status="full", snapshot=None, ) ) archive_data.origin_visit_status_add(visit_statuses) # Check correct visit is returned when searching by date for search_date, greater_or_equal, expected_visit in [ (first_visit_date, True, 1), (pivot_date, True, 2), (pivot_date, False, 1), (second_visit_date, True, 2), ]: origin_visit = archive.origin_visit_find_by_date( new_origin.url, search_date, greater_or_equal ) assert origin_visit["visit"] == expected_visit @given(new_origin()) def test_lookup_origin(archive_data, new_origin): archive_data.origin_add([new_origin]) actual_origin = archive.lookup_origin({"url": new_origin.url}) expected_origin = archive_data.origin_get([new_origin.url])[0] assert actual_origin == expected_origin def test_lookup_origin_snapshots(archive_data, origin_with_multiple_visits): origin_url = origin_with_multiple_visits["url"] visits = archive_data.origin_visit_get(origin_url) origin_snapshots = archive.lookup_origin_snapshots(origin_with_multiple_visits) assert set(origin_snapshots) == {v["snapshot"] for v in visits} def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1): with pytest.raises(BadInputExc) as e: archive.lookup_release(invalid_sha1) assert e.match("Invalid checksum") def test_lookup_release_ko_id_checksum_too_long(sha256): with pytest.raises(BadInputExc) as e: archive.lookup_release(sha256) assert e.match("Only sha1_git is supported.") def test_lookup_release_multiple(archive_data, releases): actual_releases = list(archive.lookup_release_multiple(releases)) expected_releases = [] for release_id in releases: release_info = archive_data.release_get(release_id) expected_releases.append(release_info) assert actual_releases == expected_releases def test_lookup_release_multiple_none_found(): unknown_releases_ = [random_sha1(), random_sha1(), random_sha1()] actual_releases = list(archive.lookup_release_multiple(unknown_releases_)) assert actual_releases == [None] * len(unknown_releases_) def test_lookup_directory_with_path_not_found(directory): path = "some/invalid/path/here" with pytest.raises(NotFoundExc) as e: archive.lookup_directory_with_path(directory, path) assert e.match( f"Directory entry with path {path} from root directory {directory} not found" ) def test_lookup_directory_with_path_found(archive_data, directory): directory_content = archive_data.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry["name"] actual_result = archive.lookup_directory_with_path(directory, path) assert actual_result == directory_entry def test_lookup_release(archive_data, release): actual_release = archive.lookup_release(release) assert actual_release == archive_data.release_get(release) def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with pytest.raises(BadInputExc) as e: archive.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Invalid checksum query string") sha1_git = sha256 with pytest.raises(BadInputExc) as e: archive.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Only sha1_git is supported") def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( revision, unknown_revision ): sha1_git_root = revision sha1_git = unknown_revision with pytest.raises(NotFoundExc) as e: archive.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Revision %s not found" % sha1_git) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( revision, unknown_revision ): sha1_git_root = unknown_revision sha1_git = revision with pytest.raises(NotFoundExc) as e: archive.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match("Revision root %s not found" % sha1_git_root) def test_lookup_revision_with_context(archive_data, ancestor_revisions): sha1_git = ancestor_revisions["sha1_git"] root_sha1_git = ancestor_revisions["sha1_git_root"] for sha1_git_root in (root_sha1_git, {"id": hash_to_bytes(root_sha1_git)}): actual_revision = archive.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in archive_data.revision_log(root_sha1_git): for p_rev in rev["parents"]: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev["id"]) expected_revision = archive_data.revision_get(sha1_git) expected_revision["children"] = children assert actual_revision == expected_revision def test_lookup_revision_with_context_ko(non_ancestor_revisions): sha1_git = non_ancestor_revisions["sha1_git"] root_sha1_git = non_ancestor_revisions["sha1_git_root"] with pytest.raises(NotFoundExc) as e: archive.lookup_revision_with_context(root_sha1_git, sha1_git) assert e.match("Revision %s is not an ancestor of %s" % (sha1_git, root_sha1_git)) def test_lookup_directory_with_revision_not_found(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: archive.lookup_directory_with_revision(unknown_revision_) assert e.match("Revision %s not found" % unknown_revision_) @given(new_revision()) def test_lookup_directory_with_revision_unknown_content(archive_data, new_revision): unknown_content_ = random_content() dir_path = "README.md" # A directory that points to unknown content dir = Directory( entries=( DirectoryEntry( name=bytes(dir_path.encode("utf-8")), type="file", target=hash_to_bytes(unknown_content_["sha1_git"]), perms=DentryPerms.content, ), ) ) # Create a revision that points to a directory # Which points to unknown content new_revision = new_revision.to_dict() new_revision["directory"] = dir.id del new_revision["id"] new_revision = Revision.from_dict(new_revision) # Add the directory and revision in mem archive_data.directory_add([dir]) archive_data.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision.id) with pytest.raises(NotFoundExc) as e: archive.lookup_directory_with_revision(new_revision_id, dir_path) assert e.match("Content not found for revision %s" % new_revision_id) def test_lookup_directory_with_revision_ko_path_to_nowhere(revision): invalid_path = "path/to/something/unknown" with pytest.raises(NotFoundExc) as e: archive.lookup_directory_with_revision(revision, invalid_path) assert e.match("Directory or File") assert e.match(invalid_path) assert e.match("revision %s" % revision) assert e.match("not found") def test_lookup_directory_with_revision_submodules( archive_data, revision_with_submodules ): rev_sha1_git = revision_with_submodules["rev_sha1_git"] rev_dir_path = revision_with_submodules["rev_dir_rev_path"] actual_data = archive.lookup_directory_with_revision(rev_sha1_git, rev_dir_path) revision = archive_data.revision_get(revision_with_submodules["rev_sha1_git"]) directory = archive_data.directory_ls(revision["directory"]) rev_entry = next(e for e in directory if e["name"] == rev_dir_path) expected_data = { "content": archive_data.revision_get(rev_entry["target"]), "path": rev_dir_path, "revision": rev_sha1_git, "type": "rev", } assert actual_data == expected_data def test_lookup_directory_with_revision_without_path(archive_data, revision): actual_directory_entries = archive.lookup_directory_with_revision(revision) revision_data = archive_data.revision_get(revision) expected_directory_entries = archive_data.directory_ls(revision_data["directory"]) assert actual_directory_entries["type"] == "dir" assert actual_directory_entries["content"] == expected_directory_entries def test_lookup_directory_with_revision_with_path(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] in ("file", "dir") ] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = archive.lookup_directory_with_revision( revision, expected_dir_entry["name"] ) assert actual_dir_entry["type"] == expected_dir_entry["type"] assert actual_dir_entry["revision"] == revision assert actual_dir_entry["path"] == expected_dir_entry["name"] if actual_dir_entry["type"] == "file": del actual_dir_entry["content"]["checksums"]["blake2s256"] for key in ("checksums", "status", "length"): assert actual_dir_entry["content"][key] == expected_dir_entry[key] else: sub_dir_entries = archive_data.directory_ls(expected_dir_entry["target"]) assert actual_dir_entry["content"] == sub_dir_entries def test_lookup_directory_with_revision_with_path_to_file_and_data( archive_data, revision ): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] expected_dir_entry = random.choice(dir_entries) expected_data = archive_data.content_get_data( expected_dir_entry["checksums"]["sha1"] ) actual_dir_entry = archive.lookup_directory_with_revision( revision, expected_dir_entry["name"], with_data=True ) assert actual_dir_entry["type"] == expected_dir_entry["type"] assert actual_dir_entry["revision"] == revision assert actual_dir_entry["path"] == expected_dir_entry["name"] del actual_dir_entry["content"]["checksums"]["blake2s256"] for key in ("checksums", "status", "length"): assert actual_dir_entry["content"][key] == expected_dir_entry[key] assert actual_dir_entry["content"]["data"] == expected_data["data"] def test_lookup_revision(archive_data, revision): actual_revision = archive.lookup_revision(revision) assert actual_revision == archive_data.revision_get(revision) @given(new_revision()) def test_lookup_revision_invalid_msg(archive_data, new_revision): new_revision = new_revision.to_dict() new_revision["message"] = b"elegant fix for bug \xff" archive_data.revision_add([Revision.from_dict(new_revision)]) revision = archive.lookup_revision(hash_to_hex(new_revision["id"])) assert revision["message"] == "elegant fix for bug \\xff" assert "message" in revision["decoding_failures"] @given(new_revision()) def test_lookup_revision_msg_ok(archive_data, new_revision): archive_data.revision_add([new_revision]) revision_message = archive.lookup_revision_message(hash_to_hex(new_revision.id)) assert revision_message == {"message": new_revision.message} def test_lookup_revision_msg_no_rev(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: archive.lookup_revision_message(unknown_revision_) assert e.match("Revision with sha1_git %s not found." % unknown_revision_) def test_lookup_revision_multiple(archive_data, revisions): actual_revisions = list(archive.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(archive_data.revision_get(rev)) assert actual_revisions == expected_revisions def test_lookup_revision_multiple_none_found(): unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] actual_revisions = list(archive.lookup_revision_multiple(unknown_revisions_)) assert actual_revisions == [None] * len(unknown_revisions_) def test_lookup_revision_log(archive_data, revision): actual_revision_log = list(archive.lookup_revision_log(revision, limit=25)) expected_revision_log = archive_data.revision_log(revision, limit=25) assert actual_revision_log == expected_revision_log def _get_origin_branches(archive_data, origin): origin_visit = archive_data.origin_visit_get(origin["url"])[-1] snapshot = archive_data.snapshot_get(origin_visit["snapshot"]) branches = { k: v for (k, v) in snapshot["branches"].items() if v["target_type"] == "revision" } return branches def test_lookup_revision_log_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_log = list( archive.lookup_revision_log_by(origin["url"], branch_name, None, limit=25) ) expected_log = archive_data.revision_log(branches[branch_name]["target"], limit=25) assert actual_log == expected_log def test_lookup_revision_log_by_notfound(origin): with pytest.raises(NotFoundExc): archive.lookup_revision_log_by( origin["url"], "unknown_branch_name", None, limit=100 ) def test_lookup_content_raw_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: archive.lookup_content_raw("sha1:" + unknown_content_["sha1"]) assert e.match( "Content with %s checksum equals to %s not found!" % ("sha1", unknown_content_["sha1"]) ) def test_lookup_content_raw(archive_data, content): actual_content = archive.lookup_content_raw("sha256:%s" % content["sha256"]) expected_content = archive_data.content_get_data(content["sha1"]) assert actual_content == expected_content def test_lookup_empty_content_raw(empty_content): content_raw = archive.lookup_content_raw(f"sha1_git:{empty_content['sha1_git']}") assert content_raw["data"] == b"" def test_lookup_content_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: archive.lookup_content("sha1:%s" % unknown_content_["sha1"]) assert e.match( "Content with %s checksum equals to %s not found!" % ("sha1", unknown_content_["sha1"]) ) def test_lookup_content_with_sha1(archive_data, content): actual_content = archive.lookup_content(f"sha1:{content['sha1']}") expected_content = archive_data.content_get(content["sha1"]) assert actual_content == expected_content def test_lookup_content_with_sha256(archive_data, content): actual_content = archive.lookup_content(f"sha256:{content['sha256']}") expected_content = archive_data.content_get(content["sha1"]) assert actual_content == expected_content def test_lookup_directory_bad_checksum(): with pytest.raises(BadInputExc): archive.lookup_directory("directory_id") def test_lookup_directory_not_found(): unknown_directory_ = random_sha1() with pytest.raises(NotFoundExc) as e: archive.lookup_directory(unknown_directory_) assert e.match("Directory with sha1_git %s not found" % unknown_directory_) def test_lookup_directory(archive_data, directory): actual_directory_ls = list(archive.lookup_directory(directory)) expected_directory_ls = archive_data.directory_ls(directory) assert actual_directory_ls == expected_directory_ls def test_lookup_directory_empty(empty_directory): actual_directory_ls = list(archive.lookup_directory(empty_directory)) assert actual_directory_ls == [] def test_lookup_revision_by_nothing_found(origin): with pytest.raises(NotFoundExc): archive.lookup_revision_by(origin["url"], "invalid-branch-name") def test_lookup_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_revision = archive.lookup_revision_by(origin["url"], branch_name) expected_revision = archive_data.revision_get(branches[branch_name]["target"]) assert actual_revision == expected_revision def test_lookup_revision_with_context_by_ko(origin, revision): with pytest.raises(NotFoundExc): archive.lookup_revision_with_context_by( origin["url"], "invalid-branch-name", None, revision ) def test_lookup_revision_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]["target"] root_rev_log = archive_data.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) rev = root_rev_log[-1]["id"] actual_root_rev, actual_rev = archive.lookup_revision_with_context_by( origin["url"], branch_name, None, rev ) expected_root_rev = archive_data.revision_get(root_rev) expected_rev = archive_data.revision_get(rev) expected_rev["children"] = children[rev] assert actual_root_rev == expected_root_rev assert actual_rev == expected_rev def test_lookup_revision_through_ko_not_implemented(): with pytest.raises(NotImplementedError): archive.lookup_revision_through({"something-unknown": 10}) def test_lookup_revision_through_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]["target"] root_rev_log = archive_data.revision_log(root_rev) rev = root_rev_log[-1]["id"] assert archive.lookup_revision_through( { "origin_url": origin["url"], "branch_name": branch_name, "ts": None, "sha1_git": rev, } ) == archive.lookup_revision_with_context_by(origin["url"], branch_name, None, rev) def test_lookup_revision_through_with_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) assert archive.lookup_revision_through( { "origin_url": origin["url"], "branch_name": branch_name, "ts": None, } ) == archive.lookup_revision_by(origin["url"], branch_name, None) def test_lookup_revision_through_with_context(ancestor_revisions): sha1_git = ancestor_revisions["sha1_git"] sha1_git_root = ancestor_revisions["sha1_git_root"] assert archive.lookup_revision_through( { "sha1_git_root": sha1_git_root, "sha1_git": sha1_git, } ) == archive.lookup_revision_with_context(sha1_git_root, sha1_git) def test_lookup_revision_through_with_revision(revision): assert archive.lookup_revision_through( {"sha1_git": revision} ) == archive.lookup_revision(revision) def test_lookup_directory_through_revision_ko_not_found(revision): with pytest.raises(NotFoundExc): archive.lookup_directory_through_revision( {"sha1_git": revision}, "some/invalid/path" ) def test_lookup_directory_through_revision_ok(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] dir_entry = random.choice(dir_entries) assert archive.lookup_directory_through_revision( {"sha1_git": revision}, dir_entry["name"] ) == (revision, archive.lookup_directory_with_revision(revision, dir_entry["name"])) def test_lookup_directory_through_revision_ok_with_data(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [ e for e in archive_data.directory_ls(rev_data["directory"]) if e["type"] == "file" ] dir_entry = random.choice(dir_entries) assert archive.lookup_directory_through_revision( {"sha1_git": revision}, dir_entry["name"], with_data=True ) == ( revision, archive.lookup_directory_with_revision( revision, dir_entry["name"], with_data=True ), ) def test_lookup_known_objects( archive_data, content, directory, release, revision, snapshot ): expected = archive_data.content_find(content) assert archive.lookup_object(ObjectType.CONTENT, content["sha1_git"]) == expected expected = archive_data.directory_get(directory) assert archive.lookup_object(ObjectType.DIRECTORY, directory) == expected expected = archive_data.release_get(release) assert archive.lookup_object(ObjectType.RELEASE, release) == expected expected = archive_data.revision_get(revision) assert archive.lookup_object(ObjectType.REVISION, revision) == expected expected = {**archive_data.snapshot_get(snapshot), "next_branch": None} assert archive.lookup_object(ObjectType.SNAPSHOT, snapshot) == expected def test_lookup_unknown_objects( unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ): with pytest.raises(NotFoundExc) as e: archive.lookup_object(ObjectType.CONTENT, unknown_content["sha1_git"]) assert e.match(r"Content.*not found") with pytest.raises(NotFoundExc) as e: archive.lookup_object(ObjectType.DIRECTORY, unknown_directory) assert e.match(r"Directory.*not found") with pytest.raises(NotFoundExc) as e: archive.lookup_object(ObjectType.RELEASE, unknown_release) assert e.match(r"Release.*not found") with pytest.raises(NotFoundExc) as e: archive.lookup_object(ObjectType.REVISION, unknown_revision) assert e.match(r"Revision.*not found") with pytest.raises(NotFoundExc) as e: archive.lookup_object(ObjectType.SNAPSHOT, unknown_snapshot) assert e.match(r"Snapshot.*not found") def test_lookup_invalid_objects(invalid_sha1): with pytest.raises(BadInputExc) as e: archive.lookup_object(ObjectType.CONTENT, invalid_sha1) assert e.match("Invalid hash") with pytest.raises(BadInputExc) as e: archive.lookup_object(ObjectType.DIRECTORY, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: archive.lookup_object(ObjectType.RELEASE, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: archive.lookup_object(ObjectType.REVISION, invalid_sha1) assert e.match("Invalid checksum") with pytest.raises(BadInputExc) as e: archive.lookup_object(ObjectType.SNAPSHOT, invalid_sha1) assert e.match("Invalid checksum") def test_lookup_missing_hashes_non_present(): missing_cnt = random_sha1() missing_dir = random_sha1() missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_swhids = { ObjectType.CONTENT: [hash_to_bytes(missing_cnt)], ObjectType.DIRECTORY: [hash_to_bytes(missing_dir)], ObjectType.REVISION: [hash_to_bytes(missing_rev)], ObjectType.RELEASE: [hash_to_bytes(missing_rel)], ObjectType.SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = archive.lookup_missing_hashes(grouped_swhids) assert actual_result == { missing_cnt, missing_dir, missing_rev, missing_rel, missing_snp, } def test_lookup_missing_hashes_some_present(content, directory): missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_swhids = { ObjectType.CONTENT: [hash_to_bytes(content["sha1_git"])], ObjectType.DIRECTORY: [hash_to_bytes(directory)], ObjectType.REVISION: [hash_to_bytes(missing_rev)], ObjectType.RELEASE: [hash_to_bytes(missing_rel)], ObjectType.SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = archive.lookup_missing_hashes(grouped_swhids) assert actual_result == {missing_rev, missing_rel, missing_snp} def test_lookup_origin_extra_trailing_slash(origin): origin_info = archive.lookup_origin({"url": f"{origin['url']}/"}) assert origin_info["url"] == origin["url"] def test_lookup_origin_missing_trailing_slash(archive_data): deb_origin = Origin(url="http://snapshot.debian.org/package/r-base/") archive_data.origin_add([deb_origin]) origin_info = archive.lookup_origin({"url": deb_origin.url[:-1]}) assert origin_info["url"] == deb_origin.url def test_lookup_origin_single_slash_after_protocol(archive_data): origin_url = "http://snapshot.debian.org/package/r-base/" malformed_origin_url = "http:/snapshot.debian.org/package/r-base/" archive_data.origin_add([Origin(url=origin_url)]) origin_info = archive.lookup_origin({"url": malformed_origin_url}) assert origin_info["url"] == origin_url @given(new_origin()) def test_lookup_origins_get_by_sha1s(origin, unknown_origin): hasher = hashlib.sha1() hasher.update(origin["url"].encode("utf-8")) origin_info = OriginInfo(url=origin["url"]) origin_sha1 = hasher.hexdigest() hasher = hashlib.sha1() hasher.update(unknown_origin.url.encode("utf-8")) unknown_origin_sha1 = hasher.hexdigest() origins = list(archive.lookup_origins_by_sha1s([origin_sha1])) assert origins == [origin_info] origins = list(archive.lookup_origins_by_sha1s([origin_sha1, origin_sha1])) assert origins == [origin_info, origin_info] origins = list(archive.lookup_origins_by_sha1s([origin_sha1, unknown_origin_sha1])) assert origins == [origin_info, None] def test_search_origin(origin): results = archive.search_origin(url_pattern=origin["url"])[0] assert results == [{"url": origin["url"]}] def test_search_origin_use_ql(mocker, origin): ORIGIN = [{"url": origin["url"]}] mock_archive_search = mocker.patch("swh.web.utils.archive.search") mock_archive_search.origin_search.return_value = PagedResult( results=ORIGIN, next_page_token=None, ) query = f"origin = '{origin['url']}'" results = archive.search_origin(url_pattern=query, use_ql=True)[0] assert results == ORIGIN mock_archive_search.origin_search.assert_called_with( query=query, page_token=None, with_visit=False, visit_types=None, limit=50 ) def test_lookup_snapshot_sizes(archive_data, snapshot): branches = archive_data.snapshot_get(snapshot)["branches"] expected_sizes = { "alias": 0, "branch": 0, "release": 0, "revision": 0, } for _, branch_info in branches.items(): if branch_info is not None: expected_sizes[branch_info["target_type"]] += 1 if branch_info["target_type"] in ("content", "directory", "revision"): expected_sizes["branch"] += 1 assert archive.lookup_snapshot_sizes(snapshot) == expected_sizes def test_lookup_snapshot_sizes_with_filtering(archive_data, revision): rev_id = hash_to_bytes(revision) snapshot = Snapshot( branches={ b"refs/heads/master": SnapshotBranch( target=rev_id, target_type=TargetType.REVISION, ), b"refs/heads/incoming": SnapshotBranch( target=rev_id, target_type=TargetType.REVISION, ), b"refs/pull/1": SnapshotBranch( target=rev_id, target_type=TargetType.REVISION, ), b"refs/pull/2": SnapshotBranch( target=rev_id, target_type=TargetType.REVISION, ), }, ) archive_data.snapshot_add([snapshot]) expected_sizes = {"alias": 0, "branch": 2, "release": 0, "revision": 2} assert ( archive.lookup_snapshot_sizes( snapshot.id.hex(), branch_name_exclude_prefix="refs/pull/" ) == expected_sizes ) def test_lookup_snapshot_alias(snapshot): resolved_alias = archive.lookup_snapshot_alias(snapshot, "HEAD") assert resolved_alias is not None assert resolved_alias["target_type"] == "revision" assert resolved_alias["target"] is not None def test_lookup_snapshot_missing(revision): with pytest.raises(NotFoundExc): archive.lookup_snapshot(revision) def test_lookup_snapshot_empty_branch_list(archive_data, revision): rev_id = hash_to_bytes(revision) snapshot = Snapshot( branches={ b"refs/heads/master": SnapshotBranch( target=rev_id, target_type=TargetType.REVISION, ), }, ) archive_data.snapshot_add([snapshot]) # FIXME; This test will change once the inconsistency in storage is fixed # postgres backend returns None in case of a missing branch whereas the # in-memory implementation (used in tests) returns a data structure; # hence the inconsistency branches = archive.lookup_snapshot( hash_to_hex(snapshot.id), branch_name_include_substring="non-existing", )["branches"] assert not branches def test_lookup_snapshot_branch_names_filtering(archive_data, revision): rev_id = hash_to_bytes(revision) snapshot = Snapshot( branches={ b"refs/heads/master": SnapshotBranch( target=rev_id, target_type=TargetType.REVISION, ), b"refs/heads/incoming": SnapshotBranch( target=rev_id, target_type=TargetType.REVISION, ), b"refs/pull/1": SnapshotBranch( target=rev_id, target_type=TargetType.REVISION, ), b"refs/pull/2": SnapshotBranch( target=rev_id, target_type=TargetType.REVISION, ), "non_ascii_name_é".encode(): SnapshotBranch( target=rev_id, target_type=TargetType.REVISION, ), }, ) archive_data.snapshot_add([snapshot]) for include_pattern, exclude_prefix, nb_results in ( ("pull", None, 2), ("incoming", None, 1), ("é", None, 1), (None, "refs/heads/", 3), ("refs", "refs/heads/master", 3), ): branches = archive.lookup_snapshot( hash_to_hex(snapshot.id), branch_name_include_substring=include_pattern, branch_name_exclude_prefix=exclude_prefix, )["branches"] assert len(branches) == nb_results for branch_name in branches: if include_pattern: assert include_pattern in branch_name if exclude_prefix: assert not branch_name.startswith(exclude_prefix) def test_lookup_snapshot_branch_names_filtering_paginated( archive_data, directory, revision ): pattern = "foo" nb_branches_by_target_type = 10 branches = {} for i in range(nb_branches_by_target_type): branches[f"branch/directory/bar{i}".encode()] = SnapshotBranch( target=hash_to_bytes(directory), target_type=TargetType.DIRECTORY, ) branches[f"branch/revision/bar{i}".encode()] = SnapshotBranch( target=hash_to_bytes(revision), target_type=TargetType.REVISION, ) branches[f"branch/directory/{pattern}{i}".encode()] = SnapshotBranch( target=hash_to_bytes(directory), target_type=TargetType.DIRECTORY, ) branches[f"branch/revision/{pattern}{i}".encode()] = SnapshotBranch( target=hash_to_bytes(revision), target_type=TargetType.REVISION, ) snapshot = Snapshot(branches=branches) archive_data.snapshot_add([snapshot]) branches_count = nb_branches_by_target_type // 2 for target_type in ( ObjectType.DIRECTORY.name.lower(), ObjectType.REVISION.name.lower(), ): partial_branches = archive.lookup_snapshot( hash_to_hex(snapshot.id), target_types=[target_type], branches_count=branches_count, branch_name_include_substring=pattern, ) branches = partial_branches["branches"] assert len(branches) == branches_count for branch_name, branch_data in branches.items(): assert pattern in branch_name assert branch_data["target_type"] == target_type for i in range(branches_count): assert f"branch/{target_type}/{pattern}{i}" in branches assert ( partial_branches["next_branch"] == f"branch/{target_type}/{pattern}{branches_count}" ) partial_branches = archive.lookup_snapshot( hash_to_hex(snapshot.id), target_types=[target_type], branches_from=partial_branches["next_branch"], branch_name_include_substring=pattern, ) branches = partial_branches["branches"] assert len(branches) == branches_count for branch_name, branch_data in branches.items(): assert pattern in branch_name assert branch_data["target_type"] == target_type for i in range(branches_count, 2 * branches_count): assert f"branch/{target_type}/{pattern}{i}" in branches assert partial_branches["next_branch"] is None diff --git a/swh/web/tests/utils/test_converters.py b/swh/web/utils/tests/test_converters.py similarity index 100% rename from swh/web/tests/utils/test_converters.py rename to swh/web/utils/tests/test_converters.py diff --git a/swh/web/tests/utils/test_exc.py b/swh/web/utils/tests/test_exc.py similarity index 100% rename from swh/web/tests/utils/test_exc.py rename to swh/web/utils/tests/test_exc.py diff --git a/swh/web/tests/utils/test_highlightjs.py b/swh/web/utils/tests/test_highlightjs.py similarity index 100% rename from swh/web/tests/utils/test_highlightjs.py rename to swh/web/utils/tests/test_highlightjs.py diff --git a/swh/web/tests/utils/test_identifiers.py b/swh/web/utils/tests/test_identifiers.py similarity index 100% rename from swh/web/tests/utils/test_identifiers.py rename to swh/web/utils/tests/test_identifiers.py diff --git a/swh/web/tests/utils/test_middlewares.py b/swh/web/utils/tests/test_middlewares.py similarity index 100% rename from swh/web/tests/utils/test_middlewares.py rename to swh/web/utils/tests/test_middlewares.py diff --git a/swh/web/tests/utils/test_origin_visits.py b/swh/web/utils/tests/test_origin_visits.py similarity index 100% rename from swh/web/tests/utils/test_origin_visits.py rename to swh/web/utils/tests/test_origin_visits.py diff --git a/swh/web/tests/utils/test_query.py b/swh/web/utils/tests/test_query.py similarity index 100% rename from swh/web/tests/utils/test_query.py rename to swh/web/utils/tests/test_query.py diff --git a/swh/web/tests/utils/test_templatetags.py b/swh/web/utils/tests/test_templatetags.py similarity index 100% rename from swh/web/tests/utils/test_templatetags.py rename to swh/web/utils/tests/test_templatetags.py diff --git a/swh/web/tests/utils/test_utils.py b/swh/web/utils/tests/test_utils.py similarity index 100% rename from swh/web/tests/utils/test_utils.py rename to swh/web/utils/tests/test_utils.py diff --git a/swh/web/tests/vault/__init__.py b/swh/web/vault/tests/__init__.py similarity index 100% rename from swh/web/tests/vault/__init__.py rename to swh/web/vault/tests/__init__.py diff --git a/swh/web/tests/vault/test_apiviews.py b/swh/web/vault/tests/test_apiviews.py similarity index 100% rename from swh/web/tests/vault/test_apiviews.py rename to swh/web/vault/tests/test_apiviews.py diff --git a/swh/web/tests/vault/test_app.py b/swh/web/vault/tests/test_app.py similarity index 100% rename from swh/web/tests/vault/test_app.py rename to swh/web/vault/tests/test_app.py diff --git a/swh/web/tests/vault/test_views.py b/swh/web/vault/tests/test_views.py similarity index 100% rename from swh/web/tests/vault/test_views.py rename to swh/web/vault/tests/test_views.py diff --git a/swh/web/tests/webapp/__init__.py b/swh/web/webapp/tests/__init__.py similarity index 100% rename from swh/web/tests/webapp/__init__.py rename to swh/web/webapp/tests/__init__.py diff --git a/swh/web/tests/webapp/test_templates.py b/swh/web/webapp/tests/test_templates.py similarity index 100% rename from swh/web/tests/webapp/test_templates.py rename to swh/web/webapp/tests/test_templates.py diff --git a/swh/web/tests/webapp/test_views.py b/swh/web/webapp/tests/test_views.py similarity index 100% rename from swh/web/tests/webapp/test_views.py rename to swh/web/webapp/tests/test_views.py