diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py index d1c37e4..50c95df 100644 --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -1,73 +1,77 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from .objstorage import ObjStorage from .objstorage_pathslicing import PathSlicingObjStorage from .objstorage_in_memory import InMemoryObjStorage from .api.client import RemoteObjStorage from .multiplexer import MultiplexerObjStorage from .multiplexer.filter import add_filters __all__ = ['get_objstorage', 'ObjStorage'] _STORAGE_CLASSES = { 'pathslicing': PathSlicingObjStorage, 'remote': RemoteObjStorage, 'in-memory': InMemoryObjStorage, } try: - from swh.objstorage.cloud.objstorage_azure import AzureCloudObjStorage + from swh.objstorage.cloud.objstorage_azure import ( + AzureCloudObjStorage, + PrefixedAzureCloudObjStorage, + ) _STORAGE_CLASSES['azure-storage'] = AzureCloudObjStorage + _STORAGE_CLASSES['azure-storage-prefixed'] = PrefixedAzureCloudObjStorage except ImportError: pass try: from swh.objstorage.objstorage_rados import RADOSObjStorage _STORAGE_CLASSES['rados'] = RADOSObjStorage except ImportError: pass def get_objstorage(cls, args): """ Create an ObjStorage using the given implementation class. Args: cls (str): objstorage class unique key contained in the _STORAGE_CLASSES dict. args (dict): arguments for the required class of objstorage that must match exactly the one in the `__init__` method of the class. Returns: subclass of ObjStorage that match the given `storage_class` argument. Raises: ValueError: if the given storage class is not a valid objstorage key. """ try: return _STORAGE_CLASSES[cls](**args) except KeyError: raise ValueError('Storage class %s does not exist' % cls) def _construct_filtered_objstorage(storage_conf, filters_conf): return add_filters( get_objstorage(**storage_conf), filters_conf ) _STORAGE_CLASSES['filtered'] = _construct_filtered_objstorage def _construct_multiplexer_objstorage(objstorages): storages = [get_objstorage(**conf) for conf in objstorages] return MultiplexerObjStorage(storages) _STORAGE_CLASSES['multiplexer'] = _construct_multiplexer_objstorage diff --git a/swh/objstorage/cloud/objstorage_azure.py b/swh/objstorage/cloud/objstorage_azure.py index f0012db..1f4bdaa 100644 --- a/swh/objstorage/cloud/objstorage_azure.py +++ b/swh/objstorage/cloud/objstorage_azure.py @@ -1,148 +1,207 @@ -# Copyright (C) 2016-2017 The Software Heritage developers +# Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip +import itertools +import string from azure.storage.blob import BlockBlobService from azure.common import AzureMissingResourceHttpError import requests from swh.objstorage.objstorage import ObjStorage, compute_hash from swh.objstorage.exc import ObjNotFoundError, Error from swh.model import hashutil class AzureCloudObjStorage(ObjStorage): """ObjStorage with azure abilities. """ def __init__(self, account_name, api_secret_key, container_name, **kwargs): super().__init__(**kwargs) self.block_blob_service = BlockBlobService( account_name=account_name, account_key=api_secret_key, request_session=requests.Session(), ) self.container_name = container_name def get_blob_service(self, hex_obj_id): """Get the block_blob_service and container that contains the object with internal id hex_obj_id """ return self.block_blob_service, self.container_name def get_all_blob_services(self): """Get all active block_blob_services""" yield self.block_blob_service, self.container_name def _internal_id(self, obj_id): """Internal id is the hex version in objstorage. """ return hashutil.hash_to_hex(obj_id) def check_config(self, *, check_write): """Check the configuration for this object storage""" for service, container in self.get_all_blob_services(): props = service.get_container_properties(container) # FIXME: check_write is ignored here if not props: return False return True def __contains__(self, obj_id): """Does the storage contains the obj_id. """ hex_obj_id = self._internal_id(obj_id) service, container = self.get_blob_service(hex_obj_id) return service.exists( container_name=container, blob_name=hex_obj_id) def __iter__(self): """Iterate over the objects present in the storage. """ for service, container in self.get_all_blob_services(): for obj in service.list_blobs(container): yield hashutil.hash_to_bytes(obj.name) def __len__(self): """Compute the number of objects in the current object storage. Returns: number of objects contained in the storage. """ return sum(1 for i in self) def add(self, content, obj_id=None, check_presence=True): """Add an obj in storage if it's not there already. """ if obj_id is None: # Checksum is missing, compute it on the fly. obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id hex_obj_id = self._internal_id(obj_id) # Send the gzipped content service, container = self.get_blob_service(hex_obj_id) service.create_blob_from_bytes( container_name=container, blob_name=hex_obj_id, blob=gzip.compress(content)) return obj_id def restore(self, content, obj_id=None): """Restore a content. """ return self.add(content, obj_id, check_presence=False) def get(self, obj_id): """Retrieve blob's content if found. """ hex_obj_id = self._internal_id(obj_id) service, container = self.get_blob_service(hex_obj_id) try: blob = service.get_blob_to_bytes( container_name=container, blob_name=hex_obj_id) except AzureMissingResourceHttpError: raise ObjNotFoundError(obj_id) return gzip.decompress(blob.content) def check(self, obj_id): """Check the content integrity. """ obj_content = self.get(obj_id) content_obj_id = compute_hash(obj_content) if content_obj_id != obj_id: raise Error(obj_id) def delete(self, obj_id): """Delete an object.""" super().delete(obj_id) # Check delete permission hex_obj_id = self._internal_id(obj_id) service, container = self.get_blob_service(hex_obj_id) try: service.delete_blob( container_name=container, blob_name=hex_obj_id) except AzureMissingResourceHttpError: raise ObjNotFoundError('Content {} not found!'.format(hex_obj_id)) return True + + +class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): + """ObjStorage with azure capabilities, striped by prefix. + + accounts is a dict containing entries of the form: + : + account_name: + api_secret_key: + container_name: + """ + def __init__(self, accounts, **kwargs): + # shortcut AzureCloudObjStorage __init__ + ObjStorage.__init__(self, **kwargs) + + # Definition sanity check + prefix_lengths = set(len(prefix) for prefix in accounts) + if not len(prefix_lengths) == 1: + raise ValueError("Inconsistent prefixes, found lengths %s" + % ', '.join( + str(l) for l in sorted(prefix_lengths) + )) + + self.prefix_len = prefix_lengths.pop() + + expected_prefixes = set( + ''.join(letters) + for letters in itertools.product( + set(string.hexdigits.lower()), repeat=self.prefix_len + ) + ) + missing_prefixes = expected_prefixes - set(accounts) + if missing_prefixes: + raise ValueError("Missing prefixes %s" + % ', '.join(sorted(missing_prefixes))) + + self.prefixes = {} + request_session = requests.Session() + for prefix, account in accounts.items(): + self.prefixes[prefix] = ( + BlockBlobService( + account_name=account['account_name'], + account_key=account['api_secret_key'], + request_session=request_session, + ), + account['container_name'], + ) + + def get_blob_service(self, hex_obj_id): + """Get the block_blob_service and container that contains the object with + internal id hex_obj_id + """ + return self.prefixes[hex_obj_id[:self.prefix_len]] + + def get_all_blob_services(self): + """Get all active block_blob_services""" + yield from self.prefixes.values() diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py index 16fc9e1..5a20671 100644 --- a/swh/objstorage/tests/test_objstorage_azure.py +++ b/swh/objstorage/tests/test_objstorage_azure.py @@ -1,80 +1,138 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import unittest from unittest.mock import patch +from nose.tools import istest + from azure.common import AzureMissingResourceHttpError +from swh.model.hashutil import hash_to_hex from swh.objstorage import get_objstorage from objstorage_testing import ObjStorageTestFixture class MockBlob(): """ Libcloud object mock that replicates its API """ def __init__(self, name, content): self.name = name self.content = content class MockBlockBlobService(): """Mock internal azure library which AzureCloudObjStorage depends upon. """ data = {} def __init__(self, account_name, account_key, **kwargs): # do not care for the account_name and the api_secret_key here self.data = defaultdict(dict) def get_container_properties(self, container_name): self.data[container_name] return container_name in self.data def create_blob_from_bytes(self, container_name, blob_name, blob): self.data[container_name][blob_name] = blob def get_blob_to_bytes(self, container_name, blob_name): if blob_name not in self.data[container_name]: raise AzureMissingResourceHttpError( 'Blob %s not found' % blob_name, 404) return MockBlob(name=blob_name, content=self.data[container_name][blob_name]) def delete_blob(self, container_name, blob_name): try: self.data[container_name].pop(blob_name) except KeyError: raise AzureMissingResourceHttpError( 'Blob %s not found' % blob_name, 404) return True def exists(self, container_name, blob_name): return blob_name in self.data[container_name] def list_blobs(self, container_name): for blob_name, content in self.data[container_name].items(): yield MockBlob(name=blob_name, content=content) class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() patcher = patch( 'swh.objstorage.cloud.objstorage_azure.BlockBlobService', MockBlockBlobService, ) patcher.start() self.addCleanup(patcher.stop) self.storage = get_objstorage('azure-storage', { 'account_name': 'account-name', 'api_secret_key': 'api-secret-key', 'container_name': 'container-name', }) + + +class TestPrefixedAzureCloudObjStorage(ObjStorageTestFixture, + unittest.TestCase): + def setUp(self): + super().setUp() + patcher = patch( + 'swh.objstorage.cloud.objstorage_azure.BlockBlobService', + MockBlockBlobService, + ) + patcher.start() + self.addCleanup(patcher.stop) + + self.accounts = {} + for prefix in '0123456789abcdef': + self.accounts[prefix] = { + 'account_name': 'account_%s' % prefix, + 'api_secret_key': 'secret_key_%s' % prefix, + 'container_name': 'container_%s' % prefix, + } + + self.storage = get_objstorage('azure-storage-prefixed', { + 'accounts': self.accounts + }) + + @istest + def prefixedazure_instantiation_missing_prefixes(self): + del self.accounts['d'] + del self.accounts['e'] + + with self.assertRaisesRegex(ValueError, 'Missing prefixes'): + get_objstorage('azure-storage-prefixed', { + 'accounts': self.accounts + }) + + @istest + def prefixedazure_instantiation_inconsistent_prefixes(self): + self.accounts['00'] = self.accounts['0'] + + with self.assertRaisesRegex(ValueError, 'Inconsistent prefixes'): + get_objstorage('azure-storage-prefixed', { + 'accounts': self.accounts + }) + + @istest + def prefixedazure_sharding_behavior(self): + for i in range(100): + content, obj_id = self.hash_content(b'test_content_%02d' % i) + self.storage.add(content, obj_id=obj_id) + hex_obj_id = hash_to_hex(obj_id) + prefix = hex_obj_id[0] + self.assertTrue( + self.storage.prefixes[prefix][0].exists( + self.accounts[prefix]['container_name'], hex_obj_id + ))