diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -21,8 +21,12 @@ } try: - from swh.objstorage.cloud.objstorage_azure import AzureCloudObjStorage - _STORAGE_CLASSES['azure-storage'] = AzureCloudObjStorage + from swh.objstorage.cloud.objstorage_azure import ( + AzureCloudObjStorage, + PrefixedAzureCloudObjStorage, + ) + _STORAGE_CLASSES['azure'] = AzureCloudObjStorage + _STORAGE_CLASSES['azure-prefixed'] = PrefixedAzureCloudObjStorage except ImportError: pass diff --git a/swh/objstorage/cloud/objstorage_azure.py b/swh/objstorage/cloud/objstorage_azure.py --- a/swh/objstorage/cloud/objstorage_azure.py +++ b/swh/objstorage/cloud/objstorage_azure.py @@ -1,17 +1,20 @@ -# Copyright (C) 2016-2017 The Software Heritage developers +# Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip +import itertools +import string + +from azure.storage.blob import BlockBlobService +from azure.common import AzureMissingResourceHttpError +import requests from swh.objstorage.objstorage import ObjStorage, compute_hash from swh.objstorage.exc import ObjNotFoundError, Error from swh.model import hashutil -from azure.storage.blob import BlockBlobService -from azure.common import AzureMissingResourceHttpError - class AzureCloudObjStorage(ObjStorage): """ObjStorage with azure abilities. @@ -21,9 +24,21 @@ super().__init__(**kwargs) self.block_blob_service = BlockBlobService( account_name=account_name, - account_key=api_secret_key) + account_key=api_secret_key, + request_session=requests.Session(), + ) self.container_name = container_name + def get_blob_service(self, hex_obj_id): + """Get the block_blob_service and container that contains the object with + internal id hex_obj_id + """ + return self.block_blob_service, self.container_name + + def get_all_blob_services(self): + """Get all active block_blob_services""" + yield self.block_blob_service, self.container_name + def _internal_id(self, obj_id): """Internal id is the hex version in objstorage. @@ -32,28 +47,32 @@ def check_config(self, *, check_write): """Check the configuration for this object storage""" - props = self.block_blob_service.get_container_properties( - self.container_name - ) + for service, container in self.get_all_blob_services(): + props = service.get_container_properties(container) + + # FIXME: check_write is ignored here + if not props: + return False - # FIXME: check_write is ignored here - return bool(props) + return True def __contains__(self, obj_id): """Does the storage contains the obj_id. """ hex_obj_id = self._internal_id(obj_id) - return self.block_blob_service.exists( - container_name=self.container_name, + service, container = self.get_blob_service(hex_obj_id) + return service.exists( + container_name=container, blob_name=hex_obj_id) def __iter__(self): """Iterate over the objects present in the storage. """ - for obj in self.block_blob_service.list_blobs(self.container_name): - yield hashutil.hash_to_bytes(obj.name) + for service, container in self.get_all_blob_services(): + for obj in service.list_blobs(container): + yield hashutil.hash_to_bytes(obj.name) def __len__(self): """Compute the number of objects in the current object storage. @@ -78,8 +97,9 @@ hex_obj_id = self._internal_id(obj_id) # Send the gzipped content - self.block_blob_service.create_blob_from_bytes( - container_name=self.container_name, + service, container = self.get_blob_service(hex_obj_id) + service.create_blob_from_bytes( + container_name=container, blob_name=hex_obj_id, blob=gzip.compress(content)) @@ -96,9 +116,10 @@ """ hex_obj_id = self._internal_id(obj_id) + service, container = self.get_blob_service(hex_obj_id) try: - blob = self.block_blob_service.get_blob_to_bytes( - container_name=self.container_name, + blob = service.get_blob_to_bytes( + container_name=container, blob_name=hex_obj_id) except AzureMissingResourceHttpError: raise ObjNotFoundError(obj_id) @@ -118,11 +139,69 @@ """Delete an object.""" super().delete(obj_id) # Check delete permission hex_obj_id = self._internal_id(obj_id) + service, container = self.get_blob_service(hex_obj_id) try: - self.block_blob_service.delete_blob( - container_name=self.container_name, + service.delete_blob( + container_name=container, blob_name=hex_obj_id) except AzureMissingResourceHttpError: raise ObjNotFoundError('Content {} not found!'.format(hex_obj_id)) return True + + +class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): + """ObjStorage with azure capabilities, striped by prefix. + + accounts is a dict containing entries of the form: + : + account_name: + api_secret_key: + container_name: + """ + def __init__(self, accounts, **kwargs): + # shortcut AzureCloudObjStorage __init__ + ObjStorage.__init__(self, **kwargs) + + # Definition sanity check + prefix_lengths = set(len(prefix) for prefix in accounts) + if not len(prefix_lengths) == 1: + raise ValueError("Inconsistent prefixes, found lengths %s" + % ', '.join( + str(l) for l in sorted(prefix_lengths) + )) + + self.prefix_len = prefix_lengths.pop() + + expected_prefixes = set( + ''.join(letters) + for letters in itertools.product( + set(string.hexdigits.lower()), repeat=self.prefix_len + ) + ) + missing_prefixes = expected_prefixes - set(accounts) + if missing_prefixes: + raise ValueError("Missing prefixes %s" + % ', '.join(sorted(missing_prefixes))) + + self.prefixes = {} + request_session = requests.Session() + for prefix, account in accounts.items(): + self.prefixes[prefix] = ( + BlockBlobService( + account_name=account['account_name'], + account_key=account['api_secret_key'], + request_session=request_session, + ), + account['container_name'], + ) + + def get_blob_service(self, hex_obj_id): + """Get the block_blob_service and container that contains the object with + internal id hex_obj_id + """ + return self.prefixes[hex_obj_id[:self.prefix_len]] + + def get_all_blob_services(self): + """Get all active block_blob_services""" + yield from self.prefixes.values() diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py --- a/swh/objstorage/tests/test_objstorage_azure.py +++ b/swh/objstorage/tests/test_objstorage_azure.py @@ -1,13 +1,18 @@ -# Copyright (C) 2016 The Software Heritage developers +# Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import defaultdict import unittest +from unittest.mock import patch + +from nose.tools import istest from azure.common import AzureMissingResourceHttpError -from swh.objstorage.cloud.objstorage_azure import AzureCloudObjStorage +from swh.model.hashutil import hash_to_hex +from swh.objstorage import get_objstorage from objstorage_testing import ObjStorageTestFixture @@ -25,11 +30,12 @@ """ data = {} - def __init__(self, account_name, api_secret_key, container_name): + def __init__(self, account_name, account_key, **kwargs): # do not care for the account_name and the api_secret_key here - self.data[container_name] = {} + self.data = defaultdict(dict) def get_container_properties(self, container_name): + self.data[container_name] return container_name in self.data def create_blob_from_bytes(self, container_name, blob_name, blob): @@ -59,18 +65,74 @@ yield MockBlob(name=blob_name, content=content) -class MockAzureCloudObjStorage(AzureCloudObjStorage): - """ Cloud object storage that uses a mocked driver """ - def __init__(self, api_key, api_secret_key, container_name): - self.container_name = container_name - self.block_blob_service = MockBlockBlobService(api_key, api_secret_key, - container_name) - self.allow_delete = False - - class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() - self.storage = MockAzureCloudObjStorage( - 'account-name', 'api-secret-key', 'container-name') + patcher = patch( + 'swh.objstorage.cloud.objstorage_azure.BlockBlobService', + MockBlockBlobService, + ) + patcher.start() + self.addCleanup(patcher.stop) + + self.storage = get_objstorage('azure', { + 'account_name': 'account-name', + 'api_secret_key': 'api-secret-key', + 'container_name': 'container-name', + }) + + +class TestPrefixedAzureCloudObjStorage(ObjStorageTestFixture, + unittest.TestCase): + def setUp(self): + super().setUp() + patcher = patch( + 'swh.objstorage.cloud.objstorage_azure.BlockBlobService', + MockBlockBlobService, + ) + patcher.start() + self.addCleanup(patcher.stop) + + self.accounts = {} + for prefix in '0123456789abcdef': + self.accounts[prefix] = { + 'account_name': 'account_%s' % prefix, + 'api_secret_key': 'secret_key_%s' % prefix, + 'container_name': 'container_%s' % prefix, + } + + self.storage = get_objstorage('azure-prefixed', { + 'accounts': self.accounts + }) + + @istest + def prefixedazure_instantiation_missing_prefixes(self): + del self.accounts['d'] + del self.accounts['e'] + + with self.assertRaisesRegex(ValueError, 'Missing prefixes'): + get_objstorage('azure-prefixed', { + 'accounts': self.accounts + }) + + @istest + def prefixedazure_instantiation_inconsistent_prefixes(self): + self.accounts['00'] = self.accounts['0'] + + with self.assertRaisesRegex(ValueError, 'Inconsistent prefixes'): + get_objstorage('azure-prefixed', { + 'accounts': self.accounts + }) + + @istest + def prefixedazure_sharding_behavior(self): + for i in range(100): + content, obj_id = self.hash_content(b'test_content_%02d' % i) + self.storage.add(content, obj_id=obj_id) + hex_obj_id = hash_to_hex(obj_id) + prefix = hex_obj_id[0] + self.assertTrue( + self.storage.prefixes[prefix][0].exists( + self.accounts[prefix]['container_name'], hex_obj_id + ))