diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ vcversioner # remote storage API server -aiohttp >= 2.1.0 +aiohttp >= 3 click # optional dependencies diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -3,23 +3,14 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import abc - -from swh.core.api import SWHRemoteAPI, MetaSWHRemoteAPI +from swh.core.api import SWHRemoteAPI from swh.model import hashutil -from ..objstorage import ObjStorage, DEFAULT_CHUNK_SIZE +from ..objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT from ..exc import ObjNotFoundError, ObjStorageAPIError -class MetaRemoteObjStorage(MetaSWHRemoteAPI, abc.ABCMeta): - """Hackish class to make multiple inheritance with different metaclasses - work.""" - pass - - -class RemoteObjStorage(ObjStorage, SWHRemoteAPI, - metaclass=MetaRemoteObjStorage): +class RemoteObjStorage: """Proxy to a remote object storage. This class allows to connect to an object storage server via @@ -32,56 +23,71 @@ """ - def __init__(self, url, **kwargs): - super().__init__(api_exception=ObjStorageAPIError, url=url, **kwargs) + def __init__(self, **kwargs): + self._proxy = SWHRemoteAPI(api_exception=ObjStorageAPIError, **kwargs) def check_config(self, *, check_write): - return self.post('check_config', {'check_write': check_write}) + return self._proxy.post('check_config', {'check_write': check_write}) def __contains__(self, obj_id): - return self.post('content/contains', {'obj_id': obj_id}) + return self._proxy.post('content/contains', {'obj_id': obj_id}) def add(self, content, obj_id=None, check_presence=True): - return self.post('content/add', {'content': content, 'obj_id': obj_id, - 'check_presence': check_presence}) + return self._proxy.post('content/add', { + 'content': content, 'obj_id': obj_id, + 'check_presence': check_presence}) def add_batch(self, contents, check_presence=True): - return self.post('content/add/batch', { + return self._proxy.post('content/add/batch', { 'contents': contents, 'check_presence': check_presence, }) + def restore(self, content, obj_id=None, *args, **kwargs): + return self.add(content, obj_id, check_presence=False) + def get(self, obj_id): - ret = self.post('content/get', {'obj_id': obj_id}) + ret = self._proxy.post('content/get', {'obj_id': obj_id}) if ret is None: raise ObjNotFoundError(obj_id) else: return ret def get_batch(self, obj_ids): - return self.post('content/get/batch', {'obj_ids': obj_ids}) + return self._proxy.post('content/get/batch', {'obj_ids': obj_ids}) def check(self, obj_id): - return self.post('content/check', {'obj_id': obj_id}) + return self._proxy.post('content/check', {'obj_id': obj_id}) def delete(self, obj_id): - super().delete(obj_id) # Check delete permission - return self.post('content/delete', {'obj_id': obj_id}) + # deletion permission are checked serer-side + return self._proxy.post('content/delete', {'obj_id': obj_id}) # Management methods def get_random(self, batch_size): - return self.post('content/get/random', {'batch_size': batch_size}) + return self._proxy.post('content/get/random', + {'batch_size': batch_size}) # Streaming methods def add_stream(self, content_iter, obj_id, check_presence=True): obj_id = hashutil.hash_to_hex(obj_id) - return self.post_stream('content/add_stream/{}'.format(obj_id), - params={'check_presence': check_presence}, - data=content_iter) + return self._proxy.post_stream( + 'content/add_stream/{}'.format(obj_id), + params={'check_presence': check_presence}, + data=content_iter) def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): obj_id = hashutil.hash_to_hex(obj_id) - return super().get_stream('content/get_stream/{}'.format(obj_id), - chunk_size=chunk_size) + return self._proxy.get_stream('content/get_stream/{}'.format(obj_id), + chunk_size=chunk_size) + + def __iter__(self): + yield from self._proxy.get_stream('content') + + def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): + params = {'limit': limit} + if last_obj_id: + params['lastObjId'] = hashutil.hash_to_hex(last_obj_id) + yield from self._proxy.get_stream('content', params=params) diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -16,6 +16,7 @@ from swh.model import hashutil from swh.objstorage import get_objstorage +from swh.objstorage.objstorage import DEFAULT_LIMIT from swh.objstorage.exc import ObjNotFoundError @@ -120,8 +121,24 @@ response = aiohttp.web.StreamResponse() await response.prepare(request) for chunk in request.app['objstorage'].get_stream(obj_id, 2 << 20): - response.write(chunk) - await response.drain() + await response.write(chunk) + await response.write_eof() + return response + + +async def list_content(request): + last_obj_id = request.query.get('lastObjId') + if last_obj_id: + last_obj_id = bytes.fromhex(last_obj_id) + limit = int(request.query.get('limit', DEFAULT_LIMIT)) + response = aiohttp.web.StreamResponse() + response.enable_chunked_encoding() + await response.prepare(request) + + for obj_id in request.app['objstorage'].list_content( + last_obj_id, limit=limit): + await response.write(obj_id) + await response.write_eof() return response @@ -146,6 +163,7 @@ app.router.add_route('POST', '/content/get/random', get_random_contents) app.router.add_route('POST', '/content/check', check) app.router.add_route('POST', '/content/delete', delete) + app.router.add_route('GET', '/content', list_content) app.router.add_route('POST', '/content/add_stream/{hex_id}', add_stream) app.router.add_route('GET', '/content/get_stream/{hex_id}', get_stream) return app diff --git a/swh/objstorage/cloud/objstorage_azure.py b/swh/objstorage/cloud/objstorage_azure.py --- a/swh/objstorage/cloud/objstorage_azure.py +++ b/swh/objstorage/cloud/objstorage_azure.py @@ -4,14 +4,14 @@ # See top-level LICENSE file for more information import gzip -import itertools import string +from itertools import dropwhile, islice, product from azure.storage.blob import BlockBlobService from azure.common import AzureMissingResourceHttpError import requests -from swh.objstorage.objstorage import ObjStorage, compute_hash +from swh.objstorage.objstorage import ObjStorage, compute_hash, DEFAULT_LIMIT from swh.objstorage.exc import ObjNotFoundError, Error from swh.model import hashutil @@ -149,6 +149,24 @@ return True + def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): + all_blob_services = self.get_all_blob_services() + if last_obj_id: + last_obj_id = self._internal_id(last_obj_id) + last_service, _ = self.get_blob_service(last_obj_id) + all_blob_services = dropwhile( + lambda srv: srv[0] != last_service, all_blob_services) + else: + last_service = None + + def iterate_blobs(): + for service, container in all_blob_services: + marker = last_obj_id if service == last_service else None + for obj in service.list_blobs( + container, marker=marker, maxresults=limit): + yield hashutil.hash_to_bytes(obj.name) + return islice(iterate_blobs(), limit) + class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): """ObjStorage with azure capabilities, striped by prefix. @@ -175,7 +193,7 @@ expected_prefixes = set( ''.join(letters) - for letters in itertools.product( + for letters in product( set(string.hexdigits.lower()), repeat=self.prefix_len ) ) @@ -204,4 +222,4 @@ def get_all_blob_services(self): """Get all active block_blob_services""" - yield from self.prefixes.values() + yield from (v for k, v in sorted(self.prefixes.items())) diff --git a/swh/objstorage/cloud/objstorage_cloud.py b/swh/objstorage/cloud/objstorage_cloud.py --- a/swh/objstorage/cloud/objstorage_cloud.py +++ b/swh/objstorage/cloud/objstorage_cloud.py @@ -88,8 +88,8 @@ You almost certainly don't want to use this method in production. """ - yield from map(lambda obj: obj.name, - self.driver.iterate_container_objects(self.container)) + yield from (hashutil.bytehex_to_hash(obj.name.encode()) for obj in + self.driver.iterate_container_objects(self.container)) def __len__(self): """Compute the number of objects in the current object storage. @@ -158,6 +158,9 @@ self.driver.upload_object_via_stream(iter(content), self.container, hex_obj_id) + def list_content(self): + return iter(self) + class AwsCloudObjStorage(CloudObjStorage): """ Amazon's S3 Cloud-based object storage diff --git a/swh/objstorage/multiplexer/multiplexer_objstorage.py b/swh/objstorage/multiplexer/multiplexer_objstorage.py --- a/swh/objstorage/multiplexer/multiplexer_objstorage.py +++ b/swh/objstorage/multiplexer/multiplexer_objstorage.py @@ -209,6 +209,12 @@ return True return False + def __iter__(self): + def obj_iterator(): + for storage in self.storages: + yield from storage + return obj_iterator() + def add(self, content, obj_id=None, check_presence=True): """ Add a new object to the object storage. diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py --- a/swh/objstorage/objstorage.py +++ b/swh/objstorage/objstorage.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import abc +from itertools import dropwhile, islice from swh.model import hashutil @@ -13,6 +14,7 @@ ID_HASH_ALGO = 'sha1' ID_HASH_LENGTH = 40 # Size in bytes of the hash hexadecimal representation. DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024 # Size in bytes of the streaming chunks +DEFAULT_LIMIT = 100000 def compute_hash(content): @@ -59,10 +61,9 @@ Each implementation of this interface can have a different behavior and its own way to store the contents. """ - def __init__(self, *, allow_delete=False, **kwargs): + def __init__(self, allow_delete=False, **kwargs): # A more complete permission system could be used in place of that if # it becomes needed - super().__init__(**kwargs) self.allow_delete = allow_delete @abc.abstractmethod @@ -286,3 +287,9 @@ """ raise NotImplementedError + + def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): + it = iter(self) + if last_obj_id: + it = dropwhile(lambda x: x <= last_obj_id, it) + return islice(it, limit) diff --git a/swh/objstorage/objstorage_in_memory.py b/swh/objstorage/objstorage_in_memory.py --- a/swh/objstorage/objstorage_in_memory.py +++ b/swh/objstorage/objstorage_in_memory.py @@ -24,6 +24,9 @@ def __contains__(self, obj_id, *args, **kwargs): return obj_id in self.state + def __iter__(self): + return iter(sorted(self.state)) + def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): if obj_id is None: obj_id = objstorage.compute_hash(content) diff --git a/swh/objstorage/objstorage_pathslicing.py b/swh/objstorage/objstorage_pathslicing.py --- a/swh/objstorage/objstorage_pathslicing.py +++ b/swh/objstorage/objstorage_pathslicing.py @@ -8,13 +8,15 @@ import gzip import tempfile import random +import collections +from itertools import islice from contextlib import contextmanager from swh.model import hashutil from .objstorage import (ObjStorage, compute_hash, ID_HASH_ALGO, - ID_HASH_LENGTH, DEFAULT_CHUNK_SIZE) + ID_HASH_LENGTH, DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT) from .exc import ObjNotFoundError, Error @@ -172,7 +174,8 @@ # XXX hackish: it does not verify that the depth of found files # matches the slicing depth of the storage for root, _dirs, files in os.walk(self.root): - for f in files: + _dirs.sort() + for f in sorted(files): yield bytes.fromhex(f) return obj_iterator() @@ -218,12 +221,13 @@ def add(self, content, obj_id=None, check_presence=True): if obj_id is None: obj_id = compute_hash(content) - if check_presence and obj_id in self: # If the object is already present, return immediately. return obj_id hex_obj_id = hashutil.hash_to_hex(obj_id) + if isinstance(content, collections.Iterator): + content = b''.join(content) with _write_obj_file(hex_obj_id, self) as f: f.write(content) @@ -338,3 +342,32 @@ with _read_obj_file(hex_obj_id, self) as f: reader = functools.partial(f.read, chunk_size) yield from iter(reader, b'') + + def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): + if last_obj_id: + it = self.iter_from(last_obj_id) + else: + it = iter(self) + return islice(it, limit) + + def iter_from(self, obj_id, n_leaf=False): + hex_obj_id = hashutil.hash_to_hex(obj_id) + slices = [hex_obj_id[bound] for bound in self.bounds] + rlen = len(self.root.split('/')) + + i = 0 + for root, dirs, files in os.walk(self.root): + if not dirs: + i += 1 + level = len(root.split('/')) - rlen + dirs.sort() + if dirs and root == os.path.join(self.root, *slices[:level]): + cslice = slices[level] + for d in dirs[:]: + if d < cslice: + dirs.remove(d) + for f in sorted(files): + if f > hex_obj_id: + yield bytes.fromhex(f) + if n_leaf: + yield i diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -170,3 +170,43 @@ self.assertEqual(len(contents), ret) for obj_id in contents: self.assertIn(obj_id, self.storage) + + def test_content_iterator(self): + sto_obj_ids = iter(self.storage) + sto_obj_ids = list(sto_obj_ids) + self.assertFalse(sto_obj_ids) + + obj_ids = set() + for i in range(100): + content, obj_id = self.hash_content(b'content %d' % i) + self.storage.add(content, obj_id=obj_id) + obj_ids.add(obj_id) + + sto_obj_ids = set(self.storage) + self.assertEqual(sto_obj_ids, obj_ids) + + def test_list_content(self): + all_ids = [] + for i in range(1200): + content = b'example %d' % i + obj_id = compute_hash(content) + self.storage.add(content, obj_id) + all_ids.append(obj_id) + all_ids.sort() + + ids = list(self.storage.list_content()) + self.assertEqual(len(ids), 1200) + self.assertEqual(ids[0], all_ids[0]) + self.assertEqual(ids[100], all_ids[100]) + self.assertEqual(ids[999], all_ids[999]) + + ids = list(self.storage.list_content(limit=10)) + self.assertEqual(len(ids), 10) + self.assertEqual(ids[0], all_ids[0]) + self.assertEqual(ids[9], all_ids[9]) + + ids = list(self.storage.list_content( + last_obj_id=all_ids[999], limit=100)) + self.assertEqual(len(ids), 100) + self.assertEqual(ids[0], all_ids[1000]) + self.assertEqual(ids[9], all_ids[1009]) diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py --- a/swh/objstorage/tests/test_objstorage_azure.py +++ b/swh/objstorage/tests/test_objstorage_azure.py @@ -57,9 +57,10 @@ def exists(self, container_name, blob_name): return blob_name in self.data[container_name] - def list_blobs(self, container_name): - for blob_name, content in self.data[container_name].items(): - yield MockBlob(name=blob_name, content=content) + def list_blobs(self, container_name, marker=None, maxresults=None): + for blob_name, content in sorted(self.data[container_name].items()): + if marker is None or blob_name > marker: + yield MockBlob(name=blob_name, content=content) class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): diff --git a/swh/objstorage/tests/test_objstorage_cloud.py b/swh/objstorage/tests/test_objstorage_cloud.py --- a/swh/objstorage/tests/test_objstorage_cloud.py +++ b/swh/objstorage/tests/test_objstorage_cloud.py @@ -50,7 +50,7 @@ def iterate_container_objects(self, container): self._check_credentials() - yield from container.values() + yield from (v for k, v in sorted(container.items())) def get_object(self, container_name, obj_id): self._check_credentials() @@ -79,8 +79,8 @@ class MockCloudObjStorage(CloudObjStorage): """ Cloud object storage that uses a mocked driver """ - def _get_driver(self, api_key, api_secret_key): - return MockLibcloudDriver(api_key, api_secret_key) + def _get_driver(self, **kwargs): + return MockLibcloudDriver(**kwargs) def _get_provider(self): # Implement this for the abc requirement, but behavior is defined in @@ -92,5 +92,7 @@ def setUp(self): super().setUp() - self.storage = MockCloudObjStorage(API_KEY, API_SECRET_KEY, - CONTAINER_NAME) + self.storage = MockCloudObjStorage( + CONTAINER_NAME, + api_key=API_KEY, api_secret_key=API_SECRET_KEY, + ) diff --git a/swh/objstorage/tests/test_objstorage_pathslicing.py b/swh/objstorage/tests/test_objstorage_pathslicing.py --- a/swh/objstorage/tests/test_objstorage_pathslicing.py +++ b/swh/objstorage/tests/test_objstorage_pathslicing.py @@ -8,7 +8,7 @@ import unittest from swh.model import hashutil -from swh.objstorage import exc, get_objstorage +from swh.objstorage import exc, get_objstorage, ID_HASH_LENGTH from .objstorage_testing import ObjStorageTestFixture @@ -66,3 +66,32 @@ random_contents = list(self.storage.get_random(1)) self.assertEqual(1, len(random_contents)) self.assertIn(obj_id, random_contents) + + def test_iterate_from(self): + all_ids = [] + for i in range(100): + content, obj_id = self.hash_content(b'content %d' % i) + self.storage.add(content, obj_id=obj_id) + all_ids.append(obj_id) + all_ids.sort() + + ids = list(self.storage.iter_from(b'\x00' * (ID_HASH_LENGTH // 2))) + self.assertEqual(len(ids), len(all_ids)) + self.assertEqual(ids, all_ids) + + ids = list(self.storage.iter_from(all_ids[0])) + self.assertEqual(len(ids), len(all_ids)-1) + self.assertEqual(ids, all_ids[1:]) + + ids = list(self.storage.iter_from(all_ids[-1], n_leaf=True)) + n_leaf = ids[-1] + ids = ids[:-1] + self.assertEqual(n_leaf, 1) + self.assertEqual(len(ids), 0) + + ids = list(self.storage.iter_from(all_ids[-2], n_leaf=True)) + n_leaf = ids[-1] + ids = ids[:-1] + self.assertEqual(n_leaf, 2) # beware, this depends on the hash algo + self.assertEqual(len(ids), 1) + self.assertEqual(ids, all_ids[-1:]) diff --git a/swh/objstorage/tests/test_objstorage_striping.py b/swh/objstorage/tests/test_objstorage_striping.py --- a/swh/objstorage/tests/test_objstorage_striping.py +++ b/swh/objstorage/tests/test_objstorage_striping.py @@ -78,3 +78,6 @@ self.assertIn(obj_id, self.storage) storage.delete(obj_id) self.assertNotIn(obj_id, self.storage) + + def test_list_content(self): + self.skipTest('Quite a chellenge to make it work')