Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/deposit/client.py b/swh/deposit/client.py
index 9418d03a..9cd1a4fd 100644
--- a/swh/deposit/client.py
+++ b/swh/deposit/client.py
@@ -1,658 +1,651 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module in charge of defining an swh-deposit client
"""
import hashlib
import os
+import logging
import requests
import xmltodict
-import logging
from abc import ABCMeta, abstractmethod
from typing import Any, Dict
from urllib.parse import urljoin
-from swh.core.config import SWHConfig
-
+from swh.core.config import read_raw_config, config_basepath
logger = logging.getLogger(__name__)
class MaintenanceError(ValueError):
"""Informational maintenance error exception
"""
pass
def _parse(stream, encoding="utf-8"):
"""Given a xml stream, parse the result.
Args:
stream (bytes/text): The stream to parse
encoding (str): The encoding to use if to decode the bytes
stream
Returns:
A dict of values corresponding to the parsed xml
"""
if isinstance(stream, bytes):
stream = stream.decode(encoding)
data = xmltodict.parse(stream, encoding=encoding, process_namespaces=False)
if "entry" in data:
data = data["entry"]
if "sword:error" in data:
data = data["sword:error"]
return dict(data)
def _parse_with_filter(stream, encoding="utf-8", keys=[]):
"""Given a xml stream, parse the result and filter with keys.
Args:
stream (bytes/text): The stream to parse
encoding (str): The encoding to use if to decode the bytes
stream
keys ([str]): Keys to filter the parsed result
Returns:
A dict of values corresponding to the parsed xml filtered by
the keys provided.
"""
data = _parse(stream, encoding=encoding)
m = {}
for key in keys:
m[key] = data.get(key)
return m
-class BaseApiDepositClient(SWHConfig):
+class BaseApiDepositClient:
"""Deposit client base class
"""
- CONFIG_BASE_FILENAME = "deposit/client"
- DEFAULT_CONFIG = {
- "url": ("str", "http://localhost:5006"),
- "auth": ("dict", {}), # with optional 'username'/'password' keys
- }
-
def __init__(self, config=None, _client=requests):
- super().__init__()
if config is None:
- self.config = super().parse_config_file()
+ config_file = os.environ["SWH_CONFIG_FILENAME"]
+ self.config: Dict[str, Any] = read_raw_config(config_basepath(config_file))
else:
self.config = config
self._client = _client
self.base_url = self.config["url"].strip("/") + "/"
auth = self.config["auth"]
if auth == {}:
self.auth = None
else:
self.auth = (auth["username"], auth["password"])
def do(self, method, url, *args, **kwargs):
"""Internal method to deal with requests, possibly with basic http
authentication.
Args:
method (str): supported http methods as in self._methods' keys
Returns:
The request's execution
"""
if hasattr(self._client, method):
method_fn = getattr(self._client, method)
else:
raise ValueError("Development error, unsupported method %s" % (method))
if self.auth:
kwargs["auth"] = self.auth
full_url = urljoin(self.base_url, url.lstrip("/"))
return method_fn(full_url, *args, **kwargs)
class PrivateApiDepositClient(BaseApiDepositClient):
"""Private API deposit client to:
- read a given deposit's archive(s)
- read a given deposit's metadata
- update a given deposit's status
"""
def archive_get(self, archive_update_url, archive):
"""Retrieve the archive from the deposit to a local directory.
Args:
archive_update_url (str): The full deposit archive(s)'s raw content
to retrieve locally
archive (str): the local archive's path where to store
the raw content
Returns:
The archive path to the local archive to load.
Or None if any problem arose.
"""
r = self.do("get", archive_update_url, stream=True)
if r.ok:
with open(archive, "wb") as f:
for chunk in r.iter_content():
f.write(chunk)
return archive
msg = "Problem when retrieving deposit archive at %s" % (archive_update_url,)
logger.error(msg)
raise ValueError(msg)
def metadata_get(self, metadata_url):
"""Retrieve the metadata information on a given deposit.
Args:
metadata_url (str): The full deposit metadata url to retrieve
locally
Returns:
The dictionary of metadata for that deposit or None if any
problem arose.
"""
r = self.do("get", metadata_url)
if r.ok:
return r.json()
msg = "Problem when retrieving metadata at %s" % metadata_url
logger.error(msg)
raise ValueError(msg)
def status_update(
self,
update_status_url,
status,
revision_id=None,
directory_id=None,
origin_url=None,
):
"""Update the deposit's status.
Args:
update_status_url (str): the full deposit's archive
status (str): The status to update the deposit with
revision_id (str/None): the revision's identifier to update to
directory_id (str/None): the directory's identifier to update to
origin_url (str/None): deposit's associated origin url
"""
payload = {"status": status}
if revision_id:
payload["revision_id"] = revision_id
if directory_id:
payload["directory_id"] = directory_id
if origin_url:
payload["origin_url"] = origin_url
self.do("put", update_status_url, json=payload)
def check(self, check_url):
"""Check the deposit's associated data (metadata, archive(s))
Args:
check_url (str): the full deposit's check url
"""
r = self.do("get", check_url)
if r.ok:
data = r.json()
return data["status"]
msg = "Problem when checking deposit %s" % check_url
logger.error(msg)
raise ValueError(msg)
class BaseDepositClient(BaseApiDepositClient, metaclass=ABCMeta):
"""Base Deposit client to access the public api.
"""
def __init__(self, config, error_msg=None, empty_result={}):
super().__init__(config)
self.error_msg = error_msg
self.empty_result = empty_result
@abstractmethod
def compute_url(self, *args, **kwargs):
"""Compute api url endpoint to query."""
pass
@abstractmethod
def compute_method(self, *args, **kwargs):
"""Http method to use on the url"""
pass
@abstractmethod
def parse_result_ok(self, xml_content):
"""Given an xml result from the api endpoint, parse it and returns a
dict.
"""
pass
def compute_information(self, *args, **kwargs):
"""Compute some more information given the inputs (e.g http headers,
...)
"""
return {}
def parse_result_error(self, xml_content):
"""Given an error response in xml, parse it into a dict.
Returns:
dict with following keys:
'error': The error message
'detail': Some more detail about the error if any
"""
return _parse_with_filter(
xml_content, keys=["summary", "detail", "sword:verboseDescription"]
)
def do_execute(self, method, url, info):
"""Execute the http query to url using method and info information.
By default, execute a simple query to url with the http
method. Override this in daughter class to improve the
default behavior if needed.
"""
return self.do(method, url)
def execute(self, *args, **kwargs) -> Dict[str, Any]:
"""Main endpoint to prepare and execute the http query to the api.
Raises:
MaintenanceError if some api maintenance is happening.
Returns:
Dict of computed api data
"""
url = self.compute_url(*args, **kwargs)
method = self.compute_method(*args, **kwargs)
info = self.compute_information(*args, **kwargs)
try:
r = self.do_execute(method, url, info)
except Exception as e:
msg = self.error_msg % (url, e)
r = self.empty_result
r.update(
{"error": msg,}
)
return r
else:
if r.ok:
if int(r.status_code) == 204: # 204 returns no body
return {"status": r.status_code}
else:
return self.parse_result_ok(r.text)
else:
error = self.parse_result_error(r.text)
empty = self.empty_result
error.update(empty)
if r.status_code == 503:
summary = error.get("summary")
detail = error.get("sword:verboseDescription")
# Maintenance error
if summary and detail:
raise MaintenanceError(f"{summary}: {detail}")
error.update(
{"status": r.status_code,}
)
return error
class ServiceDocumentDepositClient(BaseDepositClient):
"""Service Document information retrieval.
"""
def __init__(self, config):
super().__init__(
config,
error_msg="Service document failure at %s: %s",
empty_result={"collection": None},
)
def compute_url(self, *args, **kwargs):
return "/servicedocument/"
def compute_method(self, *args, **kwargs):
return "get"
def parse_result_ok(self, xml_content):
"""Parse service document's success response.
"""
return _parse(xml_content)
class StatusDepositClient(BaseDepositClient):
"""Status information on a deposit.
"""
def __init__(self, config):
super().__init__(
config,
error_msg="Status check failure at %s: %s",
empty_result={
"deposit_status": None,
"deposit_status_detail": None,
"deposit_swh_id": None,
},
)
def compute_url(self, collection, deposit_id):
return "/%s/%s/status/" % (collection, deposit_id)
def compute_method(self, *args, **kwargs):
return "get"
def parse_result_ok(self, xml_content):
"""Given an xml content as string, returns a deposit dict.
"""
return _parse_with_filter(
xml_content,
keys=[
"deposit_id",
"deposit_status",
"deposit_status_detail",
"deposit_swh_id",
"deposit_swh_id_context",
"deposit_external_id",
],
)
class BaseCreateDepositClient(BaseDepositClient):
"""Deposit client base class to post new deposit.
"""
def __init__(self, config):
super().__init__(
config,
error_msg="Post Deposit failure at %s: %s",
empty_result={"deposit_id": None, "deposit_status": None,},
)
def compute_url(self, collection, *args, **kwargs):
return "/%s/" % collection
def compute_method(self, *args, **kwargs):
return "post"
def parse_result_ok(self, xml_content):
"""Given an xml content as string, returns a deposit dict.
"""
return _parse_with_filter(
xml_content,
keys=[
"deposit_id",
"deposit_status",
"deposit_status_detail",
"deposit_date",
],
)
def _compute_information(
self, collection, filepath, in_progress, slug, is_archive=True
):
"""Given a filepath, compute necessary information on that file.
Args:
filepath (str): Path to a file
is_archive (bool): is it an archive or not?
Returns:
dict with keys:
'content-type': content type associated
'md5sum': md5 sum
'filename': filename
"""
filename = os.path.basename(filepath)
if is_archive:
md5sum = hashlib.md5(open(filepath, "rb").read()).hexdigest()
extension = filename.split(".")[-1]
if "zip" in extension:
content_type = "application/zip"
else:
content_type = "application/x-tar"
else:
content_type = None
md5sum = None
return {
"slug": slug,
"in_progress": in_progress,
"content-type": content_type,
"md5sum": md5sum,
"filename": filename,
"filepath": filepath,
}
def compute_information(
self, collection, filepath, in_progress, slug, is_archive=True, **kwargs
):
info = self._compute_information(
collection, filepath, in_progress, slug, is_archive=is_archive
)
info["headers"] = self.compute_headers(info)
return info
def do_execute(self, method, url, info):
with open(info["filepath"], "rb") as f:
return self.do(method, url, data=f, headers=info["headers"])
class CreateArchiveDepositClient(BaseCreateDepositClient):
"""Post an archive (binary) deposit client."""
def compute_headers(self, info):
return {
"SLUG": info["slug"],
"CONTENT_MD5": info["md5sum"],
"IN-PROGRESS": str(info["in_progress"]),
"CONTENT-TYPE": info["content-type"],
"CONTENT-DISPOSITION": "attachment; filename=%s" % (info["filename"],),
}
class UpdateArchiveDepositClient(CreateArchiveDepositClient):
"""Update (add/replace) an archive (binary) deposit client."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return "/%s/%s/media/" % (collection, deposit_id)
def compute_method(self, *args, replace=False, **kwargs):
return "put" if replace else "post"
class CreateMetadataDepositClient(BaseCreateDepositClient):
"""Post a metadata deposit client."""
def compute_headers(self, info):
return {
"SLUG": info["slug"],
"IN-PROGRESS": str(info["in_progress"]),
"CONTENT-TYPE": "application/atom+xml;type=entry",
}
class UpdateMetadataDepositClient(CreateMetadataDepositClient):
"""Update (add/replace) a metadata deposit client."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return "/%s/%s/metadata/" % (collection, deposit_id)
def compute_method(self, *args, replace=False, **kwargs):
return "put" if replace else "post"
class CreateMultipartDepositClient(BaseCreateDepositClient):
"""Create a multipart deposit client."""
def _multipart_info(self, info, info_meta):
files = [
(
"file",
(info["filename"], open(info["filepath"], "rb"), info["content-type"]),
),
(
"atom",
(
info_meta["filename"],
open(info_meta["filepath"], "rb"),
"application/atom+xml",
),
),
]
headers = {
"SLUG": info["slug"],
"CONTENT_MD5": info["md5sum"],
"IN-PROGRESS": str(info["in_progress"]),
}
return files, headers
def compute_information(
self, collection, archive, metadata, in_progress, slug, **kwargs
):
info = self._compute_information(collection, archive, in_progress, slug)
info_meta = self._compute_information(
collection, metadata, in_progress, slug, is_archive=False
)
files, headers = self._multipart_info(info, info_meta)
return {"files": files, "headers": headers}
def do_execute(self, method, url, info):
return self.do(method, url, files=info["files"], headers=info["headers"])
class UpdateMultipartDepositClient(CreateMultipartDepositClient):
"""Update a multipart deposit client."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return "/%s/%s/metadata/" % (collection, deposit_id)
def compute_method(self, *args, replace=False, **kwargs):
return "put" if replace else "post"
class PublicApiDepositClient(BaseApiDepositClient):
"""Public api deposit client."""
def service_document(self):
"""Retrieve service document endpoint's information."""
return ServiceDocumentDepositClient(self.config).execute()
def deposit_status(self, collection, deposit_id):
"""Retrieve status information on a deposit."""
return StatusDepositClient(self.config).execute(collection, deposit_id)
def deposit_create(
self, collection, slug, archive=None, metadata=None, in_progress=False
):
"""Create a new deposit (archive, metadata, both as multipart)."""
if archive and not metadata:
return CreateArchiveDepositClient(self.config).execute(
collection, archive, in_progress, slug
)
elif not archive and metadata:
return CreateMetadataDepositClient(self.config).execute(
collection, metadata, in_progress, slug, is_archive=False
)
else:
return CreateMultipartDepositClient(self.config).execute(
collection, archive, metadata, in_progress, slug
)
def deposit_update(
self,
collection,
deposit_id,
slug,
archive=None,
metadata=None,
in_progress=False,
replace=False,
):
"""Update (add/replace) existing deposit (archive, metadata, both)."""
r = self.deposit_status(collection, deposit_id)
if "error" in r:
return r
status = r["deposit_status"]
if status != "partial":
return {
"error": "You can only act on deposit with status 'partial'",
"detail": "The deposit %s has status '%s'" % (deposit_id, status),
"deposit_status": status,
"deposit_id": deposit_id,
}
if archive and not metadata:
r = UpdateArchiveDepositClient(self.config).execute(
collection,
archive,
in_progress,
slug,
deposit_id=deposit_id,
replace=replace,
)
elif not archive and metadata:
r = UpdateMetadataDepositClient(self.config).execute(
collection,
metadata,
in_progress,
slug,
deposit_id=deposit_id,
replace=replace,
)
else:
r = UpdateMultipartDepositClient(self.config).execute(
collection,
archive,
metadata,
in_progress,
slug,
deposit_id=deposit_id,
replace=replace,
)
if "error" in r:
return r
return self.deposit_status(collection, deposit_id)
diff --git a/swh/deposit/loader/checker.py b/swh/deposit/loader/checker.py
index bb054529..c9d9a9c2 100644
--- a/swh/deposit/loader/checker.py
+++ b/swh/deposit/loader/checker.py
@@ -1,51 +1,45 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import os
import logging
-from typing import Mapping
+from typing import Any, Dict
-from swh.core.config import SWHConfig
+from swh.core import config
from swh.deposit.client import PrivateApiDepositClient
logger = logging.getLogger(__name__)
-class DepositChecker(SWHConfig):
+class DepositChecker:
"""Deposit checker implementation.
Trigger deposit's checks through the private api.
"""
- CONFIG_BASE_FILENAME = "deposit/checker"
-
- DEFAULT_CONFIG = {
- "deposit": ("dict", {"url": "http://localhost:5006/1/private/", "auth": {},})
- }
-
- def __init__(self, config=None):
- super().__init__()
- if config is None:
- self.config = self.parse_config_file()
- else:
- self.config = config
+ def __init__(self):
+ config_file = os.environ["SWH_CONFIG_FILENAME"]
+ self.config: Dict[str, Any] = config.read_raw_config(
+ config.config_basepath(config_file)
+ )
self.client = PrivateApiDepositClient(config=self.config["deposit"])
- def check(self, collection: str, deposit_id: str) -> Mapping[str, str]:
+ def check(self, collection: str, deposit_id: str) -> Dict[str, str]:
status = None
deposit_check_url = f"/{collection}/{deposit_id}/check/"
logger.debug("deposit-check-url: %s", deposit_check_url)
try:
r = self.client.check(deposit_check_url)
logger.debug("Check result: %s", r)
status = "eventful" if r == "verified" else "failed"
except Exception:
logger.exception("Failure during check on '%s'", deposit_check_url)
status = "failed"
logger.debug("Check status: %s", status)
return {"status": status}
diff --git a/swh/deposit/tests/loader/conftest.py b/swh/deposit/tests/loader/conftest.py
index aa1bb4b2..d4642852 100644
--- a/swh/deposit/tests/loader/conftest.py
+++ b/swh/deposit/tests/loader/conftest.py
@@ -1,56 +1,37 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
-import os
import pytest
-import yaml
from functools import partial
from swh.core.pytest_plugin import get_response_cb
from swh.deposit.loader.checker import DepositChecker
@pytest.fixture
-def swh_config(tmp_path, swh_storage_postgresql, monkeypatch):
- storage_config = {
- "url": "https://deposit.softwareheritage.org/",
- "storage": {
- "cls": "local",
- "args": {
- "db": swh_storage_postgresql.dsn,
- "objstorage": {"cls": "memory", "args": {}},
- },
- },
+def deposit_config(tmp_path):
+ return {
+ "deposit": {
+ "url": "https://deposit.softwareheritage.org/1/private/",
+ "auth": {},
+ }
}
- conffile = os.path.join(tmp_path, "deposit.yml")
- with open(conffile, "w") as f:
- f.write(yaml.dump(storage_config))
- monkeypatch.setenv("SWH_CONFIG_FILENAME", conffile)
- return conffile
-
@pytest.fixture
-def deposit_checker():
- return DepositChecker(
- config={
- "deposit": {
- "url": "https://deposit.softwareheritage.org/1/private/",
- "auth": {},
- }
- }
- )
+def deposit_checker(deposit_config_path):
+ return DepositChecker()
@pytest.fixture
def requests_mock_datadir(datadir, requests_mock_datadir):
"""Override default behavior to deal with put method
"""
cb = partial(get_response_cb, datadir=datadir)
requests_mock_datadir.put(re.compile("https://"), body=cb)
return requests_mock_datadir
diff --git a/swh/deposit/tests/loader/test_checker.py b/swh/deposit/tests/loader/test_checker.py
index c299b3bd..60d451ef 100644
--- a/swh/deposit/tests/loader/test_checker.py
+++ b/swh/deposit/tests/loader/test_checker.py
@@ -1,32 +1,32 @@
-# Copyright (C) 2017-2019 The Software Heritage developers
+# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from unittest.mock import patch
-def test_check_deposit_ready(swh_config, requests_mock_datadir, deposit_checker):
+def test_checker_deposit_ready(requests_mock_datadir, deposit_checker):
"""Check on a valid 'deposited' deposit should result in 'verified'
"""
actual_result = deposit_checker.check(collection="test", deposit_id=1)
assert actual_result == {"status": "eventful"}
-def test_check_deposit_rejected(swh_config, requests_mock_datadir, deposit_checker):
+def test_checker_deposit_rejected(requests_mock_datadir, deposit_checker):
"""Check on invalid 'deposited' deposit should result in 'rejected'
"""
actual_result = deposit_checker.check(collection="test", deposit_id=2)
assert actual_result == {"status": "failed"}
@patch("swh.deposit.client.requests.get")
-def test_check_deposit_rejected_exception(mock_requests, swh_config, deposit_checker):
+def test_checker_deposit_rejected_exception(mock_requests, deposit_checker):
"""Check on invalid 'deposited' deposit should result in 'rejected'
"""
mock_requests.side_effect = ValueError("simulated problem when checking")
actual_result = deposit_checker.check(collection="test", deposit_id=3)
assert actual_result == {"status": "failed"}
diff --git a/swh/deposit/tests/loader/test_tasks.py b/swh/deposit/tests/loader/test_tasks.py
index 90e20e6a..5f85ebcd 100644
--- a/swh/deposit/tests/loader/test_tasks.py
+++ b/swh/deposit/tests/loader/test_tasks.py
@@ -1,75 +1,75 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
@pytest.mark.db
-def test_deposit_check_eventful(
- mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker
+def test_task_check_eventful(
+ mocker, deposit_config_path, swh_scheduler_celery_app, swh_scheduler_celery_worker
):
"""Successful check should make the check succeed
"""
client = mocker.patch("swh.deposit.loader.checker.PrivateApiDepositClient.check")
client.return_value = "verified"
collection = "collection"
deposit_id = 42
res = swh_scheduler_celery_app.send_task(
"swh.deposit.loader.tasks.ChecksDepositTsk", args=[collection, deposit_id]
)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "eventful"}
client.assert_called_once_with(f"/{collection}/{deposit_id}/check/")
@pytest.mark.db
-def test_deposit_check_failure(
- mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker
+def test_task_check_failure(
+ mocker, deposit_config_path, swh_scheduler_celery_app, swh_scheduler_celery_worker
):
"""Unverified check status should make the check fail
"""
client = mocker.patch("swh.deposit.loader.checker.PrivateApiDepositClient.check")
client.return_value = "not-verified" # will make the status "failed"
collection = "collec"
deposit_id = 666
res = swh_scheduler_celery_app.send_task(
"swh.deposit.loader.tasks.ChecksDepositTsk", args=[collection, deposit_id]
)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "failed"}
client.assert_called_once_with(f"/{collection}/{deposit_id}/check/")
@pytest.mark.db
-def test_deposit_check_3(
- mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker
+def test_task_check_3(
+ mocker, deposit_config_path, swh_scheduler_celery_app, swh_scheduler_celery_worker
):
"""Unexpected failures should fail the check
"""
client = mocker.patch("swh.deposit.loader.checker.PrivateApiDepositClient.check")
client.side_effect = ValueError("unexpected failure will make it fail")
collection = "another-collection"
deposit_id = 999
res = swh_scheduler_celery_app.send_task(
"swh.deposit.loader.tasks.ChecksDepositTsk", args=[collection, deposit_id]
)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "failed"}
client.assert_called_once_with(f"/{collection}/{deposit_id}/check/")

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 7:59 AM (10 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214162

Event Timeline