diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py index 3e138f2..3d06f01 100644 --- a/swh/objstorage/backends/azure.py +++ b/swh/objstorage/backends/azure.py @@ -1,227 +1,239 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import gzip import string from itertools import dropwhile, islice, product from azure.storage.blob import BlockBlobService from azure.common import AzureMissingResourceHttpError import requests -from swh.objstorage.objstorage import ObjStorage, compute_hash, DEFAULT_LIMIT +from swh.objstorage.objstorage import (ObjStorage, compute_hash, DEFAULT_LIMIT, + compressors, decompressors) from swh.objstorage.exc import ObjNotFoundError, Error from swh.model import hashutil class AzureCloudObjStorage(ObjStorage): """ObjStorage with azure abilities. """ - def __init__(self, account_name, api_secret_key, container_name, **kwargs): + def __init__(self, account_name, api_secret_key, container_name, + compression='gzip', **kwargs): super().__init__(**kwargs) self.block_blob_service = BlockBlobService( account_name=account_name, account_key=api_secret_key, request_session=requests.Session(), ) self.container_name = container_name + self.compression = compression def get_blob_service(self, hex_obj_id): """Get the block_blob_service and container that contains the object with internal id hex_obj_id """ return self.block_blob_service, self.container_name def get_all_blob_services(self): """Get all active block_blob_services""" yield self.block_blob_service, self.container_name def _internal_id(self, obj_id): """Internal id is the hex version in objstorage. """ return hashutil.hash_to_hex(obj_id) def check_config(self, *, check_write): """Check the configuration for this object storage""" for service, container in self.get_all_blob_services(): props = service.get_container_properties(container) # FIXME: check_write is ignored here if not props: return False return True def __contains__(self, obj_id): """Does the storage contains the obj_id. """ hex_obj_id = self._internal_id(obj_id) service, container = self.get_blob_service(hex_obj_id) return service.exists( container_name=container, blob_name=hex_obj_id) def __iter__(self): """Iterate over the objects present in the storage. """ for service, container in self.get_all_blob_services(): for obj in service.list_blobs(container): yield hashutil.hash_to_bytes(obj.name) def __len__(self): """Compute the number of objects in the current object storage. Returns: number of objects contained in the storage. """ return sum(1 for i in self) def add(self, content, obj_id=None, check_presence=True): """Add an obj in storage if it's not there already. """ if obj_id is None: # Checksum is missing, compute it on the fly. obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id hex_obj_id = self._internal_id(obj_id) - # Send the gzipped content + # Send the compressed content + compressor = compressors[self.compression]() + blob = [compressor.compress(content), compressor.flush()] + service, container = self.get_blob_service(hex_obj_id) service.create_blob_from_bytes( container_name=container, blob_name=hex_obj_id, - blob=gzip.compress(content)) + blob=b''.join(blob), + ) return obj_id def restore(self, content, obj_id=None): """Restore a content. """ return self.add(content, obj_id, check_presence=False) def get(self, obj_id): """Retrieve blob's content if found. """ hex_obj_id = self._internal_id(obj_id) service, container = self.get_blob_service(hex_obj_id) try: blob = service.get_blob_to_bytes( container_name=container, blob_name=hex_obj_id) except AzureMissingResourceHttpError: raise ObjNotFoundError(obj_id) - return gzip.decompress(blob.content) + decompressor = decompressors[self.compression]() + ret = decompressor.decompress(blob.content) + if decompressor.unused_data: + raise Error('Corrupt object %s: trailing data found' % hex_obj_id) + return ret def check(self, obj_id): """Check the content integrity. """ obj_content = self.get(obj_id) content_obj_id = compute_hash(obj_content) if content_obj_id != obj_id: raise Error(obj_id) def delete(self, obj_id): """Delete an object.""" super().delete(obj_id) # Check delete permission hex_obj_id = self._internal_id(obj_id) service, container = self.get_blob_service(hex_obj_id) try: service.delete_blob( container_name=container, blob_name=hex_obj_id) except AzureMissingResourceHttpError: raise ObjNotFoundError('Content {} not found!'.format(hex_obj_id)) return True def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): all_blob_services = self.get_all_blob_services() if last_obj_id: last_obj_id = self._internal_id(last_obj_id) last_service, _ = self.get_blob_service(last_obj_id) all_blob_services = dropwhile( lambda srv: srv[0] != last_service, all_blob_services) else: last_service = None def iterate_blobs(): for service, container in all_blob_services: marker = last_obj_id if service == last_service else None for obj in service.list_blobs( container, marker=marker, maxresults=limit): yield hashutil.hash_to_bytes(obj.name) return islice(iterate_blobs(), limit) class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): """ObjStorage with azure capabilities, striped by prefix. accounts is a dict containing entries of the form: : account_name: api_secret_key: container_name: """ - def __init__(self, accounts, **kwargs): + def __init__(self, accounts, compression='gzip', **kwargs): # shortcut AzureCloudObjStorage __init__ ObjStorage.__init__(self, **kwargs) + self.compression = compression + # Definition sanity check prefix_lengths = set(len(prefix) for prefix in accounts) if not len(prefix_lengths) == 1: raise ValueError("Inconsistent prefixes, found lengths %s" % ', '.join( str(l) for l in sorted(prefix_lengths) )) self.prefix_len = prefix_lengths.pop() expected_prefixes = set( ''.join(letters) for letters in product( set(string.hexdigits.lower()), repeat=self.prefix_len ) ) missing_prefixes = expected_prefixes - set(accounts) if missing_prefixes: raise ValueError("Missing prefixes %s" % ', '.join(sorted(missing_prefixes))) self.prefixes = {} request_session = requests.Session() for prefix, account in accounts.items(): self.prefixes[prefix] = ( BlockBlobService( account_name=account['account_name'], account_key=account['api_secret_key'], request_session=request_session, ), account['container_name'], ) def get_blob_service(self, hex_obj_id): """Get the block_blob_service and container that contains the object with internal id hex_obj_id """ return self.prefixes[hex_obj_id[:self.prefix_len]] def get_all_blob_services(self): """Get all active block_blob_services""" # iterate on items() to sort blob services; # needed to be able to paginate in the list_content() method yield from (v for _, v in sorted(self.prefixes.items())) diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py index a2dd7bb..69bc383 100644 --- a/swh/objstorage/tests/test_objstorage_azure.py +++ b/swh/objstorage/tests/test_objstorage_azure.py @@ -1,135 +1,168 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from collections import defaultdict from unittest.mock import patch -from typing import Any, Dict +from typing import Any, Dict, Optional from azure.common import AzureMissingResourceHttpError from swh.model.hashutil import hash_to_hex + from swh.objstorage import get_objstorage +from swh.objstorage.objstorage import decompressors from .objstorage_testing import ObjStorageTestFixture class MockBlob(): """ Libcloud object mock that replicates its API """ def __init__(self, name, content): self.name = name self.content = content class MockBlockBlobService(): """Mock internal azure library which AzureCloudObjStorage depends upon. """ _data = {} # type: Dict[str, Any] def __init__(self, account_name, account_key, **kwargs): # do not care for the account_name and the api_secret_key here self._data = defaultdict(dict) def get_container_properties(self, container_name): self._data[container_name] return container_name in self._data def create_blob_from_bytes(self, container_name, blob_name, blob): self._data[container_name][blob_name] = blob def get_blob_to_bytes(self, container_name, blob_name): if blob_name not in self._data[container_name]: raise AzureMissingResourceHttpError( 'Blob %s not found' % blob_name, 404) return MockBlob(name=blob_name, content=self._data[container_name][blob_name]) def delete_blob(self, container_name, blob_name): try: self._data[container_name].pop(blob_name) except KeyError: raise AzureMissingResourceHttpError( 'Blob %s not found' % blob_name, 404) return True def exists(self, container_name, blob_name): return blob_name in self._data[container_name] def list_blobs(self, container_name, marker=None, maxresults=None): for blob_name, content in sorted(self._data[container_name].items()): if marker is None or blob_name > marker: yield MockBlob(name=blob_name, content=content) class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): + compression = None # type: Optional[str] def setUp(self): super().setUp() patcher = patch( 'swh.objstorage.backends.azure.BlockBlobService', MockBlockBlobService, ) patcher.start() self.addCleanup(patcher.stop) self.storage = get_objstorage('azure', { 'account_name': 'account-name', 'api_secret_key': 'api-secret-key', 'container_name': 'container-name', + 'compression': self.compression, }) + def test_compression(self): + content, obj_id = self.hash_content(b'test content is compressed') + self.storage.add(content, obj_id=obj_id) + + blob_service, container = self.storage.get_blob_service(obj_id) + internal_id = self.storage._internal_id(obj_id) + + raw_blob = blob_service.get_blob_to_bytes(container, internal_id) + + d = decompressors[self.compression]() + assert d.decompress(raw_blob.content) == content + assert d.unused_data == b'' + + +class TestAzureCloudObjStorageGzip(TestAzureCloudObjStorage): + compression = 'gzip' + + +class TestAzureCloudObjStorageZlib(TestAzureCloudObjStorage): + compression = 'zlib' + + +class TestAzureCloudObjStorageLzma(TestAzureCloudObjStorage): + compression = 'lzma' + + +class TestAzureCloudObjStorageBz2(TestAzureCloudObjStorage): + compression = 'bz2' + class TestPrefixedAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() patcher = patch( 'swh.objstorage.backends.azure.BlockBlobService', MockBlockBlobService, ) patcher.start() self.addCleanup(patcher.stop) self.accounts = {} for prefix in '0123456789abcdef': self.accounts[prefix] = { 'account_name': 'account_%s' % prefix, 'api_secret_key': 'secret_key_%s' % prefix, 'container_name': 'container_%s' % prefix, } self.storage = get_objstorage('azure-prefixed', { 'accounts': self.accounts }) def test_prefixedazure_instantiation_missing_prefixes(self): del self.accounts['d'] del self.accounts['e'] with self.assertRaisesRegex(ValueError, 'Missing prefixes'): get_objstorage('azure-prefixed', { 'accounts': self.accounts }) def test_prefixedazure_instantiation_inconsistent_prefixes(self): self.accounts['00'] = self.accounts['0'] with self.assertRaisesRegex(ValueError, 'Inconsistent prefixes'): get_objstorage('azure-prefixed', { 'accounts': self.accounts }) def test_prefixedazure_sharding_behavior(self): for i in range(100): content, obj_id = self.hash_content(b'test_content_%02d' % i) self.storage.add(content, obj_id=obj_id) hex_obj_id = hash_to_hex(obj_id) prefix = hex_obj_id[0] self.assertTrue( self.storage.prefixes[prefix][0].exists( self.accounts[prefix]['container_name'], hex_obj_id ))