diff --git a/docs/index.rst b/docs/index.rst index d68f9f8..8192f27 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,102 +1,119 @@ .. _swh-web-client: .. include:: README.rst +.. _swh-web-client-auth: + Authentication -------------- If you have a user account registered on `Software Heritage Identity Provider`_, it is possible to authenticate requests made to the Web APIs through the use of OpenID Connect bearer tokens. Sending authenticated requests can notably allow to lift API rate limiting depending on your permissions. To get these tokens, a dedicated CLI tool is made available when installing ``swh-web-client``: .. code-block:: text $ swh auth Usage: swh auth [OPTIONS] COMMAND [ARGS]... Authenticate Software Heritage users with OpenID Connect. This CLI tool eases the retrieval of bearer tokens to authenticate a user querying the Software Heritage Web API. Options: --oidc-server-url TEXT URL of OpenID Connect server (default to "https://auth.softwareheritage.org/auth/") --realm-name TEXT Name of the OpenID Connect authentication realm (default to "SoftwareHeritage") --client-id TEXT OpenID Connect client identifier in the realm (default to "swh-web") -h, --help Show this message and exit. Commands: login Login and create new offline OpenID Connect session. logout Logout from an offline OpenID Connect session. refresh Refresh an offline OpenID Connect session. In order to get your tokens, you need to use the ``login`` subcommand of that CLI tool by passing your username as argument. You will be prompted for your password and if the authentication succeeds a new OpenID Connect session will be created and tokens will be dumped in JSON format to standard output. .. code-block:: text $ swh auth login Password: { "access_token": ".......", "expires_in": 600, "refresh_expires_in": 0, "refresh_token": ".......", "token_type": "bearer", "id_token": ".......", "not-before-policy": 1584551170, "session_state": "c14e1b7b-8263-4852-bd1c-adc7bc12a136", "scope": "openid email profile offline_access" } To authenticate yourself, you need to send the ``access_token`` value in request headers when querying the Web APIs. Considering you have stored the ``access_token`` value in a TOKEN environment variable, you can perform an authenticated call the following way using ``curl``: .. code-block:: text $ curl -H "Authorization: Bearer ${TOKEN}" http://localhost:5004/api/1/ The access token has a short living period (usually ten minutes) and must be renewed on a regular basis by passing the ``refresh_token`` value as argument of the ``refresh`` subcommand of the CLI tool. The new access token will be dumped in JSON format to standard output. Note that the refresh token has a much longer living period (usually several dozens of days) so you can use it anytime while it is valid to get an access token without having to login again. .. code-block:: text $ swh auth refresh $REFRESH_TOKEN "......." +Note that if you intend to use the :class:`swh.web.client.client.WebAPIClient` +class, the access token renewal will be automatically handled if you call +method :meth:`swh.web.client.client.WebAPIClient.authenticate` prior to +sending any requests. To activate authentication, use the following code snippet:: + + from swh.web.client import WebAPIClient + + REFRESH_TOKEN = '.......' # Use "swh auth login" command to get it + + client = WebAPIClient() + client.authenticate(REFRESH_TOKEN) + + # All requests to the Web API will be authenticated + resp = client.get('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') + It is also possible to ``logout`` from the authenticated OpenID Connect session which invalidates all previously emitted tokens. .. code-block:: text $ swh auth logout $REFRESH_TOKEN Successfully logged out from OpenID Connect session API Reference ------------- .. toctree:: :maxdepth: 2 /apidoc/swh.web.client .. _Software Heritage Identity Provider: https://auth.softwareheritage.org/auth/realms/SoftwareHeritage/account/ \ No newline at end of file diff --git a/swh/web/client/auth.py b/swh/web/client/auth.py index c340178..ba0f6d3 100644 --- a/swh/web/client/auth.py +++ b/swh/web/client/auth.py @@ -1,95 +1,104 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict from urllib.parse import urljoin import requests SWH_OIDC_SERVER_URL = 'https://auth.softwareheritage.org/auth/' SWH_REALM_NAME = 'SoftwareHeritage' SWH_WEB_CLIENT_ID = 'swh-web' +class AuthenticationError(Exception): + """Authentication related error. + + Example: A bearer token has expired. + + """ + pass + + class OpenIDConnectSession: """ Simple class wrapping requests sent to an OpenID Connect server. Args: oidc_server_url: URL of OpenID Connect server realm_name: name of the OpenID Connect authentication realm client_id: OpenID Connect client identifier in the realm """ def __init__(self, oidc_server_url: str = SWH_OIDC_SERVER_URL, realm_name: str = SWH_REALM_NAME, client_id: str = SWH_WEB_CLIENT_ID): realm_url = urljoin(oidc_server_url, f'realms/{realm_name}/') self.client_id = client_id self.token_url = urljoin(realm_url, 'protocol/openid-connect/token/') self.logout_url = urljoin(realm_url, 'protocol/openid-connect/logout/') def login(self, username: str, password: str) -> Dict[str, Any]: """ Login and create new offline OpenID Connect session. Args: username: an existing username in the realm password: password associated to username Returns: a dict filled with OpenID Connect profile info, notably access and refresh tokens for API authentication. """ return requests.post( url=self.token_url, data={ 'grant_type': 'password', 'client_id': self.client_id, 'scope': 'openid offline_access', 'username': username, 'password': password, }, ).json() def refresh(self, refresh_token: str) -> Dict[str, Any]: """ Refresh an offline OpenID Connect session to get new access token. Args: refresh_token: a refresh token retrieved after login Returns: a dict filled with OpenID Connect profile info, notably access and refresh tokens for API authentication. """ return requests.post( url=self.token_url, data={ 'grant_type': 'refresh_token', 'client_id': self.client_id, 'scope': 'openid', 'refresh_token': refresh_token, }, ).json() def logout(self, refresh_token: str): """ Logout from an offline OpenID Connect session and invalidate previously emitted tokens. Args: refresh_token: a refresh token retrieved after login """ requests.post( url=self.logout_url, data={ 'client_id': self.client_id, 'scope': 'openid', 'refresh_token': refresh_token, }, ) diff --git a/swh/web/client/client.py b/swh/web/client/client.py index 5736985..30b6760 100644 --- a/swh/web/client/client.py +++ b/swh/web/client/client.py @@ -1,399 +1,441 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Python client for the Software Heritage Web API Light wrapper around requests for the archive API, taking care of data conversions and pagination. .. code-block:: python from swh.web.client import WebAPIClient cli = WebAPIClient() # retrieve any archived object via its PID cli.get('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') # same, but for specific object types cli.revision('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') # get() always retrieve entire objects, following pagination # WARNING: this might *not* be what you want for large objects cli.get('swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a') # type-specific methods support explicit iteration through pages next(cli.snapshot('swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764')) """ -from typing import Any, Callable, Dict, Generator, List, Union +from datetime import datetime, timedelta +from typing import Any, Callable, Dict, Generator, List, Optional, Union from urllib.parse import urlparse import dateutil.parser import requests from swh.model.identifiers import \ SNAPSHOT, REVISION, RELEASE, DIRECTORY, CONTENT from swh.model.identifiers import PersistentId as PID from swh.model.identifiers import parse_persistent_identifier as parse_pid +from .auth import ( + AuthenticationError, OpenIDConnectSession, SWH_OIDC_SERVER_URL +) PIDish = Union[PID, str] def _get_pid(pidish: PIDish) -> PID: """Parse string to PID if needed""" if isinstance(pidish, str): return parse_pid(pidish) else: return pidish def typify(data: Any, obj_type: str) -> Any: """Type API responses using pythonic types where appropriate The following conversions are performed: - identifiers are converted from strings to PersistentId instances - timestamps are converted from strings to datetime.datetime objects """ def to_pid(object_type, s): return PID(object_type=object_type, object_id=s) def to_date(s): return dateutil.parser.parse(s) def obj_type_of_entry_type(s): if s == 'file': return CONTENT elif s == 'dir': return DIRECTORY elif s == 'rev': return REVISION else: raise ValueError(f'invalid directory entry type: {s}') if obj_type == SNAPSHOT: for name, target in data.items(): if target['target_type'] != 'alias': # alias targets do not point to objects via PIDs; others do target['target'] = to_pid(target['target_type'], target['target']) elif obj_type == REVISION: data['id'] = to_pid(obj_type, data['id']) data['directory'] = to_pid(DIRECTORY, data['directory']) for key in ('date', 'committer_date'): data[key] = to_date(data[key]) for parent in data['parents']: parent['id'] = to_pid(REVISION, parent['id']) elif obj_type == RELEASE: data['id'] = to_pid(obj_type, data['id']) data['date'] = to_date(data['date']) data['target'] = to_pid(data['target_type'], data['target']) elif obj_type == DIRECTORY: dir_pid = None for entry in data: dir_pid = dir_pid or to_pid(obj_type, entry['dir_id']) entry['dir_id'] = dir_pid entry['target'] = to_pid(obj_type_of_entry_type(entry['type']), entry['target']) elif obj_type == CONTENT: pass # nothing to do for contents else: raise ValueError(f'invalid object type: {obj_type}') return data class WebAPIClient: """Client for the Software Heritage archive Web API, see https://archive.softwareheritage.org/api/ """ - def __init__(self, api_url='https://archive.softwareheritage.org/api/1'): + def __init__(self, api_url='https://archive.softwareheritage.org/api/1', + auth_url=SWH_OIDC_SERVER_URL): """Create a client for the Software Heritage Web API See: https://archive.softwareheritage.org/api/ Args: api_url: base URL for API calls (default: "https://archive.softwareheritage.org/api/1") """ api_url = api_url.rstrip('/') u = urlparse(api_url) self.api_url = api_url self.api_path = u.path + self.oidc_session = OpenIDConnectSession(oidc_server_url=auth_url) + self.oidc_profile: Optional[Dict[str, Any]] = None self._getters: Dict[str, Callable[[PIDish], Any]] = { CONTENT: self.content, DIRECTORY: self.directory, RELEASE: self.release, REVISION: self.revision, SNAPSHOT: self._get_snapshot, } def _call(self, query: str, http_method: str = 'get', **req_args) -> requests.models.Response: """Dispatcher for archive API invocation Args: query: API method to be invoked, rooted at api_url http_method: HTTP method to be invoked, one of: 'get', 'head' req_args: extra keyword arguments for requests.get()/.head() Raises: requests.HTTPError: if HTTP request fails and http_method is 'get' """ url = None if urlparse(query).scheme: # absolute URL url = query else: # relative URL; prepend base API URL url = '/'.join([self.api_url, query]) r = None + headers = {} + if self.oidc_profile is not None: + # use bearer token authentication + if datetime.now() > self.oidc_profile['expires_at']: + # refresh access token if it has expired + self.authenticate(self.oidc_profile['refresh_token']) + access_token = self.oidc_profile['access_token'] + headers = {'Authorization': f'Bearer {access_token}'} + if http_method == 'get': - r = requests.get(url, **req_args) + r = requests.get(url, **req_args, headers=headers) r.raise_for_status() elif http_method == 'head': - r = requests.head(url, **req_args) + r = requests.head(url, **req_args, headers=headers) else: raise ValueError(f'unsupported HTTP method: {http_method}') return r def _get_snapshot(self, pid: PIDish) -> Dict[str, Any]: """Analogous to self.snapshot(), but zipping through partial snapshots, merging them together before returning """ snapshot = {} for snp in self.snapshot(pid): snapshot.update(snp) return snapshot def get(self, pid: PIDish, **req_args) -> Any: """Retrieve information about an object of any kind Dispatcher method over the more specific methods content(), directory(), etc. Note that this method will buffer the entire output in case of long, iterable output (e.g., for snapshot()), see the iter() method for streaming. """ pid_ = _get_pid(pid) return self._getters[pid_.object_type](pid_) def iter(self, pid: PIDish, **req_args) -> Generator[Dict[str, Any], None, None]: """Stream over the information about an object of any kind Streaming variant of get() """ pid_ = _get_pid(pid) obj_type = pid_.object_type if obj_type == SNAPSHOT: yield from self.snapshot(pid_) elif obj_type == REVISION: yield from [self.revision(pid_)] elif obj_type == RELEASE: yield from [self.release(pid_)] elif obj_type == DIRECTORY: yield from self.directory(pid_) elif obj_type == CONTENT: yield from [self.content(pid_)] else: raise ValueError(f'invalid object type: {obj_type}') def content(self, pid: PIDish, **req_args) -> Dict[str, Any]: """Retrieve information about a content object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ return typify( self._call(f'content/sha1_git:{_get_pid(pid).object_id}/', **req_args).json(), CONTENT) def directory(self, pid: PIDish, **req_args) -> List[Dict[str, Any]]: """Retrieve information about a directory object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ return typify( self._call(f'directory/{_get_pid(pid).object_id}/', **req_args).json(), DIRECTORY) def revision(self, pid: PIDish, **req_args) -> Dict[str, Any]: """Retrieve information about a revision object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ return typify( self._call(f'revision/{_get_pid(pid).object_id}/', **req_args).json(), REVISION) def release(self, pid: PIDish, **req_args) -> Dict[str, Any]: """Retrieve information about a release object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ return typify( self._call(f'release/{_get_pid(pid).object_id}/', **req_args).json(), RELEASE) def snapshot(self, pid: PIDish, **req_args) -> Generator[Dict[str, Any], None, None]: """Retrieve information about a snapshot object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Returns: an iterator over partial snapshots (dictionaries mapping branch names to information about where they point to), each containing a subset of available branches Raises: requests.HTTPError: if HTTP request fails """ done = False r = None query = f'snapshot/{_get_pid(pid).object_id}/' while not done: r = self._call(query, http_method='get', **req_args) yield typify(r.json()['branches'], SNAPSHOT) if 'next' in r.links and 'url' in r.links['next']: query = r.links['next']['url'] else: done = True def content_exists(self, pid: PIDish, **req_args) -> bool: """Check if a content object exists in the archive Args: pid: object identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool(self._call(f'content/sha1_git:{_get_pid(pid).object_id}/', http_method='head', **req_args)) def directory_exists(self, pid: PIDish, **req_args) -> bool: """Check if a directory object exists in the archive Args: pid: object identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool(self._call(f'directory/{_get_pid(pid).object_id}/', http_method='head', **req_args)) def revision_exists(self, pid: PIDish, **req_args) -> bool: """Check if a revision object exists in the archive Args: pid: object identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool(self._call(f'revision/{_get_pid(pid).object_id}/', http_method='head', **req_args)) def release_exists(self, pid: PIDish, **req_args) -> bool: """Check if a release object exists in the archive Args: pid: object identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool(self._call(f'release/{_get_pid(pid).object_id}/', http_method='head', **req_args)) def snapshot_exists(self, pid: PIDish, **req_args) -> bool: """Check if a snapshot object exists in the archive Args: pid: object identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool(self._call(f'snapshot/{_get_pid(pid).object_id}/', http_method='head', **req_args)) def content_raw(self, pid: PIDish, **req_args) -> Generator[bytes, None, None]: """Iterate over the raw content of a content object Args: pid: object identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ r = self._call(f'content/sha1_git:{_get_pid(pid).object_id}/raw/', stream=True, **req_args) r.raise_for_status() yield from r.iter_content(chunk_size=None, decode_unicode=False) + + def authenticate(self, refresh_token: str): + """Authenticate API requests using OpenID Connect bearer token + + Args: + refresh_token: A refresh token retrieved using the + ``swh auth login`` command (see :ref:`swh-web-client-auth` + section in main documentation) + + Raises: + swh.web.client.auth.AuthenticationError: if authentication fails + + """ + now = datetime.now() + try: + self.oidc_profile = self.oidc_session.refresh(refresh_token) + assert self.oidc_profile + if 'expires_in' in self.oidc_profile: + expires_in = self.oidc_profile['expires_in'] + expires_at = now + timedelta(seconds=expires_in) + self.oidc_profile['expires_at'] = expires_at + except Exception as e: + raise AuthenticationError(str(e)) + if 'access_token' not in self.oidc_profile: + # JSON error response + raise AuthenticationError(self.oidc_profile) diff --git a/swh/web/client/tests/conftest.py b/swh/web/client/tests/conftest.py index 9f738ce..d708133 100644 --- a/swh/web/client/tests/conftest.py +++ b/swh/web/client/tests/conftest.py @@ -1,29 +1,30 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from .api_data import API_URL, API_DATA from swh.web.client import WebAPIClient @pytest.fixture def web_api_mock(requests_mock): for api_call, data in API_DATA.items(): headers = {} if api_call == "snapshot/cabcc7d7bf639bbe1cc3b41989e1806618dd5764/": # monkey patch the only URL that require a special response headers - # (to make the client insit and follow pagination) + # (to make the client init and follow pagination) headers = { "Link": f"<{API_URL}/{api_call}?branches_count=1000&branches_from=refs/tags/v3.0-rc7>; rel=\"next\"" # NoQA: E501 } requests_mock.get(f"{API_URL}/{api_call}", text=data, headers=headers) + return requests_mock @pytest.fixture def web_api_client(): # use the fake base API URL that matches API data return WebAPIClient(api_url=API_URL) diff --git a/swh/web/client/tests/test_cli.py b/swh/web/client/tests/test_cli.py index 1a50b13..35f70fc 100644 --- a/swh/web/client/tests/test_cli.py +++ b/swh/web/client/tests/test_cli.py @@ -1,67 +1,67 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from click.testing import CliRunner from swh.web.client.cli import auth runner = CliRunner() -_oidc_profile = { +oidc_profile = { 'access_token': 'some-access-token', 'expires_in': 600, 'refresh_expires_in': 0, 'refresh_token': 'some-refresh-token', 'token_type': 'bearer', 'session_state': 'some-state', 'scope': 'openid email profile offline_access', } def test_auth_login(mocker): mock_getpass = mocker.patch('swh.web.client.cli.getpass') mock_getpass.return_value = 'password' mock_oidc_session = mocker.patch('swh.web.client.cli.OpenIDConnectSession') mock_login = mock_oidc_session.return_value.login - mock_login.return_value = _oidc_profile + mock_login.return_value = oidc_profile result = runner.invoke(auth, ['login', 'username'], input='password\n') assert result.exit_code == 0 - assert json.loads(result.output) == _oidc_profile + assert json.loads(result.output) == oidc_profile mock_login.side_effect = Exception('Auth error') result = runner.invoke(auth, ['login', 'username'], input='password\n') assert result.exit_code == 1 def test_auth_refresh(mocker): mock_oidc_session = mocker.patch('swh.web.client.cli.OpenIDConnectSession') mock_refresh = mock_oidc_session.return_value.refresh - mock_refresh.return_value = _oidc_profile + mock_refresh.return_value = oidc_profile - result = runner.invoke(auth, ['refresh', _oidc_profile['refresh_token']]) + result = runner.invoke(auth, ['refresh', oidc_profile['refresh_token']]) assert result.exit_code == 0 - assert json.loads(result.stdout) == _oidc_profile['access_token'] + assert json.loads(result.stdout) == oidc_profile['access_token'] mock_refresh.side_effect = Exception('Auth error') - result = runner.invoke(auth, ['refresh', _oidc_profile['refresh_token']]) + result = runner.invoke(auth, ['refresh', oidc_profile['refresh_token']]) assert result.exit_code == 1 def test_auth_logout(mocker): mock_oidc_session = mocker.patch('swh.web.client.cli.OpenIDConnectSession') mock_logout = mock_oidc_session.return_value.logout - result = runner.invoke(auth, ['logout', _oidc_profile['refresh_token']]) + result = runner.invoke(auth, ['logout', oidc_profile['refresh_token']]) assert result.exit_code == 0 mock_logout.side_effect = Exception('Auth error') - result = runner.invoke(auth, ['logout', _oidc_profile['refresh_token']]) + result = runner.invoke(auth, ['logout', oidc_profile['refresh_token']]) assert result.exit_code == 1 diff --git a/swh/web/client/tests/test_web_api_client.py b/swh/web/client/tests/test_web_api_client.py index 4dab31e..05d00ff 100644 --- a/swh/web/client/tests/test_web_api_client.py +++ b/swh/web/client/tests/test_web_api_client.py @@ -1,104 +1,201 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from copy import copy +from datetime import datetime from dateutil.parser import parse as parse_date +from unittest.mock import call, Mock +import pytest + +from swh.web.client.auth import AuthenticationError from swh.model.identifiers import parse_persistent_identifier as parse_pid +from .test_cli import oidc_profile + def test_get_content(web_api_client, web_api_mock): pid = parse_pid("swh:1:cnt:fe95a46679d128ff167b7c55df5d02356c5a1ae1") obj = web_api_client.get(pid) assert obj["length"] == 151810 for key in ("length", "status", "checksums", "data_url"): assert key in obj assert obj["checksums"]["sha1_git"] == str(pid).split(":")[3] assert obj["checksums"]["sha1"] == \ "dc2830a9e72f23c1dfebef4413003221baa5fb62" assert obj == web_api_client.content(pid) def test_get_directory(web_api_client, web_api_mock): pid = parse_pid("swh:1:dir:977fc4b98c0e85816348cebd3b12026407c368b6") obj = web_api_client.get(pid) assert len(obj) == 35 # number of directory entries assert all(map(lambda entry: entry["dir_id"] == pid, obj)) dir_entry = obj[0] assert dir_entry["type"] == "file" assert dir_entry["target"] == parse_pid( "swh:1:cnt:58471109208922c9ee8c4b06135725f03ed16814") assert dir_entry["name"] == ".bzrignore" assert dir_entry["length"] == 582 assert obj == web_api_client.directory(pid) def test_get_release(web_api_client, web_api_mock): pid = parse_pid("swh:1:rel:b9db10d00835e9a43e2eebef2db1d04d4ae82342") obj = web_api_client.get(pid) assert obj["id"] == pid assert obj["author"]["fullname"] == "Paul Tagliamonte " assert obj["author"]["name"] == "Paul Tagliamonte" assert obj["date"] == parse_date("2013-07-06T19:34:11-04:00") assert obj["name"] == "0.9.9" assert obj["target_type"] == "revision" assert obj["target"] == parse_pid( "swh:1:rev:e005cb773c769436709ca6a1d625dc784dbc1636") assert not obj["synthetic"] assert obj == web_api_client.release(pid) def test_get_revision(web_api_client, web_api_mock): pid = parse_pid("swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6") obj = web_api_client.get(pid) assert obj["id"] == pid for role in ("author", "committer"): assert obj[role]["fullname"] == \ "Nicolas Dandrimont " assert obj[role]["name"] == "Nicolas Dandrimont" timestamp = parse_date("2014-08-18T18:18:25+02:00") assert obj["date"] == timestamp assert obj["committer_date"] == timestamp assert obj["message"].startswith("Merge branch") assert obj["merge"] assert len(obj["parents"]) == 2 assert obj["parents"][0]["id"] == parse_pid( "swh:1:rev:26307d261279861c2d9c9eca3bb38519f951bea4") assert obj["parents"][1]["id"] == parse_pid( "swh:1:rev:37fc9e08d0c4b71807a4f1ecb06112e78d91c283") assert obj == web_api_client.revision(pid) def test_get_snapshot(web_api_client, web_api_mock): # small snapshot, the one from Web API doc pid = parse_pid("swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a") obj = web_api_client.get(pid) assert len(obj) == 4 assert obj["refs/heads/master"]["target_type"] == "revision" assert obj["refs/heads/master"]["target"] == parse_pid( "swh:1:rev:83c20a6a63a7ebc1a549d367bc07a61b926cecf3") assert obj["refs/tags/dpkt-1.7"]["target_type"] == "revision" assert obj["refs/tags/dpkt-1.7"]["target"] == parse_pid( "swh:1:rev:0c9dbfbc0974ec8ac1d8253aa1092366a03633a8") def test_iter_snapshot(web_api_client, web_api_mock): # large snapshot from the Linux kernel, usually spanning two pages pid = parse_pid("swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764") obj = web_api_client.snapshot(pid) snp = {} for partial in obj: snp.update(partial) assert len(snp) == 1391 + + +def test_authenticate_success(web_api_client, web_api_mock): + + rel_id = 'b9db10d00835e9a43e2eebef2db1d04d4ae82342' + url = f'{web_api_client.api_url}/release/{rel_id}/' + + web_api_client.oidc_session = Mock() + web_api_client.oidc_session.refresh.return_value = copy(oidc_profile) + + access_token = oidc_profile['access_token'] + refresh_token = 'user-refresh-token' + + web_api_client.authenticate(refresh_token) + + assert 'expires_at' in web_api_client.oidc_profile + + pid = parse_pid(f'swh:1:rel:{rel_id}') + web_api_client.get(pid) + + web_api_client.oidc_session.refresh.assert_called_once_with(refresh_token) + + sent_request = web_api_mock._adapter.last_request + + assert sent_request.url == url + assert 'Authorization' in sent_request.headers + + assert sent_request.headers['Authorization'] == f'Bearer {access_token}' + + +def test_authenticate_refresh_token(web_api_client, web_api_mock): + + rel_id = 'b9db10d00835e9a43e2eebef2db1d04d4ae82342' + url = f'{web_api_client.api_url}/release/{rel_id}/' + + oidc_profile_cp = copy(oidc_profile) + + web_api_client.oidc_session = Mock() + web_api_client.oidc_session.refresh.return_value = oidc_profile_cp + + refresh_token = 'user-refresh-token' + web_api_client.authenticate(refresh_token) + + assert 'expires_at' in web_api_client.oidc_profile + + # simulate access token expiration + web_api_client.oidc_profile['expires_at'] = datetime.now() + + access_token = 'new-access-token' + oidc_profile_cp['access_token'] = access_token + + pid = parse_pid(f'swh:1:rel:{rel_id}') + web_api_client.get(pid) + + calls = [call(refresh_token), call(oidc_profile['refresh_token'])] + web_api_client.oidc_session.refresh.assert_has_calls(calls) + + sent_request = web_api_mock._adapter.last_request + + assert sent_request.url == url + assert 'Authorization' in sent_request.headers + + assert sent_request.headers['Authorization'] == f'Bearer {access_token}' + + +def test_authenticate_failure(web_api_client, web_api_mock): + msg = 'Authentication error' + web_api_client.oidc_session = Mock() + web_api_client.oidc_session.refresh.side_effect = Exception(msg) + + refresh_token = 'user-refresh-token' + + with pytest.raises(AuthenticationError) as e: + web_api_client.authenticate(refresh_token) + + assert e.match(msg) + + oidc_error_response = { + 'error': 'invalid_grant', + 'error_description': 'Invalid refresh token', + } + + web_api_client.oidc_session.refresh.side_effect = None + web_api_client.oidc_session.refresh.return_value = oidc_error_response + + with pytest.raises(AuthenticationError) as e: + web_api_client.authenticate(refresh_token) + + assert e.match(repr(oidc_error_response))