diff --git a/swh/web/tests/admin/test_origin_save.py b/swh/web/tests/admin/test_origin_save.py index e4e701cf..63babc3c 100644 --- a/swh/web/tests/admin/test_origin_save.py +++ b/swh/web/tests/admin/test_origin_save.py @@ -1,192 +1,182 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from urllib.parse import unquote import pytest -from django.contrib.auth import get_user_model - from swh.web.common.models import ( SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_PENDING, SAVE_REQUEST_REJECTED, SAVE_TASK_NOT_YET_SCHEDULED, SaveAuthorizedOrigin, SaveOriginRequest, SaveUnauthorizedOrigin, ) from swh.web.common.origin_save import can_save_origin from swh.web.common.utils import reverse from swh.web.tests.utils import check_http_get_response, check_http_post_response -_user_name = "swh-web-admin" -_user_mail = "admin@swh-web.org" -_user_password = "..34~pounds~BEAUTY~march~63.." - _authorized_origin_url = "https://scm.ourproject.org/anonscm/" _unauthorized_origin_url = "https://www.softwareheritage.org/" pytestmark = pytest.mark.django_db @pytest.fixture(autouse=True) def populated_db(): - User = get_user_model() - user = User.objects.create_user(_user_name, _user_mail, _user_password) - user.is_staff = True - user.save() SaveAuthorizedOrigin.objects.create(url=_authorized_origin_url) SaveUnauthorizedOrigin.objects.create(url=_unauthorized_origin_url) def check_not_login(client, url): login_url = reverse("login", query_params={"next": url}) resp = check_http_post_response(client, url, status_code=302) assert unquote(resp.url) == login_url -def test_add_authorized_origin_url(client): +def test_add_authorized_origin_url(client, staff_user): authorized_url = "https://scm.adullact.net/anonscm/" assert can_save_origin(authorized_url) == SAVE_REQUEST_PENDING url = reverse( "admin-origin-save-add-authorized-url", url_args={"origin_url": authorized_url} ) check_not_login(client, url) assert can_save_origin(authorized_url) == SAVE_REQUEST_PENDING - client.login(username=_user_name, password=_user_password) + client.force_login(staff_user) check_http_post_response(client, url, status_code=200) assert can_save_origin(authorized_url) == SAVE_REQUEST_ACCEPTED -def test_remove_authorized_origin_url(client): +def test_remove_authorized_origin_url(client, staff_user): assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_ACCEPTED url = reverse( "admin-origin-save-remove-authorized-url", url_args={"origin_url": _authorized_origin_url}, ) check_not_login(client, url) assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_ACCEPTED - client.login(username=_user_name, password=_user_password) + client.force_login(staff_user) check_http_post_response(client, url, status_code=200) assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_PENDING -def test_add_unauthorized_origin_url(client): +def test_add_unauthorized_origin_url(client, staff_user): unauthorized_url = "https://www.yahoo./" assert can_save_origin(unauthorized_url) == SAVE_REQUEST_PENDING url = reverse( "admin-origin-save-add-unauthorized-url", url_args={"origin_url": unauthorized_url}, ) check_not_login(client, url) assert can_save_origin(unauthorized_url) == SAVE_REQUEST_PENDING - client.login(username=_user_name, password=_user_password) + client.force_login(staff_user) check_http_post_response(client, url, status_code=200) assert can_save_origin(unauthorized_url) == SAVE_REQUEST_REJECTED -def test_remove_unauthorized_origin_url(client): +def test_remove_unauthorized_origin_url(client, staff_user): assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_REJECTED url = reverse( "admin-origin-save-remove-unauthorized-url", url_args={"origin_url": _unauthorized_origin_url}, ) check_not_login(client, url) assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_REJECTED - client.login(username=_user_name, password=_user_password) + client.force_login(staff_user) check_http_post_response(client, url, status_code=200) assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_PENDING -def test_accept_pending_save_request(client, swh_scheduler): +def test_accept_pending_save_request(client, staff_user, swh_scheduler): visit_type = "git" origin_url = "https://v2.pikacode.com/bthate/botlib.git" save_request_url = reverse( "api-1-save-origin", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) response = check_http_post_response(client, save_request_url, status_code=200) assert response.data["save_request_status"] == SAVE_REQUEST_PENDING accept_request_url = reverse( "admin-origin-save-request-accept", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) check_not_login(client, accept_request_url) - client.login(username=_user_name, password=_user_password) + client.force_login(staff_user) response = check_http_post_response(client, accept_request_url, status_code=200) response = check_http_get_response(client, save_request_url, status_code=200) assert response.data[0]["save_request_status"] == SAVE_REQUEST_ACCEPTED assert response.data[0]["save_task_status"] == SAVE_TASK_NOT_YET_SCHEDULED -def test_reject_pending_save_request(client, swh_scheduler): +def test_reject_pending_save_request(client, staff_user, swh_scheduler): visit_type = "git" origin_url = "https://wikipedia.com" save_request_url = reverse( "api-1-save-origin", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) response = check_http_post_response(client, save_request_url, status_code=200) assert response.data["save_request_status"] == SAVE_REQUEST_PENDING reject_request_url = reverse( "admin-origin-save-request-reject", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) check_not_login(client, reject_request_url) - client.login(username=_user_name, password=_user_password) + client.force_login(staff_user) response = check_http_post_response(client, reject_request_url, status_code=200) response = check_http_get_response(client, save_request_url, status_code=200) assert response.data[0]["save_request_status"] == SAVE_REQUEST_REJECTED -def test_remove_save_request(client): +def test_remove_save_request(client, staff_user): sor = SaveOriginRequest.objects.create( visit_type="git", origin_url="https://wikipedia.com", status=SAVE_REQUEST_PENDING, ) assert SaveOriginRequest.objects.count() == 1 remove_request_url = reverse( "admin-origin-save-request-remove", url_args={"sor_id": sor.id} ) check_not_login(client, remove_request_url) - client.login(username=_user_name, password=_user_password) + client.force_login(staff_user) check_http_post_response(client, remove_request_url, status_code=200) assert SaveOriginRequest.objects.count() == 0 diff --git a/swh/web/tests/api/test_throttling.py b/swh/web/tests/api/test_throttling.py index 3e0c66f4..82ffa6a4 100644 --- a/swh/web/tests/api/test_throttling.py +++ b/swh/web/tests/api/test_throttling.py @@ -1,232 +1,230 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from django.conf.urls import url -from django.contrib.auth.models import User from django.test.utils import override_settings from rest_framework.decorators import api_view from rest_framework.response import Response from rest_framework.views import APIView from swh.web.api.throttling import ( API_THROTTLING_EXEMPTED_PERM, SwhWebRateThrottle, SwhWebUserRateThrottle, throttle_scope, ) from swh.web.settings.tests import ( scope1_limiter_rate, scope1_limiter_rate_post, scope2_limiter_rate, scope2_limiter_rate_post, scope3_limiter_rate, scope3_limiter_rate_post, ) from swh.web.tests.utils import create_django_permission from swh.web.urls import urlpatterns class MockViewScope1(APIView): throttle_classes = (SwhWebRateThrottle,) throttle_scope = "scope1" def get(self, request): return Response("foo_get") def post(self, request): return Response("foo_post") @api_view(["GET", "POST"]) @throttle_scope("scope2") def mock_view_scope2(request): if request.method == "GET": return Response("bar_get") elif request.method == "POST": return Response("bar_post") class MockViewScope3(APIView): throttle_classes = (SwhWebRateThrottle,) throttle_scope = "scope3" def get(self, request): return Response("foo_get") def post(self, request): return Response("foo_post") @api_view(["GET", "POST"]) @throttle_scope("scope3") def mock_view_scope3(request): if request.method == "GET": return Response("bar_get") elif request.method == "POST": return Response("bar_post") urlpatterns += [ url(r"^scope1_class$", MockViewScope1.as_view()), url(r"^scope2_func$", mock_view_scope2), url(r"^scope3_class$", MockViewScope3.as_view()), url(r"^scope3_func$", mock_view_scope3), ] def check_response(response, status_code, limit=None, remaining=None): assert response.status_code == status_code if limit is not None: assert response["X-RateLimit-Limit"] == str(limit) else: assert "X-RateLimit-Limit" not in response if remaining is not None: assert response["X-RateLimit-Remaining"] == str(remaining) else: assert "X-RateLimit-Remaining" not in response @override_settings(ROOT_URLCONF=__name__) def test_scope1_requests_are_throttled(api_client): """ Ensure request rate is limited in scope1 """ for i in range(scope1_limiter_rate): response = api_client.get("/scope1_class") check_response(response, 200, scope1_limiter_rate, scope1_limiter_rate - i - 1) response = api_client.get("/scope1_class") check_response(response, 429, scope1_limiter_rate, 0) for i in range(scope1_limiter_rate_post): response = api_client.post("/scope1_class") check_response( response, 200, scope1_limiter_rate_post, scope1_limiter_rate_post - i - 1 ) response = api_client.post("/scope1_class") check_response(response, 429, scope1_limiter_rate_post, 0) @override_settings(ROOT_URLCONF=__name__) def test_scope2_requests_are_throttled(api_client): """ Ensure request rate is limited in scope2 """ for i in range(scope2_limiter_rate): response = api_client.get("/scope2_func") check_response(response, 200, scope2_limiter_rate, scope2_limiter_rate - i - 1) response = api_client.get("/scope2_func") check_response(response, 429, scope2_limiter_rate, 0) for i in range(scope2_limiter_rate_post): response = api_client.post("/scope2_func") check_response( response, 200, scope2_limiter_rate_post, scope2_limiter_rate_post - i - 1 ) response = api_client.post("/scope2_func") check_response(response, 429, scope2_limiter_rate_post, 0) @override_settings(ROOT_URLCONF=__name__) def test_scope3_requests_are_throttled_exempted(api_client): """ Ensure request rate is not limited in scope3 as requests coming from localhost are exempted from rate limit. """ for _ in range(scope3_limiter_rate + 1): response = api_client.get("/scope3_class") check_response(response, 200) for _ in range(scope3_limiter_rate_post + 1): response = api_client.post("/scope3_class") check_response(response, 200) for _ in range(scope3_limiter_rate + 1): response = api_client.get("/scope3_func") check_response(response, 200) for _ in range(scope3_limiter_rate_post + 1): response = api_client.post("/scope3_func") check_response(response, 200) @override_settings(ROOT_URLCONF=__name__) @pytest.mark.django_db -def test_staff_users_are_not_rate_limited(api_client): - staff_user = User.objects.create_user( - username="johndoe", password="", is_staff=True - ) +def test_staff_users_are_not_rate_limited(api_client, staff_user): api_client.force_login(staff_user) for _ in range(scope2_limiter_rate + 1): response = api_client.get("/scope2_func") check_response(response, 200) for _ in range(scope2_limiter_rate_post + 1): response = api_client.post("/scope2_func") check_response(response, 200) @override_settings(ROOT_URLCONF=__name__) @pytest.mark.django_db -def test_non_staff_users_are_rate_limited(api_client): - - user = User.objects.create_user(username="johndoe", password="", is_staff=False) +def test_non_staff_users_are_rate_limited(api_client, regular_user): - api_client.force_login(user) + api_client.force_login(regular_user) scope2_limiter_rate_user = ( scope2_limiter_rate * SwhWebUserRateThrottle.NUM_REQUESTS_FACTOR ) for i in range(scope2_limiter_rate_user): response = api_client.get("/scope2_func") check_response( response, 200, scope2_limiter_rate_user, scope2_limiter_rate_user - i - 1 ) response = api_client.get("/scope2_func") check_response(response, 429, scope2_limiter_rate_user, 0) scope2_limiter_rate_post_user = ( scope2_limiter_rate_post * SwhWebUserRateThrottle.NUM_REQUESTS_FACTOR ) for i in range(scope2_limiter_rate_post_user): response = api_client.post("/scope2_func") check_response( response, 200, scope2_limiter_rate_post_user, scope2_limiter_rate_post_user - i - 1, ) response = api_client.post("/scope2_func") check_response(response, 429, scope2_limiter_rate_post_user, 0) @override_settings(ROOT_URLCONF=__name__) @pytest.mark.django_db -def test_users_with_throttling_exempted_perm_are_not_rate_limited(api_client): - user = User.objects.create_user(username="johndoe", password="") - user.user_permissions.add(create_django_permission(API_THROTTLING_EXEMPTED_PERM)) +def test_users_with_throttling_exempted_perm_are_not_rate_limited( + api_client, regular_user +): + + regular_user.user_permissions.add( + create_django_permission(API_THROTTLING_EXEMPTED_PERM) + ) - assert user.has_perm(API_THROTTLING_EXEMPTED_PERM) + assert regular_user.has_perm(API_THROTTLING_EXEMPTED_PERM) - api_client.force_login(user) + api_client.force_login(regular_user) for _ in range(scope2_limiter_rate + 1): response = api_client.get("/scope2_func") check_response(response, 200) for _ in range(scope2_limiter_rate_post + 1): response = api_client.post("/scope2_func") check_response(response, 200) diff --git a/swh/web/tests/api/views/test_origin_save.py b/swh/web/tests/api/views/test_origin_save.py index 8e627f35..6dcc8f7d 100644 --- a/swh/web/tests/api/views/test_origin_save.py +++ b/swh/web/tests/api/views/test_origin_save.py @@ -1,543 +1,541 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta import uuid import pytest -from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from django.utils import timezone from swh.web.auth.utils import SWH_AMBASSADOR_PERMISSION from swh.web.common.models import ( SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_PENDING, SAVE_REQUEST_REJECTED, SAVE_TASK_FAILED, SAVE_TASK_NOT_CREATED, SAVE_TASK_NOT_YET_SCHEDULED, SAVE_TASK_SCHEDULED, SAVE_TASK_SUCCEEDED, VISIT_STATUS_FAILED, VISIT_STATUS_FULL, SaveAuthorizedOrigin, SaveOriginRequest, SaveUnauthorizedOrigin, ) from swh.web.common.typing import OriginExistenceCheckInfo from swh.web.common.utils import reverse from swh.web.settings.tests import save_origin_rate_post from swh.web.tests.utils import ( check_api_get_responses, check_api_post_response, check_api_post_responses, ) pytestmark = pytest.mark.django_db @pytest.fixture(autouse=True) def populated_db(): SaveAuthorizedOrigin.objects.create(url="https://github.com/"), SaveAuthorizedOrigin.objects.create(url="https://gitlab.com/"), SaveUnauthorizedOrigin.objects.create(url="https://github.com/user/illegal_repo") SaveUnauthorizedOrigin.objects.create(url="https://gitlab.com/user_to_exclude") def test_invalid_visit_type(api_client, swh_scheduler): url = reverse( "api-1-save-origin", url_args={ "visit_type": "foo", "origin_url": "https://github.com/torvalds/linux", }, ) check_api_get_responses(api_client, url, status_code=400) def test_invalid_origin_url(api_client, swh_scheduler): url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": "bar"} ) check_api_get_responses(api_client, url, status_code=400) def check_created_save_request_status( api_client, mocker, origin_url, expected_request_status, expected_task_status=None, visit_date=None, ): mock_origin_exists = mocker.patch("swh.web.common.origin_save.origin_exists") mock_origin_exists.return_value = OriginExistenceCheckInfo( origin_url=origin_url, exists=True, last_modified=None, content_length=None ) url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": origin_url} ) mock_visit_date = mocker.patch( ("swh.web.common.origin_save._get_visit_info_for_save_request") ) mock_visit_date.return_value = (visit_date, None) if expected_request_status != SAVE_REQUEST_REJECTED: response = check_api_post_responses(api_client, url, data=None, status_code=200) assert response.data["save_request_status"] == expected_request_status assert response.data["save_task_status"] == expected_task_status else: check_api_post_responses(api_client, url, data=None, status_code=403) def check_save_request_status( api_client, mocker, swh_scheduler, origin_url, expected_request_status, expected_task_status, scheduler_task_status="next_run_not_scheduled", scheduler_task_run_status=None, visit_date=None, visit_status=None, ): if expected_task_status != SAVE_TASK_NOT_CREATED: task = dict(swh_scheduler.search_tasks()[0].items()) backend_id = str(uuid.uuid4()) if scheduler_task_status != "next_run_not_scheduled": swh_scheduler.schedule_task_run(task["id"], backend_id) if scheduler_task_run_status is not None: swh_scheduler.start_task_run(backend_id) task_run = dict( swh_scheduler.end_task_run(backend_id, scheduler_task_run_status).items() ) url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": origin_url} ) mock_visit_date = mocker.patch( ("swh.web.common.origin_save._get_visit_info_for_save_request") ) mock_visit_date.return_value = (visit_date, visit_status) response = check_api_get_responses(api_client, url, status_code=200) save_request_data = response.data[0] assert save_request_data["save_request_status"] == expected_request_status assert save_request_data["save_task_status"] == expected_task_status assert save_request_data["visit_status"] == visit_status if scheduler_task_run_status is not None: # Check that save task status is still available when # the scheduler task has been archived swh_scheduler.delete_archived_tasks( [{"task_id": task["id"], "task_run_id": task_run["id"]}] ) response = check_api_get_responses(api_client, url, status_code=200) save_request_data = response.data[0] assert save_request_data["save_task_status"] == expected_task_status assert save_request_data["visit_status"] == visit_status def test_save_request_rejected(api_client, mocker, swh_scheduler): origin_url = "https://github.com/user/illegal_repo" check_created_save_request_status( api_client, mocker, origin_url, expected_request_status=SAVE_REQUEST_REJECTED, ) check_save_request_status( api_client, mocker, swh_scheduler, origin_url, expected_request_status=SAVE_REQUEST_REJECTED, expected_task_status=SAVE_TASK_NOT_CREATED, ) def test_save_request_pending(api_client, mocker, swh_scheduler): origin_url = "https://unkwownforge.com/user/repo" check_created_save_request_status( api_client, mocker, origin_url, expected_request_status=SAVE_REQUEST_PENDING, expected_task_status=SAVE_TASK_NOT_CREATED, ) check_save_request_status( api_client, mocker, swh_scheduler, origin_url, expected_request_status=SAVE_REQUEST_PENDING, expected_task_status=SAVE_TASK_NOT_CREATED, ) def test_save_request_scheduled(api_client, mocker, swh_scheduler): origin_url = "https://github.com/Kitware/CMake" check_created_save_request_status( api_client, mocker, origin_url, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_NOT_YET_SCHEDULED, ) check_save_request_status( api_client, mocker, swh_scheduler, origin_url, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_SCHEDULED, scheduler_task_status="next_run_scheduled", scheduler_task_run_status="scheduled", ) def test_save_request_completed(api_client, mocker, swh_scheduler): origin_url = "https://github.com/Kitware/CMake" check_created_save_request_status( api_client, mocker, origin_url, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_NOT_YET_SCHEDULED, ) check_save_request_status( api_client, mocker, swh_scheduler, origin_url, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_SUCCEEDED, scheduler_task_status="completed", scheduler_task_run_status="eventful", visit_date=None, ) def test_save_request_completed_visit_status(api_client, mocker, swh_scheduler): origin_url = "https://github.com/Kitware/CMake" check_created_save_request_status( api_client, mocker, origin_url, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_NOT_YET_SCHEDULED, ) visit_date = datetime.now(tz=timezone.utc) + timedelta(hours=1) check_save_request_status( api_client, mocker, swh_scheduler, origin_url, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_SUCCEEDED, scheduler_task_status="completed", scheduler_task_run_status="eventful", visit_date=visit_date, visit_status=VISIT_STATUS_FULL, ) def test_save_request_failed(api_client, mocker, swh_scheduler): origin_url = "https://gitlab.com/inkscape/inkscape" check_created_save_request_status( api_client, mocker, origin_url, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_NOT_YET_SCHEDULED, ) check_save_request_status( api_client, mocker, swh_scheduler, origin_url, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_FAILED, scheduler_task_status="disabled", scheduler_task_run_status="failed", visit_status=VISIT_STATUS_FAILED, ) def test_create_save_request_no_duplicate(api_client, mocker, swh_scheduler): origin_url = "https://github.com/webpack/webpack" check_created_save_request_status( api_client, mocker, origin_url, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_NOT_YET_SCHEDULED, ) sors = list( SaveOriginRequest.objects.filter(visit_type="git", origin_url=origin_url) ) assert len(sors) == 1 check_save_request_status( api_client, mocker, swh_scheduler, origin_url, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_SCHEDULED, scheduler_task_status="next_run_scheduled", scheduler_task_run_status="scheduled", ) check_created_save_request_status( api_client, mocker, origin_url, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_SCHEDULED, ) sors = list( SaveOriginRequest.objects.filter(visit_type="git", origin_url=origin_url) ) assert len(sors) == 1 def test_get_save_requests_unknown_origin(api_client, swh_scheduler): unknown_origin_url = "https://gitlab.com/foo/bar" url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": unknown_origin_url}, ) response = check_api_get_responses(api_client, url, status_code=404) assert response.data == { "exception": "NotFoundExc", "reason": ( "No save requests found for visit of type git on origin with url %s." ) % unknown_origin_url, } _visit_type = "git" _origin_url = "https://github.com/python/cpython" def test_save_requests_rate_limit(api_client, mocker): create_save_origin_request = mocker.patch( "swh.web.api.views.origin_save.create_save_origin_request" ) def _save_request_dict(*args, **kwargs): return { "id": 1, "visit_type": _visit_type, "origin_url": _origin_url, "save_request_date": datetime.now().isoformat(), "save_request_status": SAVE_REQUEST_ACCEPTED, "save_task_status": SAVE_TASK_NOT_YET_SCHEDULED, "visit_date": None, "visit_status": None, } create_save_origin_request.side_effect = _save_request_dict url = reverse( "api-1-save-origin", url_args={"visit_type": _visit_type, "origin_url": _origin_url}, ) for _ in range(save_origin_rate_post): check_api_post_response(api_client, url, status_code=200) check_api_post_response(api_client, url, status_code=429) def test_save_request_form_server_error(api_client, mocker): create_save_origin_request = mocker.patch( "swh.web.api.views.origin_save.create_save_origin_request" ) create_save_origin_request.side_effect = Exception("Server error") url = reverse( "api-1-save-origin", url_args={"visit_type": _visit_type, "origin_url": _origin_url}, ) check_api_post_responses(api_client, url, status_code=500) @pytest.fixture def origin_to_review(): return "https://git.example.org/user/project" def test_create_save_request_pending_review_anonymous_user( api_client, origin_to_review, swh_scheduler ): url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": origin_to_review}, ) response = check_api_post_responses(api_client, url, status_code=200) assert response.data["save_request_status"] == SAVE_REQUEST_PENDING with pytest.raises(ObjectDoesNotExist): SaveAuthorizedOrigin.objects.get(url=origin_to_review) def test_create_save_request_archives_with_ambassador_user( api_client, keycloak_oidc, requests_mock, swh_scheduler, ): swh_scheduler.add_load_archive_task_type() keycloak_oidc.realm_permissions = [SWH_AMBASSADOR_PERMISSION] oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") originUrl = "https://somewhere.org/simple" artifact_version = "1.2.3" artifact_filename = f"tarball-{artifact_version}.tar.gz" artifact_url = f"{originUrl}/{artifact_filename}" content_length = "100" last_modified = "Sun, 21 Aug 2011 16:26:32 GMT" requests_mock.head( artifact_url, status_code=200, headers={"content-length": content_length, "last-modified": last_modified,}, ) url = reverse( "api-1-save-origin", url_args={"visit_type": "archives", "origin_url": originUrl,}, ) response = check_api_post_response( api_client, url, status_code=200, data={ "archives_data": [ {"artifact_url": artifact_url, "artifact_version": artifact_version,} ] }, ) assert response.data["save_request_status"] == SAVE_REQUEST_ACCEPTED assert SaveAuthorizedOrigin.objects.get(url=originUrl) def test_create_save_request_archives_missing_artifacts_data( api_client, keycloak_oidc, swh_scheduler ): swh_scheduler.add_load_archive_task_type() keycloak_oidc.realm_permissions = [SWH_AMBASSADOR_PERMISSION] oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") originUrl = "https://somewhere.org/simple" url = reverse( "api-1-save-origin", url_args={"visit_type": "archives", "origin_url": originUrl,}, ) response = check_api_post_response(api_client, url, status_code=400, data={},) assert "Artifacts data are missing" in response.data["reason"] response = check_api_post_response( api_client, url, status_code=400, data={"archives_data": [{"artifact_url": "", "arttifact_version": "1.0"}]}, ) assert "Missing url or version for an artifact to load" in response.data["reason"] def test_create_save_request_archives_accepted_ambassador_user( api_client, origin_to_review, keycloak_oidc, mocker, swh_scheduler ): keycloak_oidc.realm_permissions = [SWH_AMBASSADOR_PERMISSION] oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") check_created_save_request_status( api_client, mocker, origin_to_review, expected_request_status=SAVE_REQUEST_ACCEPTED, expected_task_status=SAVE_TASK_NOT_YET_SCHEDULED, ) assert SaveAuthorizedOrigin.objects.get(url=origin_to_review) def test_create_save_request_anonymous_user_no_user_id(api_client, swh_scheduler): origin_url = "https://some.git.hosters/user/repo" url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": origin_url}, ) check_api_post_responses(api_client, url, status_code=200) sor = SaveOriginRequest.objects.get(origin_url=origin_url) assert sor.user_ids is None def test_create_save_request_authenticated_user_id( api_client, keycloak_oidc, swh_scheduler ): oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") origin_url = "https://some.git.hosters/user/repo2" url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": origin_url}, ) response = check_api_post_response(api_client, url, status_code=200) assert response.wsgi_request.user.id is not None user_id = str(response.wsgi_request.user.id) sor = SaveOriginRequest.objects.get(user_ids=f'"{user_id}"') assert sor.user_ids == f'"{user_id}"' def test_create_pending_save_request_multiple_authenticated_users( - api_client, swh_scheduler + api_client, swh_scheduler, regular_user, regular_user2 ): origin_url = "https://some.git.hosters/user/repo3" - first_user = User.objects.create_user(username="first_user", password="") - second_user = User.objects.create_user(username="second_user", password="") + url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": origin_url}, ) - api_client.force_login(first_user) + api_client.force_login(regular_user) check_api_post_response(api_client, url, status_code=200) - api_client.force_login(second_user) + api_client.force_login(regular_user2) check_api_post_response(api_client, url, status_code=200) - assert SaveOriginRequest.objects.get(user_ids__contains=f'"{first_user.id}"') - assert SaveOriginRequest.objects.get(user_ids__contains=f'"{second_user.id}"') + assert SaveOriginRequest.objects.get(user_ids__contains=f'"{regular_user.id}"') + assert SaveOriginRequest.objects.get(user_ids__contains=f'"{regular_user2.id}"') diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py index 9babf7b0..1081a751 100644 --- a/swh/web/tests/conftest.py +++ b/swh/web/tests/conftest.py @@ -1,517 +1,533 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import json import os import shutil from subprocess import PIPE, run import sys from typing import Any, Dict, List, Optional from _pytest.python import Function from hypothesis import HealthCheck, settings import pytest +from django.contrib.auth.models import User from django.core.cache import cache from django.test.utils import setup_databases # type: ignore from rest_framework.test import APIClient, APIRequestFactory from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.scheduler.tests.common import TASK_TYPES from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.common import converters from swh.web.common.origin_save import get_scheduler_load_task_types from swh.web.common.typing import OriginVisitInfo from swh.web.config import get_config from swh.web.tests.data import get_tests_data, override_storages # Used to skip some tests ctags_json_missing = ( shutil.which("ctags") is None or b"+json" not in run(["ctags", "--version"], stdout=PIPE).stdout ) fossology_missing = shutil.which("nomossa") is None # Register some hypothesis profiles settings.register_profile("default", settings()) # we use getattr here to keep mypy happy regardless hypothesis version function_scoped_fixture_check = ( [getattr(HealthCheck, "function_scoped_fixture")] if hasattr(HealthCheck, "function_scoped_fixture") else [] ) suppress_health_check = [ HealthCheck.too_slow, HealthCheck.filter_too_much, ] + function_scoped_fixture_check settings.register_profile( "swh-web", settings(deadline=None, suppress_health_check=suppress_health_check,), ) settings.register_profile( "swh-web-fast", settings( deadline=None, max_examples=5, suppress_health_check=suppress_health_check, ), ) def pytest_configure(config): # Use fast hypothesis profile by default if none has been # explicitly specified in pytest option if config.getoption("--hypothesis-profile") is None: settings.load_profile("swh-web-fast") # Small hack in order to be able to run the unit tests # without static assets generated by webpack. # Those assets are not really needed for the Python tests # but the django templates will fail to load due to missing # generated file webpack-stats.json describing the js and css # files to include. # So generate a dummy webpack-stats.json file to overcome # that issue. test_dir = os.path.dirname(__file__) # location of the static folder when running tests through tox data_dir = os.path.join(sys.prefix, "share/swh/web") static_dir = os.path.join(data_dir, "static") if not os.path.exists(static_dir): # location of the static folder when running tests locally with pytest static_dir = os.path.join(test_dir, "../../../static") webpack_stats = os.path.join(static_dir, "webpack-stats.json") if os.path.exists(webpack_stats): return bundles_dir = os.path.join(test_dir, "../../../assets/src/bundles") if not os.path.exists(bundles_dir): # location of the bundles folder when running tests with tox bundles_dir = os.path.join(data_dir, "assets/src/bundles") _, bundles, _ = next(os.walk(bundles_dir)) mock_webpack_stats = { "status": "done", "publicPath": "/static", "chunks": {}, "assets": {}, } for bundle in bundles: asset = f"js/{bundle}.js" mock_webpack_stats["chunks"][bundle] = [asset] mock_webpack_stats["assets"][asset] = { "name": asset, "publicPath": f"/static/{asset}", } with open(webpack_stats, "w") as outfile: json.dump(mock_webpack_stats, outfile) # Clear Django cache before each test @pytest.fixture(autouse=True) def django_cache_cleared(): cache.clear() # Alias rf fixture from pytest-django @pytest.fixture def request_factory(rf): return rf # Fixture to get test client from Django REST Framework @pytest.fixture def api_client(): return APIClient() # Fixture to get API request factory from Django REST Framework @pytest.fixture def api_request_factory(): return APIRequestFactory() # Initialize tests data @pytest.fixture(scope="function", autouse=True) def tests_data(): data = get_tests_data(reset=True) # Update swh-web configuration to use the in-memory storages # instantiated in the tests.data module override_storages( data["storage"], data["idx_storage"], data["search"], data["counters"] ) return data # Fixture to manipulate data from a sample archive used in the tests @pytest.fixture(scope="function") def archive_data(tests_data): return _ArchiveData(tests_data) # Fixture to manipulate indexer data from a sample archive used in the tests @pytest.fixture(scope="function") def indexer_data(tests_data): return _IndexerData(tests_data) # Custom data directory for requests_mock @pytest.fixture def datadir(): return os.path.join(os.path.abspath(os.path.dirname(__file__)), "resources") class _ArchiveData: """ Helper class to manage data from a sample test archive. It is initialized with a reference to an in-memory storage containing raw tests data. It is basically a proxy to Storage interface but it overrides some methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.storage = tests_data["storage"] def __getattr__(self, key): if key == "storage": raise AttributeError(key) # Forward calls to non overridden Storage methods to wrapped # storage instance return getattr(self.storage, key) def content_find(self, content: Dict[str, Any]) -> Dict[str, Any]: cnt_ids_bytes = { algo_hash: hash_to_bytes(content[algo_hash]) for algo_hash in ALGORITHMS if content.get(algo_hash) } cnt = self.storage.content_find(cnt_ids_bytes) return converters.from_content(cnt[0].to_dict()) if cnt else cnt def content_get(self, cnt_id: str) -> Dict[str, Any]: cnt_id_bytes = hash_to_bytes(cnt_id) content = self.storage.content_get([cnt_id_bytes])[0] if content: content_d = content.to_dict() content_d.pop("ctime", None) else: content_d = None return converters.from_swh( content_d, hashess={"sha1", "sha1_git", "sha256", "blake2s256"} ) def content_get_data(self, cnt_id: str) -> Optional[Dict[str, Any]]: cnt_id_bytes = hash_to_bytes(cnt_id) cnt_data = self.storage.content_get_data(cnt_id_bytes) if cnt_data is None: return None return converters.from_content({"data": cnt_data, "sha1": cnt_id_bytes}) def directory_get(self, dir_id): return {"id": dir_id, "content": self.directory_ls(dir_id)} def directory_ls(self, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map( converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes) ) return list(dir_content) def release_get(self, rel_id: str) -> Optional[Dict[str, Any]]: rel_id_bytes = hash_to_bytes(rel_id) rel_data = self.storage.release_get([rel_id_bytes])[0] return converters.from_release(rel_data) if rel_data else None def revision_get(self, rev_id: str) -> Optional[Dict[str, Any]]: rev_id_bytes = hash_to_bytes(rev_id) rev_data = self.storage.revision_get([rev_id_bytes])[0] return converters.from_revision(rev_data) if rev_data else None def revision_log(self, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list( map( converters.from_revision, self.storage.revision_log([rev_id_bytes], limit=limit), ) ) def snapshot_get_latest(self, origin_url): snp = snapshot_get_latest(self.storage, origin_url) return converters.from_snapshot(snp.to_dict()) def origin_get(self, origin_urls): origins = self.storage.origin_get(origin_urls) return [converters.from_origin(o.to_dict()) for o in origins] def origin_visit_get(self, origin_url): next_page_token = None visits = [] while True: visit_page = self.storage.origin_visit_get( origin_url, page_token=next_page_token ) next_page_token = visit_page.next_page_token for visit in visit_page.results: visit_status = self.storage.origin_visit_status_get_latest( origin_url, visit.visit ) visits.append( converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) ) if not next_page_token: break return visits def origin_visit_get_by(self, origin_url: str, visit_id: int) -> OriginVisitInfo: visit = self.storage.origin_visit_get_by(origin_url, visit_id) assert visit is not None visit_status = self.storage.origin_visit_status_get_latest(origin_url, visit_id) assert visit_status is not None return converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) def origin_visit_status_get_latest( self, origin_url, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ): visit_status = origin_get_latest_visit_status( self.storage, origin_url, type=type, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, ) return ( converters.from_origin_visit(visit_status.to_dict()) if visit_status else None ) def snapshot_get(self, snapshot_id): snp = snapshot_get_all_branches(self.storage, hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp.to_dict()) def snapshot_get_branches( self, snapshot_id, branches_from="", branches_count=1000, target_types=None ): partial_branches = self.storage.snapshot_get_branches( hash_to_bytes(snapshot_id), branches_from.encode(), branches_count, target_types, ) return converters.from_partial_branches(partial_branches) def snapshot_get_head(self, snapshot): if snapshot["branches"]["HEAD"]["target_type"] == "alias": target = snapshot["branches"]["HEAD"]["target"] head = snapshot["branches"][target]["target"] else: head = snapshot["branches"]["HEAD"]["target"] return head def snapshot_count_branches(self, snapshot_id): counts = dict.fromkeys(("alias", "release", "revision"), 0) counts.update(self.storage.snapshot_count_branches(hash_to_bytes(snapshot_id))) counts.pop(None, None) return counts class _IndexerData: """ Helper class to manage indexer tests data It is initialized with a reference to an in-memory indexer storage containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.idx_storage = tests_data["idx_storage"] self.mimetype_indexer = tests_data["mimetype_indexer"] self.license_indexer = tests_data["license_indexer"] self.ctags_indexer = tests_data["ctags_indexer"] def content_add_mimetype(self, cnt_id): self.mimetype_indexer.run([hash_to_bytes(cnt_id)]) def content_get_mimetype(self, cnt_id): mimetype = self.idx_storage.content_mimetype_get([hash_to_bytes(cnt_id)])[ 0 ].to_dict() return converters.from_filetype(mimetype) def content_add_license(self, cnt_id): self.license_indexer.run([hash_to_bytes(cnt_id)]) def content_get_license(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) licenses = self.idx_storage.content_fossology_license_get([cnt_id_bytes]) for license in licenses: yield converters.from_swh(license.to_dict(), hashess={"id"}) def content_add_ctags(self, cnt_id): self.ctags_indexer.run([hash_to_bytes(cnt_id)]) def content_get_ctags(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) ctags = self.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={"id"}) @pytest.fixture def keycloak_oidc(keycloak_oidc, mocker): keycloak_config = get_config()["keycloak"] keycloak_oidc.server_url = keycloak_config["server_url"] keycloak_oidc.realm_name = keycloak_config["realm_name"] keycloak_oidc.client_id = OIDC_SWH_WEB_CLIENT_ID keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client") keycloak_oidc_client.return_value = keycloak_oidc return keycloak_oidc @pytest.fixture def subtest(request): """A hack to explicitly set up and tear down fixtures. This fixture allows you to set up and tear down fixtures within the test function itself. This is useful (necessary!) for using Hypothesis inside pytest, as hypothesis will call the test function multiple times, without setting up or tearing down fixture state as it is normally the case. Copied from the pytest-subtesthack project, public domain license (https://github.com/untitaker/pytest-subtesthack). """ parent_test = request.node def inner(func): if hasattr(Function, "from_parent"): item = Function.from_parent( parent_test, name=request.function.__name__ + "[]", originalname=request.function.__name__, callobj=func, ) else: item = Function( name=request.function.__name__ + "[]", parent=parent_test, callobj=func ) nextitem = parent_test # prevents pytest from tearing down module fixtures item.ihook.pytest_runtest_setup(item=item) item.ihook.pytest_runtest_call(item=item) item.ihook.pytest_runtest_teardown(item=item, nextitem=nextitem) return inner @pytest.fixture def swh_scheduler(swh_scheduler): config = get_config() scheduler = config["scheduler"] config["scheduler"] = swh_scheduler # create load-git and load-hg task types for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) # create load-svn task type swh_scheduler.create_task_type( { "type": "load-svn", "description": "Update a Subversion repository", "backend_name": "swh.loader.svn.tasks.DumpMountAndLoadSvnRepository", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) # create load-cvs task type swh_scheduler.create_task_type( { "type": "load-cvs", "description": "Update a CVS repository", "backend_name": "swh.loader.cvs.tasks.DumpMountAndLoadSvnRepository", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) # add method to add load-archive-files task type during tests def add_load_archive_task_type(): swh_scheduler.create_task_type( { "type": "load-archive-files", "description": "Load tarballs", "backend_name": "swh.loader.package.archive.tasks.LoadArchive", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) swh_scheduler.add_load_archive_task_type = add_load_archive_task_type yield swh_scheduler config["scheduler"] = scheduler get_scheduler_load_task_types.cache_clear() @pytest.fixture(scope="session") def django_db_setup(request, django_db_blocker, postgresql_proc): from django.conf import settings settings.DATABASES["default"].update( { ("ENGINE", "django.db.backends.postgresql"), ("NAME", get_config()["test_db"]["name"]), ("USER", postgresql_proc.user), ("HOST", postgresql_proc.host), ("PORT", postgresql_proc.port), } ) with django_db_blocker.unblock(): setup_databases( verbosity=request.config.option.verbose, interactive=False, keepdb=False ) + + +@pytest.fixture +def staff_user(): + return User.objects.create_user(username="admin", password="", is_staff=True) + + +@pytest.fixture +def regular_user(): + return User.objects.create_user(username="johndoe", password="") + + +@pytest.fixture +def regular_user2(): + return User.objects.create_user(username="janedoe", password="")