diff --git a/PKG-INFO b/PKG-INFO index 60ab70c..ba30403 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.objstorage -Version: 0.0.12 +Version: 0.0.13 Summary: Software Heritage Object Storage Home-page: https://forge.softwareheritage.org/diffusion/DOBJS Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/bin/swh-objstorage-azure b/bin/swh-objstorage-azure index bdf4ac7..9cfed40 100755 --- a/bin/swh-objstorage-azure +++ b/bin/swh-objstorage-azure @@ -1,114 +1,113 @@ #!/usr/bin/env python3 # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # NOT FOR PRODUCTION import click from swh.objstorage import get_objstorage from swh.objstorage.cloud.objstorage_azure import AzureCloudObjStorage from swh.core import config, hashutil class AzureAccess(config.SWHConfig): """This is an orchestration class to try and check objstorage_azure implementation.""" DEFAULT_CONFIG = { # Output storage - 'storage_account_name': ('str', 'account-name-as-access-key'), - 'storage_secret_key': ('str', 'associated-secret-key'), - 'container_name': ('str', 'sample-container'), - + 'storage_azure': ('dict', + {'cls': 'pathslicing', + 'args': {'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6'}}), # Input storage - 'storage': ('dict', - {'cls': 'pathslicing', - 'args': {'root': '/srv/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6'}}), + 'storage_local': ('dict', + {'cls': 'pathslicing', + 'args': {'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6'}}), } CONFIG_BASE_FILENAME = 'objstorage/azure' def __init__(self): super().__init__() self.config = self.parse_config_file() - container_name = self.config['container_name'] - - self.azure_cloud_storage = AzureCloudObjStorage( - account_name=self.config['storage_account_name'], - api_secret_key=self.config['storage_secret_key'], - container_name=container_name) - - self.read_objstorage = get_objstorage(**self.config['storage']) - - def _to_id(self, hex_obj_id): - return hashutil.hex_to_hash(hex_obj_id) + container_name = 'contents' + self.azure_cloud_storage = get_objstorage( + **self.config['storage_azure']) + self.read_objstorage = get_objstorage( + **self.config['storage_local']) - def list_contents(self): + def list_contents(self, limit=10): + count = 0 for c in self.azure_cloud_storage: - print(c) + count += 1 + yield c + if count >= limit: + return - def send_one_content(self, hex_obj_id): - obj_id = self._to_id(hex_obj_id) + def send_one_content(self, obj_id): obj_content = self.read_objstorage.get(obj_id) self.azure_cloud_storage.add(content=obj_content, obj_id=obj_id) - def check_integrity(self, hex_obj_id): - obj_id = self._to_id(hex_obj_id) + def check_integrity(self, obj_id): self.azure_cloud_storage.check(obj_id) # will raise if problem - def check_presence(self, hex_obj_id): - obj_id = self._to_id(hex_obj_id) + def check_presence(self, obj_id): return obj_id in self.azure_cloud_storage - def download(self, hex_obj_id): - obj_id = self._to_id(hex_obj_id) + def download(self, obj_id): return self.azure_cloud_storage.get(obj_id) @click.command() def tryout(): obj_azure = AzureAccess() - # hex_sample_id = '00000008e22217b439f3e582813bd875e7141a0e' - hex_sample_id = '0001001d2879dd009fc11d0c5f0691940989a76b' + hex_sample_id = '00000085c856b32f0709a4f5d669bb4faa3a0ce9' + sample_id = hashutil.hex_to_hash(hex_sample_id) - check_presence = obj_azure.check_presence(hex_sample_id) + check_presence = obj_azure.check_presence(sample_id) print('presence first time should be False:', check_presence) - obj_azure.send_one_content(hex_sample_id) + obj_azure.send_one_content(sample_id) - check_presence = obj_azure.check_presence(hex_sample_id) + check_presence = obj_azure.check_presence(sample_id) print('presence True:', check_presence) - check_presence = obj_azure.check_presence('dfeffffeffff17b439f3e582813bd875e7141a0e') + + hex_sample_2 = 'dfeffffeffff17b439f3e582813bd875e7141a0e' + sample_2 = hashutil.hex_to_hash(hex_sample_2) + check_presence = obj_azure.check_presence(sample_2) print('presence False:', check_presence) print() print('Download a blob') - blob_content = obj_azure.download(hex_sample_id) + blob_content = obj_azure.download(sample_id) print(blob_content) print() try: - obj_azure.download(hex_sample_id.replace('0', 'f')) + not_found_hex_id = hex_sample_id.replace('0', 'f') + not_found_id = hashutil.hash_to_hex(not_found_hex_id) + obj_azure.download(not_found_id) except: print('Expected `blob does not exist`!') - print() - print('blobs:') - obj_azure.list_contents() + # print() + # print('blobs:') + # print(list(obj_azure.list_contents())) - print() - print('content of %s' % hex_sample_id) - print(obj_azure.download(hex_sample_id)) + # print() + # print('content of %s' % hex_sample_id) + # print(obj_azure.download(hex_sample_id)) - obj_azure.check_integrity(hex_sample_id) + obj_azure.check_integrity(sample_id) if __name__ == '__main__': tryout() diff --git a/swh.objstorage.egg-info/PKG-INFO b/swh.objstorage.egg-info/PKG-INFO index 60ab70c..ba30403 100644 --- a/swh.objstorage.egg-info/PKG-INFO +++ b/swh.objstorage.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.objstorage -Version: 0.0.12 +Version: 0.0.13 Summary: Software Heritage Object Storage Home-page: https://forge.softwareheritage.org/diffusion/DOBJS Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/objstorage/cloud/objstorage_azure.py b/swh/objstorage/cloud/objstorage_azure.py index 6175738..f4e29c8 100644 --- a/swh/objstorage/cloud/objstorage_azure.py +++ b/swh/objstorage/cloud/objstorage_azure.py @@ -1,88 +1,105 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip from swh.core import hashutil from swh.objstorage.objstorage import ObjStorage, compute_hash from swh.objstorage.exc import ObjNotFoundError, Error from azure.storage.blob import BlockBlobService from azure.common import AzureMissingResourceHttpError class AzureCloudObjStorage(ObjStorage): - """ObjStorage with azure abilities + """ObjStorage with azure abilities. """ def __init__(self, account_name, api_secret_key, container_name): self.block_blob_service = BlockBlobService( account_name=account_name, account_key=api_secret_key) self.container_name = container_name + def _internal_id(self, obj_id): + """Internal id is the hex version in objstorage. + + """ + return hashutil.hash_to_hex(obj_id) + def __contains__(self, obj_id): - hex_obj_id = hashutil.hash_to_hex(obj_id) + """Does the storage contains the obj_id. + + """ + hex_obj_id = self._internal_id(obj_id) return self.block_blob_service.exists( container_name=self.container_name, blob_name=hex_obj_id) def __iter__(self): - """ Iterate over the objects present in the storage + """Iterate over the objects present in the storage. """ for obj in self.block_blob_service.list_blobs(self.container_name): - yield obj.name + yield hashutil.hex_to_hash(obj.name) def __len__(self): """Compute the number of objects in the current object storage. Returns: number of objects contained in the storage. """ return sum(1 for i in self) def add(self, content, obj_id=None, check_presence=True): """Add an obj in storage if it's not there already. """ if obj_id is None: # Checksum is missing, compute it on the fly. obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id - hex_obj_id = hashutil.hash_to_hex(obj_id) + hex_obj_id = self._internal_id(obj_id) # Send the gzipped content self.block_blob_service.create_blob_from_bytes( container_name=self.container_name, blob_name=hex_obj_id, blob=gzip.compress(content)) return obj_id def restore(self, content, obj_id=None): + """Restore a content. + + """ return self.add(content, obj_id, check_presence=False) def get(self, obj_id): - hex_obj_id = hashutil.hash_to_hex(obj_id) + """Retrieve blob's content if found. + + """ + hex_obj_id = self._internal_id(obj_id) try: blob = self.block_blob_service.get_blob_to_bytes( container_name=self.container_name, blob_name=hex_obj_id) except AzureMissingResourceHttpError: raise ObjNotFoundError('Content %s not found!' % hex_obj_id) return gzip.decompress(blob.content) def check(self, obj_id): - # Check the content integrity + """Check the content integrity. + + """ obj_content = self.get(obj_id) content_obj_id = compute_hash(obj_content) if content_obj_id != obj_id: raise Error(obj_id) diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py index 90d3d70..0f173ef 100644 --- a/swh/objstorage/objstorage.py +++ b/swh/objstorage/objstorage.py @@ -1,165 +1,167 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc from swh.core import hashutil from .exc import ObjNotFoundError ID_HASH_ALGO = 'sha1' ID_HASH_LENGTH = 40 # Size in bytes of the hash hexadecimal representation. def compute_hash(content): return hashutil.hashdata( content, algorithms=[ID_HASH_ALGO] ).get(ID_HASH_ALGO) class ObjStorage(metaclass=abc.ABCMeta): """ High-level API to manipulate the Software Heritage object storage. Conceptually, the object storage offers 5 methods: - __contains__() check if an object is present, by object id - add() add a new object, returning an object id - restore() same as add() but erase an already existed content - get() retrieve the content of an object, by object id - check() check the integrity of an object, by object id And some management methods: - get_random() get random object id of existing contents (used for the content integrity checker). Each implementation of this interface can have a different behavior and its own way to store the contents. """ @abc.abstractmethod def __contains__(self, obj_id, *args, **kwargs): - """ Indicates if the given object is present in the storage + """Indicate if the given object is present in the storage. + + Args: + obj_id (bytes): object identifier. Returns: True iff the object is present in the current object storage. + """ - raise NotImplementedError( - "Implementations of ObjStorage must have a '__contains__' method" - ) + pass @abc.abstractmethod def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): - """ Add a new object to the object storage. + """Add a new object to the object storage. Args: - content: content of the object to be added to the storage. - obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When - given, obj_id will be trusted to match the bytes. If missing, - obj_id will be computed on the fly. - check_presence: indicate if the presence of the content should be - verified before adding the file. + content (bytes): object's raw content to add in storage. + obj_id (bytes): checksum of [bytes] using [ID_HASH_ALGO] + algorithm. When given, obj_id will be trusted to match + the bytes. If missing, obj_id will be computed on the + fly. + check_presence (bool): indicate if the presence of the + content should be verified before adding the file. Returns: - the id of the object into the storage. + the id (bytes) of the object into the storage. + """ - raise NotImplementedError( - "Implementations of ObjStorage must have a 'add' method" - ) + pass def restore(self, content, obj_id=None, *args, **kwargs): - """ Restore a content that have been corrupted. + """Restore a content that have been corrupted. This function is identical to add_bytes but does not check if the object id is already in the file system. The default implementation provided by the current class is suitable for most cases. Args: - content: content of the object to be added to the storage - obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When - given, obj_id will be trusted to match bytes. If missing, - obj_id will be computed on the fly. + content (bytes): object's raw content to add in storage + obj_id (bytes): checksum of `bytes` as computed by + ID_HASH_ALGO. When given, obj_id will be trusted to + match bytes. If missing, obj_id will be computed on + the fly. + """ # check_presence to false will erase the potential previous content. return self.add(content, obj_id, check_presence=False) @abc.abstractmethod def get(self, obj_id, *args, **kwargs): - """ Retrieve the content of a given object. + """Retrieve the content of a given object. Args: - obj_id: object id. + obj_id (bytes): object id. Returns: the content of the requested object as bytes. Raises: ObjNotFoundError: if the requested object is missing. + """ - raise NotImplementedError( - "Implementations of ObjStorage must have a 'get' method" - ) + pass def get_batch(self, obj_ids, *args, **kwargs): - """ Retrieve content in bulk. + """Retrieve objects' raw content in bulk from storage. + + Note: This function does have a default implementation in + ObjStorage that is suitable for most cases. - Note: This function does have a default implementation in ObjStorage - that is suitable for most cases. - For object storages that needs to do the minimal number of requests - possible (ex: remote object storages), that method can be overriden - to perform a more efficient operation. + For object storages that needs to do the minimal number of + requests possible (ex: remote object storages), that method + can be overriden to perform a more efficient operation. Args: - obj_ids: list of object ids. + obj_ids ([bytes]: list of object ids. Returns: - list of resulting contents, or None if the content could not - be retrieved. Do not raise any exception as a fail for one content - will not cancel the whole request. + list of resulting contents, or None if the content could + not be retrieved. Do not raise any exception as a fail for + one content will not cancel the whole request. + """ for obj_id in obj_ids: try: yield self.get(obj_id) except ObjNotFoundError: yield None @abc.abstractmethod def check(self, obj_id, *args, **kwargs): - """ Perform an integrity check for a given object. + """Perform an integrity check for a given object. Verify that the file object is in place and that the gziped content matches the object id. Args: - obj_id: object id. + obj_id (bytes): object identifier. Raises: ObjNotFoundError: if the requested object is missing. Error: if the request object is corrupted. + """ - raise NotImplementedError( - "Implementations of ObjStorage must have a 'check' method" - ) + pass def get_random(self, batch_size, *args, **kwargs): - """ Get random ids of existing contents + """Get random ids of existing contents. This method is used in order to get random ids to perform content integrity verifications on random contents. Args: batch_size (int): Number of ids that will be given Yields: - An iterable of ids of contents that are in the current object - storage. + An iterable of ids (bytes) of contents that are in the + current object storage. + """ - raise NotImplementedError( - "The current implementation of ObjStorage does not support " - "'get_random' operation" - ) + pass diff --git a/swh/objstorage/objstorage_pathslicing.py b/swh/objstorage/objstorage_pathslicing.py index ca1ac3b..34697b7 100644 --- a/swh/objstorage/objstorage_pathslicing.py +++ b/swh/objstorage/objstorage_pathslicing.py @@ -1,276 +1,279 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import gzip import tempfile import random from contextlib import contextmanager from swh.core import hashutil from .objstorage import ObjStorage, compute_hash, ID_HASH_ALGO, ID_HASH_LENGTH from .exc import ObjNotFoundError, Error GZIP_BUFSIZ = 1048576 DIR_MODE = 0o755 FILE_MODE = 0o644 @contextmanager def _write_obj_file(hex_obj_id, objstorage): """ Context manager for writing object files to the object storage. During writing, data are written to a temporary file, which is atomically renamed to the right file name after closing. This context manager also takes care of (gzip) compressing the data on the fly. Usage sample: with _write_obj_file(hex_obj_id, objstorage): f.write(obj_data) Yields: a file-like object open for writing bytes. """ # Get the final paths and create the directory if absent. dir = objstorage._obj_dir(hex_obj_id) if not os.path.isdir(dir): os.makedirs(dir, DIR_MODE, exist_ok=True) path = os.path.join(dir, hex_obj_id) # Create a temporary file. (tmp, tmp_path) = tempfile.mkstemp(suffix='.tmp', prefix='hex_obj_id.', dir=dir) # Open the file and yield it for writing. tmp_f = os.fdopen(tmp, 'wb') with gzip.GzipFile(filename=tmp_path, fileobj=tmp_f) as f: yield f # Then close the temporary file and move it to the right directory. tmp_f.close() os.chmod(tmp_path, FILE_MODE) os.rename(tmp_path, path) @contextmanager def _read_obj_file(hex_obj_id, objstorage): """ Context manager for reading object file in the object storage. Usage sample: with _read_obj_file(hex_obj_id, objstorage) as f: b = f.read() Yields: a file-like object open for reading bytes. """ path = objstorage._obj_path(hex_obj_id) with gzip.GzipFile(path, 'rb') as f: yield f class PathSlicingObjStorage(ObjStorage): """ Implementation of the ObjStorage API based on the hash of the content. On disk, an object storage is a directory tree containing files named after their object IDs. An object ID is a checksum of its content, depending on the value of the ID_HASH_ALGO constant (see hashutil for its meaning). To avoid directories that contain too many files, the object storage has a given slicing. Each slicing correspond to a directory that is named according to the hash of its content. So for instance a file with SHA1 34973274ccef6ab4dfaaf86599792fa9c3fe4689 will be stored in the given object storages : - 0:2/2:4/4:6 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689 - 0:1/0:5/ : 3/34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689 The files in the storage are stored in gzipped compressed format. Attributes: root (string): path to the root directory of the storage on the disk. bounds: list of tuples that indicates the beginning and the end of each subdirectory for a content. """ def __init__(self, root, slicing): """ Create an object to access a hash-slicing based object storage. Args: root (string): path to the root directory of the storage on the disk. slicing (string): string that indicates the slicing to perform on the hash of the content to know the path where it should be stored. """ if not os.path.isdir(root): raise ValueError( 'PathSlicingObjStorage root "%s" is not a directory' % root ) self.root = root # Make a list of tuples where each tuple contains the beginning # and the end of each slicing. self.bounds = [ slice(*map(int, sbounds.split(':'))) for sbounds in slicing.split('/') if sbounds ] max_endchar = max(map(lambda bound: bound.stop, self.bounds)) if ID_HASH_LENGTH < max_endchar: raise ValueError( 'Algorithm %s has too short hash for slicing to char %d' % (ID_HASH_ALGO, max_endchar) ) def __contains__(self, obj_id): hex_obj_id = hashutil.hash_to_hex(obj_id) return os.path.exists(self._obj_path(hex_obj_id)) def __iter__(self): - """iterate over the object identifiers currently available in the storage + """Iterate over the object identifiers currently available in the + storage. - Warning: with the current implementation of the object storage, this - method will walk the filesystem to list objects, meaning that listing - all objects will be very slow for large storages. You almost certainly - don't want to use this method in production. + Warning: with the current implementation of the object + storage, this method will walk the filesystem to list objects, + meaning that listing all objects will be very slow for large + storages. You almost certainly don't want to use this method + in production. Return: - iterator over object IDs + Iterator over object IDs + """ def obj_iterator(): # XXX hackish: it does not verify that the depth of found files # matches the slicing depth of the storage for root, _dirs, files in os.walk(self.root): for f in files: yield bytes.fromhex(f) return obj_iterator() def __len__(self): - """compute the number of objects available in the storage + """Compute the number of objects available in the storage. Warning: this currently uses `__iter__`, its warning about bad performances applies Return: number of objects contained in the storage """ return sum(1 for i in self) def _obj_dir(self, hex_obj_id): """ Compute the storage directory of an object. See also: PathSlicingObjStorage::_obj_path Args: hex_obj_id: object id as hexlified string. Returns: Path to the directory that contains the required object. """ slices = [hex_obj_id[bound] for bound in self.bounds] return os.path.join(self.root, *slices) def _obj_path(self, hex_obj_id): """ Compute the full path to an object into the current storage. See also: PathSlicingObjStorage::_obj_dir Args: hex_obj_id: object id as hexlified string. Returns: Path to the actual object corresponding to the given id. """ return os.path.join(self._obj_dir(hex_obj_id), hex_obj_id) def add(self, content, obj_id=None, check_presence=True): if obj_id is None: obj_id = compute_hash(content) if check_presence and obj_id in self: # If the object is already present, return immediately. return obj_id hex_obj_id = hashutil.hash_to_hex(obj_id) with _write_obj_file(hex_obj_id, self) as f: f.write(content) return obj_id def get(self, obj_id): if obj_id not in self: raise ObjNotFoundError(obj_id) # Open the file and return its content as bytes hex_obj_id = hashutil.hash_to_hex(obj_id) with _read_obj_file(hex_obj_id, self) as f: return f.read() def check(self, obj_id): if obj_id not in self: raise ObjNotFoundError(obj_id) hex_obj_id = hashutil.hash_to_hex(obj_id) try: with gzip.open(self._obj_path(hex_obj_id)) as f: length = None if ID_HASH_ALGO.endswith('_git'): # if the hashing algorithm is git-like, we need to know the # content size to hash on the fly. Do a first pass here to # compute the size length = 0 while True: chunk = f.read(GZIP_BUFSIZ) length += len(chunk) if not chunk: break f.rewind() checksums = hashutil._hash_file_obj(f, length, algorithms=[ID_HASH_ALGO]) actual_obj_id = checksums[ID_HASH_ALGO] if obj_id != actual_obj_id: raise Error( 'Corrupt object %s should have id %s' % (hashutil.hash_to_hex(obj_id), hashutil.hash_to_hex(actual_obj_id)) ) except (OSError, IOError): # IOError is for compatibility with older python versions raise Error('Corrupt object %s is not a gzip file' % obj_id) def get_random(self, batch_size): def get_random_content(self, batch_size): """ Get a batch of content inside a single directory. Returns: a tuple (batch size, batch). """ dirs = [] for level in range(len(self.bounds)): path = os.path.join(self.root, *dirs) dir_list = next(os.walk(path))[1] if 'tmp' in dir_list: dir_list.remove('tmp') dirs.append(random.choice(dir_list)) path = os.path.join(self.root, *dirs) content_list = next(os.walk(path))[2] length = min(batch_size, len(content_list)) return length, map(hashutil.hex_to_hash, random.sample(content_list, length)) while batch_size: length, it = get_random_content(self, batch_size) batch_size = batch_size - length yield from it diff --git a/version.txt b/version.txt index 93bb0e4..820f3ac 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.12-0-g0bfc6b7 \ No newline at end of file +v0.0.13-0-g1d4d89a \ No newline at end of file