diff --git a/requirements-test.txt b/requirements-test.txt index fc79011..b72b790 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,4 @@ pytest apache-libcloud -azure-storage +azure-storage-blob >= 12.0 python-cephlibs diff --git a/requirements.txt b/requirements.txt index fe1bbd4..66045f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,12 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html vcversioner # remote storage API server aiohttp >= 3 click # optional dependencies # apache-libcloud -# azure-storage +# azure-storage-blob >= 12.0 diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py index 6ae16c7..424ac85 100644 --- a/swh/objstorage/backends/azure.py +++ b/swh/objstorage/backends/azure.py @@ -1,248 +1,343 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import logging +import datetime +from itertools import product import string -from itertools import dropwhile, islice, product +from typing import Dict, Optional, Union +import warnings -from azure.storage.blob import BlockBlobService -from azure.common import AzureMissingResourceHttpError -import requests +from azure.storage.blob import ( + ContainerClient, + ContainerSasPermissions, + generate_container_sas, +) +from azure.core.exceptions import ResourceNotFoundError from swh.objstorage.objstorage import ( ObjStorage, compute_hash, - DEFAULT_LIMIT, compressors, decompressors, ) from swh.objstorage.exc import ObjNotFoundError, Error from swh.model import hashutil -logging.getLogger("azure.storage").setLevel(logging.CRITICAL) +def get_container_url( + account_name: str, + account_key: str, + container_name: str, + access_policy: str = "read_only", + expiry: datetime.timedelta = datetime.timedelta(days=365), + **kwargs, +) -> str: + """Get the full url, for the given container on the given account, with a + Shared Access Signature granting the specified access policy. + + Args: + account_name: name of the storage account for which to generate the URL + account_key: shared account key of the storage account used to generate the SAS + container_name: name of the container for which to grant access in the storage + account + access_policy: one of ``read_only``, ``append_only``, ``full`` + expiry: the interval in the future with which the signature will expire + + Returns: + the full URL of the container, with the shared access signature. + """ -class AzureCloudObjStorage(ObjStorage): - """ObjStorage with azure abilities. + access_policies = { + "read_only": ContainerSasPermissions( + read=True, list=True, delete=False, write=False + ), + "append_only": ContainerSasPermissions( + read=True, list=True, delete=False, write=True + ), + "full": ContainerSasPermissions(read=True, list=True, delete=True, write=True), + } + + current_time = datetime.datetime.utcnow() + + signature = generate_container_sas( + account_name, + container_name, + account_key=account_key, + permission=access_policies[access_policy], + start=current_time + datetime.timedelta(minutes=-1), + expiry=current_time + expiry, + ) + return f"https://{account_name}.blob.core.windows.net/{container_name}?{signature}" + + +class AzureCloudObjStorage(ObjStorage): + """ObjStorage backend for Azure blob storage accounts. + + Args: + container_url: the URL of the container in which the objects are stored. + account_name: (deprecated) the name of the storage account under which objects are + stored + api_secret_key: (deprecated) the shared account key + container_name: (deprecated) the name of the container under which objects are + stored + compression: the compression algorithm used to compress objects in storage + + Notes: + The container url should contain the credentials via a "Shared Access + Signature". The :func:`get_container_url` helper can be used to generate + such a URL from the account's access keys. The ``account_name``, + ``api_secret_key`` and ``container_name`` arguments are deprecated. """ def __init__( - self, account_name, api_secret_key, container_name, compression="gzip", **kwargs + self, + container_url: Optional[str] = None, + account_name: Optional[str] = None, + api_secret_key: Optional[str] = None, + container_name: Optional[str] = None, + compression="gzip", + **kwargs, ): + if container_url is None: + if account_name is None or api_secret_key is None or container_name is None: + raise ValueError( + "AzureCloudObjStorage must have a container_url or all three " + "account_name, api_secret_key and container_name" + ) + else: + warnings.warn( + "The Azure objstorage account secret key parameters are " + "deprecated, please use container URLs instead.", + DeprecationWarning, + ) + container_url = get_container_url( + account_name=account_name, + account_key=api_secret_key, + container_name=container_name, + access_policy="full", + ) + super().__init__(**kwargs) - self.block_blob_service = BlockBlobService( - account_name=account_name, - account_key=api_secret_key, - request_session=requests.Session(), - ) - self.container_name = container_name + self.container_client = ContainerClient.from_container_url(container_url) self.compression = compression - def get_blob_service(self, hex_obj_id): - """Get the block_blob_service and container that contains the object with + def get_container_client(self, hex_obj_id): + """Get the container client for the container that contains the object with internal id hex_obj_id + + This is used to allow the PrefixedAzureCloudObjStorage to dispatch the + client according to the prefix of the object id. + """ - return self.block_blob_service, self.container_name + return self.container_client + + def get_blob_client(self, hex_obj_id): + """Get the azure blob client for the given hex obj id""" + container_client = self.get_container_client(hex_obj_id) - def get_all_blob_services(self): + return container_client.get_blob_client(blob=hex_obj_id) + + def get_all_container_clients(self): """Get all active block_blob_services""" - yield self.block_blob_service, self.container_name + yield self.container_client def _internal_id(self, obj_id): """Internal id is the hex version in objstorage. """ return hashutil.hash_to_hex(obj_id) def check_config(self, *, check_write): """Check the configuration for this object storage""" - for service, container in self.get_all_blob_services(): - props = service.get_container_properties(container) + for container_client in self.get_all_container_clients(): + props = container_client.get_container_properties() # FIXME: check_write is ignored here if not props: return False return True def __contains__(self, obj_id): """Does the storage contains the obj_id. """ hex_obj_id = self._internal_id(obj_id) - service, container = self.get_blob_service(hex_obj_id) - return service.exists(container_name=container, blob_name=hex_obj_id) + client = self.get_blob_client(hex_obj_id) + try: + client.get_blob_properties() + except ResourceNotFoundError: + return False + else: + return True def __iter__(self): """Iterate over the objects present in the storage. """ - for service, container in self.get_all_blob_services(): - for obj in service.list_blobs(container): + for client in self.get_all_container_clients(): + for obj in client.list_blobs(): yield hashutil.hash_to_bytes(obj.name) def __len__(self): """Compute the number of objects in the current object storage. Returns: number of objects contained in the storage. """ return sum(1 for i in self) def add(self, content, obj_id=None, check_presence=True): """Add an obj in storage if it's not there already. """ if obj_id is None: # Checksum is missing, compute it on the fly. obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id hex_obj_id = self._internal_id(obj_id) # Send the compressed content compressor = compressors[self.compression]() - blob = [compressor.compress(content), compressor.flush()] + data = compressor.compress(content) + data += compressor.flush() - service, container = self.get_blob_service(hex_obj_id) - service.create_blob_from_bytes( - container_name=container, blob_name=hex_obj_id, blob=b"".join(blob), - ) + client = self.get_blob_client(hex_obj_id) + client.upload_blob(data=data, length=len(data)) return obj_id def restore(self, content, obj_id=None): """Restore a content. """ + if obj_id is None: + # Checksum is missing, compute it on the fly. + obj_id = compute_hash(content) + + if obj_id in self: + self.delete(obj_id) + return self.add(content, obj_id, check_presence=False) def get(self, obj_id): """Retrieve blob's content if found. """ hex_obj_id = self._internal_id(obj_id) - service, container = self.get_blob_service(hex_obj_id) + client = self.get_blob_client(hex_obj_id) + try: - blob = service.get_blob_to_bytes( - container_name=container, blob_name=hex_obj_id - ) - except AzureMissingResourceHttpError: - raise ObjNotFoundError(obj_id) + download = client.download_blob() + except ResourceNotFoundError: + raise ObjNotFoundError(obj_id) from None + else: + data = download.content_as_bytes() decompressor = decompressors[self.compression]() - ret = decompressor.decompress(blob.content) + ret = decompressor.decompress(data) if decompressor.unused_data: raise Error("Corrupt object %s: trailing data found" % hex_obj_id) return ret def check(self, obj_id): """Check the content integrity. """ obj_content = self.get(obj_id) content_obj_id = compute_hash(obj_content) if content_obj_id != obj_id: raise Error(obj_id) def delete(self, obj_id): """Delete an object.""" super().delete(obj_id) # Check delete permission hex_obj_id = self._internal_id(obj_id) - service, container = self.get_blob_service(hex_obj_id) + client = self.get_blob_client(hex_obj_id) try: - service.delete_blob(container_name=container, blob_name=hex_obj_id) - except AzureMissingResourceHttpError: - raise ObjNotFoundError("Content {} not found!".format(hex_obj_id)) + client.delete_blob() + except ResourceNotFoundError: + raise ObjNotFoundError(obj_id) from None return True - def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): - all_blob_services = self.get_all_blob_services() - if last_obj_id: - last_obj_id = self._internal_id(last_obj_id) - last_service, _ = self.get_blob_service(last_obj_id) - all_blob_services = dropwhile( - lambda srv: srv[0] != last_service, all_blob_services - ) - else: - last_service = None - - def iterate_blobs(): - for service, container in all_blob_services: - marker = last_obj_id if service == last_service else None - for obj in service.list_blobs( - container, marker=marker, maxresults=limit - ): - yield hashutil.hash_to_bytes(obj.name) - - return islice(iterate_blobs(), limit) - class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): """ObjStorage with azure capabilities, striped by prefix. accounts is a dict containing entries of the form: - : - account_name: - api_secret_key: - container_name: + : """ - def __init__(self, accounts, compression="gzip", **kwargs): + def __init__( + self, + accounts: Dict[str, Union[str, Dict[str, str]]], + compression="gzip", + **kwargs, + ): # shortcut AzureCloudObjStorage __init__ ObjStorage.__init__(self, **kwargs) self.compression = compression # Definition sanity check prefix_lengths = set(len(prefix) for prefix in accounts) if not len(prefix_lengths) == 1: raise ValueError( "Inconsistent prefixes, found lengths %s" % ", ".join(str(l) for l in sorted(prefix_lengths)) ) self.prefix_len = prefix_lengths.pop() expected_prefixes = set( "".join(letters) for letters in product( set(string.hexdigits.lower()), repeat=self.prefix_len ) ) missing_prefixes = expected_prefixes - set(accounts) if missing_prefixes: raise ValueError( "Missing prefixes %s" % ", ".join(sorted(missing_prefixes)) ) + do_warning = False + self.prefixes = {} - request_session = requests.Session() - for prefix, account in accounts.items(): - self.prefixes[prefix] = ( - BlockBlobService( - account_name=account["account_name"], - account_key=account["api_secret_key"], - request_session=request_session, - ), - account["container_name"], + for prefix, container_url in accounts.items(): + if isinstance(container_url, dict): + do_warning = True + container_url = get_container_url( + account_name=container_url["account_name"], + account_key=container_url["api_secret_key"], + container_name=container_url["container_name"], + access_policy="full", + ) + self.prefixes[prefix] = ContainerClient.from_container_url(container_url) + + if do_warning: + warnings.warn( + "The Azure objstorage account secret key parameters are " + "deprecated, please use container URLs instead.", + DeprecationWarning, ) - def get_blob_service(self, hex_obj_id): + def get_container_client(self, hex_obj_id): """Get the block_blob_service and container that contains the object with internal id hex_obj_id """ return self.prefixes[hex_obj_id[: self.prefix_len]] - def get_all_blob_services(self): - """Get all active block_blob_services""" + def get_all_container_clients(self): + """Get all active container clients""" # iterate on items() to sort blob services; # needed to be able to paginate in the list_content() method yield from (v for _, v in sorted(self.prefixes.items())) diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py index df28879..26ec0d1 100644 --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -1,221 +1,223 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time import collections from swh.objstorage import exc from swh.objstorage.objstorage import compute_hash class ObjStorageTestFixture: def hash_content(self, content): obj_id = compute_hash(content) return content, obj_id def assertContentMatch(self, obj_id, expected_content): # noqa content = self.storage.get(obj_id) self.assertEqual(content, expected_content) def test_check_config(self): self.assertTrue(self.storage.check_config(check_write=False)) self.assertTrue(self.storage.check_config(check_write=True)) def test_contains(self): content_p, obj_id_p = self.hash_content(b"contains_present") content_m, obj_id_m = self.hash_content(b"contains_missing") self.storage.add(content_p, obj_id=obj_id_p) self.assertIn(obj_id_p, self.storage) self.assertNotIn(obj_id_m, self.storage) def test_add_get_w_id(self): content, obj_id = self.hash_content(b"add_get_w_id") r = self.storage.add(content, obj_id=obj_id) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) def test_add_big(self): content, obj_id = self.hash_content(b"add_big" * 1024 * 1024) r = self.storage.add(content, obj_id=obj_id) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) def test_add_get_wo_id(self): content, obj_id = self.hash_content(b"add_get_wo_id") r = self.storage.add(content) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) def test_add_get_batch(self): content1, obj_id1 = self.hash_content(b"add_get_batch_1") content2, obj_id2 = self.hash_content(b"add_get_batch_2") self.storage.add(content1, obj_id1) self.storage.add(content2, obj_id2) cr1, cr2 = self.storage.get_batch([obj_id1, obj_id2]) self.assertEqual(cr1, content1) self.assertEqual(cr2, content2) def test_get_batch_unexisting_content(self): content, obj_id = self.hash_content(b"get_batch_unexisting_content") result = list(self.storage.get_batch([obj_id])) self.assertTrue(len(result) == 1) self.assertIsNone(result[0]) def test_restore_content(self): + self.storage.allow_delete = True + valid_content, valid_obj_id = self.hash_content(b"restore_content") invalid_content = b"unexpected content" id_adding = self.storage.add(invalid_content, valid_obj_id) self.assertEqual(id_adding, valid_obj_id) with self.assertRaises(exc.Error): self.storage.check(id_adding) id_restore = self.storage.restore(valid_content, valid_obj_id) self.assertEqual(id_restore, valid_obj_id) self.assertContentMatch(valid_obj_id, valid_content) def test_get_missing(self): content, obj_id = self.hash_content(b"get_missing") with self.assertRaises(exc.ObjNotFoundError) as e: self.storage.get(obj_id) self.assertIn(obj_id, e.exception.args) def test_check_missing(self): content, obj_id = self.hash_content(b"check_missing") with self.assertRaises(exc.Error): self.storage.check(obj_id) def test_check_present(self): content, obj_id = self.hash_content(b"check_present") self.storage.add(content, obj_id) try: self.storage.check(obj_id) except exc.Error: self.fail("Integrity check failed") def test_delete_missing(self): self.storage.allow_delete = True content, obj_id = self.hash_content(b"missing_content_to_delete") with self.assertRaises(exc.Error): self.storage.delete(obj_id) def test_delete_present(self): self.storage.allow_delete = True content, obj_id = self.hash_content(b"content_to_delete") self.storage.add(content, obj_id=obj_id) self.assertTrue(self.storage.delete(obj_id)) with self.assertRaises(exc.Error): self.storage.get(obj_id) def test_delete_not_allowed(self): self.storage.allow_delete = False content, obj_id = self.hash_content(b"content_to_delete") self.storage.add(content, obj_id=obj_id) with self.assertRaises(PermissionError): self.storage.delete(obj_id) def test_delete_not_allowed_by_default(self): content, obj_id = self.hash_content(b"content_to_delete") self.storage.add(content, obj_id=obj_id) with self.assertRaises(PermissionError): self.assertTrue(self.storage.delete(obj_id)) def test_add_stream(self): content = [b"chunk1", b"chunk2"] _, obj_id = self.hash_content(b"".join(content)) try: self.storage.add_stream(iter(content), obj_id=obj_id) except NotImplementedError: return self.assertContentMatch(obj_id, b"".join(content)) def test_add_stream_sleep(self): def gen_content(): yield b"chunk1" time.sleep(0.5) yield b"chunk42" _, obj_id = self.hash_content(b"placeholder_id") try: self.storage.add_stream(gen_content(), obj_id=obj_id) except NotImplementedError: return self.assertContentMatch(obj_id, b"chunk1chunk42") def test_get_stream(self): content = b"123456789" _, obj_id = self.hash_content(content) self.storage.add(content, obj_id=obj_id) r = self.storage.get(obj_id) self.assertEqual(r, content) try: r = self.storage.get_stream(obj_id, chunk_size=1) except NotImplementedError: return self.assertTrue(isinstance(r, collections.Iterator)) r = list(r) self.assertEqual(b"".join(r), content) def test_add_batch(self): contents = {} expected_content_add = 0 expected_content_add_bytes = 0 for i in range(50): content = b"Test content %02d" % i content, obj_id = self.hash_content(content) contents[obj_id] = content expected_content_add_bytes += len(content) expected_content_add += 1 ret = self.storage.add_batch(contents) self.assertEqual( ret, { "object:add": expected_content_add, "object:add:bytes": expected_content_add_bytes, }, ) for obj_id in contents: self.assertIn(obj_id, self.storage) def test_content_iterator(self): sto_obj_ids = iter(self.storage) sto_obj_ids = list(sto_obj_ids) self.assertFalse(sto_obj_ids) obj_ids = set() for i in range(100): content, obj_id = self.hash_content(b"content %d" % i) self.storage.add(content, obj_id=obj_id) obj_ids.add(obj_id) sto_obj_ids = set(self.storage) self.assertEqual(sto_obj_ids, obj_ids) def test_list_content(self): all_ids = [] for i in range(1200): content = b"example %d" % i obj_id = compute_hash(content) self.storage.add(content, obj_id) all_ids.append(obj_id) all_ids.sort() ids = list(self.storage.list_content()) self.assertEqual(len(ids), 1200) self.assertEqual(ids[0], all_ids[0]) self.assertEqual(ids[100], all_ids[100]) self.assertEqual(ids[999], all_ids[999]) ids = list(self.storage.list_content(limit=10)) self.assertEqual(len(ids), 10) self.assertEqual(ids[0], all_ids[0]) self.assertEqual(ids[9], all_ids[9]) ids = list(self.storage.list_content(last_obj_id=all_ids[999], limit=100)) self.assertEqual(len(ids), 100) self.assertEqual(ids[0], all_ids[1000]) self.assertEqual(ids[9], all_ids[1009]) diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py index a42ff30..d4d4896 100644 --- a/swh/objstorage/tests/test_objstorage_azure.py +++ b/swh/objstorage/tests/test_objstorage_azure.py @@ -1,179 +1,268 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import base64 +from dataclasses import dataclass import unittest -from collections import defaultdict from unittest.mock import patch +from urllib.parse import urlparse, parse_qs -from typing import Any, Dict +from azure.core.exceptions import ResourceNotFoundError +import pytest -from azure.common import AzureMissingResourceHttpError from swh.model.hashutil import hash_to_hex +import swh.objstorage.backends.azure from swh.objstorage.factory import get_objstorage from swh.objstorage.objstorage import decompressors from swh.objstorage.exc import Error from .objstorage_testing import ObjStorageTestFixture -class MockBlob: - """ Libcloud object mock that replicates its API """ +@dataclass +class MockListedObject: + name: str - def __init__(self, name, content): - self.name = name - self.content = content +class MockDownloadClient: + def __init__(self, blob_data): + self.blob_data = blob_data -class MockBlockBlobService: - """Mock internal azure library which AzureCloudObjStorage depends upon. + def content_as_bytes(self): + return self.blob_data - """ - _data: Dict[str, Any] = {} +class MockBlobClient: + def __init__(self, container, blob): + self.container = container + self.blob = blob - def __init__(self, account_name, account_key, **kwargs): - # do not care for the account_name and the api_secret_key here - self._data = defaultdict(dict) + def get_blob_properties(self): + if self.blob not in self.container.blobs: + raise ResourceNotFoundError("Blob not found") - def get_container_properties(self, container_name): - self._data[container_name] - return container_name in self._data + return {"exists": True} - def create_blob_from_bytes(self, container_name, blob_name, blob): - self._data[container_name][blob_name] = blob + def upload_blob(self, data, length=None): + if self.blob in self.container.blobs: + raise ValueError("Blob already exists") - def get_blob_to_bytes(self, container_name, blob_name): - if blob_name not in self._data[container_name]: - raise AzureMissingResourceHttpError("Blob %s not found" % blob_name, 404) - return MockBlob(name=blob_name, content=self._data[container_name][blob_name]) + if length is not None and length != len(data): + raise ValueError("Wrong length for blob data!") - def delete_blob(self, container_name, blob_name): - try: - self._data[container_name].pop(blob_name) - except KeyError: - raise AzureMissingResourceHttpError("Blob %s not found" % blob_name, 404) - return True + self.container.blobs[self.blob] = data - def exists(self, container_name, blob_name): - return blob_name in self._data[container_name] + def download_blob(self): + if self.blob not in self.container.blobs: + raise ResourceNotFoundError("Blob not found") - def list_blobs(self, container_name, marker=None, maxresults=None): - for blob_name, content in sorted(self._data[container_name].items()): - if marker is None or blob_name > marker: - yield MockBlob(name=blob_name, content=content) + return MockDownloadClient(self.container.blobs[self.blob]) + + def delete_blob(self): + if self.blob not in self.container.blobs: + raise ResourceNotFoundError("Blob not found") + + del self.container.blobs[self.blob] + + +class MockContainerClient: + def __init__(self, container_url): + self.container_url = container_url + self.blobs = {} + + @classmethod + def from_container_url(cls, container_url): + return cls(container_url) + + def get_container_properties(self): + return {"exists": True} + + def get_blob_client(self, blob): + return MockBlobClient(self, blob) + + def list_blobs(self): + for obj in sorted(self.blobs): + yield MockListedObject(obj) + + def delete_blob(self, blob): + self.get_blob_client(blob.name).delete_blob() class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): compression = "none" def setUp(self): super().setUp() patcher = patch( - "swh.objstorage.backends.azure.BlockBlobService", MockBlockBlobService, + "swh.objstorage.backends.azure.ContainerClient", MockContainerClient, ) patcher.start() self.addCleanup(patcher.stop) self.storage = get_objstorage( "azure", { - "account_name": "account-name", - "api_secret_key": "api-secret-key", - "container_name": "container-name", + "container_url": "https://bogus-container-url.example", "compression": self.compression, }, ) def test_compression(self): content, obj_id = self.hash_content(b"test content is compressed") self.storage.add(content, obj_id=obj_id) - blob_service, container = self.storage.get_blob_service(obj_id) internal_id = self.storage._internal_id(obj_id) - - raw_blob = blob_service.get_blob_to_bytes(container, internal_id) + blob_client = self.storage.get_blob_client(internal_id) + raw_blob = blob_client.download_blob().content_as_bytes() d = decompressors[self.compression]() - assert d.decompress(raw_blob.content) == content + assert d.decompress(raw_blob) == content assert d.unused_data == b"" def test_trailing_data_on_stored_blob(self): content, obj_id = self.hash_content(b"test content without garbage") self.storage.add(content, obj_id=obj_id) - blob_service, container = self.storage.get_blob_service(obj_id) internal_id = self.storage._internal_id(obj_id) + blob_client = self.storage.get_blob_client(internal_id) + raw_blob = blob_client.download_blob().content_as_bytes() + new_data = raw_blob + b"trailing garbage" - blob_service._data[container][internal_id] += b"trailing garbage" + blob_client.delete_blob() + blob_client.upload_blob(data=new_data, length=len(new_data)) if self.compression == "none": with self.assertRaises(Error) as e: self.storage.check(obj_id) else: with self.assertRaises(Error) as e: self.storage.get(obj_id) assert "trailing data" in e.exception.args[0] class TestAzureCloudObjStorageGzip(TestAzureCloudObjStorage): compression = "gzip" class TestAzureCloudObjStorageZlib(TestAzureCloudObjStorage): compression = "zlib" class TestAzureCloudObjStorageLzma(TestAzureCloudObjStorage): compression = "lzma" class TestAzureCloudObjStorageBz2(TestAzureCloudObjStorage): compression = "bz2" class TestPrefixedAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() patcher = patch( - "swh.objstorage.backends.azure.BlockBlobService", MockBlockBlobService, + "swh.objstorage.backends.azure.ContainerClient", MockContainerClient ) patcher.start() self.addCleanup(patcher.stop) self.accounts = {} for prefix in "0123456789abcdef": - self.accounts[prefix] = { - "account_name": "account_%s" % prefix, - "api_secret_key": "secret_key_%s" % prefix, - "container_name": "container_%s" % prefix, - } + self.accounts[prefix] = "https://bogus-container-url.example/" + prefix self.storage = get_objstorage("azure-prefixed", {"accounts": self.accounts}) def test_prefixedazure_instantiation_missing_prefixes(self): del self.accounts["d"] del self.accounts["e"] with self.assertRaisesRegex(ValueError, "Missing prefixes"): get_objstorage("azure-prefixed", {"accounts": self.accounts}) def test_prefixedazure_instantiation_inconsistent_prefixes(self): self.accounts["00"] = self.accounts["0"] with self.assertRaisesRegex(ValueError, "Inconsistent prefixes"): get_objstorage("azure-prefixed", {"accounts": self.accounts}) def test_prefixedazure_sharding_behavior(self): for i in range(100): content, obj_id = self.hash_content(b"test_content_%02d" % i) self.storage.add(content, obj_id=obj_id) hex_obj_id = hash_to_hex(obj_id) prefix = hex_obj_id[0] self.assertTrue( - self.storage.prefixes[prefix][0].exists( - self.accounts[prefix]["container_name"], hex_obj_id - ) + self.storage.prefixes[prefix] + .get_blob_client(hex_obj_id) + .get_blob_properties() ) + + +def test_get_container_url(): + # r=read, l=list, w=write, d=delete + policy_map = { + "read_only": "rl", + "append_only": "rwl", + "full": "rwdl", + } + + for policy, expected in policy_map.items(): + ret = swh.objstorage.backends.azure.get_container_url( + account_name="account_name", + account_key=base64.b64encode(b"account_key"), + container_name="container_name", + access_policy=policy, + ) + + p = urlparse(ret) + assert p.scheme == "https" + assert p.netloc == "account_name.blob.core.windows.net" + assert p.path == "/container_name" + + qs = parse_qs(p.query) + # sp: permissions + assert qs["sp"] == [expected] + # sr: resource (c=container) + assert qs["sr"] == ["c"] + # st: start; se: expiry + assert qs["st"][0] < qs["se"][0] + + +def test_bwcompat_args(monkeypatch): + monkeypatch.setattr( + swh.objstorage.backends.azure, "ContainerClient", MockContainerClient, + ) + + with pytest.deprecated_call(): + objs = get_objstorage( + "azure", + { + "account_name": "account_name", + "api_secret_key": base64.b64encode(b"account_key"), + "container_name": "container_name", + }, + ) + + assert objs is not None + + +def test_bwcompat_args_prefixed(monkeypatch): + monkeypatch.setattr( + swh.objstorage.backends.azure, "ContainerClient", MockContainerClient, + ) + + accounts = { + prefix: { + "account_name": f"account_name{prefix}", + "api_secret_key": base64.b64encode(b"account_key"), + "container_name": "container_name", + } + for prefix in "0123456789abcdef" + } + + with pytest.deprecated_call(): + objs = get_objstorage("azure-prefixed", {"accounts": accounts}) + + assert objs is not None