Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9337765
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
24 KB
Subscribers
None
View Options
diff --git a/swh/web/client/client.py b/swh/web/client/client.py
index 4a495f3..6ea2481 100644
--- a/swh/web/client/client.py
+++ b/swh/web/client/client.py
@@ -1,509 +1,515 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Python client for the Software Heritage Web API
Light wrapper around requests for the archive API, taking care of data
conversions and pagination.
.. code-block:: python
from swh.web.client.client import WebAPIClient
cli = WebAPIClient()
# retrieve any archived object via its SWHID
cli.get('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6')
# same, but for specific object types
cli.revision('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6')
# get() always retrieve entire objects, following pagination
# WARNING: this might *not* be what you want for large objects
cli.get('swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a')
# type-specific methods support explicit iteration through pages
next(cli.snapshot('swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764'))
"""
+from datetime import datetime
from typing import Any, Callable, Dict, Iterator, List, Optional, Union
from urllib.parse import urlparse
import dateutil.parser
import requests
from swh.model.identifiers import (
CONTENT,
DIRECTORY,
RELEASE,
REVISION,
SNAPSHOT,
SWHID,
parse_swhid,
)
SWHIDish = Union[SWHID, str]
ORIGIN_VISIT = "origin_visit"
def _get_swhid(swhidish: SWHIDish) -> SWHID:
"""Parse string to SWHID if needed"""
if isinstance(swhidish, str):
return parse_swhid(swhidish)
else:
return swhidish
def typify_json(data: Any, obj_type: str) -> Any:
"""Type API responses using pythonic types where appropriate
The following conversions are performed:
- identifiers are converted from strings to SWHID instances
- timestamps are converted from strings to datetime.datetime objects
"""
- def to_swhid(object_type, s):
+ def to_swhid(object_type: str, s: Any) -> SWHID:
return SWHID(object_type=object_type, object_id=s)
- def to_date(s):
- return dateutil.parser.parse(s)
+ def to_date(date: str) -> datetime:
+ return dateutil.parser.parse(date)
+
+ def to_optional_date(date: Optional[str]) -> Optional[datetime]:
+ return None if date is None else to_date(date)
+
+ # The date attribute is optional for Revision and Release object
def obj_type_of_entry_type(s):
if s == "file":
return CONTENT
elif s == "dir":
return DIRECTORY
elif s == "rev":
return REVISION
else:
raise ValueError(f"invalid directory entry type: {s}")
if obj_type == SNAPSHOT:
for name, target in data.items():
if target["target_type"] != "alias":
# alias targets do not point to objects via SWHIDs; others do
target["target"] = to_swhid(target["target_type"], target["target"])
elif obj_type == REVISION:
data["id"] = to_swhid(obj_type, data["id"])
data["directory"] = to_swhid(DIRECTORY, data["directory"])
for key in ("date", "committer_date"):
- data[key] = to_date(data[key])
+ data[key] = to_optional_date(data[key])
for parent in data["parents"]:
parent["id"] = to_swhid(REVISION, parent["id"])
elif obj_type == RELEASE:
data["id"] = to_swhid(obj_type, data["id"])
- data["date"] = to_date(data["date"])
+ data["date"] = to_optional_date(data["date"])
data["target"] = to_swhid(data["target_type"], data["target"])
elif obj_type == DIRECTORY:
dir_swhid = None
for entry in data:
dir_swhid = dir_swhid or to_swhid(obj_type, entry["dir_id"])
entry["dir_id"] = dir_swhid
entry["target"] = to_swhid(
obj_type_of_entry_type(entry["type"]), entry["target"]
)
elif obj_type == CONTENT:
pass # nothing to do for contents
elif obj_type == ORIGIN_VISIT:
data["date"] = to_date(data["date"])
if data["snapshot"] is not None:
data["snapshot"] = to_swhid(SNAPSHOT, data["snapshot"])
else:
raise ValueError(f"invalid object type: {obj_type}")
return data
class WebAPIClient:
"""Client for the Software Heritage archive Web API, see
https://archive.softwareheritage.org/api/
"""
def __init__(
self,
api_url: str = "https://archive.softwareheritage.org/api/1",
bearer_token: Optional[str] = None,
):
"""Create a client for the Software Heritage Web API
See: https://archive.softwareheritage.org/api/
Args:
api_url: base URL for API calls (default:
"https://archive.softwareheritage.org/api/1")
bearer_token: optional bearer token to do authenticated API calls
"""
api_url = api_url.rstrip("/")
u = urlparse(api_url)
self.api_url = api_url
self.api_path = u.path
self.bearer_token = bearer_token
self._getters: Dict[str, Callable[[SWHIDish, bool], Any]] = {
CONTENT: self.content,
DIRECTORY: self.directory,
RELEASE: self.release,
REVISION: self.revision,
SNAPSHOT: self._get_snapshot,
}
def _call(
self, query: str, http_method: str = "get", **req_args
) -> requests.models.Response:
"""Dispatcher for archive API invocation
Args:
query: API method to be invoked, rooted at api_url
http_method: HTTP method to be invoked, one of: 'get', 'head'
req_args: extra keyword arguments for requests.get()/.head()
Raises:
requests.HTTPError: if HTTP request fails and http_method is 'get'
"""
url = None
if urlparse(query).scheme: # absolute URL
url = query
else: # relative URL; prepend base API URL
url = "/".join([self.api_url, query])
r = None
headers = {}
if self.bearer_token is not None:
headers = {"Authorization": f"Bearer {self.bearer_token}"}
if http_method == "get":
r = requests.get(url, **req_args, headers=headers)
r.raise_for_status()
elif http_method == "head":
r = requests.head(url, **req_args, headers=headers)
else:
raise ValueError(f"unsupported HTTP method: {http_method}")
return r
def _get_snapshot(self, swhid: SWHIDish, typify: bool = True) -> Dict[str, Any]:
"""Analogous to self.snapshot(), but zipping through partial snapshots,
merging them together before returning
"""
snapshot = {}
for snp in self.snapshot(swhid, typify):
snapshot.update(snp)
return snapshot
def get(self, swhid: SWHIDish, typify: bool = True, **req_args) -> Any:
"""Retrieve information about an object of any kind
Dispatcher method over the more specific methods content(),
directory(), etc.
Note that this method will buffer the entire output in case of long,
iterable output (e.g., for snapshot()), see the iter() method for
streaming.
"""
swhid_ = _get_swhid(swhid)
return self._getters[swhid_.object_type](swhid_, typify)
def iter(
self, swhid: SWHIDish, typify: bool = True, **req_args
) -> Iterator[Dict[str, Any]]:
"""Stream over the information about an object of any kind
Streaming variant of get()
"""
swhid_ = _get_swhid(swhid)
obj_type = swhid_.object_type
if obj_type == SNAPSHOT:
yield from self.snapshot(swhid_, typify)
elif obj_type == REVISION:
yield from [self.revision(swhid_, typify)]
elif obj_type == RELEASE:
yield from [self.release(swhid_, typify)]
elif obj_type == DIRECTORY:
yield from self.directory(swhid_, typify)
elif obj_type == CONTENT:
yield from [self.content(swhid_, typify)]
else:
raise ValueError(f"invalid object type: {obj_type}")
def content(
self, swhid: SWHIDish, typify: bool = True, **req_args
) -> Dict[str, Any]:
"""Retrieve information about a content object
Args:
swhid: object persistent identifier
typify: if True, convert return value to pythonic types wherever
possible, otherwise return raw JSON types (default: True)
req_args: extra keyword arguments for requests.get()
Raises:
requests.HTTPError: if HTTP request fails
"""
json = self._call(
f"content/sha1_git:{_get_swhid(swhid).object_id}/", **req_args
).json()
return typify_json(json, CONTENT) if typify else json
def directory(
self, swhid: SWHIDish, typify: bool = True, **req_args
) -> List[Dict[str, Any]]:
"""Retrieve information about a directory object
Args:
swhid: object persistent identifier
typify: if True, convert return value to pythonic types wherever
possible, otherwise return raw JSON types (default: True)
req_args: extra keyword arguments for requests.get()
Raises:
requests.HTTPError: if HTTP request fails
"""
json = self._call(
f"directory/{_get_swhid(swhid).object_id}/", **req_args
).json()
return typify_json(json, DIRECTORY) if typify else json
def revision(
self, swhid: SWHIDish, typify: bool = True, **req_args
) -> Dict[str, Any]:
"""Retrieve information about a revision object
Args:
swhid: object persistent identifier
typify: if True, convert return value to pythonic types wherever
possible, otherwise return raw JSON types (default: True)
req_args: extra keyword arguments for requests.get()
Raises:
requests.HTTPError: if HTTP request fails
"""
json = self._call(f"revision/{_get_swhid(swhid).object_id}/", **req_args).json()
return typify_json(json, REVISION) if typify else json
def release(
self, swhid: SWHIDish, typify: bool = True, **req_args
) -> Dict[str, Any]:
"""Retrieve information about a release object
Args:
swhid: object persistent identifier
typify: if True, convert return value to pythonic types wherever
possible, otherwise return raw JSON types (default: True)
req_args: extra keyword arguments for requests.get()
Raises:
requests.HTTPError: if HTTP request fails
"""
json = self._call(f"release/{_get_swhid(swhid).object_id}/", **req_args).json()
return typify_json(json, RELEASE) if typify else json
def snapshot(
self, swhid: SWHIDish, typify: bool = True, **req_args
) -> Iterator[Dict[str, Any]]:
"""Retrieve information about a snapshot object
Args:
swhid: object persistent identifier
typify: if True, convert return value to pythonic types wherever
possible, otherwise return raw JSON types (default: True)
req_args: extra keyword arguments for requests.get()
Returns:
an iterator over partial snapshots (dictionaries mapping branch
names to information about where they point to), each containing a
subset of available branches
Raises:
requests.HTTPError: if HTTP request fails
"""
done = False
r = None
query = f"snapshot/{_get_swhid(swhid).object_id}/"
while not done:
r = self._call(query, http_method="get", **req_args)
json = r.json()["branches"]
yield typify_json(json, SNAPSHOT) if typify else json
if "next" in r.links and "url" in r.links["next"]:
query = r.links["next"]["url"]
else:
done = True
def visits(
self,
origin: str,
per_page: Optional[int] = None,
last_visit: Optional[int] = None,
typify: bool = True,
**req_args,
) -> Iterator[Dict[str, Any]]:
"""List visits of an origin
Args:
origin: the URL of a software origin
per_page: the number of visits to list
last_visit: visit to start listing from
typify: if True, convert return value to pythonic types wherever
possible, otherwise return raw JSON types (default: True)
req_args: extra keyword arguments for requests.get()
Returns:
an iterator over visits of the origin
Raises:
requests.HTTPError: if HTTP request fails
"""
done = False
r = None
params = []
if last_visit is not None:
params.append(("last_visit", last_visit))
if per_page is not None:
params.append(("per_page", per_page))
query = f"origin/{origin}/visits/"
while not done:
r = self._call(query, http_method="get", params=params, **req_args)
yield from [typify_json(v, ORIGIN_VISIT) if typify else v for v in r.json()]
if "next" in r.links and "url" in r.links["next"]:
params = []
query = r.links["next"]["url"]
else:
done = True
def content_exists(self, swhid: SWHIDish, **req_args) -> bool:
"""Check if a content object exists in the archive
Args:
swhid: object persistent identifier
req_args: extra keyword arguments for requests.head()
Raises:
requests.HTTPError: if HTTP request fails
"""
return bool(
self._call(
f"content/sha1_git:{_get_swhid(swhid).object_id}/",
http_method="head",
**req_args,
)
)
def directory_exists(self, swhid: SWHIDish, **req_args) -> bool:
"""Check if a directory object exists in the archive
Args:
swhid: object persistent identifier
req_args: extra keyword arguments for requests.head()
Raises:
requests.HTTPError: if HTTP request fails
"""
return bool(
self._call(
f"directory/{_get_swhid(swhid).object_id}/",
http_method="head",
**req_args,
)
)
def revision_exists(self, swhid: SWHIDish, **req_args) -> bool:
"""Check if a revision object exists in the archive
Args:
swhid: object persistent identifier
req_args: extra keyword arguments for requests.head()
Raises:
requests.HTTPError: if HTTP request fails
"""
return bool(
self._call(
f"revision/{_get_swhid(swhid).object_id}/",
http_method="head",
**req_args,
)
)
def release_exists(self, swhid: SWHIDish, **req_args) -> bool:
"""Check if a release object exists in the archive
Args:
swhid: object persistent identifier
req_args: extra keyword arguments for requests.head()
Raises:
requests.HTTPError: if HTTP request fails
"""
return bool(
self._call(
f"release/{_get_swhid(swhid).object_id}/",
http_method="head",
**req_args,
)
)
def snapshot_exists(self, swhid: SWHIDish, **req_args) -> bool:
"""Check if a snapshot object exists in the archive
Args:
swhid: object persistent identifier
req_args: extra keyword arguments for requests.head()
Raises:
requests.HTTPError: if HTTP request fails
"""
return bool(
self._call(
f"snapshot/{_get_swhid(swhid).object_id}/",
http_method="head",
**req_args,
)
)
def content_raw(self, swhid: SWHIDish, **req_args) -> Iterator[bytes]:
"""Iterate over the raw content of a content object
Args:
swhid: object persistent identifier
req_args: extra keyword arguments for requests.get()
Raises:
requests.HTTPError: if HTTP request fails
"""
r = self._call(
f"content/sha1_git:{_get_swhid(swhid).object_id}/raw/",
stream=True,
**req_args,
)
r.raise_for_status()
yield from r.iter_content(chunk_size=None, decode_unicode=False)
diff --git a/swh/web/client/tests/test_web_api_client.py b/swh/web/client/tests/test_web_api_client.py
index 79842df..0353ae3 100644
--- a/swh/web/client/tests/test_web_api_client.py
+++ b/swh/web/client/tests/test_web_api_client.py
@@ -1,174 +1,189 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from dateutil.parser import parse as parse_date
-from swh.model.identifiers import parse_swhid
+from swh.model.identifiers import REVISION, parse_swhid
+from swh.web.client.client import typify_json
from .api_data import API_DATA
def test_get_content(web_api_client, web_api_mock):
swhid = parse_swhid("swh:1:cnt:fe95a46679d128ff167b7c55df5d02356c5a1ae1")
obj = web_api_client.get(swhid)
assert obj["length"] == 151810
for key in ("length", "status", "checksums", "data_url"):
assert key in obj
assert obj["checksums"]["sha1_git"] == str(swhid).split(":")[3]
assert obj["checksums"]["sha1"] == "dc2830a9e72f23c1dfebef4413003221baa5fb62"
assert obj == web_api_client.content(swhid)
def test_get_directory(web_api_client, web_api_mock):
swhid = parse_swhid("swh:1:dir:977fc4b98c0e85816348cebd3b12026407c368b6")
obj = web_api_client.get(swhid)
assert len(obj) == 35 # number of directory entries
assert all(map(lambda entry: entry["dir_id"] == swhid, obj))
dir_entry = obj[0]
assert dir_entry["type"] == "file"
assert dir_entry["target"] == parse_swhid(
"swh:1:cnt:58471109208922c9ee8c4b06135725f03ed16814"
)
assert dir_entry["name"] == ".bzrignore"
assert dir_entry["length"] == 582
assert obj == web_api_client.directory(swhid)
def test_get_release(web_api_client, web_api_mock):
swhid = parse_swhid("swh:1:rel:b9db10d00835e9a43e2eebef2db1d04d4ae82342")
obj = web_api_client.get(swhid)
assert obj["id"] == swhid
assert obj["author"]["fullname"] == "Paul Tagliamonte <tag@pault.ag>"
assert obj["author"]["name"] == "Paul Tagliamonte"
assert obj["date"] == parse_date("2013-07-06T19:34:11-04:00")
assert obj["name"] == "0.9.9"
assert obj["target_type"] == "revision"
assert obj["target"] == parse_swhid(
"swh:1:rev:e005cb773c769436709ca6a1d625dc784dbc1636"
)
assert not obj["synthetic"]
assert obj == web_api_client.release(swhid)
def test_get_revision(web_api_client, web_api_mock):
swhid = parse_swhid("swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6")
obj = web_api_client.get(swhid)
assert obj["id"] == swhid
for role in ("author", "committer"):
assert (
obj[role]["fullname"] == "Nicolas Dandrimont <nicolas.dandrimont@crans.org>"
)
assert obj[role]["name"] == "Nicolas Dandrimont"
timestamp = parse_date("2014-08-18T18:18:25+02:00")
assert obj["date"] == timestamp
assert obj["committer_date"] == timestamp
assert obj["message"].startswith("Merge branch")
assert obj["merge"]
assert len(obj["parents"]) == 2
assert obj["parents"][0]["id"] == parse_swhid(
"swh:1:rev:26307d261279861c2d9c9eca3bb38519f951bea4"
)
assert obj["parents"][1]["id"] == parse_swhid(
"swh:1:rev:37fc9e08d0c4b71807a4f1ecb06112e78d91c283"
)
assert obj == web_api_client.revision(swhid)
def test_get_snapshot(web_api_client, web_api_mock):
# small snapshot, the one from Web API doc
swhid = parse_swhid("swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a")
obj = web_api_client.get(swhid)
assert len(obj) == 4
assert obj["refs/heads/master"]["target_type"] == "revision"
assert obj["refs/heads/master"]["target"] == parse_swhid(
"swh:1:rev:83c20a6a63a7ebc1a549d367bc07a61b926cecf3"
)
assert obj["refs/tags/dpkt-1.7"]["target_type"] == "revision"
assert obj["refs/tags/dpkt-1.7"]["target"] == parse_swhid(
"swh:1:rev:0c9dbfbc0974ec8ac1d8253aa1092366a03633a8"
)
def test_iter_snapshot(web_api_client, web_api_mock):
# large snapshot from the Linux kernel, usually spanning two pages
swhid = parse_swhid("swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764")
obj = web_api_client.snapshot(swhid)
snp = {}
for partial in obj:
snp.update(partial)
assert len(snp) == 1391
def test_authentication(web_api_client, web_api_mock):
rel_id = "b9db10d00835e9a43e2eebef2db1d04d4ae82342"
url = f"{web_api_client.api_url}/release/{rel_id}/"
refresh_token = "user-refresh-token"
web_api_client.bearer_token = refresh_token
swhid = parse_swhid(f"swh:1:rel:{rel_id}")
web_api_client.get(swhid)
sent_request = web_api_mock._adapter.last_request
assert sent_request.url == url
assert "Authorization" in sent_request.headers
assert sent_request.headers["Authorization"] == f"Bearer {refresh_token}"
def test_get_visits(web_api_client, web_api_mock):
obj = web_api_client.visits(
"https://github.com/NixOS/nixpkgs", last_visit=50, per_page=10
)
visits = [v for v in obj]
assert len(visits) == 20
timestamp = parse_date("2018-07-31 04:34:23.298931+00:00")
assert visits[0]["date"] == timestamp
assert visits[0]["snapshot"] is None
snapshot_swhid = "swh:1:snp:456550ea74af4e2eecaa406629efaaf0b9b5f976"
assert visits[7]["snapshot"] == parse_swhid(snapshot_swhid)
def test_get_json(web_api_client, web_api_mock):
swhids = [
"swh:1:cnt:fe95a46679d128ff167b7c55df5d02356c5a1ae1",
"swh:1:dir:977fc4b98c0e85816348cebd3b12026407c368b6",
"swh:1:rel:b9db10d00835e9a43e2eebef2db1d04d4ae82342",
"swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6",
"swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a",
]
for swhid in swhids:
actual = web_api_client.get(swhid, typify=False)
expected = None
# Fetch raw JSON data from the generated API_DATA
for url, data in API_DATA.items():
object_id = swhid[len("swh:1:XXX:") :]
if object_id in url:
expected = json.loads(data)
# Special case: snapshots response differs slightly from the Web API
if swhid.startswith("swh:1:snp:"):
expected = expected["branches"]
break
assert actual == expected
+
+
+def test_typify_json_minimal_revision():
+ revision_data = {
+ "id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+ "directory": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+ "date": None,
+ "committer_date": None,
+ "parents": [],
+ }
+ revision_typed = typify_json(revision_data, REVISION)
+ pid = "swh:1:rev:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
+ assert revision_typed["id"] == parse_swhid(pid)
+ assert revision_typed["date"] is None
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 8:17 AM (9 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3266834
Attached To
rDWCLI Web client
Event Timeline
Log In to Comment