Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py
index ef8624b..0682dcb 100644
--- a/swh/objstorage/__init__.py
+++ b/swh/objstorage/__init__.py
@@ -1,103 +1,105 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from .objstorage import ObjStorage
from .objstorage_pathslicing import PathSlicingObjStorage
from .objstorage_in_memory import InMemoryObjStorage
from .api.client import RemoteObjStorage
from .multiplexer import MultiplexerObjStorage, StripingObjStorage
from .multiplexer.filter import add_filters
+from swh.objstorage.objstorage_weed import WeedObjStorage
__all__ = ['get_objstorage', 'ObjStorage']
_STORAGE_CLASSES = {
'pathslicing': PathSlicingObjStorage,
'remote': RemoteObjStorage,
'memory': InMemoryObjStorage,
+ 'weed': WeedObjStorage,
}
_STORAGE_CLASSES_MISSING = {
}
try:
from swh.objstorage.cloud.objstorage_azure import (
AzureCloudObjStorage,
PrefixedAzureCloudObjStorage,
)
_STORAGE_CLASSES['azure'] = AzureCloudObjStorage
_STORAGE_CLASSES['azure-prefixed'] = PrefixedAzureCloudObjStorage
except ImportError as e:
_STORAGE_CLASSES_MISSING['azure'] = e.args[0]
_STORAGE_CLASSES_MISSING['azure-prefixed'] = e.args[0]
try:
from swh.objstorage.objstorage_rados import RADOSObjStorage
_STORAGE_CLASSES['rados'] = RADOSObjStorage
except ImportError as e:
_STORAGE_CLASSES_MISSING['rados'] = e.args[0]
try:
from swh.objstorage.cloud.objstorage_cloud import (
AwsCloudObjStorage,
OpenStackCloudObjStorage,
)
_STORAGE_CLASSES['s3'] = AwsCloudObjStorage
_STORAGE_CLASSES['swift'] = OpenStackCloudObjStorage
except ImportError as e:
_STORAGE_CLASSES_MISSING['s3'] = e.args[0]
_STORAGE_CLASSES_MISSING['swift'] = e.args[0]
def get_objstorage(cls, args):
""" Create an ObjStorage using the given implementation class.
Args:
cls (str): objstorage class unique key contained in the
_STORAGE_CLASSES dict.
args (dict): arguments for the required class of objstorage
that must match exactly the one in the `__init__` method of the
class.
Returns:
subclass of ObjStorage that match the given `storage_class` argument.
Raises:
ValueError: if the given storage class is not a valid objstorage
key.
"""
if cls in _STORAGE_CLASSES:
return _STORAGE_CLASSES[cls](**args)
else:
raise ValueError('Storage class {} is not available: {}'.format(
cls,
_STORAGE_CLASSES_MISSING.get(cls, 'unknown name')))
def _construct_filtered_objstorage(storage_conf, filters_conf):
return add_filters(
get_objstorage(**storage_conf),
filters_conf
)
_STORAGE_CLASSES['filtered'] = _construct_filtered_objstorage
def _construct_multiplexer_objstorage(objstorages):
storages = [get_objstorage(**conf)
for conf in objstorages]
return MultiplexerObjStorage(storages)
_STORAGE_CLASSES['multiplexer'] = _construct_multiplexer_objstorage
def _construct_striping_objstorage(objstorages):
storages = [get_objstorage(**conf)
for conf in objstorages]
return StripingObjStorage(storages)
_STORAGE_CLASSES['striping'] = _construct_striping_objstorage
diff --git a/swh/objstorage/objstorage_weed.py b/swh/objstorage/objstorage_weed.py
new file mode 100644
index 0000000..9d4f0cf
--- /dev/null
+++ b/swh/objstorage/objstorage_weed.py
@@ -0,0 +1,202 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import io
+import logging
+from urllib.parse import urljoin, urlparse
+
+import requests
+
+from swh.model import hashutil
+from swh.objstorage.objstorage import ObjStorage, compute_hash
+from swh.objstorage.objstorage import compressors, decompressors
+from swh.objstorage.objstorage import DEFAULT_LIMIT
+
+from swh.objstorage.exc import ObjNotFoundError, Error
+
+LOGGER = logging.getLogger(__name__)
+LOGGER.setLevel(logging.ERROR)
+
+
+class WeedFiler(object):
+ """Simple class that encapsulates access to a seaweedfs filer service.
+
+ TODO: handle errors
+ """
+
+ def __init__(self, url):
+ self.url = url
+
+ def get(self, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Get file %s', url)
+ return requests.get(url).content
+
+ def exists(self, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Check file %s', url)
+ return requests.head(url).status_code == 200
+
+ def put(self, fp, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Put file %s', url)
+ return requests.post(url, files={'file': fp})
+
+ def delete(self, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Delete file %s', url)
+ return requests.delete(url)
+
+ def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT):
+ '''list sub folders and files of @dir. show a better look if you turn on
+
+ returns a dict of "sub-folders and files'
+
+ '''
+ d = dir if dir.endswith('/') else (dir + '/')
+ url = urljoin(self.url, d)
+ headers = {'Accept': 'application/json'}
+ params = {'limit': limit}
+ if last_file_name:
+ params['lastFileName'] = last_file_name
+
+ LOGGER.debug('List directory %s', url)
+ rsp = requests.get(url, params=params, headers=headers)
+ if rsp.ok:
+ return rsp.json()
+ else:
+ LOGGER.error('Error listing "%s". [HTTP %d]' % (
+ url, rsp.status_code))
+
+
+class WeedObjStorage(ObjStorage):
+ """ObjStorage with seaweedfs abilities, using the Filer API.
+
+ https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API
+ """
+ def __init__(self, url='http://127.0.0.1:8888/swh',
+ compression=None, **kwargs):
+ super().__init__(**kwargs)
+ self.wf = WeedFiler(url)
+ self.root_path = urlparse(url).path
+ self.compression = compression
+
+ def check_config(self, *, check_write):
+ """Check the configuration for this object storage"""
+ # FIXME: hopefully this blew up during instantiation
+ return True
+
+ def __contains__(self, obj_id):
+ return self.wf.exists(self._path(obj_id))
+
+ def __iter__(self):
+ """ Iterate over the objects present in the storage
+
+ Warning: Iteration over the contents of a cloud-based object storage
+ may have bad efficiency: due to the very high amount of objects in it
+ and the fact that it is remote, get all the contents of the current
+ object storage may result in a lot of network requests.
+
+ You almost certainly don't want to use this method in production.
+ """
+ obj_id = last_obj_id = None
+ while True:
+ for obj_id in self.list_content(last_obj_id=last_obj_id):
+ yield obj_id
+ if last_obj_id == obj_id:
+ break
+ last_obj_id = obj_id
+
+ def __len__(self):
+ """Compute the number of objects in the current object storage.
+
+ Warning: this currently uses `__iter__`, its warning about bad
+ performance applies.
+
+ Returns:
+ number of objects contained in the storage.
+
+ """
+ return sum(1 for i in self)
+
+ def add(self, content, obj_id=None, check_presence=True):
+ if obj_id is None:
+ # Checksum is missing, compute it on the fly.
+ obj_id = compute_hash(content)
+
+ if check_presence and obj_id in self:
+ return obj_id
+
+ def compressor(data):
+ comp = compressors[self.compression]()
+ for chunk in data:
+ yield comp.compress(chunk)
+ yield comp.flush()
+
+ if isinstance(content, bytes):
+ content = [content]
+
+ # XXX should handle streaming correctly...
+ self.wf.put(io.BytesIO(b''.join(compressor(content))),
+ self._path(obj_id))
+ return obj_id
+
+ def restore(self, content, obj_id=None):
+ return self.add(content, obj_id, check_presence=False)
+
+ def get(self, obj_id):
+ try:
+ obj = self.wf.get(self._path(obj_id))
+ return decompressors[self.compression](obj)
+ except Exception:
+ raise ObjNotFoundError(obj_id)
+
+ def check(self, obj_id):
+ # Check the content integrity
+ obj_content = self.get(obj_id)
+ content_obj_id = compute_hash(obj_content)
+ if content_obj_id != obj_id:
+ raise Error(obj_id)
+
+ def delete(self, obj_id):
+ super().delete(obj_id) # Check delete permission
+ if obj_id not in self:
+ raise ObjNotFoundError(obj_id)
+ self.wf.delete(self._path(obj_id))
+ return True
+
+ def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
+ if last_obj_id:
+ last_obj_id = hashutil.hash_to_hex(last_obj_id)
+ resp = self.wf.list(self.root_path, last_obj_id, limit)
+ if resp is not None:
+ entries = resp['Entries']
+ if entries:
+ for obj in entries:
+ if obj is not None:
+ bytehex = obj['FullPath'].rsplit('/', 1)[-1]
+ yield hashutil.bytehex_to_hash(bytehex.encode())
+
+ # internal methods
+ def _put_object(self, content, obj_id):
+ """Create an object in the cloud storage.
+
+ Created object will contain the content and be referenced by
+ the given id.
+
+ """
+ def compressor(data):
+ comp = compressors[self.compression]()
+ for chunk in data:
+ yield comp.compress(chunk)
+ yield comp.flush()
+
+ if isinstance(content, bytes):
+ content = [content]
+ self.wf.put(io.BytesIO(b''.join(compressor(content))),
+ self._path(obj_id))
+
+ def _path(self, obj_id):
+ return hashutil.hash_to_hex(obj_id)
diff --git a/swh/objstorage/tests/test_objstorage_seaweedfs.py b/swh/objstorage/tests/test_objstorage_seaweedfs.py
new file mode 100644
index 0000000..5a3484a
--- /dev/null
+++ b/swh/objstorage/tests/test_objstorage_seaweedfs.py
@@ -0,0 +1,45 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import unittest
+
+from swh.objstorage.backends.seaweed import WeedObjStorage, DEFAULT_LIMIT
+from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture
+
+
+class MockWeedFiler:
+ """ WeedFiler mock that replicates its API """
+ def __init__(self, url):
+ self.url = url
+ self.content = {}
+
+ def get(self, remote_path):
+ return self.content[remote_path]
+
+ def put(self, fp, remote_path):
+ self.content[remote_path] = fp.read()
+
+ def exists(self, remote_path):
+ return remote_path in self.content
+
+ def delete(self, remote_path):
+ del self.content[remote_path]
+
+ def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT):
+ keys = sorted(self.content.keys())
+ if last_file_name is None:
+ idx = 0
+ else:
+ idx = keys.index(last_file_name) + 1
+ return {'Entries': [{'FullPath': x} for x in keys[idx:idx+limit]]}
+
+
+class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase):
+
+ def setUp(self):
+ super().setUp()
+ self.url = 'http://127.0.0.1/test'
+ self.storage = WeedObjStorage(url=self.url)
+ self.storage.wf = MockWeedFiler(self.url)

File Metadata

Mime Type
text/x-diff
Expires
Mon, Aug 18, 11:13 PM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3316094

Event Timeline