Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124890
D1276.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
D1276.diff
View Options
Index: swh/objstorage/__init__.py
===================================================================
--- swh/objstorage/__init__.py
+++ swh/objstorage/__init__.py
@@ -10,6 +10,7 @@
from .multiplexer import MultiplexerObjStorage, StripingObjStorage
from .multiplexer.filter import add_filters
+from swh.objstorage.objstorage_weed import WeedObjStorage
__all__ = ['get_objstorage', 'ObjStorage']
@@ -18,6 +19,7 @@
'pathslicing': PathSlicingObjStorage,
'remote': RemoteObjStorage,
'memory': InMemoryObjStorage,
+ 'weed': WeedObjStorage,
}
_STORAGE_CLASSES_MISSING = {
Index: swh/objstorage/objstorage_weed.py
===================================================================
--- /dev/null
+++ swh/objstorage/objstorage_weed.py
@@ -0,0 +1,197 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import os
+import io
+import logging
+from urllib.parse import urljoin
+
+import requests
+
+from swh.model import hashutil
+from swh.objstorage.objstorage import ObjStorage, compute_hash
+from swh.objstorage.objstorage import compressors, decompressors
+
+from swh.objstorage.exc import ObjNotFoundError, Error
+
+LOGGER = logging.getLogger(__name__)
+LOGGER.setLevel(logging.ERROR)
+
+DEFAULT_LIMIT = 1000
+
+
+class WeedFiler(object):
+ """ weed filer service.
+ """
+
+ def __init__(self, url):
+ self.url = url
+
+ def get(self, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Get file %s', url)
+ return requests.get(url).content
+
+ def exists(self, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Check file %s', url)
+ return requests.post(url).status_code == 200
+
+ def put(self, fp, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Put file %s', url)
+ return requests.post(url, files={'file': fp})
+
+ def delete(self, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Delete file %s', url)
+ return requests.delete(url)
+
+ def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT):
+ '''list sub folders and files of @dir. show a better look if you turn on
+
+ returns a dict of "sub-folders and files'
+
+ '''
+ d = dir if dir.endswith('/') else (dir + '/')
+ url = urljoin(self.url, d)
+ headers = {'Accept': 'application/json'}
+ params = {'limit': limit}
+ if last_file_name:
+ params['lastFileName'] = last_file_name
+
+ LOGGER.debug('List directory %s', url)
+ rsp = requests.get(url, params=params, headers=headers)
+ if rsp.ok:
+ return rsp.json()
+ else:
+ LOGGER.error('Error listing "%s". [HTTP %d]' % (
+ url, rsp.status_code))
+
+
+class WeedObjStorage(ObjStorage):
+ """
+ """
+ def __init__(self, url='http://127.0.0.1:8888/swh',
+ compression=None, **kwargs):
+ super().__init__(**kwargs)
+ self.wf = WeedFiler(url)
+ self.root_path = '/swh'
+ self.compression = compression
+
+ def check_config(self, *, check_write):
+ """Check the configuration for this object storage"""
+ # FIXME: hopefully this blew up during instantiation
+ return True
+
+ def __contains__(self, obj_id):
+ return self.wf.exists(obj_id)
+
+ def __iter__(self):
+ """ Iterate over the objects present in the storage
+
+ Warning: Iteration over the contents of a cloud-based object storage
+ may have bad efficiency: due to the very high amount of objects in it
+ and the fact that it is remote, get all the contents of the current
+ object storage may result in a lot of network requests.
+
+ You almost certainly don't want to use this method in production.
+ """
+ last_obj_id = None
+ while True:
+ for obj_id in self.list_content(last_obj_id=last_obj_id):
+ yield obj_id
+ if last_obj_id == obj_id:
+ break
+ last_obj_id = obj_id
+
+ def __len__(self):
+ """Compute the number of objects in the current object storage.
+
+ Warning: this currently uses `__iter__`, its warning about bad
+ performance applies.
+
+ Returns:
+ number of objects contained in the storage.
+
+ """
+ return sum(1 for i in self)
+
+ def add(self, content, obj_id=None, check_presence=True):
+ if obj_id is None:
+ # Checksum is missing, compute it on the fly.
+ obj_id = compute_hash(content)
+
+ if check_presence and obj_id in self:
+ return obj_id
+
+ self._put_object(content, obj_id)
+ return obj_id
+
+ def restore(self, content, obj_id=None):
+ return self.add(content, obj_id, check_presence=False)
+
+ def get(self, obj_id):
+ return self._get_object(obj_id)
+
+ def check(self, obj_id):
+ # Check that the file exists, as _get_object raises ObjNotFoundError
+ self._get_object(obj_id)
+ # Check the content integrity
+ obj_content = self.get(obj_id)
+ content_obj_id = compute_hash(obj_content)
+ if content_obj_id != obj_id:
+ raise Error(obj_id)
+
+ def delete(self, obj_id):
+ super().delete(obj_id) # Check delete permission
+ self.wf.delete(self.path(obj_id))
+
+ def _get_object(self, obj_id):
+ """Get a Libcloud wrapper for an object pointer.
+
+ This wrapper does not retrieve the content of the object
+ directly.
+
+ """
+ try:
+ obj = self.wf.get(self.path(obj_id))
+ return decompressors[self.compression](obj)
+ except Exception:
+ raise ObjNotFoundError(obj_id)
+
+ def _put_object(self, content, obj_id):
+ """Create an object in the cloud storage.
+
+ Created object will contain the content and be referenced by
+ the given id.
+
+ """
+ def compressor(data):
+ comp = compressors[self.compression]()
+ for chunk in data:
+ yield comp.compress(chunk)
+ yield comp.flush()
+
+ if isinstance(content, bytes):
+ content = [content]
+ self.wf.put(io.BytesIO(b''.join(compressor(content))),
+ self.path(obj_id))
+
+ def path(self, obj_id):
+ hex_obj_id = hashutil.hash_to_hex(obj_id)
+ return os.path.join(self.root_path, hex_obj_id)
+
+ def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
+ if last_obj_id:
+ last_obj_id = hashutil.hash_to_hex(last_obj_id)
+ resp = self.wf.list(self.root_path, last_obj_id, limit)
+ if resp is not None:
+ entries = resp['Entries']
+ if entries:
+ for obj in entries:
+ if obj is not None:
+ bytehex = obj['FullPath'].rsplit('/', 1)[-1]
+ yield hashutil.bytehex_to_hash(bytehex.encode())
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 10:42 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222178
Attached To
D1276: Add a seaweedfs backend
Event Timeline
Log In to Comment