diff --git a/docs/index.rst b/docs/index.rst index 8c58681..9b7b0f9 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,119 +1,90 @@ .. _swh-web-client: .. include:: README.rst .. _swh-web-client-auth: Authentication -------------- If you have a user account registered on `Software Heritage Identity Provider`_, it is possible to authenticate requests made to the Web APIs through the use of -OpenID Connect bearer tokens. Sending authenticated requests can notably +a OpenID Connect bearer token. Sending authenticated requests can notably allow to lift API rate limiting depending on your permissions. -To get these tokens, a dedicated CLI tool is made available when installing +To get this token, a dedicated CLI tool is made available when installing ``swh-web-client``: .. code-block:: text $ swh auth Usage: swh auth [OPTIONS] COMMAND [ARGS]... Authenticate Software Heritage users with OpenID Connect. This CLI tool eases the retrieval of bearer tokens to authenticate a user querying the Software Heritage Web API. Options: --oidc-server-url TEXT URL of OpenID Connect server (default to "https://auth.softwareheritage.org/auth/") --realm-name TEXT Name of the OpenID Connect authentication realm (default to "SoftwareHeritage") --client-id TEXT OpenID Connect client identifier in the realm (default to "swh-web") -h, --help Show this message and exit. Commands: login Login and create new offline OpenID Connect session. logout Logout from an offline OpenID Connect session. - refresh Refresh an offline OpenID Connect session. In order to get your tokens, you need to use the ``login`` subcommand of that CLI tool by passing your username as argument. You will be prompted for your password and if the authentication succeeds a new OpenID Connect -session will be created and tokens will be dumped in JSON format to standard -output. +session will be created and tokens will be dumped to standard output. .. code-block:: text $ swh auth login Password: - { - "access_token": ".......", - "expires_in": 600, - "refresh_expires_in": 0, - "refresh_token": ".......", - "token_type": "bearer", - "id_token": ".......", - "not-before-policy": 1584551170, - "session_state": "c14e1b7b-8263-4852-bd1c-adc7bc12a136", - "scope": "openid email profile offline_access" - } - -To authenticate yourself, you need to send the ``access_token`` value in -request headers when querying the Web APIs. -Considering you have stored the ``access_token`` value in a TOKEN environment + eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmNjMzMD... + +To authenticate yourself, you need to send that token value in request headers +when querying the Web API. +Considering you have stored that token value in a TOKEN environment variable, you can perform an authenticated call the following way using ``curl``: .. code-block:: text $ curl -H "Authorization: Bearer ${TOKEN}" https://archive.softwareheritage.org/api/1/ -The access token has a short living period (usually ten minutes) and must be -renewed on a regular basis by passing the ``refresh_token`` value as argument -of the ``refresh`` subcommand of the CLI tool. The new access token will be -dumped in JSON format to standard output. Note that the refresh token has a -much longer living period (usually several dozens of days) so you can use -it anytime while it is valid to get an access token without having to login -again. - -.. code-block:: text - - $ swh auth refresh $REFRESH_TOKEN - "......." - Note that if you intend to use the :class:`swh.web.client.client.WebAPIClient` -class, the access token renewal will be automatically handled if you call -method :meth:`swh.web.client.client.WebAPIClient.authenticate` prior to -sending any requests. To activate authentication, use the following code snippet:: +class, you can activate authentication by using the following code snippet:: from swh.web.client import WebAPIClient - REFRESH_TOKEN = '.......' # Use "swh auth login" command to get it + TOKEN = '.......' # Use "swh auth login" command to get it - client = WebAPIClient() - client.authenticate(REFRESH_TOKEN) + client = WebAPIClient(bearer_token=TOKEN) # All requests to the Web API will be authenticated resp = client.get('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') It is also possible to ``logout`` from the authenticated OpenID Connect session -which invalidates all previously emitted tokens. - +which definitely revokes the token. .. code-block:: text $ swh auth logout $REFRESH_TOKEN Successfully logged out from OpenID Connect session API Reference ------------- .. toctree:: :maxdepth: 2 /apidoc/swh.web.client .. _Software Heritage Identity Provider: https://auth.softwareheritage.org/auth/realms/SoftwareHeritage/account/ \ No newline at end of file diff --git a/swh/web/client/auth.py b/swh/web/client/auth.py index ce7091a..a70cd40 100644 --- a/swh/web/client/auth.py +++ b/swh/web/client/auth.py @@ -1,106 +1,84 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict from urllib.parse import urljoin import requests SWH_OIDC_SERVER_URL = "https://auth.softwareheritage.org/auth/" SWH_REALM_NAME = "SoftwareHeritage" SWH_WEB_CLIENT_ID = "swh-web" class AuthenticationError(Exception): """Authentication related error. - Example: A bearer token has expired. + Example: A bearer token has been revoked. """ pass class OpenIDConnectSession: """ Simple class wrapping requests sent to an OpenID Connect server. Args: oidc_server_url: URL of OpenID Connect server realm_name: name of the OpenID Connect authentication realm client_id: OpenID Connect client identifier in the realm """ def __init__( self, oidc_server_url: str = SWH_OIDC_SERVER_URL, realm_name: str = SWH_REALM_NAME, client_id: str = SWH_WEB_CLIENT_ID, ): realm_url = urljoin(oidc_server_url, f"realms/{realm_name}/") self.client_id = client_id self.token_url = urljoin(realm_url, "protocol/openid-connect/token/") self.logout_url = urljoin(realm_url, "protocol/openid-connect/logout/") def login(self, username: str, password: str) -> Dict[str, Any]: """ Login and create new offline OpenID Connect session. Args: username: an existing username in the realm password: password associated to username Returns: - a dict filled with OpenID Connect profile info, notably access - and refresh tokens for API authentication. + The OpenID Connect session info """ return requests.post( url=self.token_url, data={ "grant_type": "password", "client_id": self.client_id, "scope": "openid offline_access", "username": username, "password": password, }, ).json() - def refresh(self, refresh_token: str) -> Dict[str, Any]: - """ - Refresh an offline OpenID Connect session to get new access token. - - Args: - refresh_token: a refresh token retrieved after login - - Returns: - a dict filled with OpenID Connect profile info, notably access - and refresh tokens for API authentication. - """ - return requests.post( - url=self.token_url, - data={ - "grant_type": "refresh_token", - "client_id": self.client_id, - "scope": "openid", - "refresh_token": refresh_token, - }, - ).json() - - def logout(self, refresh_token: str): + def logout(self, token: str): """ Logout from an offline OpenID Connect session and invalidate previously emitted tokens. Args: - refresh_token: a refresh token retrieved after login + token: a bearer token retrieved after login """ requests.post( url=self.logout_url, data={ "client_id": self.client_id, "scope": "openid", - "refresh_token": refresh_token, + "refresh_token": token, }, ) diff --git a/swh/web/client/cli.py b/swh/web/client/cli.py index e2f88a8..868324c 100644 --- a/swh/web/client/cli.py +++ b/swh/web/client/cli.py @@ -1,118 +1,93 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from getpass import getpass -import json import click from click.core import Context from swh.web.client.auth import OpenIDConnectSession CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) -def _output_json(obj): - print(json.dumps(obj, indent=4, sort_keys=True)) - - @click.group(name="auth", context_settings=CONTEXT_SETTINGS) @click.option( "--oidc-server-url", "oidc_server_url", default="https://auth.softwareheritage.org/auth/", help=( "URL of OpenID Connect server (default to " '"https://auth.softwareheritage.org/auth/")' ), ) @click.option( "--realm-name", "realm_name", default="SoftwareHeritage", help=( "Name of the OpenID Connect authentication realm " '(default to "SoftwareHeritage")' ), ) @click.option( "--client-id", "client_id", default="swh-web", help=("OpenID Connect client identifier in the realm " '(default to "swh-web")'), ) @click.pass_context def auth(ctx: Context, oidc_server_url: str, realm_name: str, client_id: str): """ Authenticate Software Heritage users with OpenID Connect. - This CLI tool eases the retrieval of bearer tokens to authenticate + This CLI tool eases the retrieval of a bearer token to authenticate a user querying the Software Heritage Web API. """ ctx.ensure_object(dict) ctx.obj["oidc_session"] = OpenIDConnectSession( oidc_server_url, realm_name, client_id ) @auth.command("login") @click.argument("username") @click.pass_context def login(ctx: Context, username: str): """ Login and create new offline OpenID Connect session. Login with USERNAME, create a new OpenID Connect session and get - access and refresh tokens. - - User will be prompted for his password and tokens will be printed in - JSON format to standard output. + bearer token. - When its access token has expired, user can request a new one using the - session-refresh command of that CLI tool without having to authenticate - using a password again. + User will be prompted for his password and tokens will be printed + to standard output. The created OpenID Connect session is an offline one so the provided - refresh token has a much longer expiration time than classical OIDC + token has a much longer expiration time than classical OIDC sessions (usually several dozens of days). """ password = getpass() - oidc_profile = ctx.obj["oidc_session"].login(username, password) - _output_json(oidc_profile) - - -@auth.command("refresh") -@click.argument("refresh_token") -@click.pass_context -def refresh(ctx: Context, refresh_token: str): - """ - Refresh an offline OpenID Connect session. - - Get a new access token from REFRESH_TOKEN when previous one expired. - - New access token will be printed in JSON format to standard output. - """ - oidc_profile = ctx.obj["oidc_session"].refresh(refresh_token) - if "access_token" in oidc_profile: - _output_json(oidc_profile["access_token"]) + oidc_info = ctx.obj["oidc_session"].login(username, password) + if "refresh_token" in oidc_info: + print(oidc_info["refresh_token"]) else: - # print oidc error - _output_json(oidc_profile) + print(oidc_info) @auth.command("logout") -@click.argument("refresh_token") +@click.argument("token") @click.pass_context -def logout(ctx: Context, refresh_token: str): +def logout(ctx: Context, token: str): """ Logout from an offline OpenID Connect session. - Use REFRESH_TOKEN to logout from an offline OpenID Connect session. + Use TOKEN to logout from an offline OpenID Connect session. - Access and refresh tokens are no more usable after that operation. + The token is definitely revoked after that operation. """ - ctx.obj["oidc_session"].logout(refresh_token) + ctx.obj["oidc_session"].logout(token) print("Successfully logged out from OpenID Connect session") diff --git a/swh/web/client/client.py b/swh/web/client/client.py index a1a4b40..42fef15 100644 --- a/swh/web/client/client.py +++ b/swh/web/client/client.py @@ -1,508 +1,475 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Python client for the Software Heritage Web API Light wrapper around requests for the archive API, taking care of data conversions and pagination. .. code-block:: python from swh.web.client import WebAPIClient cli = WebAPIClient() # retrieve any archived object via its PID cli.get('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') # same, but for specific object types cli.revision('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') # get() always retrieve entire objects, following pagination # WARNING: this might *not* be what you want for large objects cli.get('swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a') # type-specific methods support explicit iteration through pages next(cli.snapshot('swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764')) """ -from datetime import datetime, timedelta from typing import Any, Callable, Dict, Generator, List, Optional, Union from urllib.parse import urlparse import dateutil.parser import requests from swh.model.identifiers import SNAPSHOT, REVISION, RELEASE, DIRECTORY, CONTENT from swh.model.identifiers import PersistentId as PID from swh.model.identifiers import parse_persistent_identifier as parse_pid -from .auth import AuthenticationError, OpenIDConnectSession, SWH_OIDC_SERVER_URL - PIDish = Union[PID, str] ORIGIN_VISIT = "origin_visit" def _get_pid(pidish: PIDish) -> PID: """Parse string to PID if needed""" if isinstance(pidish, str): return parse_pid(pidish) else: return pidish def typify(data: Any, obj_type: str) -> Any: """Type API responses using pythonic types where appropriate The following conversions are performed: - identifiers are converted from strings to PersistentId instances - timestamps are converted from strings to datetime.datetime objects """ def to_pid(object_type, s): return PID(object_type=object_type, object_id=s) def to_date(s): return dateutil.parser.parse(s) def obj_type_of_entry_type(s): if s == "file": return CONTENT elif s == "dir": return DIRECTORY elif s == "rev": return REVISION else: raise ValueError(f"invalid directory entry type: {s}") if obj_type == SNAPSHOT: for name, target in data.items(): if target["target_type"] != "alias": # alias targets do not point to objects via PIDs; others do target["target"] = to_pid(target["target_type"], target["target"]) elif obj_type == REVISION: data["id"] = to_pid(obj_type, data["id"]) data["directory"] = to_pid(DIRECTORY, data["directory"]) for key in ("date", "committer_date"): data[key] = to_date(data[key]) for parent in data["parents"]: parent["id"] = to_pid(REVISION, parent["id"]) elif obj_type == RELEASE: data["id"] = to_pid(obj_type, data["id"]) data["date"] = to_date(data["date"]) data["target"] = to_pid(data["target_type"], data["target"]) elif obj_type == DIRECTORY: dir_pid = None for entry in data: dir_pid = dir_pid or to_pid(obj_type, entry["dir_id"]) entry["dir_id"] = dir_pid entry["target"] = to_pid( obj_type_of_entry_type(entry["type"]), entry["target"] ) elif obj_type == CONTENT: pass # nothing to do for contents elif obj_type == ORIGIN_VISIT: - data['date'] = to_date(data['date']) - if data['snapshot'] is not None: - data['snapshot'] = to_pid(SNAPSHOT, data['snapshot']) + data["date"] = to_date(data["date"]) + if data["snapshot"] is not None: + data["snapshot"] = to_pid(SNAPSHOT, data["snapshot"]) else: raise ValueError(f"invalid object type: {obj_type}") return data class WebAPIClient: """Client for the Software Heritage archive Web API, see https://archive.softwareheritage.org/api/ """ def __init__( self, - api_url="https://archive.softwareheritage.org/api/1", - auth_url=SWH_OIDC_SERVER_URL, + api_url: str = "https://archive.softwareheritage.org/api/1", + bearer_token: Optional[str] = None, ): """Create a client for the Software Heritage Web API See: https://archive.softwareheritage.org/api/ Args: api_url: base URL for API calls (default: "https://archive.softwareheritage.org/api/1") - + bearer_token: optional bearer token to do authenticated API calls """ api_url = api_url.rstrip("/") u = urlparse(api_url) self.api_url = api_url self.api_path = u.path - self.oidc_session = OpenIDConnectSession(oidc_server_url=auth_url) - self.oidc_profile: Optional[Dict[str, Any]] = None + self.bearer_token = bearer_token self._getters: Dict[str, Callable[[PIDish], Any]] = { CONTENT: self.content, DIRECTORY: self.directory, RELEASE: self.release, REVISION: self.revision, SNAPSHOT: self._get_snapshot, } def _call( self, query: str, http_method: str = "get", **req_args ) -> requests.models.Response: """Dispatcher for archive API invocation Args: query: API method to be invoked, rooted at api_url http_method: HTTP method to be invoked, one of: 'get', 'head' req_args: extra keyword arguments for requests.get()/.head() Raises: requests.HTTPError: if HTTP request fails and http_method is 'get' """ url = None if urlparse(query).scheme: # absolute URL url = query else: # relative URL; prepend base API URL url = "/".join([self.api_url, query]) r = None headers = {} - if self.oidc_profile is not None: - # use bearer token authentication - if datetime.now() > self.oidc_profile["expires_at"]: - # refresh access token if it has expired - self.authenticate(self.oidc_profile["refresh_token"]) - access_token = self.oidc_profile["access_token"] - headers = {"Authorization": f"Bearer {access_token}"} + if self.bearer_token is not None: + headers = {"Authorization": f"Bearer {self.bearer_token}"} if http_method == "get": r = requests.get(url, **req_args, headers=headers) r.raise_for_status() elif http_method == "head": r = requests.head(url, **req_args, headers=headers) else: raise ValueError(f"unsupported HTTP method: {http_method}") return r def _get_snapshot(self, pid: PIDish) -> Dict[str, Any]: """Analogous to self.snapshot(), but zipping through partial snapshots, merging them together before returning """ snapshot = {} for snp in self.snapshot(pid): snapshot.update(snp) return snapshot def get(self, pid: PIDish, **req_args) -> Any: """Retrieve information about an object of any kind Dispatcher method over the more specific methods content(), directory(), etc. Note that this method will buffer the entire output in case of long, iterable output (e.g., for snapshot()), see the iter() method for streaming. """ pid_ = _get_pid(pid) return self._getters[pid_.object_type](pid_) def iter(self, pid: PIDish, **req_args) -> Generator[Dict[str, Any], None, None]: """Stream over the information about an object of any kind Streaming variant of get() """ pid_ = _get_pid(pid) obj_type = pid_.object_type if obj_type == SNAPSHOT: yield from self.snapshot(pid_) elif obj_type == REVISION: yield from [self.revision(pid_)] elif obj_type == RELEASE: yield from [self.release(pid_)] elif obj_type == DIRECTORY: yield from self.directory(pid_) elif obj_type == CONTENT: yield from [self.content(pid_)] else: raise ValueError(f"invalid object type: {obj_type}") def content(self, pid: PIDish, **req_args) -> Dict[str, Any]: """Retrieve information about a content object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ return typify( self._call( f"content/sha1_git:{_get_pid(pid).object_id}/", **req_args ).json(), CONTENT, ) def directory(self, pid: PIDish, **req_args) -> List[Dict[str, Any]]: """Retrieve information about a directory object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ return typify( self._call(f"directory/{_get_pid(pid).object_id}/", **req_args).json(), DIRECTORY, ) def revision(self, pid: PIDish, **req_args) -> Dict[str, Any]: """Retrieve information about a revision object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ return typify( self._call(f"revision/{_get_pid(pid).object_id}/", **req_args).json(), REVISION, ) def release(self, pid: PIDish, **req_args) -> Dict[str, Any]: """Retrieve information about a release object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ return typify( self._call(f"release/{_get_pid(pid).object_id}/", **req_args).json(), RELEASE, ) def snapshot( self, pid: PIDish, **req_args ) -> Generator[Dict[str, Any], None, None]: """Retrieve information about a snapshot object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Returns: an iterator over partial snapshots (dictionaries mapping branch names to information about where they point to), each containing a subset of available branches Raises: requests.HTTPError: if HTTP request fails """ done = False r = None query = f"snapshot/{_get_pid(pid).object_id}/" while not done: r = self._call(query, http_method="get", **req_args) yield typify(r.json()["branches"], SNAPSHOT) if "next" in r.links and "url" in r.links["next"]: query = r.links["next"]["url"] else: done = True - def visits(self, - origin: str, - per_page: Optional[int] = None, - last_visit: Optional[int] = None, - **req_args) -> Generator[Dict[str, Any], None, None]: + def visits( + self, + origin: str, + per_page: Optional[int] = None, + last_visit: Optional[int] = None, + **req_args, + ) -> Generator[Dict[str, Any], None, None]: """List visits of an origin Args: origin: the URL of a software origin per_page: the number of visits to list last_visit: visit to start listing from req_args: extra keyword arguments for requests.get() Returns: an iterator over visits of the origin Raises: requests.HTTPError: if HTTP request fails """ done = False r = None params = [] if last_visit is not None: params.append(("last_visit", last_visit)) if per_page is not None: params.append(("per_page", per_page)) - query = f'origin/{origin}/visits/' + query = f"origin/{origin}/visits/" while not done: - r = self._call(query, http_method='get', params=params, **req_args) + r = self._call(query, http_method="get", params=params, **req_args) yield from [typify(v, ORIGIN_VISIT) for v in r.json()] - if 'next' in r.links and 'url' in r.links['next']: + if "next" in r.links and "url" in r.links["next"]: params = [] - query = r.links['next']['url'] + query = r.links["next"]["url"] else: done = True def content_exists(self, pid: PIDish, **req_args) -> bool: """Check if a content object exists in the archive Args: pid: object identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"content/sha1_git:{_get_pid(pid).object_id}/", http_method="head", **req_args, ) ) def directory_exists(self, pid: PIDish, **req_args) -> bool: """Check if a directory object exists in the archive Args: pid: object identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"directory/{_get_pid(pid).object_id}/", http_method="head", **req_args ) ) def revision_exists(self, pid: PIDish, **req_args) -> bool: """Check if a revision object exists in the archive Args: pid: object identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"revision/{_get_pid(pid).object_id}/", http_method="head", **req_args ) ) def release_exists(self, pid: PIDish, **req_args) -> bool: """Check if a release object exists in the archive Args: pid: object identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"release/{_get_pid(pid).object_id}/", http_method="head", **req_args ) ) def snapshot_exists(self, pid: PIDish, **req_args) -> bool: """Check if a snapshot object exists in the archive Args: pid: object identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"snapshot/{_get_pid(pid).object_id}/", http_method="head", **req_args ) ) def content_raw(self, pid: PIDish, **req_args) -> Generator[bytes, None, None]: """Iterate over the raw content of a content object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ r = self._call( f"content/sha1_git:{_get_pid(pid).object_id}/raw/", stream=True, **req_args ) r.raise_for_status() yield from r.iter_content(chunk_size=None, decode_unicode=False) - - def authenticate(self, refresh_token: str): - """Authenticate API requests using OpenID Connect bearer token - - Args: - refresh_token: A refresh token retrieved using the - ``swh auth login`` command (see :ref:`swh-web-client-auth` - section in main documentation) - - Raises: - swh.web.client.auth.AuthenticationError: if authentication fails - - """ - now = datetime.now() - try: - self.oidc_profile = self.oidc_session.refresh(refresh_token) - assert self.oidc_profile - if "expires_in" in self.oidc_profile: - expires_in = self.oidc_profile["expires_in"] - expires_at = now + timedelta(seconds=expires_in) - self.oidc_profile["expires_at"] = expires_at - except Exception as e: - raise AuthenticationError(str(e)) - if "access_token" not in self.oidc_profile: - # JSON error response - raise AuthenticationError(self.oidc_profile) diff --git a/swh/web/client/tests/test_cli.py b/swh/web/client/tests/test_cli.py index 04ff5e9..605a12d 100644 --- a/swh/web/client/tests/test_cli.py +++ b/swh/web/client/tests/test_cli.py @@ -1,67 +1,50 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import json - from click.testing import CliRunner from swh.web.client.cli import auth runner = CliRunner() oidc_profile = { "access_token": "some-access-token", "expires_in": 600, "refresh_expires_in": 0, "refresh_token": "some-refresh-token", "token_type": "bearer", "session_state": "some-state", "scope": "openid email profile offline_access", } def test_auth_login(mocker): mock_getpass = mocker.patch("swh.web.client.cli.getpass") mock_getpass.return_value = "password" mock_oidc_session = mocker.patch("swh.web.client.cli.OpenIDConnectSession") mock_login = mock_oidc_session.return_value.login mock_login.return_value = oidc_profile result = runner.invoke(auth, ["login", "username"], input="password\n") assert result.exit_code == 0 - assert json.loads(result.output) == oidc_profile + assert result.output[:-1] == oidc_profile["refresh_token"] mock_login.side_effect = Exception("Auth error") result = runner.invoke(auth, ["login", "username"], input="password\n") assert result.exit_code == 1 -def test_auth_refresh(mocker): - - mock_oidc_session = mocker.patch("swh.web.client.cli.OpenIDConnectSession") - mock_refresh = mock_oidc_session.return_value.refresh - mock_refresh.return_value = oidc_profile - - result = runner.invoke(auth, ["refresh", oidc_profile["refresh_token"]]) - assert result.exit_code == 0 - assert json.loads(result.stdout) == oidc_profile["access_token"] - - mock_refresh.side_effect = Exception("Auth error") - result = runner.invoke(auth, ["refresh", oidc_profile["refresh_token"]]) - assert result.exit_code == 1 - - def test_auth_logout(mocker): mock_oidc_session = mocker.patch("swh.web.client.cli.OpenIDConnectSession") mock_logout = mock_oidc_session.return_value.logout result = runner.invoke(auth, ["logout", oidc_profile["refresh_token"]]) assert result.exit_code == 0 mock_logout.side_effect = Exception("Auth error") result = runner.invoke(auth, ["logout", oidc_profile["refresh_token"]]) assert result.exit_code == 1 diff --git a/swh/web/client/tests/test_web_api_client.py b/swh/web/client/tests/test_web_api_client.py index acc3935..9f06782 100644 --- a/swh/web/client/tests/test_web_api_client.py +++ b/swh/web/client/tests/test_web_api_client.py @@ -1,222 +1,145 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from copy import copy -from datetime import datetime from dateutil.parser import parse as parse_date -from unittest.mock import call, Mock -import pytest - -from swh.web.client.auth import AuthenticationError from swh.model.identifiers import parse_persistent_identifier as parse_pid -from .test_cli import oidc_profile - def test_get_content(web_api_client, web_api_mock): pid = parse_pid("swh:1:cnt:fe95a46679d128ff167b7c55df5d02356c5a1ae1") obj = web_api_client.get(pid) assert obj["length"] == 151810 for key in ("length", "status", "checksums", "data_url"): assert key in obj assert obj["checksums"]["sha1_git"] == str(pid).split(":")[3] assert obj["checksums"]["sha1"] == "dc2830a9e72f23c1dfebef4413003221baa5fb62" assert obj == web_api_client.content(pid) def test_get_directory(web_api_client, web_api_mock): pid = parse_pid("swh:1:dir:977fc4b98c0e85816348cebd3b12026407c368b6") obj = web_api_client.get(pid) assert len(obj) == 35 # number of directory entries assert all(map(lambda entry: entry["dir_id"] == pid, obj)) dir_entry = obj[0] assert dir_entry["type"] == "file" assert dir_entry["target"] == parse_pid( "swh:1:cnt:58471109208922c9ee8c4b06135725f03ed16814" ) assert dir_entry["name"] == ".bzrignore" assert dir_entry["length"] == 582 assert obj == web_api_client.directory(pid) def test_get_release(web_api_client, web_api_mock): pid = parse_pid("swh:1:rel:b9db10d00835e9a43e2eebef2db1d04d4ae82342") obj = web_api_client.get(pid) assert obj["id"] == pid assert obj["author"]["fullname"] == "Paul Tagliamonte " assert obj["author"]["name"] == "Paul Tagliamonte" assert obj["date"] == parse_date("2013-07-06T19:34:11-04:00") assert obj["name"] == "0.9.9" assert obj["target_type"] == "revision" assert obj["target"] == parse_pid( "swh:1:rev:e005cb773c769436709ca6a1d625dc784dbc1636" ) assert not obj["synthetic"] assert obj == web_api_client.release(pid) def test_get_revision(web_api_client, web_api_mock): pid = parse_pid("swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6") obj = web_api_client.get(pid) assert obj["id"] == pid for role in ("author", "committer"): assert ( obj[role]["fullname"] == "Nicolas Dandrimont " ) assert obj[role]["name"] == "Nicolas Dandrimont" timestamp = parse_date("2014-08-18T18:18:25+02:00") assert obj["date"] == timestamp assert obj["committer_date"] == timestamp assert obj["message"].startswith("Merge branch") assert obj["merge"] assert len(obj["parents"]) == 2 assert obj["parents"][0]["id"] == parse_pid( "swh:1:rev:26307d261279861c2d9c9eca3bb38519f951bea4" ) assert obj["parents"][1]["id"] == parse_pid( "swh:1:rev:37fc9e08d0c4b71807a4f1ecb06112e78d91c283" ) assert obj == web_api_client.revision(pid) def test_get_snapshot(web_api_client, web_api_mock): # small snapshot, the one from Web API doc pid = parse_pid("swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a") obj = web_api_client.get(pid) assert len(obj) == 4 assert obj["refs/heads/master"]["target_type"] == "revision" assert obj["refs/heads/master"]["target"] == parse_pid( "swh:1:rev:83c20a6a63a7ebc1a549d367bc07a61b926cecf3" ) assert obj["refs/tags/dpkt-1.7"]["target_type"] == "revision" assert obj["refs/tags/dpkt-1.7"]["target"] == parse_pid( "swh:1:rev:0c9dbfbc0974ec8ac1d8253aa1092366a03633a8" ) def test_iter_snapshot(web_api_client, web_api_mock): # large snapshot from the Linux kernel, usually spanning two pages pid = parse_pid("swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764") obj = web_api_client.snapshot(pid) snp = {} for partial in obj: snp.update(partial) assert len(snp) == 1391 -def test_authenticate_success(web_api_client, web_api_mock): - - rel_id = "b9db10d00835e9a43e2eebef2db1d04d4ae82342" - url = f"{web_api_client.api_url}/release/{rel_id}/" - - web_api_client.oidc_session = Mock() - web_api_client.oidc_session.refresh.return_value = copy(oidc_profile) - - access_token = oidc_profile["access_token"] - refresh_token = "user-refresh-token" - - web_api_client.authenticate(refresh_token) - - assert "expires_at" in web_api_client.oidc_profile - - pid = parse_pid(f"swh:1:rel:{rel_id}") - web_api_client.get(pid) - - web_api_client.oidc_session.refresh.assert_called_once_with(refresh_token) - - sent_request = web_api_mock._adapter.last_request - - assert sent_request.url == url - assert "Authorization" in sent_request.headers - - assert sent_request.headers["Authorization"] == f"Bearer {access_token}" - - -def test_authenticate_refresh_token(web_api_client, web_api_mock): +def test_authentication(web_api_client, web_api_mock): rel_id = "b9db10d00835e9a43e2eebef2db1d04d4ae82342" url = f"{web_api_client.api_url}/release/{rel_id}/" - oidc_profile_cp = copy(oidc_profile) - - web_api_client.oidc_session = Mock() - web_api_client.oidc_session.refresh.return_value = oidc_profile_cp - refresh_token = "user-refresh-token" - web_api_client.authenticate(refresh_token) - assert "expires_at" in web_api_client.oidc_profile - - # simulate access token expiration - web_api_client.oidc_profile["expires_at"] = datetime.now() - - access_token = "new-access-token" - oidc_profile_cp["access_token"] = access_token + web_api_client.bearer_token = refresh_token pid = parse_pid(f"swh:1:rel:{rel_id}") web_api_client.get(pid) - calls = [call(refresh_token), call(oidc_profile["refresh_token"])] - web_api_client.oidc_session.refresh.assert_has_calls(calls) - sent_request = web_api_mock._adapter.last_request assert sent_request.url == url assert "Authorization" in sent_request.headers - assert sent_request.headers["Authorization"] == f"Bearer {access_token}" - - -def test_authenticate_failure(web_api_client, web_api_mock): - msg = "Authentication error" - web_api_client.oidc_session = Mock() - web_api_client.oidc_session.refresh.side_effect = Exception(msg) - - refresh_token = "user-refresh-token" - - with pytest.raises(AuthenticationError) as e: - web_api_client.authenticate(refresh_token) - - assert e.match(msg) - - oidc_error_response = { - "error": "invalid_grant", - "error_description": "Invalid refresh token", - } - - web_api_client.oidc_session.refresh.side_effect = None - web_api_client.oidc_session.refresh.return_value = oidc_error_response - - with pytest.raises(AuthenticationError) as e: - web_api_client.authenticate(refresh_token) - - assert e.match(repr(oidc_error_response)) + assert sent_request.headers["Authorization"] == f"Bearer {refresh_token}" def test_get_visits(web_api_client, web_api_mock): - obj = web_api_client.visits('https://github.com/NixOS/nixpkgs', - last_visit=50, - per_page=10) + obj = web_api_client.visits( + "https://github.com/NixOS/nixpkgs", last_visit=50, per_page=10 + ) visits = [v for v in obj] assert len(visits) == 20 - timestamp = parse_date('2018-07-31 04:34:23.298931+00:00') - assert visits[0]['date'] == timestamp + timestamp = parse_date("2018-07-31 04:34:23.298931+00:00") + assert visits[0]["date"] == timestamp assert visits[0]["snapshot"] is None - snapshot_pid = 'swh:1:snp:456550ea74af4e2eecaa406629efaaf0b9b5f976' + snapshot_pid = "swh:1:snp:456550ea74af4e2eecaa406629efaaf0b9b5f976" assert visits[7]["snapshot"] == parse_pid(snapshot_pid)