diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,4 @@ pytest apache-libcloud -azure-storage +azure-storage-blob >= 12.0 python-cephlibs diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,4 @@ # optional dependencies # apache-libcloud -# azure-storage +# azure-storage-blob >= 12.0 diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py --- a/swh/objstorage/backends/azure.py +++ b/swh/objstorage/backends/azure.py @@ -3,53 +3,47 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import logging import string -from itertools import dropwhile, islice, product +from itertools import product -from azure.storage.blob import BlockBlobService -from azure.common import AzureMissingResourceHttpError -import requests +from azure.storage.blob import ContainerClient +from azure.core.exceptions import ResourceNotFoundError from swh.objstorage.objstorage import ( ObjStorage, compute_hash, - DEFAULT_LIMIT, compressors, decompressors, ) from swh.objstorage.exc import ObjNotFoundError, Error from swh.model import hashutil -logging.getLogger("azure.storage").setLevel(logging.CRITICAL) - class AzureCloudObjStorage(ObjStorage): """ObjStorage with azure abilities. """ - def __init__( - self, account_name, api_secret_key, container_name, compression="gzip", **kwargs - ): + def __init__(self, container_url, compression="gzip", **kwargs): super().__init__(**kwargs) - self.block_blob_service = BlockBlobService( - account_name=account_name, - account_key=api_secret_key, - request_session=requests.Session(), - ) - self.container_name = container_name + self.container_client = ContainerClient.from_container_url(container_url) self.compression = compression - def get_blob_service(self, hex_obj_id): - """Get the block_blob_service and container that contains the object with + def get_container_client(self, hex_obj_id): + """Get the container client for the container that contains the object with internal id hex_obj_id """ - return self.block_blob_service, self.container_name + return self.container_client - def get_all_blob_services(self): + def get_blob_client(self, hex_obj_id): + """Get the azure blob client for the given hex obj id""" + container_client = self.get_container_client(hex_obj_id) + + return container_client.get_blob_client(blob=hex_obj_id) + + def get_all_container_clients(self): """Get all active block_blob_services""" - yield self.block_blob_service, self.container_name + yield self.container_client def _internal_id(self, obj_id): """Internal id is the hex version in objstorage. @@ -59,8 +53,8 @@ def check_config(self, *, check_write): """Check the configuration for this object storage""" - for service, container in self.get_all_blob_services(): - props = service.get_container_properties(container) + for container_client in self.get_all_container_clients(): + props = container_client.get_container_properties() # FIXME: check_write is ignored here if not props: @@ -73,15 +67,20 @@ """ hex_obj_id = self._internal_id(obj_id) - service, container = self.get_blob_service(hex_obj_id) - return service.exists(container_name=container, blob_name=hex_obj_id) + client = self.get_blob_client(hex_obj_id) + try: + client.get_blob_properties() + except ResourceNotFoundError: + return False + else: + return True def __iter__(self): """Iterate over the objects present in the storage. """ - for service, container in self.get_all_blob_services(): - for obj in service.list_blobs(container): + for client in self.get_all_container_clients(): + for obj in client.list_blobs(): yield hashutil.hash_to_bytes(obj.name) def __len__(self): @@ -108,12 +107,11 @@ # Send the compressed content compressor = compressors[self.compression]() - blob = [compressor.compress(content), compressor.flush()] + data = compressor.compress(content) + data += compressor.flush() - service, container = self.get_blob_service(hex_obj_id) - service.create_blob_from_bytes( - container_name=container, blob_name=hex_obj_id, blob=b"".join(blob), - ) + client = self.get_blob_client(hex_obj_id) + client.upload_blob(data=data, length=len(data)) return obj_id @@ -121,6 +119,13 @@ """Restore a content. """ + if obj_id is None: + # Checksum is missing, compute it on the fly. + obj_id = compute_hash(content) + + if obj_id in self: + self.delete(obj_id) + return self.add(content, obj_id, check_presence=False) def get(self, obj_id): @@ -128,16 +133,17 @@ """ hex_obj_id = self._internal_id(obj_id) - service, container = self.get_blob_service(hex_obj_id) + client = self.get_blob_client(hex_obj_id) + try: - blob = service.get_blob_to_bytes( - container_name=container, blob_name=hex_obj_id - ) - except AzureMissingResourceHttpError: - raise ObjNotFoundError(obj_id) + download = client.download_blob() + except ResourceNotFoundError: + raise ObjNotFoundError(obj_id) from None + else: + data = download.content_as_bytes() decompressor = decompressors[self.compression]() - ret = decompressor.decompress(blob.content) + ret = decompressor.decompress(data) if decompressor.unused_data: raise Error("Corrupt object %s: trailing data found" % hex_obj_id) return ret @@ -155,44 +161,20 @@ """Delete an object.""" super().delete(obj_id) # Check delete permission hex_obj_id = self._internal_id(obj_id) - service, container = self.get_blob_service(hex_obj_id) + client = self.get_blob_client(hex_obj_id) try: - service.delete_blob(container_name=container, blob_name=hex_obj_id) - except AzureMissingResourceHttpError: - raise ObjNotFoundError("Content {} not found!".format(hex_obj_id)) + client.delete_blob() + except ResourceNotFoundError: + raise ObjNotFoundError(obj_id) from None return True - def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): - all_blob_services = self.get_all_blob_services() - if last_obj_id: - last_obj_id = self._internal_id(last_obj_id) - last_service, _ = self.get_blob_service(last_obj_id) - all_blob_services = dropwhile( - lambda srv: srv[0] != last_service, all_blob_services - ) - else: - last_service = None - - def iterate_blobs(): - for service, container in all_blob_services: - marker = last_obj_id if service == last_service else None - for obj in service.list_blobs( - container, marker=marker, maxresults=limit - ): - yield hashutil.hash_to_bytes(obj.name) - - return islice(iterate_blobs(), limit) - class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): """ObjStorage with azure capabilities, striped by prefix. accounts is a dict containing entries of the form: - : - account_name: - api_secret_key: - container_name: + : """ def __init__(self, accounts, compression="gzip", **kwargs): @@ -224,25 +206,17 @@ ) self.prefixes = {} - request_session = requests.Session() - for prefix, account in accounts.items(): - self.prefixes[prefix] = ( - BlockBlobService( - account_name=account["account_name"], - account_key=account["api_secret_key"], - request_session=request_session, - ), - account["container_name"], - ) + for prefix, container_url in accounts.items(): + self.prefixes[prefix] = ContainerClient.from_container_url(container_url) - def get_blob_service(self, hex_obj_id): + def get_container_client(self, hex_obj_id): """Get the block_blob_service and container that contains the object with internal id hex_obj_id """ return self.prefixes[hex_obj_id[: self.prefix_len]] - def get_all_blob_services(self): - """Get all active block_blob_services""" + def get_all_container_clients(self): + """Get all active container clients""" # iterate on items() to sort blob services; # needed to be able to paginate in the list_content() method yield from (v for _, v in sorted(self.prefixes.items())) diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -64,6 +64,8 @@ self.assertIsNone(result[0]) def test_restore_content(self): + self.storage.allow_delete = True + valid_content, valid_obj_id = self.hash_content(b"restore_content") invalid_content = b"unexpected content" id_adding = self.storage.add(invalid_content, valid_obj_id) diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py --- a/swh/objstorage/tests/test_objstorage_azure.py +++ b/swh/objstorage/tests/test_objstorage_azure.py @@ -3,15 +3,15 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from dataclasses import dataclass + import unittest -from collections import defaultdict from unittest.mock import patch -from typing import Any, Dict - -from azure.common import AzureMissingResourceHttpError from swh.model.hashutil import hash_to_hex +from azure.core.exceptions import ResourceNotFoundError + from swh.objstorage.factory import get_objstorage from swh.objstorage.objstorage import decompressors from swh.objstorage.exc import Error @@ -19,51 +19,73 @@ from .objstorage_testing import ObjStorageTestFixture -class MockBlob: - """ Libcloud object mock that replicates its API """ +@dataclass +class MockListedObject: + name: str + + +class MockDownloadClient: + def __init__(self, blob_data): + self.blob_data = blob_data + + def content_as_bytes(self): + return self.blob_data + - def __init__(self, name, content): - self.name = name - self.content = content +class MockBlobClient: + def __init__(self, container, blob): + self.container = container + self.blob = blob + def get_blob_properties(self): + if self.blob not in self.container.blobs: + raise ResourceNotFoundError("Blob not found") -class MockBlockBlobService: - """Mock internal azure library which AzureCloudObjStorage depends upon. + return {"exists": True} - """ + def upload_blob(self, data, length=None): + if self.blob in self.container.blobs: + raise ValueError("Blob already exists") - _data: Dict[str, Any] = {} + if length is not None and length != len(data): + raise ValueError("Wrong length for blob data!") - def __init__(self, account_name, account_key, **kwargs): - # do not care for the account_name and the api_secret_key here - self._data = defaultdict(dict) + self.container.blobs[self.blob] = data - def get_container_properties(self, container_name): - self._data[container_name] - return container_name in self._data + def download_blob(self): + if self.blob not in self.container.blobs: + raise ResourceNotFoundError("Blob not found") - def create_blob_from_bytes(self, container_name, blob_name, blob): - self._data[container_name][blob_name] = blob + return MockDownloadClient(self.container.blobs[self.blob]) - def get_blob_to_bytes(self, container_name, blob_name): - if blob_name not in self._data[container_name]: - raise AzureMissingResourceHttpError("Blob %s not found" % blob_name, 404) - return MockBlob(name=blob_name, content=self._data[container_name][blob_name]) + def delete_blob(self): + if self.blob not in self.container.blobs: + raise ResourceNotFoundError("Blob not found") - def delete_blob(self, container_name, blob_name): - try: - self._data[container_name].pop(blob_name) - except KeyError: - raise AzureMissingResourceHttpError("Blob %s not found" % blob_name, 404) - return True + del self.container.blobs[self.blob] - def exists(self, container_name, blob_name): - return blob_name in self._data[container_name] - def list_blobs(self, container_name, marker=None, maxresults=None): - for blob_name, content in sorted(self._data[container_name].items()): - if marker is None or blob_name > marker: - yield MockBlob(name=blob_name, content=content) +class MockContainerClient: + def __init__(self, container_url): + self.container_url = [] + self.blobs = {} + + @classmethod + def from_container_url(cls, container_url): + return cls(container_url) + + def get_container_properties(self): + return {"exists": True} + + def get_blob_client(self, blob): + return MockBlobClient(self, blob) + + def list_blobs(self): + for obj in sorted(self.blobs): + yield MockListedObject(obj) + + def delete_blob(self, blob): + self.get_blob_client(blob.name).delete_blob() class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): @@ -72,7 +94,7 @@ def setUp(self): super().setUp() patcher = patch( - "swh.objstorage.backends.azure.BlockBlobService", MockBlockBlobService, + "swh.objstorage.backends.azure.ContainerClient", MockContainerClient, ) patcher.start() self.addCleanup(patcher.stop) @@ -80,9 +102,7 @@ self.storage = get_objstorage( "azure", { - "account_name": "account-name", - "api_secret_key": "api-secret-key", - "container_name": "container-name", + "container_url": "https://bogus-container-url.example", "compression": self.compression, }, ) @@ -91,23 +111,25 @@ content, obj_id = self.hash_content(b"test content is compressed") self.storage.add(content, obj_id=obj_id) - blob_service, container = self.storage.get_blob_service(obj_id) internal_id = self.storage._internal_id(obj_id) - - raw_blob = blob_service.get_blob_to_bytes(container, internal_id) + blob_client = self.storage.get_blob_client(internal_id) + raw_blob = blob_client.download_blob().content_as_bytes() d = decompressors[self.compression]() - assert d.decompress(raw_blob.content) == content + assert d.decompress(raw_blob) == content assert d.unused_data == b"" def test_trailing_data_on_stored_blob(self): content, obj_id = self.hash_content(b"test content without garbage") self.storage.add(content, obj_id=obj_id) - blob_service, container = self.storage.get_blob_service(obj_id) internal_id = self.storage._internal_id(obj_id) + blob_client = self.storage.get_blob_client(internal_id) + raw_blob = blob_client.download_blob().content_as_bytes() + new_data = raw_blob + b"trailing garbage" - blob_service._data[container][internal_id] += b"trailing garbage" + blob_client.delete_blob() + blob_client.upload_blob(data=new_data, length=len(new_data)) if self.compression == "none": with self.assertRaises(Error) as e: @@ -138,18 +160,14 @@ def setUp(self): super().setUp() patcher = patch( - "swh.objstorage.backends.azure.BlockBlobService", MockBlockBlobService, + "swh.objstorage.backends.azure.ContainerClient", MockContainerClient ) patcher.start() self.addCleanup(patcher.stop) self.accounts = {} for prefix in "0123456789abcdef": - self.accounts[prefix] = { - "account_name": "account_%s" % prefix, - "api_secret_key": "secret_key_%s" % prefix, - "container_name": "container_%s" % prefix, - } + self.accounts[prefix] = "https://bogus-container-url.example/" + prefix self.storage = get_objstorage("azure-prefixed", {"accounts": self.accounts}) @@ -173,7 +191,7 @@ hex_obj_id = hash_to_hex(obj_id) prefix = hex_obj_id[0] self.assertTrue( - self.storage.prefixes[prefix][0].exists( - self.accounts[prefix]["container_name"], hex_obj_id - ) + self.storage.prefixes[prefix] + .get_blob_client(hex_obj_id) + .get_blob_properties() )