Changeset View
Changeset View
Standalone View
Standalone View
swh/web/client/client.py
Show All 22 Lines | .. code-block:: python | ||||
# WARNING: this might *not* be what you want for large objects | # WARNING: this might *not* be what you want for large objects | ||||
cli.get('swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a') | cli.get('swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a') | ||||
# type-specific methods support explicit iteration through pages | # type-specific methods support explicit iteration through pages | ||||
next(cli.snapshot('swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764')) | next(cli.snapshot('swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764')) | ||||
""" | """ | ||||
from typing import Any, Callable, Dict, Generator, List, Union | from datetime import datetime, timedelta | ||||
from typing import Any, Callable, Dict, Generator, List, Optional, Union | |||||
from urllib.parse import urlparse | from urllib.parse import urlparse | ||||
import dateutil.parser | import dateutil.parser | ||||
import requests | import requests | ||||
from swh.model.identifiers import \ | from swh.model.identifiers import \ | ||||
SNAPSHOT, REVISION, RELEASE, DIRECTORY, CONTENT | SNAPSHOT, REVISION, RELEASE, DIRECTORY, CONTENT | ||||
from swh.model.identifiers import PersistentId as PID | from swh.model.identifiers import PersistentId as PID | ||||
from swh.model.identifiers import parse_persistent_identifier as parse_pid | from swh.model.identifiers import parse_persistent_identifier as parse_pid | ||||
from .auth import ( | |||||
AuthenticationError, OpenIDConnectSession, SWH_OIDC_SERVER_URL | |||||
) | |||||
PIDish = Union[PID, str] | PIDish = Union[PID, str] | ||||
def _get_pid(pidish: PIDish) -> PID: | def _get_pid(pidish: PIDish) -> PID: | ||||
"""Parse string to PID if needed""" | """Parse string to PID if needed""" | ||||
if isinstance(pidish, str): | if isinstance(pidish, str): | ||||
return parse_pid(pidish) | return parse_pid(pidish) | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | |||||
class WebAPIClient: | class WebAPIClient: | ||||
"""Client for the Software Heritage archive Web API, see | """Client for the Software Heritage archive Web API, see | ||||
https://archive.softwareheritage.org/api/ | https://archive.softwareheritage.org/api/ | ||||
""" | """ | ||||
def __init__(self, api_url='https://archive.softwareheritage.org/api/1'): | def __init__(self, api_url='https://archive.softwareheritage.org/api/1', | ||||
auth_url=SWH_OIDC_SERVER_URL): | |||||
"""Create a client for the Software Heritage Web API | """Create a client for the Software Heritage Web API | ||||
See: https://archive.softwareheritage.org/api/ | See: https://archive.softwareheritage.org/api/ | ||||
Args: | Args: | ||||
api_url: base URL for API calls (default: | api_url: base URL for API calls (default: | ||||
"https://archive.softwareheritage.org/api/1") | "https://archive.softwareheritage.org/api/1") | ||||
""" | """ | ||||
api_url = api_url.rstrip('/') | api_url = api_url.rstrip('/') | ||||
u = urlparse(api_url) | u = urlparse(api_url) | ||||
self.api_url = api_url | self.api_url = api_url | ||||
self.api_path = u.path | self.api_path = u.path | ||||
self.oidc_session = OpenIDConnectSession(oidc_server_url=auth_url) | |||||
self.oidc_profile: Optional[Dict[str, Any]] = None | |||||
self._getters: Dict[str, Callable[[PIDish], Any]] = { | self._getters: Dict[str, Callable[[PIDish], Any]] = { | ||||
CONTENT: self.content, | CONTENT: self.content, | ||||
DIRECTORY: self.directory, | DIRECTORY: self.directory, | ||||
RELEASE: self.release, | RELEASE: self.release, | ||||
REVISION: self.revision, | REVISION: self.revision, | ||||
SNAPSHOT: self._get_snapshot, | SNAPSHOT: self._get_snapshot, | ||||
} | } | ||||
Show All 13 Lines | def _call(self, query: str, http_method: str = 'get', | ||||
""" | """ | ||||
url = None | url = None | ||||
if urlparse(query).scheme: # absolute URL | if urlparse(query).scheme: # absolute URL | ||||
url = query | url = query | ||||
else: # relative URL; prepend base API URL | else: # relative URL; prepend base API URL | ||||
url = '/'.join([self.api_url, query]) | url = '/'.join([self.api_url, query]) | ||||
r = None | r = None | ||||
headers = {} | |||||
if self.oidc_profile is not None: | |||||
# use bearer token authentication | |||||
if datetime.now() > self.oidc_profile['expires_at']: | |||||
# refresh access token if it has expired | |||||
self.authenticate(self.oidc_profile['refresh_token']) | |||||
access_token = self.oidc_profile['access_token'] | |||||
headers = {'Authorization': f'Bearer {access_token}'} | |||||
if http_method == 'get': | if http_method == 'get': | ||||
r = requests.get(url, **req_args) | r = requests.get(url, **req_args, headers=headers) | ||||
r.raise_for_status() | r.raise_for_status() | ||||
elif http_method == 'head': | elif http_method == 'head': | ||||
r = requests.head(url, **req_args) | r = requests.head(url, **req_args, headers=headers) | ||||
else: | else: | ||||
raise ValueError(f'unsupported HTTP method: {http_method}') | raise ValueError(f'unsupported HTTP method: {http_method}') | ||||
return r | return r | ||||
def _get_snapshot(self, pid: PIDish) -> Dict[str, Any]: | def _get_snapshot(self, pid: PIDish) -> Dict[str, Any]: | ||||
"""Analogous to self.snapshot(), but zipping through partial snapshots, | """Analogous to self.snapshot(), but zipping through partial snapshots, | ||||
merging them together before returning | merging them together before returning | ||||
▲ Show 20 Lines • Show All 217 Lines • ▼ Show 20 Lines | def content_raw(self, pid: PIDish, | ||||
requests.HTTPError: if HTTP request fails | requests.HTTPError: if HTTP request fails | ||||
""" | """ | ||||
r = self._call(f'content/sha1_git:{_get_pid(pid).object_id}/raw/', | r = self._call(f'content/sha1_git:{_get_pid(pid).object_id}/raw/', | ||||
stream=True, **req_args) | stream=True, **req_args) | ||||
r.raise_for_status() | r.raise_for_status() | ||||
yield from r.iter_content(chunk_size=None, decode_unicode=False) | yield from r.iter_content(chunk_size=None, decode_unicode=False) | ||||
def authenticate(self, refresh_token: str): | |||||
"""Authenticate API requests using OpenID Connect bearer token | |||||
Args: | |||||
refresh_token: A refresh token retrieved using the | |||||
``swh auth login`` command (see :ref:`swh-web-client-auth` | |||||
section in main documentation) | |||||
Raises: | |||||
swh.web.client.auth.AuthenticationError: if authentication fails | |||||
""" | |||||
now = datetime.now() | |||||
try: | |||||
self.oidc_profile = self.oidc_session.refresh(refresh_token) | |||||
assert self.oidc_profile | |||||
if 'expires_in' in self.oidc_profile: | |||||
expires_in = self.oidc_profile['expires_in'] | |||||
expires_at = now + timedelta(seconds=expires_in) | |||||
self.oidc_profile['expires_at'] = expires_at | |||||
except Exception as e: | |||||
raise AuthenticationError(str(e)) | |||||
if 'access_token' not in self.oidc_profile: | |||||
# JSON error response | |||||
raise AuthenticationError(self.oidc_profile) |