Page MenuHomeSoftware Heritage

D1276.diff
No OneTemporary

D1276.diff

Index: swh/objstorage/__init__.py
===================================================================
--- swh/objstorage/__init__.py
+++ swh/objstorage/__init__.py
@@ -10,6 +10,7 @@
from .multiplexer import MultiplexerObjStorage, StripingObjStorage
from .multiplexer.filter import add_filters
+from swh.objstorage.objstorage_weed import WeedObjStorage
__all__ = ['get_objstorage', 'ObjStorage']
@@ -18,6 +19,7 @@
'pathslicing': PathSlicingObjStorage,
'remote': RemoteObjStorage,
'memory': InMemoryObjStorage,
+ 'weed': WeedObjStorage,
}
_STORAGE_CLASSES_MISSING = {
Index: swh/objstorage/objstorage_weed.py
===================================================================
--- /dev/null
+++ swh/objstorage/objstorage_weed.py
@@ -0,0 +1,197 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import os
+import io
+import logging
+from urllib.parse import urljoin
+
+import requests
+
+from swh.model import hashutil
+from swh.objstorage.objstorage import ObjStorage, compute_hash
+from swh.objstorage.objstorage import compressors, decompressors
+
+from swh.objstorage.exc import ObjNotFoundError, Error
+
+LOGGER = logging.getLogger(__name__)
+LOGGER.setLevel(logging.ERROR)
+
+DEFAULT_LIMIT = 1000
+
+
+class WeedFiler(object):
+ """ weed filer service.
+ """
+
+ def __init__(self, url):
+ self.url = url
+
+ def get(self, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Get file %s', url)
+ return requests.get(url).content
+
+ def exists(self, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Check file %s', url)
+ return requests.post(url).status_code == 200
+
+ def put(self, fp, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Put file %s', url)
+ return requests.post(url, files={'file': fp})
+
+ def delete(self, remote_path):
+ url = urljoin(self.url, remote_path)
+ LOGGER.debug('Delete file %s', url)
+ return requests.delete(url)
+
+ def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT):
+ '''list sub folders and files of @dir. show a better look if you turn on
+
+ returns a dict of "sub-folders and files'
+
+ '''
+ d = dir if dir.endswith('/') else (dir + '/')
+ url = urljoin(self.url, d)
+ headers = {'Accept': 'application/json'}
+ params = {'limit': limit}
+ if last_file_name:
+ params['lastFileName'] = last_file_name
+
+ LOGGER.debug('List directory %s', url)
+ rsp = requests.get(url, params=params, headers=headers)
+ if rsp.ok:
+ return rsp.json()
+ else:
+ LOGGER.error('Error listing "%s". [HTTP %d]' % (
+ url, rsp.status_code))
+
+
+class WeedObjStorage(ObjStorage):
+ """
+ """
+ def __init__(self, url='http://127.0.0.1:8888/swh',
+ compression=None, **kwargs):
+ super().__init__(**kwargs)
+ self.wf = WeedFiler(url)
+ self.root_path = '/swh'
+ self.compression = compression
+
+ def check_config(self, *, check_write):
+ """Check the configuration for this object storage"""
+ # FIXME: hopefully this blew up during instantiation
+ return True
+
+ def __contains__(self, obj_id):
+ return self.wf.exists(obj_id)
+
+ def __iter__(self):
+ """ Iterate over the objects present in the storage
+
+ Warning: Iteration over the contents of a cloud-based object storage
+ may have bad efficiency: due to the very high amount of objects in it
+ and the fact that it is remote, get all the contents of the current
+ object storage may result in a lot of network requests.
+
+ You almost certainly don't want to use this method in production.
+ """
+ last_obj_id = None
+ while True:
+ for obj_id in self.list_content(last_obj_id=last_obj_id):
+ yield obj_id
+ if last_obj_id == obj_id:
+ break
+ last_obj_id = obj_id
+
+ def __len__(self):
+ """Compute the number of objects in the current object storage.
+
+ Warning: this currently uses `__iter__`, its warning about bad
+ performance applies.
+
+ Returns:
+ number of objects contained in the storage.
+
+ """
+ return sum(1 for i in self)
+
+ def add(self, content, obj_id=None, check_presence=True):
+ if obj_id is None:
+ # Checksum is missing, compute it on the fly.
+ obj_id = compute_hash(content)
+
+ if check_presence and obj_id in self:
+ return obj_id
+
+ self._put_object(content, obj_id)
+ return obj_id
+
+ def restore(self, content, obj_id=None):
+ return self.add(content, obj_id, check_presence=False)
+
+ def get(self, obj_id):
+ return self._get_object(obj_id)
+
+ def check(self, obj_id):
+ # Check that the file exists, as _get_object raises ObjNotFoundError
+ self._get_object(obj_id)
+ # Check the content integrity
+ obj_content = self.get(obj_id)
+ content_obj_id = compute_hash(obj_content)
+ if content_obj_id != obj_id:
+ raise Error(obj_id)
+
+ def delete(self, obj_id):
+ super().delete(obj_id) # Check delete permission
+ self.wf.delete(self.path(obj_id))
+
+ def _get_object(self, obj_id):
+ """Get a Libcloud wrapper for an object pointer.
+
+ This wrapper does not retrieve the content of the object
+ directly.
+
+ """
+ try:
+ obj = self.wf.get(self.path(obj_id))
+ return decompressors[self.compression](obj)
+ except Exception:
+ raise ObjNotFoundError(obj_id)
+
+ def _put_object(self, content, obj_id):
+ """Create an object in the cloud storage.
+
+ Created object will contain the content and be referenced by
+ the given id.
+
+ """
+ def compressor(data):
+ comp = compressors[self.compression]()
+ for chunk in data:
+ yield comp.compress(chunk)
+ yield comp.flush()
+
+ if isinstance(content, bytes):
+ content = [content]
+ self.wf.put(io.BytesIO(b''.join(compressor(content))),
+ self.path(obj_id))
+
+ def path(self, obj_id):
+ hex_obj_id = hashutil.hash_to_hex(obj_id)
+ return os.path.join(self.root_path, hex_obj_id)
+
+ def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
+ if last_obj_id:
+ last_obj_id = hashutil.hash_to_hex(last_obj_id)
+ resp = self.wf.list(self.root_path, last_obj_id, limit)
+ if resp is not None:
+ entries = resp['Entries']
+ if entries:
+ for obj in entries:
+ if obj is not None:
+ bytehex = obj['FullPath'].rsplit('/', 1)[-1]
+ yield hashutil.bytehex_to_hash(bytehex.encode())

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 10:42 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222178

Event Timeline