diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -8,11 +8,7 @@ from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError, ObjStorageAPIError from swh.objstorage.interface import ObjStorageInterface -from swh.objstorage.objstorage import ( - DEFAULT_CHUNK_SIZE, - DEFAULT_LIMIT, - ID_DIGEST_LENGTH, -) +from swh.objstorage.objstorage import DEFAULT_LIMIT, ID_DIGEST_LENGTH class RemoteObjStorage(RPCClient): @@ -35,15 +31,6 @@ def restore(self, content, obj_id=None): return self.add(content, obj_id, check_presence=False) - def add_stream(self, content_iter, obj_id, check_presence=True): - raise NotImplementedError - - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - obj_id = hashutil.hash_to_hex(obj_id) - return self._get_stream( - "content/get_stream/{}".format(obj_id), chunk_size=chunk_size - ) - def __iter__(self): yield from self.list_content() diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -15,7 +15,6 @@ from swh.core.api import error_handler from swh.core.config import read as config_read from swh.core.statsd import statsd -from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError from swh.objstorage.factory import get_objstorage as get_swhobjstorage from swh.objstorage.interface import ObjStorageInterface @@ -92,20 +91,6 @@ return "SWH Objstorage API server" -# Streaming methods - - -@app.route("/content/get_stream/") -def get_stream(hex_id): - obj_id = hashutil.hash_to_bytes(hex_id) - - def generate(): - with timed_context("get_stream"): - yield from get_objstorage().get_stream(obj_id, 2 << 20) - - return app.response_class(generate()) - - @app.route("/content") def list_content(): last_obj_id = request.args.get("last_obj_id") @@ -114,7 +99,7 @@ limit = int(request.args.get("limit", DEFAULT_LIMIT)) def generate(): - with timed_context("get_stream"): + with timed_context("list_content"): yield from get_objstorage().list_content(last_obj_id, limit=limit) return app.response_class(generate()) diff --git a/swh/objstorage/backends/generator.py b/swh/objstorage/backends/generator.py --- a/swh/objstorage/backends/generator.py +++ b/swh/objstorage/backends/generator.py @@ -1,10 +1,8 @@ -import functools -import io from itertools import count, islice, repeat import logging import random -from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT, ObjStorage +from swh.objstorage.objstorage import DEFAULT_LIMIT, ObjStorage logger = logging.getLogger(__name__) @@ -213,11 +211,6 @@ def delete(self, obj_id, *args, **kwargs): return True - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - data = io.BytesIO(next(self.content_generator)) - reader = functools.partial(data.read, chunk_size) - yield from iter(reader, b"") - def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): it = iter(self) if last_obj_id: diff --git a/swh/objstorage/backends/in_memory.py b/swh/objstorage/backends/in_memory.py --- a/swh/objstorage/backends/in_memory.py +++ b/swh/objstorage/backends/in_memory.py @@ -3,11 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import functools -import io - from swh.objstorage.exc import Error, ObjNotFoundError -from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, ObjStorage, compute_hash +from swh.objstorage.objstorage import ObjStorage, compute_hash class InMemoryObjStorage(ObjStorage): @@ -61,11 +58,3 @@ self.state.pop(obj_id) return True - - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - if obj_id not in self: - raise ObjNotFoundError(obj_id) - - data = io.BytesIO(self.state[obj_id]) - reader = functools.partial(data.read, chunk_size) - yield from iter(reader, b"") diff --git a/swh/objstorage/backends/pathslicing.py b/swh/objstorage/backends/pathslicing.py --- a/swh/objstorage/backends/pathslicing.py +++ b/swh/objstorage/backends/pathslicing.py @@ -14,7 +14,6 @@ from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError from swh.objstorage.objstorage import ( - DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT, ID_HASH_ALGO, ID_HEXDIGEST_LENGTH, @@ -338,48 +337,6 @@ yield lambda c: f.write(compressor.compress(c)) f.write(compressor.flush()) - def add_stream(self, content_iter, obj_id, check_presence=True): - """Add a new object to the object storage using streaming. - - This function is identical to add() except it takes a generator that - yields the chunked content instead of the whole content at once. - - Args: - content (bytes): chunked generator that yields the object's raw - content to add in storage. - obj_id (bytes): object identifier - check_presence (bool): indicate if the presence of the - content should be verified before adding the file. - - Returns: - the id (bytes) of the object into the storage. - - """ - if check_presence and obj_id in self: - return obj_id - - with self.chunk_writer(obj_id) as writer: - for chunk in content_iter: - writer(chunk) - - return obj_id - - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - if obj_id not in self: - raise ObjNotFoundError(obj_id) - - hex_obj_id = hashutil.hash_to_hex(obj_id) - decompressor = decompressors[self.compression]() - with open(self.slicer.get_path(hex_obj_id), "rb") as f: - while True: - raw = f.read(chunk_size) - if not raw: - break - r = decompressor.decompress(raw) - if not r: - continue - yield r - def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): if last_obj_id: it = self.iter_from(last_obj_id) diff --git a/swh/objstorage/interface.py b/swh/objstorage/interface.py --- a/swh/objstorage/interface.py +++ b/swh/objstorage/interface.py @@ -8,7 +8,7 @@ from typing_extensions import Protocol, runtime_checkable from swh.core.api import remote_api_endpoint -from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT +from swh.objstorage.objstorage import DEFAULT_LIMIT @runtime_checkable @@ -30,10 +30,6 @@ - get_random() get random object id of existing contents (used for the content integrity checker). - Some of the methods have available streaming equivalents: - - - get_stream() same as get() but returns a chunked iterator - Each implementation of this interface can have a different behavior and its own way to store the contents. """ @@ -203,23 +199,6 @@ """ ... - # Streaming methods - - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - """Retrieve the content of a given object as a chunked iterator. - - Args: - obj_id (bytes): object id. - - Returns: - the content of the requested object as bytes. - - Raises: - ObjNotFoundError: if the requested object is missing. - - """ - ... - def __iter__(self): ... diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py --- a/swh/objstorage/objstorage.py +++ b/swh/objstorage/objstorage.py @@ -22,9 +22,6 @@ ID_DIGEST_LENGTH = 20 """Size in bytes of the hash""" -DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024 -"""Size in bytes of the streaming chunks""" - DEFAULT_LIMIT = 10000 """Default number of results of ``list_content``.""" @@ -143,12 +140,6 @@ # Streaming methods - def add_stream(self, content_iter, obj_id, check_presence=True): - raise NotImplementedError - - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - raise NotImplementedError - def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): it = iter(self) if last_obj_id: diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -3,9 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections.abc import Iterator import inspect -import time from swh.objstorage import exc from swh.objstorage.interface import ObjStorageInterface @@ -164,43 +162,6 @@ with self.assertRaises(PermissionError): self.assertTrue(self.storage.delete(obj_id)) - def test_add_stream(self): - content = [b"chunk1", b"chunk2"] - _, obj_id = self.hash_content(b"".join(content)) - try: - self.storage.add_stream(iter(content), obj_id=obj_id) - except NotImplementedError: - return - self.assertContentMatch(obj_id, b"".join(content)) - - def test_add_stream_sleep(self): - def gen_content(): - yield b"chunk1" - time.sleep(0.5) - yield b"chunk42" - - _, obj_id = self.hash_content(b"placeholder_id") - try: - self.storage.add_stream(gen_content(), obj_id=obj_id) - except NotImplementedError: - return - self.assertContentMatch(obj_id, b"chunk1chunk42") - - def test_get_stream(self): - content = b"123456789" - _, obj_id = self.hash_content(content) - self.storage.add(content, obj_id=obj_id) - r = self.storage.get(obj_id) - self.assertEqual(r, content) - - try: - r = self.storage.get_stream(obj_id, chunk_size=1) - except NotImplementedError: - return - self.assertTrue(isinstance(r, Iterator)) - r = list(r) - self.assertEqual(b"".join(r), content) - def test_add_batch(self): contents = {} expected_content_add = 0 diff --git a/swh/objstorage/tests/test_objstorage_random_generator.py b/swh/objstorage/tests/test_objstorage_random_generator.py --- a/swh/objstorage/tests/test_objstorage_random_generator.py +++ b/swh/objstorage/tests/test_objstorage_random_generator.py @@ -17,13 +17,6 @@ assert max(lengths) <= 55056238 -def test_random_generator_objstorage_get_stream(): - sto = get_objstorage("random", {}) - gen = sto.get_stream(None) - assert isinstance(gen, Iterator) - assert list(gen) # ensure the iterator can be consumed - - def test_random_generator_objstorage_list_content(): sto = get_objstorage("random", {"total": 100}) assert isinstance(sto.list_content(), Iterator)