diff --git a/requirements-swh.txt b/requirements-swh.txt index ac67abcf..9224ac8a 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,9 +1,9 @@ -swh.auth[django] >= 0.5.0 +swh.auth[django] >= 0.5.3 swh.core >= 0.0.95 swh.counters >= 0.5.1 swh.indexer >= 0.4.1 swh.model >= 0.5.0 swh.scheduler >= 0.7.0 swh.search >= 0.2.0 swh.storage >= 0.11.10 swh.vault >= 0.0.33 diff --git a/swh/web/tests/api/views/test_graph.py b/swh/web/tests/api/views/test_graph.py index d4d3b52a..b3cb2ba7 100644 --- a/swh/web/tests/api/views/test_graph.py +++ b/swh/web/tests/api/views/test_graph.py @@ -1,261 +1,261 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import json import textwrap from hypothesis import given from django.http.response import StreamingHttpResponse from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import ExtendedObjectType, ExtendedSWHID from swh.web.api.views.graph import API_GRAPH_PERM from swh.web.common.utils import reverse from swh.web.config import SWH_WEB_INTERNAL_SERVER_NAME, get_config from swh.web.tests.strategies import origin from swh.web.tests.utils import check_http_get_response def test_graph_endpoint_no_authentication_for_vpn_users(api_client, requests_mock): graph_query = "stats" url = reverse("api-1-graph", url_args={"graph_query": graph_query}) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json={}, headers={"Content-Type": "application/json"}, ) check_http_get_response( api_client, url, status_code=200, server_name=SWH_WEB_INTERNAL_SERVER_NAME ) def test_graph_endpoint_needs_authentication(api_client): url = reverse("api-1-graph", url_args={"graph_query": "stats"}) check_http_get_response(api_client, url, status_code=401) def _authenticate_graph_user(api_client, keycloak_oidc): - keycloak_oidc.user_permissions = [API_GRAPH_PERM] + keycloak_oidc.client_permissions = [API_GRAPH_PERM] oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") def test_graph_endpoint_needs_permission(api_client, keycloak_oidc, requests_mock): graph_query = "stats" url = reverse("api-1-graph", url_args={"graph_query": graph_query}) oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") check_http_get_response(api_client, url, status_code=403) _authenticate_graph_user(api_client, keycloak_oidc) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json={}, headers={"Content-Type": "application/json"}, ) check_http_get_response(api_client, url, status_code=200) def test_graph_text_plain_response(api_client, keycloak_oidc, requests_mock): _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "leaves/swh:1:dir:432d1b21c1256f7408a07c577b6974bbdbcc1323" response_text = textwrap.dedent( """\ swh:1:cnt:1d3dace0a825b0535c37c53ed669ef817e9c1b47 swh:1:cnt:6d5b280f4e33589ae967a7912a587dd5cb8dedaa swh:1:cnt:91bef238bf01356a550d416d14bb464c576ac6f4 swh:1:cnt:58a8b925a463b87d49639fda282b8f836546e396 swh:1:cnt:fd32ee0a87e16ccc853dfbeb7018674f9ce008c0 swh:1:cnt:ab7c39871872589a4fc9e249ebc927fb1042c90d swh:1:cnt:93073c02bf3869845977527de16af4d54765838d swh:1:cnt:4251f795b52c54c447a97c9fe904d8b1f993b1e0 swh:1:cnt:c6e7055424332006d07876ffeba684e7e284b383 swh:1:cnt:8459d8867dc3b15ef7ae9683e21cccc9ab2ec887 swh:1:cnt:5f9981d52202815aa947f85b9dfa191b66f51138 swh:1:cnt:00a685ec51bcdf398c15d588ecdedb611dbbab4b swh:1:cnt:e1cf1ea335106a0197a2f92f7804046425a7d3eb swh:1:cnt:07069b38087f88ec192d2c9aff75a502476fd17d swh:1:cnt:f045ee845c7f14d903a2c035b2691a7c400c01f0 """ ) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_text, headers={"Content-Type": "text/plain", "Transfer-Encoding": "chunked"}, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response( api_client, url, status_code=200, content_type="text/plain" ) assert isinstance(resp, StreamingHttpResponse) assert b"".join(resp.streaming_content) == response_text.encode() _response_json = { "counts": {"nodes": 17075708289, "edges": 196236587976}, "ratios": { "compression": 0.16, "bits_per_node": 58.828, "bits_per_edge": 5.119, "avg_locality": 2184278529.729, }, "indegree": {"min": 0, "max": 263180117, "avg": 11.4921492364925}, "outdegree": {"min": 0, "max": 1033207, "avg": 11.4921492364925}, } def test_graph_json_response(api_client, keycloak_oidc, requests_mock): _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "stats" requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json=_response_json, headers={"Content-Type": "application/json"}, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == "application/json" assert resp.content == json.dumps(_response_json).encode() def test_graph_ndjson_response(api_client, keycloak_oidc, requests_mock): _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "visit/paths/swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb" response_ndjson = textwrap.dedent( """\ ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:acfb7cabd63b368a03a9df87670ece1488c8bce0"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:2a0837708151d76edf28fdbb90dc3eabc676cff3"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:eaf025ad54b94b2fdda26af75594cfae3491ec75"] """ ) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_ndjson, headers={ "Content-Type": "application/x-ndjson", "Transfer-Encoding": "chunked", }, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == "application/x-ndjson" assert b"".join(resp.streaming_content) == response_ndjson.encode() @given(origin()) def test_graph_response_resolve_origins( archive_data, api_client, keycloak_oidc, requests_mock, origin ): hasher = hashlib.sha1() hasher.update(origin["url"].encode()) origin_sha1 = hasher.digest() origin_swhid = str( ExtendedSWHID(object_type=ExtendedObjectType.ORIGIN, object_id=origin_sha1) ) snapshot = archive_data.snapshot_get_latest(origin["url"])["id"] snapshot_swhid = str( ExtendedSWHID( object_type=ExtendedObjectType.SNAPSHOT, object_id=hash_to_bytes(snapshot) ) ) _authenticate_graph_user(api_client, keycloak_oidc) for graph_query, response_text, content_type in ( ( f"visit/nodes/{snapshot_swhid}", f"{snapshot_swhid}\n{origin_swhid}\n", "text/plain", ), ( f"visit/edges/{snapshot_swhid}", f"{snapshot_swhid} {origin_swhid}\n", "text/plain", ), ( f"visit/paths/{snapshot_swhid}", f'["{snapshot_swhid}", "{origin_swhid}"]\n', "application/x-ndjson", ), ): # set two lines response to check resolved origins cache response_text = response_text + response_text requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_text, headers={"Content-Type": content_type, "Transfer-Encoding": "chunked"}, ) url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"direction": "backward"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == content_type assert b"".join(resp.streaming_content) == response_text.encode() url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"direction": "backward", "resolve_origins": "true"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == content_type assert ( b"".join(resp.streaming_content) == response_text.replace(origin_swhid, origin["url"]).encode() ) def test_graph_response_resolve_origins_nothing_to_do( api_client, keycloak_oidc, requests_mock ): _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "stats" requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json=_response_json, headers={"Content-Type": "application/json"}, ) url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"resolve_origins": "true"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == "application/json" assert resp.content == json.dumps(_response_json).encode() diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py index 251975a6..83dec955 100644 --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,306 +1,306 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from urllib.parse import urljoin, urlparse import uuid import pytest from django.http import QueryDict from swh.auth.keycloak import KeycloakError from swh.web.auth.models import OIDCUserOfflineTokens from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.tests.django_asserts import assert_contains from swh.web.tests.utils import ( check_html_get_response, check_http_get_response, check_http_post_response, ) from swh.web.urls import _default_view as homepage_view def _check_oidc_login_code_flow_data( request, response, keycloak_oidc, redirect_uri, scope="openid" ): parsed_url = urlparse(response["location"]) authorization_url = keycloak_oidc.well_known()["authorization_endpoint"] query_dict = QueryDict(parsed_url.query) # check redirect url is valid assert urljoin(response["location"], parsed_url.path) == authorization_url assert "client_id" in query_dict assert query_dict["client_id"] == OIDC_SWH_WEB_CLIENT_ID assert "response_type" in query_dict assert query_dict["response_type"] == "code" assert "redirect_uri" in query_dict assert query_dict["redirect_uri"] == redirect_uri assert "code_challenge_method" in query_dict assert query_dict["code_challenge_method"] == "S256" assert "scope" in query_dict assert query_dict["scope"] == scope assert "state" in query_dict assert "code_challenge" in query_dict # check a login_data has been registered in user session assert "login_data" in request.session login_data = request.session["login_data"] assert "code_verifier" in login_data assert "state" in login_data assert "redirect_uri" in login_data assert login_data["redirect_uri"] == query_dict["redirect_uri"] return login_data def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default assert not hasattr(request, "user") response = homepage_view(request) assert response.status_code == 200 def test_oidc_generate_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-generate-bearer-token") check_http_get_response(client, url, status_code=403) def _generate_and_test_bearer_token(client, kc_oidc_mock): # user authenticates client.login( code="code", code_verifier="code-verifier", redirect_uri="redirect-uri" ) # user initiates bearer token generation flow url = reverse("oidc-generate-bearer-token") response = check_http_get_response(client, url, status_code=302) request = response.wsgi_request redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) # check login data and redirection to Keycloak is valid login_data = _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri=redirect_uri, scope="openid offline_access", ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-generate-bearer-token-complete' view # to get and save bearer token # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) token_complete_url = reverse( "oidc-generate-bearer-token-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) nb_tokens = len(OIDCUserOfflineTokens.objects.all()) response = check_http_get_response(client, token_complete_url, status_code=302) request = response.wsgi_request # check token has been generated and saved encrypted to database assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1 encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token secret = get_config()["secret_key"].encode() salt = request.user.sub.encode() decrypted_token = decrypt_data(encrypted_token, secret, salt) oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri) assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"] # should redirect to tokens management Web UI assert response["location"] == reverse("oidc-profile") + "#tokens" return decrypted_token @pytest.mark.django_db def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_oidc): """ Authenticated user should be able to generate a bearer token using OIDC Authorization Code Flow. """ _generate_and_test_bearer_token(client, keycloak_oidc) def test_oidc_list_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) check_http_get_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_list_bearer_tokens(client, keycloak_oidc): """ User with correct credentials should be allowed to list his tokens. """ nb_tokens = 3 for _ in range(nb_tokens): _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) response = check_http_get_response(client, url, status_code=200) tokens_data = list(reversed(json.loads(response.content.decode("utf-8"))["data"])) for oidc_token in OIDCUserOfflineTokens.objects.all(): assert ( oidc_token.creation_date.isoformat() == tokens_data[oidc_token.id - 1]["creation_date"] ) def test_oidc_get_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-get-bearer-token") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_get_bearer_token(client, keycloak_oidc): """ User with correct credentials should be allowed to display a token. """ nb_tokens = 3 for i in range(nb_tokens): token = _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse("oidc-get-bearer-token") response = check_http_post_response( client, url, status_code=200, data={"token_id": i + 1}, content_type="text/plain", ) assert response.content == token @pytest.mark.django_db def test_oidc_get_bearer_token_expired_token(client, keycloak_oidc): """ User with correct credentials should be allowed to display a token. """ _generate_and_test_bearer_token(client, keycloak_oidc) for kc_err_msg in ("Offline session not active", "Offline user session not found"): kc_error_dict = { "error": "invalid_grant", "error_description": kc_err_msg, } keycloak_oidc.refresh_token.side_effect = KeycloakError( error_message=json.dumps(kc_error_dict).encode(), response_code=400 ) url = reverse("oidc-get-bearer-token") response = check_http_post_response( client, url, status_code=400, data={"token_id": 1}, content_type="text/plain", ) assert ( response.content == b"Bearer token has expired, please generate a new one." ) def test_oidc_revoke_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-revoke-bearer-tokens") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_revoke_bearer_tokens(client, keycloak_oidc): """ User with correct credentials should be allowed to revoke tokens. """ nb_tokens = 3 for _ in range(nb_tokens): _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse("oidc-revoke-bearer-tokens") check_http_post_response( client, url, status_code=200, data={"token_ids": [1]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 2 check_http_post_response( client, url, status_code=200, data={"token_ids": [2, 3]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 0 def test_oidc_profile_view_anonymous_user(client): """ Non authenticated users should be redirected to login page when requesting profile view. """ url = reverse("oidc-profile") login_url = reverse("oidc-login", query_params={"next_path": url}) resp = check_http_get_response(client, url, status_code=302) assert resp["location"] == login_url @pytest.mark.django_db def test_oidc_profile_view(client, keycloak_oidc): """ Authenticated users should be able to request the profile page and link to Keycloak account UI should be present. """ url = reverse("oidc-profile") kc_config = get_config()["keycloak"] - user_permissions = ["perm1", "perm2"] - keycloak_oidc.user_permissions = user_permissions + client_permissions = ["perm1", "perm2"] + keycloak_oidc.client_permissions = client_permissions client.login(code="", code_verifier="", redirect_uri="") resp = check_html_get_response( client, url, status_code=200, template_used="auth/profile.html" ) user = resp.wsgi_request.user kc_account_url = ( f"{kc_config['server_url']}realms/{kc_config['realm_name']}/account/" ) assert_contains(resp, kc_account_url) assert_contains(resp, user.username) assert_contains(resp, user.first_name) assert_contains(resp, user.last_name) assert_contains(resp, user.email) - for perm in user_permissions: + for perm in client_permissions: assert_contains(resp, perm)