diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,4 @@ pytest apache-libcloud -azure-storage +azure-storage-blob >= 12.0 python-cephlibs diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,4 @@ # optional dependencies # apache-libcloud -# azure-storage +# azure-storage-blob >= 12.0 diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py --- a/swh/objstorage/backends/azure.py +++ b/swh/objstorage/backends/azure.py @@ -3,53 +3,146 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import logging +import datetime +from itertools import product import string -from itertools import dropwhile, islice, product +from typing import Dict, Optional, Union +import warnings -from azure.storage.blob import BlockBlobService -from azure.common import AzureMissingResourceHttpError -import requests +from azure.storage.blob import ( + ContainerClient, + ContainerSasPermissions, + generate_container_sas, +) +from azure.core.exceptions import ResourceNotFoundError from swh.objstorage.objstorage import ( ObjStorage, compute_hash, - DEFAULT_LIMIT, compressors, decompressors, ) from swh.objstorage.exc import ObjNotFoundError, Error from swh.model import hashutil -logging.getLogger("azure.storage").setLevel(logging.CRITICAL) +def get_container_url( + account_name: str, + account_key: str, + container_name: str, + access_policy: str = "read_only", + expiry: datetime.timedelta = datetime.timedelta(days=365), + **kwargs, +) -> str: + """Get the full url, for the given container on the given account, with a + Shared Access Signature granting the specified access policy. + + Args: + account_name: name of the storage account for which to generate the URL + account_key: shared account key of the storage account used to generate the SAS + container_name: name of the container for which to grant access in the storage + account + access_policy: one of ``read_only``, ``append_only``, ``full`` + expiry: the interval in the future with which the signature will expire + + Returns: + the full URL of the container, with the shared access signature. + """ -class AzureCloudObjStorage(ObjStorage): - """ObjStorage with azure abilities. + access_policies = { + "read_only": ContainerSasPermissions( + read=True, list=True, delete=False, write=False + ), + "append_only": ContainerSasPermissions( + read=True, list=True, delete=False, write=True + ), + "full": ContainerSasPermissions(read=True, list=True, delete=True, write=True), + } + + current_time = datetime.datetime.utcnow() + + signature = generate_container_sas( + account_name, + container_name, + account_key=account_key, + permission=access_policies[access_policy], + start=current_time + datetime.timedelta(minutes=-1), + expiry=current_time + expiry, + ) + return f"https://{account_name}.blob.core.windows.net/{container_name}?{signature}" + + +class AzureCloudObjStorage(ObjStorage): + """ObjStorage backend for Azure blob storage accounts. + + Args: + container_url: the URL of the container in which the objects are stored. + account_name: (deprecated) the name of the storage account under which objects are + stored + api_secret_key: (deprecated) the shared account key + container_name: (deprecated) the name of the container under which objects are + stored + compression: the compression algorithm used to compress objects in storage + + Notes: + The container url should contain the credentials via a "Shared Access + Signature". The :func:`get_container_url` helper can be used to generate + such a URL from the account's access keys. The ``account_name``, + ``api_secret_key`` and ``container_name`` arguments are deprecated. """ def __init__( - self, account_name, api_secret_key, container_name, compression="gzip", **kwargs + self, + container_url: Optional[str] = None, + account_name: Optional[str] = None, + api_secret_key: Optional[str] = None, + container_name: Optional[str] = None, + compression="gzip", + **kwargs, ): + if container_url is None: + if account_name is None or api_secret_key is None or container_name is None: + raise ValueError( + "AzureCloudObjStorage must have a container_url or all three " + "account_name, api_secret_key and container_name" + ) + else: + warnings.warn( + "The Azure objstorage account secret key parameters are " + "deprecated, please use container URLs instead.", + DeprecationWarning, + ) + container_url = get_container_url( + account_name=account_name, + account_key=api_secret_key, + container_name=container_name, + access_policy="full", + ) + super().__init__(**kwargs) - self.block_blob_service = BlockBlobService( - account_name=account_name, - account_key=api_secret_key, - request_session=requests.Session(), - ) - self.container_name = container_name + self.container_client = ContainerClient.from_container_url(container_url) self.compression = compression - def get_blob_service(self, hex_obj_id): - """Get the block_blob_service and container that contains the object with + def get_container_client(self, hex_obj_id): + """Get the container client for the container that contains the object with internal id hex_obj_id + + This is used to allow the PrefixedAzureCloudObjStorage to dispatch the + client according to the prefix of the object id. + """ - return self.block_blob_service, self.container_name + return self.container_client + + def get_blob_client(self, hex_obj_id): + """Get the azure blob client for the given hex obj id""" + container_client = self.get_container_client(hex_obj_id) - def get_all_blob_services(self): + return container_client.get_blob_client(blob=hex_obj_id) + + def get_all_container_clients(self): """Get all active block_blob_services""" - yield self.block_blob_service, self.container_name + yield self.container_client def _internal_id(self, obj_id): """Internal id is the hex version in objstorage. @@ -59,8 +152,8 @@ def check_config(self, *, check_write): """Check the configuration for this object storage""" - for service, container in self.get_all_blob_services(): - props = service.get_container_properties(container) + for container_client in self.get_all_container_clients(): + props = container_client.get_container_properties() # FIXME: check_write is ignored here if not props: @@ -73,15 +166,20 @@ """ hex_obj_id = self._internal_id(obj_id) - service, container = self.get_blob_service(hex_obj_id) - return service.exists(container_name=container, blob_name=hex_obj_id) + client = self.get_blob_client(hex_obj_id) + try: + client.get_blob_properties() + except ResourceNotFoundError: + return False + else: + return True def __iter__(self): """Iterate over the objects present in the storage. """ - for service, container in self.get_all_blob_services(): - for obj in service.list_blobs(container): + for client in self.get_all_container_clients(): + for obj in client.list_blobs(): yield hashutil.hash_to_bytes(obj.name) def __len__(self): @@ -108,12 +206,11 @@ # Send the compressed content compressor = compressors[self.compression]() - blob = [compressor.compress(content), compressor.flush()] + data = compressor.compress(content) + data += compressor.flush() - service, container = self.get_blob_service(hex_obj_id) - service.create_blob_from_bytes( - container_name=container, blob_name=hex_obj_id, blob=b"".join(blob), - ) + client = self.get_blob_client(hex_obj_id) + client.upload_blob(data=data, length=len(data)) return obj_id @@ -121,6 +218,13 @@ """Restore a content. """ + if obj_id is None: + # Checksum is missing, compute it on the fly. + obj_id = compute_hash(content) + + if obj_id in self: + self.delete(obj_id) + return self.add(content, obj_id, check_presence=False) def get(self, obj_id): @@ -128,16 +232,17 @@ """ hex_obj_id = self._internal_id(obj_id) - service, container = self.get_blob_service(hex_obj_id) + client = self.get_blob_client(hex_obj_id) + try: - blob = service.get_blob_to_bytes( - container_name=container, blob_name=hex_obj_id - ) - except AzureMissingResourceHttpError: - raise ObjNotFoundError(obj_id) + download = client.download_blob() + except ResourceNotFoundError: + raise ObjNotFoundError(obj_id) from None + else: + data = download.content_as_bytes() decompressor = decompressors[self.compression]() - ret = decompressor.decompress(blob.content) + ret = decompressor.decompress(data) if decompressor.unused_data: raise Error("Corrupt object %s: trailing data found" % hex_obj_id) return ret @@ -155,47 +260,28 @@ """Delete an object.""" super().delete(obj_id) # Check delete permission hex_obj_id = self._internal_id(obj_id) - service, container = self.get_blob_service(hex_obj_id) + client = self.get_blob_client(hex_obj_id) try: - service.delete_blob(container_name=container, blob_name=hex_obj_id) - except AzureMissingResourceHttpError: - raise ObjNotFoundError("Content {} not found!".format(hex_obj_id)) + client.delete_blob() + except ResourceNotFoundError: + raise ObjNotFoundError(obj_id) from None return True - def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): - all_blob_services = self.get_all_blob_services() - if last_obj_id: - last_obj_id = self._internal_id(last_obj_id) - last_service, _ = self.get_blob_service(last_obj_id) - all_blob_services = dropwhile( - lambda srv: srv[0] != last_service, all_blob_services - ) - else: - last_service = None - - def iterate_blobs(): - for service, container in all_blob_services: - marker = last_obj_id if service == last_service else None - for obj in service.list_blobs( - container, marker=marker, maxresults=limit - ): - yield hashutil.hash_to_bytes(obj.name) - - return islice(iterate_blobs(), limit) - class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): """ObjStorage with azure capabilities, striped by prefix. accounts is a dict containing entries of the form: - : - account_name: - api_secret_key: - container_name: + : """ - def __init__(self, accounts, compression="gzip", **kwargs): + def __init__( + self, + accounts: Dict[str, Union[str, Dict[str, str]]], + compression="gzip", + **kwargs, + ): # shortcut AzureCloudObjStorage __init__ ObjStorage.__init__(self, **kwargs) @@ -223,26 +309,35 @@ "Missing prefixes %s" % ", ".join(sorted(missing_prefixes)) ) + do_warning = False + self.prefixes = {} - request_session = requests.Session() - for prefix, account in accounts.items(): - self.prefixes[prefix] = ( - BlockBlobService( - account_name=account["account_name"], - account_key=account["api_secret_key"], - request_session=request_session, - ), - account["container_name"], + for prefix, container_url in accounts.items(): + if isinstance(container_url, dict): + do_warning = True + container_url = get_container_url( + account_name=container_url["account_name"], + account_key=container_url["api_secret_key"], + container_name=container_url["container_name"], + access_policy="full", + ) + self.prefixes[prefix] = ContainerClient.from_container_url(container_url) + + if do_warning: + warnings.warn( + "The Azure objstorage account secret key parameters are " + "deprecated, please use container URLs instead.", + DeprecationWarning, ) - def get_blob_service(self, hex_obj_id): + def get_container_client(self, hex_obj_id): """Get the block_blob_service and container that contains the object with internal id hex_obj_id """ return self.prefixes[hex_obj_id[: self.prefix_len]] - def get_all_blob_services(self): - """Get all active block_blob_services""" + def get_all_container_clients(self): + """Get all active container clients""" # iterate on items() to sort blob services; # needed to be able to paginate in the list_content() method yield from (v for _, v in sorted(self.prefixes.items())) diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -64,6 +64,8 @@ self.assertIsNone(result[0]) def test_restore_content(self): + self.storage.allow_delete = True + valid_content, valid_obj_id = self.hash_content(b"restore_content") invalid_content = b"unexpected content" id_adding = self.storage.add(invalid_content, valid_obj_id) diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py --- a/swh/objstorage/tests/test_objstorage_azure.py +++ b/swh/objstorage/tests/test_objstorage_azure.py @@ -3,15 +3,18 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import base64 +from dataclasses import dataclass import unittest -from collections import defaultdict from unittest.mock import patch +from urllib.parse import urlparse, parse_qs -from typing import Any, Dict +from azure.core.exceptions import ResourceNotFoundError +import pytest -from azure.common import AzureMissingResourceHttpError from swh.model.hashutil import hash_to_hex +import swh.objstorage.backends.azure from swh.objstorage.factory import get_objstorage from swh.objstorage.objstorage import decompressors from swh.objstorage.exc import Error @@ -19,51 +22,73 @@ from .objstorage_testing import ObjStorageTestFixture -class MockBlob: - """ Libcloud object mock that replicates its API """ +@dataclass +class MockListedObject: + name: str - def __init__(self, name, content): - self.name = name - self.content = content +class MockDownloadClient: + def __init__(self, blob_data): + self.blob_data = blob_data -class MockBlockBlobService: - """Mock internal azure library which AzureCloudObjStorage depends upon. + def content_as_bytes(self): + return self.blob_data - """ - _data: Dict[str, Any] = {} +class MockBlobClient: + def __init__(self, container, blob): + self.container = container + self.blob = blob - def __init__(self, account_name, account_key, **kwargs): - # do not care for the account_name and the api_secret_key here - self._data = defaultdict(dict) + def get_blob_properties(self): + if self.blob not in self.container.blobs: + raise ResourceNotFoundError("Blob not found") - def get_container_properties(self, container_name): - self._data[container_name] - return container_name in self._data + return {"exists": True} - def create_blob_from_bytes(self, container_name, blob_name, blob): - self._data[container_name][blob_name] = blob + def upload_blob(self, data, length=None): + if self.blob in self.container.blobs: + raise ValueError("Blob already exists") - def get_blob_to_bytes(self, container_name, blob_name): - if blob_name not in self._data[container_name]: - raise AzureMissingResourceHttpError("Blob %s not found" % blob_name, 404) - return MockBlob(name=blob_name, content=self._data[container_name][blob_name]) + if length is not None and length != len(data): + raise ValueError("Wrong length for blob data!") - def delete_blob(self, container_name, blob_name): - try: - self._data[container_name].pop(blob_name) - except KeyError: - raise AzureMissingResourceHttpError("Blob %s not found" % blob_name, 404) - return True + self.container.blobs[self.blob] = data - def exists(self, container_name, blob_name): - return blob_name in self._data[container_name] + def download_blob(self): + if self.blob not in self.container.blobs: + raise ResourceNotFoundError("Blob not found") - def list_blobs(self, container_name, marker=None, maxresults=None): - for blob_name, content in sorted(self._data[container_name].items()): - if marker is None or blob_name > marker: - yield MockBlob(name=blob_name, content=content) + return MockDownloadClient(self.container.blobs[self.blob]) + + def delete_blob(self): + if self.blob not in self.container.blobs: + raise ResourceNotFoundError("Blob not found") + + del self.container.blobs[self.blob] + + +class MockContainerClient: + def __init__(self, container_url): + self.container_url = container_url + self.blobs = {} + + @classmethod + def from_container_url(cls, container_url): + return cls(container_url) + + def get_container_properties(self): + return {"exists": True} + + def get_blob_client(self, blob): + return MockBlobClient(self, blob) + + def list_blobs(self): + for obj in sorted(self.blobs): + yield MockListedObject(obj) + + def delete_blob(self, blob): + self.get_blob_client(blob.name).delete_blob() class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): @@ -72,7 +97,7 @@ def setUp(self): super().setUp() patcher = patch( - "swh.objstorage.backends.azure.BlockBlobService", MockBlockBlobService, + "swh.objstorage.backends.azure.ContainerClient", MockContainerClient, ) patcher.start() self.addCleanup(patcher.stop) @@ -80,9 +105,7 @@ self.storage = get_objstorage( "azure", { - "account_name": "account-name", - "api_secret_key": "api-secret-key", - "container_name": "container-name", + "container_url": "https://bogus-container-url.example", "compression": self.compression, }, ) @@ -91,23 +114,25 @@ content, obj_id = self.hash_content(b"test content is compressed") self.storage.add(content, obj_id=obj_id) - blob_service, container = self.storage.get_blob_service(obj_id) internal_id = self.storage._internal_id(obj_id) - - raw_blob = blob_service.get_blob_to_bytes(container, internal_id) + blob_client = self.storage.get_blob_client(internal_id) + raw_blob = blob_client.download_blob().content_as_bytes() d = decompressors[self.compression]() - assert d.decompress(raw_blob.content) == content + assert d.decompress(raw_blob) == content assert d.unused_data == b"" def test_trailing_data_on_stored_blob(self): content, obj_id = self.hash_content(b"test content without garbage") self.storage.add(content, obj_id=obj_id) - blob_service, container = self.storage.get_blob_service(obj_id) internal_id = self.storage._internal_id(obj_id) + blob_client = self.storage.get_blob_client(internal_id) + raw_blob = blob_client.download_blob().content_as_bytes() + new_data = raw_blob + b"trailing garbage" - blob_service._data[container][internal_id] += b"trailing garbage" + blob_client.delete_blob() + blob_client.upload_blob(data=new_data, length=len(new_data)) if self.compression == "none": with self.assertRaises(Error) as e: @@ -138,18 +163,14 @@ def setUp(self): super().setUp() patcher = patch( - "swh.objstorage.backends.azure.BlockBlobService", MockBlockBlobService, + "swh.objstorage.backends.azure.ContainerClient", MockContainerClient ) patcher.start() self.addCleanup(patcher.stop) self.accounts = {} for prefix in "0123456789abcdef": - self.accounts[prefix] = { - "account_name": "account_%s" % prefix, - "api_secret_key": "secret_key_%s" % prefix, - "container_name": "container_%s" % prefix, - } + self.accounts[prefix] = "https://bogus-container-url.example/" + prefix self.storage = get_objstorage("azure-prefixed", {"accounts": self.accounts}) @@ -173,7 +194,75 @@ hex_obj_id = hash_to_hex(obj_id) prefix = hex_obj_id[0] self.assertTrue( - self.storage.prefixes[prefix][0].exists( - self.accounts[prefix]["container_name"], hex_obj_id - ) + self.storage.prefixes[prefix] + .get_blob_client(hex_obj_id) + .get_blob_properties() ) + + +def test_get_container_url(): + # r=read, l=list, w=write, d=delete + policy_map = { + "read_only": "rl", + "append_only": "rwl", + "full": "rwdl", + } + + for policy, expected in policy_map.items(): + ret = swh.objstorage.backends.azure.get_container_url( + account_name="account_name", + account_key=base64.b64encode(b"account_key"), + container_name="container_name", + access_policy=policy, + ) + + p = urlparse(ret) + assert p.scheme == "https" + assert p.netloc == "account_name.blob.core.windows.net" + assert p.path == "/container_name" + + qs = parse_qs(p.query) + # sp: permissions + assert qs["sp"] == [expected] + # sr: resource (c=container) + assert qs["sr"] == ["c"] + # st: start; se: expiry + assert qs["st"][0] < qs["se"][0] + + +def test_bwcompat_args(monkeypatch): + monkeypatch.setattr( + swh.objstorage.backends.azure, "ContainerClient", MockContainerClient, + ) + + with pytest.deprecated_call(): + objs = get_objstorage( + "azure", + { + "account_name": "account_name", + "api_secret_key": base64.b64encode(b"account_key"), + "container_name": "container_name", + }, + ) + + assert objs is not None + + +def test_bwcompat_args_prefixed(monkeypatch): + monkeypatch.setattr( + swh.objstorage.backends.azure, "ContainerClient", MockContainerClient, + ) + + accounts = { + prefix: { + "account_name": f"account_name{prefix}", + "api_secret_key": base64.b64encode(b"account_key"), + "container_name": "container_name", + } + for prefix in "0123456789abcdef" + } + + with pytest.deprecated_call(): + objs = get_objstorage("azure-prefixed", {"accounts": accounts}) + + assert objs is not None