Page MenuHomeSoftware Heritage

D7272.id26349.diff
No OneTemporary

D7272.id26349.diff

diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,7 +3,6 @@
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
# remote storage API server
-aiohttp >= 3
click
requests
psycopg2
diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py
--- a/swh/objstorage/api/client.py
+++ b/swh/objstorage/api/client.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2020 The Software Heritage developers
+# Copyright (C) 2015-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -7,12 +7,13 @@
from swh.core.utils import iter_chunks
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError, ObjStorageAPIError
+from swh.objstorage.interface import ObjStorageInterface
from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT
SHA1_SIZE = 20
-class RemoteObjStorage:
+class RemoteObjStorage(RPCClient):
"""Proxy to a remote object storage.
This class allows to connect to an object storage server via
@@ -25,60 +26,19 @@
"""
- def __init__(self, **kwargs):
- self._proxy = RPCClient(
- api_exception=ObjStorageAPIError,
- reraise_exceptions=[ObjNotFoundError, Error],
- **kwargs,
- )
-
- def check_config(self, *, check_write):
- return self._proxy.post("check_config", {"check_write": check_write})
-
- def __contains__(self, obj_id):
- return self._proxy.post("content/contains", {"obj_id": obj_id})
-
- def add(self, content, obj_id=None, check_presence=True):
- return self._proxy.post(
- "content/add",
- {"content": content, "obj_id": obj_id, "check_presence": check_presence},
- )
-
- def add_batch(self, contents, check_presence=True):
- return self._proxy.post(
- "content/add/batch",
- {"contents": contents, "check_presence": check_presence,},
- )
+ api_exception = ObjStorageAPIError
+ reraise_exceptions = [ObjNotFoundError, Error]
+ backend_class = ObjStorageInterface
def restore(self, content, obj_id=None, *args, **kwargs):
return self.add(content, obj_id, check_presence=False)
- def get(self, obj_id):
- return self._proxy.post("content/get", {"obj_id": obj_id})
-
- def get_batch(self, obj_ids):
- return self._proxy.post("content/get/batch", {"obj_ids": obj_ids})
-
- def check(self, obj_id):
- return self._proxy.post("content/check", {"obj_id": obj_id})
-
- def delete(self, obj_id):
- # deletion permission are checked server-side
- return self._proxy.post("content/delete", {"obj_id": obj_id})
-
- # Management methods
-
- def get_random(self, batch_size):
- return self._proxy.post("content/get/random", {"batch_size": batch_size})
-
- # Streaming methods
-
def add_stream(self, content_iter, obj_id, check_presence=True):
raise NotImplementedError
def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE):
obj_id = hashutil.hash_to_hex(obj_id)
- return self._proxy.get_stream(
+ return self._get_stream(
"content/get_stream/{}".format(obj_id), chunk_size=chunk_size
)
@@ -90,5 +50,5 @@
if last_obj_id:
params["last_obj_id"] = hashutil.hash_to_hex(last_obj_id)
yield from iter_chunks(
- self._proxy.get_stream("content", params=params), chunk_size=SHA1_SIZE
+ self._get_stream("content", params=params), chunk_size=SHA1_SIZE
)
diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py
--- a/swh/objstorage/api/server.py
+++ b/swh/objstorage/api/server.py
@@ -1,163 +1,122 @@
-# Copyright (C) 2015-2020 The Software Heritage developers
+# Copyright (C) 2015-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import contextlib
+import functools
+import logging
import os
-import aiohttp.web
+from flask import request
-from swh.core.api.asynchronous import RPCServerApp, decode_request
-from swh.core.api.asynchronous import encode_data_server as encode_data
+from swh.core.api import RPCServerApp
+from swh.core.api import encode_data_server as encode_data
+from swh.core.api import error_handler
from swh.core.config import read as config_read
from swh.core.statsd import statsd
-from swh.core.utils import grouper
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.factory import get_objstorage
+from swh.objstorage.factory import get_objstorage as get_swhobjstorage
+from swh.objstorage.interface import ObjStorageInterface
from swh.objstorage.objstorage import DEFAULT_LIMIT
def timed(f):
- async def w(*a, **kw):
+ @functools.wraps(f)
+ def w(*a, **kw):
with statsd.timed(
"swh_objstorage_request_duration_seconds", tags={"endpoint": f.__name__}
):
- return await f(*a, **kw)
+ return f(*a, **kw)
return w
-@timed
-async def index(request):
- return aiohttp.web.Response(body="SWH Objstorage API server")
-
-
-@timed
-async def check_config(request):
- req = await decode_request(request)
- return encode_data(request.app["objstorage"].check_config(**req))
-
-
-@timed
-async def contains(request):
- req = await decode_request(request)
- return encode_data(request.app["objstorage"].__contains__(**req))
-
-
-@timed
-async def add_bytes(request):
- req = await decode_request(request)
- statsd.increment(
- "swh_objstorage_in_bytes_total",
- len(req["content"]),
- tags={"endpoint": "add_bytes"},
- )
- return encode_data(request.app["objstorage"].add(**req))
-
+@contextlib.contextmanager
+def timed_context(f_name):
+ with statsd.timed(
+ "swh_objstorage_request_duration_seconds", tags={"endpoint": f_name}
+ ):
+ yield
-@timed
-async def add_batch(request):
- req = await decode_request(request)
- return encode_data(request.app["objstorage"].add_batch(**req))
+def get_objstorage():
+ global objstorage
+ if not objstorage:
+ objstorage = get_swhobjstorage(**app.config["objstorage"])
-@timed
-async def get_bytes(request):
- req = await decode_request(request)
+ return objstorage
- ret = request.app["objstorage"].get(**req)
- statsd.increment(
- "swh_objstorage_out_bytes_total", len(ret), tags={"endpoint": "get_bytes"}
- )
- return encode_data(ret)
+class ObjStorageServerApp(RPCServerApp):
+ client_exception_classes = (ObjNotFoundError, Error)
+ method_decorators = [timed]
+ def pre_add(self, kw):
+ """Called before the 'add' method."""
+ statsd.increment(
+ "swh_objstorage_in_bytes_total",
+ len(kw["content"]),
+ tags={"endpoint": "add_bytes"},
+ )
-@timed
-async def get_batch(request):
- req = await decode_request(request)
- return encode_data(request.app["objstorage"].get_batch(**req))
+ def post_get(self, ret, kw):
+ """Called after the 'get' method."""
+ statsd.increment(
+ "swh_objstorage_out_bytes_total", len(ret), tags={"endpoint": "get_bytes"}
+ )
-@timed
-async def check(request):
- req = await decode_request(request)
- return encode_data(request.app["objstorage"].check(**req))
+app = ObjStorageServerApp(
+ __name__, backend_class=ObjStorageInterface, backend_factory=get_objstorage,
+)
+objstorage = None
-@timed
-async def delete(request):
- req = await decode_request(request)
- return encode_data(request.app["objstorage"].delete(**req))
+@app.errorhandler(Error)
+def argument_error_handler(exception):
+ return error_handler(exception, encode_data, status_code=400)
-# Management methods
+@app.errorhandler(Exception)
+def my_error_handler(exception):
+ return error_handler(exception, encode_data)
+@app.route("/")
@timed
-async def get_random_contents(request):
- req = await decode_request(request)
- return encode_data(request.app["objstorage"].get_random(**req))
+def index():
+ return "SWH Objstorage API server"
# Streaming methods
-@timed
-async def get_stream(request):
- hex_id = request.match_info["hex_id"]
+@app.route("/content/get_stream/<hex_id>")
+def get_stream(hex_id):
obj_id = hashutil.hash_to_bytes(hex_id)
- response = aiohttp.web.StreamResponse()
- await response.prepare(request)
- for chunk in request.app["objstorage"].get_stream(obj_id, 2 << 20):
- await response.write(chunk)
- await response.write_eof()
- return response
+ def generate():
+ with timed_context("get_stream"):
+ yield from objstorage.get_stream(obj_id, 2 << 20)
-@timed
-async def list_content(request):
- last_obj_id = request.query.get("last_obj_id")
+ return app.response_class(generate())
+
+
+@app.route("/content")
+def list_content():
+ last_obj_id = request.args.get("last_obj_id")
if last_obj_id:
last_obj_id = bytes.fromhex(last_obj_id)
- limit = int(request.query.get("limit", DEFAULT_LIMIT))
- response = aiohttp.web.StreamResponse()
- response.enable_chunked_encoding()
- await response.prepare(request)
- for group in grouper(
- request.app["objstorage"].list_content(last_obj_id, limit=limit), 100
- ):
- await response.write(b"".join(group))
- await response.write_eof()
- return response
-
+ limit = int(request.args.get("limit", DEFAULT_LIMIT))
-def make_app(config):
- """Initialize the remote api application.
+ def generate():
+ yield b""
+ with timed_context("get_stream"):
+ yield from objstorage.list_content(last_obj_id, limit=limit)
- """
- client_max_size = config.get("client_max_size", 1024 * 1024 * 1024)
- app = RPCServerApp(client_max_size=client_max_size)
- app.client_exception_classes = (ObjNotFoundError, Error)
-
- # retro compatibility configuration settings
- app["config"] = config
- app["objstorage"] = get_objstorage(**config["objstorage"])
-
- app.router.add_route("GET", "/", index)
- app.router.add_route("POST", "/check_config", check_config)
- app.router.add_route("POST", "/content/contains", contains)
- app.router.add_route("POST", "/content/add", add_bytes)
- app.router.add_route("POST", "/content/add/batch", add_batch)
- app.router.add_route("POST", "/content/get", get_bytes)
- app.router.add_route("POST", "/content/get/batch", get_batch)
- app.router.add_route("POST", "/content/get/random", get_random_contents)
- app.router.add_route("POST", "/content/check", check)
- app.router.add_route("POST", "/content/delete", delete)
- app.router.add_route("GET", "/content", list_content)
- app.router.add_route("GET", "/content/get_stream/{hex_id}", get_stream)
- return app
+ return app.response_class(generate())
def load_and_check_config(config_file):
@@ -230,8 +189,10 @@
"""
config_file = os.environ.get("SWH_CONFIG_FILENAME")
- config = load_and_check_config(config_file)
- return make_app(config=config)
+ app.config = load_and_check_config(config_file)
+ handler = logging.StreamHandler()
+ app.logger.addHandler(handler)
+ return app
if __name__ == "__main__":
diff --git a/swh/objstorage/cli.py b/swh/objstorage/cli.py
--- a/swh/objstorage/cli.py
+++ b/swh/objstorage/cli.py
@@ -66,20 +66,24 @@
show_default=True,
help="Binding port of the server",
)
+@click.option(
+ "--debug/--no-debug",
+ default=True,
+ help="Indicates if the server should run in debug mode",
+)
@click.pass_context
-def serve(ctx, host, port):
+def serve(ctx, host, port, debug):
"""Run a standalone objstorage server.
This is not meant to be run on production systems.
"""
- import aiohttp.web
-
- from swh.objstorage.api.server import make_app, validate_config
+ from swh.objstorage.api.server import app, validate_config
- app = make_app(validate_config(ctx.obj["config"]))
- if ctx.obj["log_level"] == "DEBUG":
- app.update(debug=True)
- aiohttp.web.run_app(app, host=host, port=int(port))
+ if "log_level" in ctx.obj:
+ logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"])
+ validate_config(ctx.obj["config"])
+ app.config.update(ctx.obj["config"])
+ app.run(host, port=int(port), debug=bool(debug))
@objstorage_cli_group.command("import")
diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/interface.py
copy from swh/objstorage/objstorage.py
copy to swh/objstorage/interface.py
--- a/swh/objstorage/objstorage.py
+++ b/swh/objstorage/interface.py
@@ -1,78 +1,13 @@
-# Copyright (C) 2015-2020 The Software Heritage developers
+# Copyright (C) 2015-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import abc
-import bz2
-from itertools import dropwhile, islice
-import lzma
-from typing import Dict
-import zlib
+from swh.core.api import remote_api_endpoint
+from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT
-from swh.model import hashutil
-from .exc import ObjNotFoundError
-
-ID_HASH_ALGO = "sha1"
-ID_HASH_LENGTH = 40 # Size in bytes of the hash hexadecimal representation.
-DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024 # Size in bytes of the streaming chunks
-DEFAULT_LIMIT = 10000
-
-
-def compute_hash(content):
- """Compute the content's hash.
-
- Args:
- content (bytes): The raw content to hash
- hash_name (str): Hash's name (default to ID_HASH_ALGO)
-
- Returns:
- The ID_HASH_ALGO for the content
-
- """
- return (
- hashutil.MultiHash.from_data(content, hash_names=[ID_HASH_ALGO],)
- .digest()
- .get(ID_HASH_ALGO)
- )
-
-
-class NullCompressor:
- def compress(self, data):
- return data
-
- def flush(self):
- return b""
-
-
-class NullDecompressor:
- def decompress(self, data):
- return data
-
- @property
- def unused_data(self):
- return b""
-
-
-decompressors = {
- "bz2": bz2.BZ2Decompressor,
- "lzma": lzma.LZMADecompressor,
- "gzip": lambda: zlib.decompressobj(wbits=31),
- "zlib": zlib.decompressobj,
- "none": NullDecompressor,
-}
-
-compressors = {
- "bz2": bz2.BZ2Compressor,
- "lzma": lzma.LZMACompressor,
- "gzip": lambda: zlib.compressobj(wbits=31),
- "zlib": zlib.compressobj,
- "none": NullCompressor,
-}
-
-
-class ObjStorage(metaclass=abc.ABCMeta):
+class ObjStorageInterface:
""" High-level API to manipulate the Software Heritage object storage.
Conceptually, the object storage offers the following methods:
@@ -92,20 +27,13 @@
Some of the methods have available streaming equivalents:
- - add_stream() same as add() but with a chunked iterator
- - restore_stream() same as add_stream() but erase already existing content
- get_stream() same as get() but returns a chunked iterator
Each implementation of this interface can have a different behavior and
its own way to store the contents.
"""
- def __init__(self, *, allow_delete=False, **kwargs):
- # A more complete permission system could be used in place of that if
- # it becomes needed
- self.allow_delete = allow_delete
-
- @abc.abstractmethod
+ @remote_api_endpoint("check_config")
def check_config(self, *, check_write):
"""Check whether the object storage is properly configured.
@@ -116,10 +44,10 @@
Returns:
True if the configuration check worked, an exception if it didn't.
"""
- pass
+ ...
- @abc.abstractmethod
- def __contains__(self, obj_id, *args, **kwargs):
+ @remote_api_endpoint("content/contains")
+ def __contains__(self, obj_id):
"""Indicate if the given object is present in the storage.
Args:
@@ -130,10 +58,10 @@
storage.
"""
- pass
+ ...
- @abc.abstractmethod
- def add(self, content, obj_id=None, check_presence=True, *args, **kwargs):
+ @remote_api_endpoint("content/add")
+ def add(self, content, obj_id=None, check_presence=True):
"""Add a new object to the object storage.
Args:
@@ -149,9 +77,10 @@
the id (bytes) of the object into the storage.
"""
- pass
+ ...
- def add_batch(self, contents, check_presence=True) -> Dict:
+ @remote_api_endpoint("content/add/batch")
+ def add_batch(self, contents, check_presence=True):
"""Add a batch of new objects to the object storage.
Args:
@@ -162,16 +91,9 @@
count of bytes object)
"""
- summary = {"object:add": 0, "object:add:bytes": 0}
- for obj_id, content in contents.items():
- if check_presence and obj_id in self:
- continue
- self.add(content, obj_id, check_presence=False)
- summary["object:add"] += 1
- summary["object:add:bytes"] += len(content)
- return summary
-
- def restore(self, content, obj_id=None, *args, **kwargs):
+ ...
+
+ def restore(self, content, obj_id=None):
"""Restore a content that have been corrupted.
This function is identical to add but does not check if
@@ -187,11 +109,10 @@
the fly.
"""
- # check_presence to false will erase the potential previous content.
- return self.add(content, obj_id, check_presence=False)
+ ...
- @abc.abstractmethod
- def get(self, obj_id, *args, **kwargs):
+ @remote_api_endpoint("content/get")
+ def get(self, obj_id):
"""Retrieve the content of a given object.
Args:
@@ -204,9 +125,10 @@
ObjNotFoundError: if the requested object is missing.
"""
- pass
+ ...
- def get_batch(self, obj_ids, *args, **kwargs):
+ @remote_api_endpoint("content/get/batch")
+ def get_batch(self, obj_ids):
"""Retrieve objects' raw content in bulk from storage.
Note: This function does have a default implementation in
@@ -225,14 +147,10 @@
one content will not cancel the whole request.
"""
- for obj_id in obj_ids:
- try:
- yield self.get(obj_id)
- except ObjNotFoundError:
- yield None
-
- @abc.abstractmethod
- def check(self, obj_id, *args, **kwargs):
+ ...
+
+ @remote_api_endpoint("content/check")
+ def check(self, obj_id):
"""Perform an integrity check for a given object.
Verify that the file object is in place and that the content matches
@@ -246,10 +164,10 @@
Error: if the request object is corrupted.
"""
- pass
+ ...
- @abc.abstractmethod
- def delete(self, obj_id, *args, **kwargs):
+ @remote_api_endpoint("content/delete")
+ def delete(self, obj_id):
"""Delete an object.
Args:
@@ -259,12 +177,12 @@
ObjNotFoundError: if the requested object is missing.
"""
- if not self.allow_delete:
- raise PermissionError("Delete is not allowed.")
+ ...
# Management methods
- def get_random(self, batch_size, *args, **kwargs):
+ @remote_api_endpoint("content/get/random")
+ def get_random(self, batch_size):
"""Get random ids of existing contents.
This method is used in order to get random ids to perform
@@ -278,46 +196,10 @@
current object storage.
"""
- pass
+ ...
# Streaming methods
- def add_stream(self, content_iter, obj_id, check_presence=True):
- """Add a new object to the object storage using streaming.
-
- This function is identical to add() except it takes a generator that
- yields the chunked content instead of the whole content at once.
-
- Args:
- content (bytes): chunked generator that yields the object's raw
- content to add in storage.
- obj_id (bytes): object identifier
- check_presence (bool): indicate if the presence of the
- content should be verified before adding the file.
-
- Returns:
- the id (bytes) of the object into the storage.
-
- """
- raise NotImplementedError
-
- def restore_stream(self, content_iter, obj_id=None):
- """Restore a content that have been corrupted using streaming.
-
- This function is identical to restore() except it takes a generator
- that yields the chunked content instead of the whole content at once.
- The default implementation provided by the current class is
- suitable for most cases.
-
- Args:
- content (bytes): chunked generator that yields the object's raw
- content to add in storage.
- obj_id (bytes): object identifier
-
- """
- # check_presence to false will erase the potential previous content.
- return self.add_stream(content_iter, obj_id, check_presence=False)
-
def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE):
"""Retrieve the content of a given object as a chunked iterator.
@@ -331,7 +213,10 @@
ObjNotFoundError: if the requested object is missing.
"""
- raise NotImplementedError
+ ...
+
+ def __iter__(self):
+ ...
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
"""Generates known object ids.
@@ -344,7 +229,4 @@
Generates:
obj_id (bytes): object ids.
"""
- it = iter(self)
- if last_obj_id:
- it = dropwhile(lambda x: x <= last_obj_id, it)
- return islice(it, limit)
+ ...
diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py
--- a/swh/objstorage/objstorage.py
+++ b/swh/objstorage/objstorage.py
@@ -73,33 +73,6 @@
class ObjStorage(metaclass=abc.ABCMeta):
- """ High-level API to manipulate the Software Heritage object storage.
-
- Conceptually, the object storage offers the following methods:
-
- - check_config() check if the object storage is properly configured
- - __contains__() check if an object is present, by object id
- - add() add a new object, returning an object id
- - restore() same as add() but erase an already existed content
- - get() retrieve the content of an object, by object id
- - check() check the integrity of an object, by object id
- - delete() remove an object
-
- And some management methods:
-
- - get_random() get random object id of existing contents (used for the
- content integrity checker).
-
- Some of the methods have available streaming equivalents:
-
- - add_stream() same as add() but with a chunked iterator
- - restore_stream() same as add_stream() but erase already existing content
- - get_stream() same as get() but returns a chunked iterator
-
- Each implementation of this interface can have a different behavior and
- its own way to store the contents.
- """
-
def __init__(self, *, allow_delete=False, **kwargs):
# A more complete permission system could be used in place of that if
# it becomes needed
@@ -107,61 +80,17 @@
@abc.abstractmethod
def check_config(self, *, check_write):
- """Check whether the object storage is properly configured.
-
- Args:
- check_write (bool): if True, check if writes to the object storage
- can succeed.
-
- Returns:
- True if the configuration check worked, an exception if it didn't.
- """
pass
@abc.abstractmethod
def __contains__(self, obj_id, *args, **kwargs):
- """Indicate if the given object is present in the storage.
-
- Args:
- obj_id (bytes): object identifier.
-
- Returns:
- True if and only if the object is present in the current object
- storage.
-
- """
pass
@abc.abstractmethod
def add(self, content, obj_id=None, check_presence=True, *args, **kwargs):
- """Add a new object to the object storage.
-
- Args:
- content (bytes): object's raw content to add in storage.
- obj_id (bytes): checksum of [bytes] using [ID_HASH_ALGO]
- algorithm. When given, obj_id will be trusted to match
- the bytes. If missing, obj_id will be computed on the
- fly.
- check_presence (bool): indicate if the presence of the
- content should be verified before adding the file.
-
- Returns:
- the id (bytes) of the object into the storage.
-
- """
pass
def add_batch(self, contents, check_presence=True) -> Dict:
- """Add a batch of new objects to the object storage.
-
- Args:
- contents: mapping from obj_id to object contents
-
- Returns:
- the summary of objects added to the storage (count of object,
- count of bytes object)
-
- """
summary = {"object:add": 0, "object:add:bytes": 0}
for obj_id, content in contents.items():
if check_presence and obj_id in self:
@@ -172,59 +101,14 @@
return summary
def restore(self, content, obj_id=None, *args, **kwargs):
- """Restore a content that have been corrupted.
-
- This function is identical to add but does not check if
- the object id is already in the file system.
- The default implementation provided by the current class is
- suitable for most cases.
-
- Args:
- content (bytes): object's raw content to add in storage
- obj_id (bytes): checksum of `bytes` as computed by
- ID_HASH_ALGO. When given, obj_id will be trusted to
- match bytes. If missing, obj_id will be computed on
- the fly.
-
- """
# check_presence to false will erase the potential previous content.
return self.add(content, obj_id, check_presence=False)
@abc.abstractmethod
def get(self, obj_id, *args, **kwargs):
- """Retrieve the content of a given object.
-
- Args:
- obj_id (bytes): object id.
-
- Returns:
- the content of the requested object as bytes.
-
- Raises:
- ObjNotFoundError: if the requested object is missing.
-
- """
pass
def get_batch(self, obj_ids, *args, **kwargs):
- """Retrieve objects' raw content in bulk from storage.
-
- Note: This function does have a default implementation in
- ObjStorage that is suitable for most cases.
-
- For object storages that needs to do the minimal number of
- requests possible (ex: remote object storages), that method
- can be overridden to perform a more efficient operation.
-
- Args:
- obj_ids ([bytes]: list of object ids.
-
- Returns:
- list of resulting contents, or None if the content could
- not be retrieved. Do not raise any exception as a fail for
- one content will not cancel the whole request.
-
- """
for obj_id in obj_ids:
try:
yield self.get(obj_id)
@@ -233,117 +117,27 @@
@abc.abstractmethod
def check(self, obj_id, *args, **kwargs):
- """Perform an integrity check for a given object.
-
- Verify that the file object is in place and that the content matches
- the object id.
-
- Args:
- obj_id (bytes): object identifier.
-
- Raises:
- ObjNotFoundError: if the requested object is missing.
- Error: if the request object is corrupted.
-
- """
pass
@abc.abstractmethod
def delete(self, obj_id, *args, **kwargs):
- """Delete an object.
-
- Args:
- obj_id (bytes): object identifier.
-
- Raises:
- ObjNotFoundError: if the requested object is missing.
-
- """
if not self.allow_delete:
raise PermissionError("Delete is not allowed.")
# Management methods
def get_random(self, batch_size, *args, **kwargs):
- """Get random ids of existing contents.
-
- This method is used in order to get random ids to perform
- content integrity verifications on random contents.
-
- Args:
- batch_size (int): Number of ids that will be given
-
- Yields:
- An iterable of ids (bytes) of contents that are in the
- current object storage.
-
- """
pass
# Streaming methods
def add_stream(self, content_iter, obj_id, check_presence=True):
- """Add a new object to the object storage using streaming.
-
- This function is identical to add() except it takes a generator that
- yields the chunked content instead of the whole content at once.
-
- Args:
- content (bytes): chunked generator that yields the object's raw
- content to add in storage.
- obj_id (bytes): object identifier
- check_presence (bool): indicate if the presence of the
- content should be verified before adding the file.
-
- Returns:
- the id (bytes) of the object into the storage.
-
- """
raise NotImplementedError
- def restore_stream(self, content_iter, obj_id=None):
- """Restore a content that have been corrupted using streaming.
-
- This function is identical to restore() except it takes a generator
- that yields the chunked content instead of the whole content at once.
- The default implementation provided by the current class is
- suitable for most cases.
-
- Args:
- content (bytes): chunked generator that yields the object's raw
- content to add in storage.
- obj_id (bytes): object identifier
-
- """
- # check_presence to false will erase the potential previous content.
- return self.add_stream(content_iter, obj_id, check_presence=False)
-
def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE):
- """Retrieve the content of a given object as a chunked iterator.
-
- Args:
- obj_id (bytes): object id.
-
- Returns:
- the content of the requested object as bytes.
-
- Raises:
- ObjNotFoundError: if the requested object is missing.
-
- """
raise NotImplementedError
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
- """Generates known object ids.
-
- Args:
- last_obj_id (bytes): object id from which to iterate from
- (excluded).
- limit (int): max number of object ids to generate.
-
- Generates:
- obj_id (bytes): object ids.
- """
it = iter(self)
if last_obj_id:
it = dropwhile(lambda x: x <= last_obj_id, it)
diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py
--- a/swh/objstorage/tests/test_objstorage_api.py
+++ b/swh/objstorage/tests/test_objstorage_api.py
@@ -9,15 +9,13 @@
import pytest
-from swh.core.api.tests.server_testing import ServerTestFixtureAsync
-from swh.objstorage.api.server import make_app
+from swh.core.api.tests.server_testing import ServerTestFixture
+from swh.objstorage.api.server import app
from swh.objstorage.factory import get_objstorage
from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture
-class TestRemoteObjStorage(
- ServerTestFixtureAsync, ObjStorageTestFixture, unittest.TestCase
-):
+class TestRemoteObjStorage(ServerTestFixture, ObjStorageTestFixture, unittest.TestCase):
""" Test the remote archive API.
"""
@@ -33,7 +31,7 @@
"client_max_size": 8 * 1024 * 1024,
}
- self.app = make_app(self.config)
+ self.app = app
super().setUp()
self.storage = get_objstorage("remote", {"url": self.url()})

File Metadata

Mime Type
text/plain
Expires
Dec 19 2024, 4:09 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228280

Event Timeline