diff --git a/docs/cli.rst b/docs/cli.rst new file mode 100644 index 0000000..9931b86 --- /dev/null +++ b/docs/cli.rst @@ -0,0 +1,8 @@ +.. _swh-auth-cli: + +Command-line interface +====================== + +.. click:: swh.auth.cli:auth + :prog: swh auth + :nested: full diff --git a/docs/index.rst b/docs/index.rst index c8f7297..28c50c6 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,12 +1,13 @@ .. _swh-auth: .. include:: README.rst Reference Documentation ----------------------- .. toctree:: :maxdepth: 2 + cli django /apidoc/swh.auth diff --git a/setup.py b/setup.py index e9ca471..0fcb566 100755 --- a/setup.py +++ b/setup.py @@ -1,75 +1,75 @@ #!/usr/bin/env python3 # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import open from os import path from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.rst"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(*names): requirements = [] for name in names: if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.auth", description="Software Heritage Authentication Utilities", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/source/swh-auth/", packages=find_packages(), # packages's modules install_requires=parse_requirements(None, "swh"), tests_require=parse_requirements("test"), setup_requires=["setuptools-scm"], use_scm_version=True, extras_require={ "django": parse_requirements("django"), "testing": parse_requirements("test"), }, include_package_data=True, - # entry_points=""" - # [swh.cli.subcommands] - # =swh..cli - # """, + entry_points=""" + [swh.cli.subcommands] + auth=swh.auth.cli + """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 3 - Alpha", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-", "Documentation": "https://docs.softwareheritage.org/devel/swh-/", }, ) diff --git a/swh/auth/cli.py b/swh/auth/cli.py index 07c279b..10068ac 100644 --- a/swh/auth/cli.py +++ b/swh/auth/cli.py @@ -1,19 +1,111 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control + +import sys + import click +from click.core import Context -from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group +CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) -@swh_cli_group.group(name="foo", context_settings=CONTEXT_SETTINGS) + +@swh_cli_group.group(name="auth", context_settings=CONTEXT_SETTINGS) +@click.option( + "--oidc-server-url", + "oidc_server_url", + default="https://auth.softwareheritage.org/auth/", + help=( + "URL of OpenID Connect server (default to " + '"https://auth.softwareheritage.org/auth/")' + ), +) +@click.option( + "--realm-name", + "realm_name", + default="SoftwareHeritage", + help=( + "Name of the OpenID Connect authentication realm " + '(default to "SoftwareHeritage")' + ), +) +@click.option( + "--client-id", + "client_id", + default="swh-web", + help=("OpenID Connect client identifier in the realm " '(default to "swh-web")'), +) @click.pass_context -def foo_cli_group(ctx): - """Foo main command. +def auth(ctx: Context, oidc_server_url: str, realm_name: str, client_id: str): + """ + Software Heritage Authentication tools. + + This CLI eases the retrieval of a bearer token to authenticate + a user querying Software Heritage Web APIs. """ + from swh.auth.keycloak import KeycloakOpenIDConnect + ctx.ensure_object(dict) + ctx.obj["oidc_client"] = KeycloakOpenIDConnect( + oidc_server_url, realm_name, client_id + ) -@foo_cli_group.command() -@click.option("--bar", help="Something") + +@auth.command("generate-token") +@click.argument("username") @click.pass_context -def bar(ctx, bar): - """Do something.""" - click.echo("bar") +def generate_token(ctx: Context, username: str): + """ + Generate a new bearer token for a Web API authentication. + + Login with USERNAME, create a new OpenID Connect session and get + bearer token. + + Users will be prompted for their password, then the token will be printed + to standard output. + + The created OpenID Connect session is an offline one so the provided + token has a much longer expiration time than classical OIDC + sessions (usually several dozens of days). + """ + from getpass import getpass + + from swh.auth.keycloak import KeycloakError, keycloak_error_message + + password = getpass() + + try: + oidc_info = ctx.obj["oidc_client"].login( + username, password, scope="openid offline_access" + ) + print(oidc_info["refresh_token"]) + except KeycloakError as ke: + print(keycloak_error_message(ke)) + sys.exit(1) + + +@auth.command("revoke-token") +@click.argument("token") +@click.pass_context +def revoke_token(ctx: Context, token: str): + """ + Revoke a bearer token used for a Web API authentication. + + Use TOKEN to logout from an offline OpenID Connect session. + + The token is definitely revoked after that operation. + """ + from swh.auth.keycloak import KeycloakError, keycloak_error_message + + try: + ctx.obj["oidc_client"].logout(token) + print("Token successfully revoked.") + except KeycloakError as ke: + print(keycloak_error_message(ke)) + sys.exit(1) diff --git a/swh/auth/keycloak.py b/swh/auth/keycloak.py index 8bf627d..cb45d37 100644 --- a/swh/auth/keycloak.py +++ b/swh/auth/keycloak.py @@ -1,241 +1,246 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Optional from urllib.parse import urlencode # add ExpiredSignatureError alias to avoid leaking jose import # in swh-auth client code from jose.jwt import ExpiredSignatureError # noqa from keycloak import KeycloakOpenID # add KeycloakError alias to avoid leaking keycloak import # in swh-auth client code from keycloak.exceptions import KeycloakError # noqa from swh.core.config import load_from_envvar class KeycloakOpenIDConnect: """ Wrapper class around python-keycloak to ease the interaction with Keycloak for managing authentication and user permissions with OpenID Connect. """ def __init__( self, server_url: str, realm_name: str, client_id: str, realm_public_key: str = "", ): """ Args: server_url: URL of the Keycloak server realm_name: The realm name client_id: The OpenID Connect client identifier realm_public_key: The realm public key (will be dynamically retrieved if not provided) """ self._keycloak = KeycloakOpenID( server_url=server_url, client_id=client_id, realm_name=realm_name, ) self.server_url = server_url self.realm_public_key = realm_public_key @property def realm_name(self): return self._keycloak.realm_name @realm_name.setter def realm_name(self, value): self._keycloak.realm_name = value @property def client_id(self): return self._keycloak.client_id @client_id.setter def client_id(self, value): self._keycloak.client_id = value def well_known(self) -> Dict[str, Any]: """ Retrieve the OpenID Connect Well-Known URI registry from Keycloak. Returns: A dictionary filled with OpenID Connect URIS. """ return self._keycloak.well_know() def authorization_url(self, redirect_uri: str, **extra_params: str) -> str: """ Get OpenID Connect authorization URL to authenticate users. Args: redirect_uri: URI to redirect to once a user is authenticated extra_params: Extra query parameters to add to the authorization URL """ auth_url = self._keycloak.auth_url(redirect_uri) if extra_params: auth_url += "&%s" % urlencode(extra_params) return auth_url def authorization_code( self, code: str, redirect_uri: str, **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Authorization Code flow. Raises: KeycloakError in case of authentication failures Args: code: Authorization code provided by Keycloak redirect_uri: URI to redirect to once a user is authenticated (must be the same as the one provided to authorization_url): extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="authorization_code", code=code, redirect_uri=redirect_uri, **extra_params, ) def login( - self, username: str, password: str, **extra_params: str + self, username: str, password: str, scope: str = "openid", **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Direct Access Grant flow. Raises: KeycloakError in case of authentication failures Args: username: an existing username in the realm password: password associated to username extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="password", - scope="openid", + scope=scope, username=username, password=password, **extra_params, ) def refresh_token(self, refresh_token: str) -> Dict[str, Any]: """ Request a new access token from Keycloak using a refresh token. Args: refresh_token: A refresh token provided by Keycloak Returns: A dictionary filled with tokens info """ return self._keycloak.refresh_token(refresh_token) def decode_token( self, token: str, options: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Try to decode a JWT token. Args: token: A JWT token to decode options: Options for jose.jwt.decode Returns: A dictionary filled with decoded token content """ if not self.realm_public_key: realm_public_key = self._keycloak.public_key() self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n" self.realm_public_key += realm_public_key self.realm_public_key += "\n-----END PUBLIC KEY-----" return self._keycloak.decode_token( token, key=self.realm_public_key, options=options ) def logout(self, refresh_token: str) -> None: """ Logout a user by closing its authenticated session. Args: refresh_token: A refresh token provided by Keycloak """ self._keycloak.logout(refresh_token) def userinfo(self, access_token: str) -> Dict[str, Any]: """ Return user information from its access token. Args: access_token: An access token provided by Keycloak Returns: A dictionary fillled with user information """ return self._keycloak.userinfo(access_token) @classmethod def from_config(cls, **kwargs: Any) -> "KeycloakOpenIDConnect": """Instantiate a KeycloakOpenIDConnect class from a configuration dict. Args: kwargs: configuration dict for the instance, with one keycloak key, whose value is a Dict with the following keys: - server_url: URL of the Keycloak server - realm_name: The realm name - client_id: The OpenID Connect client identifier Returns: the KeycloakOpenIDConnect instance """ cfg = kwargs["keycloak"] return cls( server_url=cfg["server_url"], realm_name=cfg["realm_name"], client_id=cfg["client_id"], ) @classmethod def from_configfile(cls, **kwargs: Any) -> "KeycloakOpenIDConnect": """Instantiate a KeycloakOpenIDConnect class from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None. Args: kwargs: kwargs passed to instantiation call Returns: the KeycloakOpenIDConnect instance """ config = dict(load_from_envvar()).get("keycloak", {}) config.update({k: v for k, v in kwargs.items() if v is not None}) return cls.from_config(keycloak=config) def keycloak_error_message(keycloak_error: KeycloakError) -> str: """Transform a keycloak exception into an error message. """ - msg_dict = json.loads(keycloak_error.error_message.decode()) - error_msg = msg_dict["error"] - error_desc = msg_dict.get("error_description") - if error_desc: - error_msg = f"{error_msg}: {error_desc}" - return error_msg + try: + # keycloak error wrapped in a JSON document + msg_dict = json.loads(keycloak_error.error_message.decode()) + error_msg = msg_dict["error"] + error_desc = msg_dict.get("error_description") + if error_desc: + error_msg = f"{error_msg}: {error_desc}" + return error_msg + except Exception: + # fallback: return error message string + return keycloak_error.error_message diff --git a/swh/auth/tests/test_cli.py b/swh/auth/tests/test_cli.py new file mode 100644 index 0000000..e22848e --- /dev/null +++ b/swh/auth/tests/test_cli.py @@ -0,0 +1,93 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from click.testing import CliRunner +import pytest + +from swh.auth.cli import auth +from swh.auth.tests.sample_data import OIDC_PROFILE + +runner = CliRunner() + + +@pytest.fixture() +def keycloak_oidc(keycloak_oidc, mocker): + def _keycloak_oidc(server_url, realm_name, client_id): + keycloak_oidc.server_url = server_url + keycloak_oidc.realm_name = realm_name + keycloak_oidc.client_id = client_id + return keycloak_oidc + + keycloak_oidc_client = mocker.patch("swh.auth.keycloak.KeycloakOpenIDConnect") + keycloak_oidc_client.side_effect = _keycloak_oidc + return keycloak_oidc + + +def _run_auth_command(command, keycloak_oidc, input=None): + server_url = "http://localhost:5080/auth" + realm_name = "realm-test" + client_id = "client-test" + result = runner.invoke( + auth, + [ + "--oidc-server-url", + server_url, + "--realm-name", + realm_name, + "--client-id", + client_id, + *command, + ], + input=input, + ) + assert keycloak_oidc.server_url == server_url + assert keycloak_oidc.realm_name == realm_name + assert keycloak_oidc.client_id == client_id + return result + + +@pytest.fixture +def user_credentials(): + return {"username": "foo", "password": "bar"} + + +def test_auth_generate_token_ok(keycloak_oidc, mocker, user_credentials): + mock_getpass = mocker.patch("getpass.getpass") + mock_getpass.return_value = user_credentials["password"] + + command = ["generate-token", user_credentials["username"]] + result = _run_auth_command( + command, keycloak_oidc, input=f"{user_credentials['password']}\n" + ) + assert result.exit_code == 0 + assert result.output[:-1] == OIDC_PROFILE["refresh_token"] + + +def test_auth_generate_token_error(keycloak_oidc, mocker, user_credentials): + keycloak_oidc.set_auth_success(False) + mock_getpass = mocker.patch("getpass.getpass") + mock_getpass.return_value = user_credentials["password"] + + command = ["generate-token", user_credentials["username"]] + result = _run_auth_command( + command, keycloak_oidc, input=f"{user_credentials['password']}\n" + ) + assert result.exit_code == 1 + assert result.output[:-1] == "invalid_grant: Invalid user credentials" + + +def test_auth_remove_token_ok(keycloak_oidc): + command = ["revoke-token", OIDC_PROFILE["refresh_token"]] + result = _run_auth_command(command, keycloak_oidc) + assert result.exit_code == 0 + assert result.output[:-1] == "Token successfully revoked." + + +def test_auth_remove_token_error(keycloak_oidc): + keycloak_oidc.set_auth_success(False) + command = ["revoke-token", OIDC_PROFILE["refresh_token"]] + result = _run_auth_command(command, keycloak_oidc) + assert result.exit_code == 1 + assert result.output[:-1] == "invalid_grant: Invalid user credentials" diff --git a/swh/auth/tests/test_keycloak.py b/swh/auth/tests/test_keycloak.py index 33b7abe..b8f23a1 100644 --- a/swh/auth/tests/test_keycloak.py +++ b/swh/auth/tests/test_keycloak.py @@ -1,175 +1,182 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy import json import os from urllib.parse import parse_qs, urlparse from keycloak.exceptions import KeycloakError import pytest import yaml from swh.auth.keycloak import KeycloakOpenIDConnect, keycloak_error_message from swh.auth.tests.sample_data import CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, USER_INFO from swh.core.config import read def test_keycloak_oidc_well_known(keycloak_oidc): well_known_result = keycloak_oidc.well_known() assert set(well_known_result.keys()) == { "issuer", "authorization_endpoint", "token_endpoint", "userinfo_endpoint", "end_session_endpoint", "jwks_uri", "token_introspection_endpoint", } def test_keycloak_oidc_authorization_url(keycloak_oidc): actual_auth_uri = keycloak_oidc.authorization_url("http://redirect-uri", foo="bar") expected_auth_url = keycloak_oidc.well_known()["authorization_endpoint"] parsed_result = urlparse(actual_auth_uri) assert expected_auth_url.endswith(parsed_result.path) parsed_query = parse_qs(parsed_result.query) assert parsed_query == { "client_id": [CLIENT_ID], "response_type": ["code"], "redirect_uri": ["http://redirect-uri"], "foo": ["bar"], } def test_keycloak_oidc_authorization_code_fail(keycloak_oidc): "Authorization failure raise error" # Simulate failed authentication with Keycloak keycloak_oidc.set_auth_success(False) with pytest.raises(KeycloakError): keycloak_oidc.authorization_code("auth-code", "redirect-uri") with pytest.raises(KeycloakError): keycloak_oidc.login("username", "password") def test_keycloak_oidc_authorization_code(keycloak_oidc): actual_response = keycloak_oidc.authorization_code("auth-code", "redirect-uri") assert actual_response == OIDC_PROFILE def test_keycloak_oidc_refresh_token(keycloak_oidc): actual_result = keycloak_oidc.refresh_token("refresh-token") assert actual_result == OIDC_PROFILE def test_keycloak_oidc_userinfo(keycloak_oidc): actual_user_info = keycloak_oidc.userinfo("refresh-token") assert actual_user_info == USER_INFO def test_keycloak_oidc_logout(keycloak_oidc): """Login out does not raise""" keycloak_oidc.logout("refresh-token") def test_keycloak_oidc_decode_token(keycloak_oidc): actual_decoded_data = keycloak_oidc.decode_token(OIDC_PROFILE["access_token"]) actual_decoded_data2 = copy(actual_decoded_data) expected_decoded_token = copy(DECODED_TOKEN) for dynamic_valued_key in ["exp", "iat", "auth_time"]: actual_decoded_data2.pop(dynamic_valued_key, None) expected_decoded_token.pop(dynamic_valued_key, None) assert actual_decoded_data2 == expected_decoded_token def test_keycloak_oidc_login(keycloak_oidc): actual_response = keycloak_oidc.login("username", "password") assert actual_response == OIDC_PROFILE @pytest.fixture def auth_config(): return { "keycloak": { "server_url": "https://auth.swh.org/SWHTest", "realm_name": "SWHTest", "client_id": "client_id", } } @pytest.fixture def auth_config_path(tmp_path, monkeypatch, auth_config): conf_path = os.path.join(tmp_path, "auth.yml") with open(conf_path, "w") as f: f.write(yaml.dump(auth_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) return conf_path def test_auth_KeycloakOpenIDConnect_from_config(auth_config): """Instantiating keycloak client out of configuration dict is possible """ client = KeycloakOpenIDConnect.from_config(**auth_config) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == auth_config["keycloak"]["client_id"] def test_auth_KeycloakOpenIDConnect_from_configfile(auth_config_path, monkeypatch): """Instantiating keycloak client out of environment variable is possible """ client = KeycloakOpenIDConnect.from_configfile() auth_config = read(auth_config_path) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == auth_config["keycloak"]["client_id"] def test_auth_KeycloakOpenIDConnect_from_configfile_override( auth_config_path, monkeypatch ): """Instantiating keycloak client out of environment variable is possible And caller can override the configuration at calling """ client = KeycloakOpenIDConnect.from_configfile(client_id="foobar") auth_config = read(auth_config_path) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == "foobar" @pytest.mark.parametrize( "error_dict, expected_result", [ ({"error": "unknown_error"}, "unknown_error"), ( {"error": "invalid_grant", "error_description": "Invalid credentials"}, "invalid_grant: Invalid credentials", ), ], ) def test_auth_keycloak_error_message(error_dict, expected_result): """Conversion from KeycloakError to error message should work with detail or not""" error_message = json.dumps(error_dict).encode() exception = KeycloakError(error_message=error_message, response_code=401) actual_result = keycloak_error_message(exception) assert actual_result == expected_result + + +def test_auth_keycloak_error_message_string(): + """Conversion from KeycloakError to error message should work with detail or not""" + error_message = "Can't connect to server " + exception = KeycloakError(error_message=error_message) + assert keycloak_error_message(exception) == error_message