diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py index d448716..d0d416a 100644 --- a/swh/objstorage/backends/azure.py +++ b/swh/objstorage/backends/azure.py @@ -1,343 +1,350 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from itertools import product import string from typing import Dict, Optional, Union import warnings from azure.storage.blob import ( ContainerClient, ContainerSasPermissions, generate_container_sas, ) -from azure.core.exceptions import ResourceNotFoundError +from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError from swh.objstorage.objstorage import ( ObjStorage, compute_hash, compressors, decompressors, ) from swh.objstorage.exc import ObjNotFoundError, Error from swh.model import hashutil def get_container_url( account_name: str, account_key: str, container_name: str, access_policy: str = "read_only", expiry: datetime.timedelta = datetime.timedelta(days=365), **kwargs, ) -> str: """Get the full url, for the given container on the given account, with a Shared Access Signature granting the specified access policy. Args: account_name: name of the storage account for which to generate the URL account_key: shared account key of the storage account used to generate the SAS container_name: name of the container for which to grant access in the storage account access_policy: one of ``read_only``, ``append_only``, ``full`` expiry: the interval in the future with which the signature will expire Returns: the full URL of the container, with the shared access signature. """ access_policies = { "read_only": ContainerSasPermissions( read=True, list=True, delete=False, write=False ), "append_only": ContainerSasPermissions( read=True, list=True, delete=False, write=True ), "full": ContainerSasPermissions(read=True, list=True, delete=True, write=True), } current_time = datetime.datetime.utcnow() signature = generate_container_sas( account_name, container_name, account_key=account_key, permission=access_policies[access_policy], start=current_time + datetime.timedelta(minutes=-1), expiry=current_time + expiry, ) return f"https://{account_name}.blob.core.windows.net/{container_name}?{signature}" class AzureCloudObjStorage(ObjStorage): """ObjStorage backend for Azure blob storage accounts. Args: container_url: the URL of the container in which the objects are stored. account_name: (deprecated) the name of the storage account under which objects are stored api_secret_key: (deprecated) the shared account key container_name: (deprecated) the name of the container under which objects are stored compression: the compression algorithm used to compress objects in storage Notes: The container url should contain the credentials via a "Shared Access Signature". The :func:`get_container_url` helper can be used to generate such a URL from the account's access keys. The ``account_name``, ``api_secret_key`` and ``container_name`` arguments are deprecated. """ def __init__( self, container_url: Optional[str] = None, account_name: Optional[str] = None, api_secret_key: Optional[str] = None, container_name: Optional[str] = None, compression="gzip", **kwargs, ): if container_url is None: if account_name is None or api_secret_key is None or container_name is None: raise ValueError( "AzureCloudObjStorage must have a container_url or all three " "account_name, api_secret_key and container_name" ) else: warnings.warn( "The Azure objstorage account secret key parameters are " "deprecated, please use container URLs instead.", DeprecationWarning, ) container_url = get_container_url( account_name=account_name, account_key=api_secret_key, container_name=container_name, access_policy="full", ) super().__init__(**kwargs) self.container_client = ContainerClient.from_container_url(container_url) self.compression = compression def get_container_client(self, hex_obj_id): """Get the container client for the container that contains the object with internal id hex_obj_id This is used to allow the PrefixedAzureCloudObjStorage to dispatch the client according to the prefix of the object id. """ return self.container_client def get_blob_client(self, hex_obj_id): """Get the azure blob client for the given hex obj id""" container_client = self.get_container_client(hex_obj_id) return container_client.get_blob_client(blob=hex_obj_id) def get_all_container_clients(self): """Get all active block_blob_services""" yield self.container_client def _internal_id(self, obj_id): """Internal id is the hex version in objstorage. """ return hashutil.hash_to_hex(obj_id) def check_config(self, *, check_write): """Check the configuration for this object storage""" for container_client in self.get_all_container_clients(): props = container_client.get_container_properties() # FIXME: check_write is ignored here if not props: return False return True def __contains__(self, obj_id): """Does the storage contains the obj_id. """ hex_obj_id = self._internal_id(obj_id) client = self.get_blob_client(hex_obj_id) try: client.get_blob_properties() except ResourceNotFoundError: return False else: return True def __iter__(self): """Iterate over the objects present in the storage. """ for client in self.get_all_container_clients(): for obj in client.list_blobs(): yield hashutil.hash_to_bytes(obj.name) def __len__(self): """Compute the number of objects in the current object storage. Returns: number of objects contained in the storage. """ return sum(1 for i in self) def add(self, content, obj_id=None, check_presence=True): """Add an obj in storage if it's not there already. """ if obj_id is None: # Checksum is missing, compute it on the fly. obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id hex_obj_id = self._internal_id(obj_id) # Send the compressed content compressor = compressors[self.compression]() data = compressor.compress(content) data += compressor.flush() client = self.get_blob_client(hex_obj_id) - client.upload_blob(data=data, length=len(data)) + try: + client.upload_blob(data=data, length=len(data)) + except ResourceExistsError: + # There's a race condition between check_presence and upload_blob, + # that we can't get rid of as the azure api doesn't allow atomic + # replaces or renaming a blob. As the restore operation explicitly + # removes the blob, it should be safe to just ignore the error. + pass return obj_id def restore(self, content, obj_id=None): """Restore a content. """ if obj_id is None: # Checksum is missing, compute it on the fly. obj_id = compute_hash(content) if obj_id in self: self.delete(obj_id) return self.add(content, obj_id, check_presence=False) def get(self, obj_id): """Retrieve blob's content if found. """ hex_obj_id = self._internal_id(obj_id) client = self.get_blob_client(hex_obj_id) try: download = client.download_blob() except ResourceNotFoundError: raise ObjNotFoundError(obj_id) from None else: data = download.content_as_bytes() decompressor = decompressors[self.compression]() ret = decompressor.decompress(data) if decompressor.unused_data: raise Error("Corrupt object %s: trailing data found" % hex_obj_id) return ret def check(self, obj_id): """Check the content integrity. """ obj_content = self.get(obj_id) content_obj_id = compute_hash(obj_content) if content_obj_id != obj_id: raise Error(obj_id) def delete(self, obj_id): """Delete an object.""" super().delete(obj_id) # Check delete permission hex_obj_id = self._internal_id(obj_id) client = self.get_blob_client(hex_obj_id) try: client.delete_blob() except ResourceNotFoundError: raise ObjNotFoundError(obj_id) from None return True class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): """ObjStorage with azure capabilities, striped by prefix. accounts is a dict containing entries of the form: : """ def __init__( self, accounts: Dict[str, Union[str, Dict[str, str]]], compression="gzip", **kwargs, ): # shortcut AzureCloudObjStorage __init__ ObjStorage.__init__(self, **kwargs) self.compression = compression # Definition sanity check prefix_lengths = set(len(prefix) for prefix in accounts) if not len(prefix_lengths) == 1: raise ValueError( "Inconsistent prefixes, found lengths %s" % ", ".join(str(lst) for lst in sorted(prefix_lengths)) ) self.prefix_len = prefix_lengths.pop() expected_prefixes = set( "".join(letters) for letters in product( set(string.hexdigits.lower()), repeat=self.prefix_len ) ) missing_prefixes = expected_prefixes - set(accounts) if missing_prefixes: raise ValueError( "Missing prefixes %s" % ", ".join(sorted(missing_prefixes)) ) do_warning = False self.prefixes = {} for prefix, container_url in accounts.items(): if isinstance(container_url, dict): do_warning = True container_url = get_container_url( account_name=container_url["account_name"], account_key=container_url["api_secret_key"], container_name=container_url["container_name"], access_policy="full", ) self.prefixes[prefix] = ContainerClient.from_container_url(container_url) if do_warning: warnings.warn( "The Azure objstorage account secret key parameters are " "deprecated, please use container URLs instead.", DeprecationWarning, ) def get_container_client(self, hex_obj_id): """Get the block_blob_service and container that contains the object with internal id hex_obj_id """ return self.prefixes[hex_obj_id[: self.prefix_len]] def get_all_container_clients(self): """Get all active container clients""" # iterate on items() to sort blob services; # needed to be able to paginate in the list_content() method yield from (v for _, v in sorted(self.prefixes.items())) diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py index 26ec0d1..2799d49 100644 --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -1,223 +1,232 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time import collections from swh.objstorage import exc from swh.objstorage.objstorage import compute_hash class ObjStorageTestFixture: def hash_content(self, content): obj_id = compute_hash(content) return content, obj_id def assertContentMatch(self, obj_id, expected_content): # noqa content = self.storage.get(obj_id) self.assertEqual(content, expected_content) def test_check_config(self): self.assertTrue(self.storage.check_config(check_write=False)) self.assertTrue(self.storage.check_config(check_write=True)) def test_contains(self): content_p, obj_id_p = self.hash_content(b"contains_present") content_m, obj_id_m = self.hash_content(b"contains_missing") self.storage.add(content_p, obj_id=obj_id_p) self.assertIn(obj_id_p, self.storage) self.assertNotIn(obj_id_m, self.storage) def test_add_get_w_id(self): content, obj_id = self.hash_content(b"add_get_w_id") r = self.storage.add(content, obj_id=obj_id) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) + def test_add_twice(self): + content, obj_id = self.hash_content(b"add_twice") + r = self.storage.add(content, obj_id=obj_id) + self.assertEqual(obj_id, r) + self.assertContentMatch(obj_id, content) + r = self.storage.add(content, obj_id=obj_id, check_presence=False) + self.assertEqual(obj_id, r) + self.assertContentMatch(obj_id, content) + def test_add_big(self): content, obj_id = self.hash_content(b"add_big" * 1024 * 1024) r = self.storage.add(content, obj_id=obj_id) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) def test_add_get_wo_id(self): content, obj_id = self.hash_content(b"add_get_wo_id") r = self.storage.add(content) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) def test_add_get_batch(self): content1, obj_id1 = self.hash_content(b"add_get_batch_1") content2, obj_id2 = self.hash_content(b"add_get_batch_2") self.storage.add(content1, obj_id1) self.storage.add(content2, obj_id2) cr1, cr2 = self.storage.get_batch([obj_id1, obj_id2]) self.assertEqual(cr1, content1) self.assertEqual(cr2, content2) def test_get_batch_unexisting_content(self): content, obj_id = self.hash_content(b"get_batch_unexisting_content") result = list(self.storage.get_batch([obj_id])) self.assertTrue(len(result) == 1) self.assertIsNone(result[0]) def test_restore_content(self): self.storage.allow_delete = True valid_content, valid_obj_id = self.hash_content(b"restore_content") invalid_content = b"unexpected content" id_adding = self.storage.add(invalid_content, valid_obj_id) self.assertEqual(id_adding, valid_obj_id) with self.assertRaises(exc.Error): self.storage.check(id_adding) id_restore = self.storage.restore(valid_content, valid_obj_id) self.assertEqual(id_restore, valid_obj_id) self.assertContentMatch(valid_obj_id, valid_content) def test_get_missing(self): content, obj_id = self.hash_content(b"get_missing") with self.assertRaises(exc.ObjNotFoundError) as e: self.storage.get(obj_id) self.assertIn(obj_id, e.exception.args) def test_check_missing(self): content, obj_id = self.hash_content(b"check_missing") with self.assertRaises(exc.Error): self.storage.check(obj_id) def test_check_present(self): content, obj_id = self.hash_content(b"check_present") self.storage.add(content, obj_id) try: self.storage.check(obj_id) except exc.Error: self.fail("Integrity check failed") def test_delete_missing(self): self.storage.allow_delete = True content, obj_id = self.hash_content(b"missing_content_to_delete") with self.assertRaises(exc.Error): self.storage.delete(obj_id) def test_delete_present(self): self.storage.allow_delete = True content, obj_id = self.hash_content(b"content_to_delete") self.storage.add(content, obj_id=obj_id) self.assertTrue(self.storage.delete(obj_id)) with self.assertRaises(exc.Error): self.storage.get(obj_id) def test_delete_not_allowed(self): self.storage.allow_delete = False content, obj_id = self.hash_content(b"content_to_delete") self.storage.add(content, obj_id=obj_id) with self.assertRaises(PermissionError): self.storage.delete(obj_id) def test_delete_not_allowed_by_default(self): content, obj_id = self.hash_content(b"content_to_delete") self.storage.add(content, obj_id=obj_id) with self.assertRaises(PermissionError): self.assertTrue(self.storage.delete(obj_id)) def test_add_stream(self): content = [b"chunk1", b"chunk2"] _, obj_id = self.hash_content(b"".join(content)) try: self.storage.add_stream(iter(content), obj_id=obj_id) except NotImplementedError: return self.assertContentMatch(obj_id, b"".join(content)) def test_add_stream_sleep(self): def gen_content(): yield b"chunk1" time.sleep(0.5) yield b"chunk42" _, obj_id = self.hash_content(b"placeholder_id") try: self.storage.add_stream(gen_content(), obj_id=obj_id) except NotImplementedError: return self.assertContentMatch(obj_id, b"chunk1chunk42") def test_get_stream(self): content = b"123456789" _, obj_id = self.hash_content(content) self.storage.add(content, obj_id=obj_id) r = self.storage.get(obj_id) self.assertEqual(r, content) try: r = self.storage.get_stream(obj_id, chunk_size=1) except NotImplementedError: return self.assertTrue(isinstance(r, collections.Iterator)) r = list(r) self.assertEqual(b"".join(r), content) def test_add_batch(self): contents = {} expected_content_add = 0 expected_content_add_bytes = 0 for i in range(50): content = b"Test content %02d" % i content, obj_id = self.hash_content(content) contents[obj_id] = content expected_content_add_bytes += len(content) expected_content_add += 1 ret = self.storage.add_batch(contents) self.assertEqual( ret, { "object:add": expected_content_add, "object:add:bytes": expected_content_add_bytes, }, ) for obj_id in contents: self.assertIn(obj_id, self.storage) def test_content_iterator(self): sto_obj_ids = iter(self.storage) sto_obj_ids = list(sto_obj_ids) self.assertFalse(sto_obj_ids) obj_ids = set() for i in range(100): content, obj_id = self.hash_content(b"content %d" % i) self.storage.add(content, obj_id=obj_id) obj_ids.add(obj_id) sto_obj_ids = set(self.storage) self.assertEqual(sto_obj_ids, obj_ids) def test_list_content(self): all_ids = [] for i in range(1200): content = b"example %d" % i obj_id = compute_hash(content) self.storage.add(content, obj_id) all_ids.append(obj_id) all_ids.sort() ids = list(self.storage.list_content()) self.assertEqual(len(ids), 1200) self.assertEqual(ids[0], all_ids[0]) self.assertEqual(ids[100], all_ids[100]) self.assertEqual(ids[999], all_ids[999]) ids = list(self.storage.list_content(limit=10)) self.assertEqual(len(ids), 10) self.assertEqual(ids[0], all_ids[0]) self.assertEqual(ids[9], all_ids[9]) ids = list(self.storage.list_content(last_obj_id=all_ids[999], limit=100)) self.assertEqual(len(ids), 100) self.assertEqual(ids[0], all_ids[1000]) self.assertEqual(ids[9], all_ids[1009]) diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py index d4d4896..0b78865 100644 --- a/swh/objstorage/tests/test_objstorage_azure.py +++ b/swh/objstorage/tests/test_objstorage_azure.py @@ -1,268 +1,268 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 from dataclasses import dataclass import unittest from unittest.mock import patch from urllib.parse import urlparse, parse_qs -from azure.core.exceptions import ResourceNotFoundError +from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError import pytest from swh.model.hashutil import hash_to_hex import swh.objstorage.backends.azure from swh.objstorage.factory import get_objstorage from swh.objstorage.objstorage import decompressors from swh.objstorage.exc import Error from .objstorage_testing import ObjStorageTestFixture @dataclass class MockListedObject: name: str class MockDownloadClient: def __init__(self, blob_data): self.blob_data = blob_data def content_as_bytes(self): return self.blob_data class MockBlobClient: def __init__(self, container, blob): self.container = container self.blob = blob def get_blob_properties(self): if self.blob not in self.container.blobs: raise ResourceNotFoundError("Blob not found") return {"exists": True} def upload_blob(self, data, length=None): if self.blob in self.container.blobs: - raise ValueError("Blob already exists") + raise ResourceExistsError("Blob already exists") if length is not None and length != len(data): raise ValueError("Wrong length for blob data!") self.container.blobs[self.blob] = data def download_blob(self): if self.blob not in self.container.blobs: raise ResourceNotFoundError("Blob not found") return MockDownloadClient(self.container.blobs[self.blob]) def delete_blob(self): if self.blob not in self.container.blobs: raise ResourceNotFoundError("Blob not found") del self.container.blobs[self.blob] class MockContainerClient: def __init__(self, container_url): self.container_url = container_url self.blobs = {} @classmethod def from_container_url(cls, container_url): return cls(container_url) def get_container_properties(self): return {"exists": True} def get_blob_client(self, blob): return MockBlobClient(self, blob) def list_blobs(self): for obj in sorted(self.blobs): yield MockListedObject(obj) def delete_blob(self, blob): self.get_blob_client(blob.name).delete_blob() class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): compression = "none" def setUp(self): super().setUp() patcher = patch( "swh.objstorage.backends.azure.ContainerClient", MockContainerClient, ) patcher.start() self.addCleanup(patcher.stop) self.storage = get_objstorage( "azure", { "container_url": "https://bogus-container-url.example", "compression": self.compression, }, ) def test_compression(self): content, obj_id = self.hash_content(b"test content is compressed") self.storage.add(content, obj_id=obj_id) internal_id = self.storage._internal_id(obj_id) blob_client = self.storage.get_blob_client(internal_id) raw_blob = blob_client.download_blob().content_as_bytes() d = decompressors[self.compression]() assert d.decompress(raw_blob) == content assert d.unused_data == b"" def test_trailing_data_on_stored_blob(self): content, obj_id = self.hash_content(b"test content without garbage") self.storage.add(content, obj_id=obj_id) internal_id = self.storage._internal_id(obj_id) blob_client = self.storage.get_blob_client(internal_id) raw_blob = blob_client.download_blob().content_as_bytes() new_data = raw_blob + b"trailing garbage" blob_client.delete_blob() blob_client.upload_blob(data=new_data, length=len(new_data)) if self.compression == "none": with self.assertRaises(Error) as e: self.storage.check(obj_id) else: with self.assertRaises(Error) as e: self.storage.get(obj_id) assert "trailing data" in e.exception.args[0] class TestAzureCloudObjStorageGzip(TestAzureCloudObjStorage): compression = "gzip" class TestAzureCloudObjStorageZlib(TestAzureCloudObjStorage): compression = "zlib" class TestAzureCloudObjStorageLzma(TestAzureCloudObjStorage): compression = "lzma" class TestAzureCloudObjStorageBz2(TestAzureCloudObjStorage): compression = "bz2" class TestPrefixedAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() patcher = patch( "swh.objstorage.backends.azure.ContainerClient", MockContainerClient ) patcher.start() self.addCleanup(patcher.stop) self.accounts = {} for prefix in "0123456789abcdef": self.accounts[prefix] = "https://bogus-container-url.example/" + prefix self.storage = get_objstorage("azure-prefixed", {"accounts": self.accounts}) def test_prefixedazure_instantiation_missing_prefixes(self): del self.accounts["d"] del self.accounts["e"] with self.assertRaisesRegex(ValueError, "Missing prefixes"): get_objstorage("azure-prefixed", {"accounts": self.accounts}) def test_prefixedazure_instantiation_inconsistent_prefixes(self): self.accounts["00"] = self.accounts["0"] with self.assertRaisesRegex(ValueError, "Inconsistent prefixes"): get_objstorage("azure-prefixed", {"accounts": self.accounts}) def test_prefixedazure_sharding_behavior(self): for i in range(100): content, obj_id = self.hash_content(b"test_content_%02d" % i) self.storage.add(content, obj_id=obj_id) hex_obj_id = hash_to_hex(obj_id) prefix = hex_obj_id[0] self.assertTrue( self.storage.prefixes[prefix] .get_blob_client(hex_obj_id) .get_blob_properties() ) def test_get_container_url(): # r=read, l=list, w=write, d=delete policy_map = { "read_only": "rl", "append_only": "rwl", "full": "rwdl", } for policy, expected in policy_map.items(): ret = swh.objstorage.backends.azure.get_container_url( account_name="account_name", account_key=base64.b64encode(b"account_key"), container_name="container_name", access_policy=policy, ) p = urlparse(ret) assert p.scheme == "https" assert p.netloc == "account_name.blob.core.windows.net" assert p.path == "/container_name" qs = parse_qs(p.query) # sp: permissions assert qs["sp"] == [expected] # sr: resource (c=container) assert qs["sr"] == ["c"] # st: start; se: expiry assert qs["st"][0] < qs["se"][0] def test_bwcompat_args(monkeypatch): monkeypatch.setattr( swh.objstorage.backends.azure, "ContainerClient", MockContainerClient, ) with pytest.deprecated_call(): objs = get_objstorage( "azure", { "account_name": "account_name", "api_secret_key": base64.b64encode(b"account_key"), "container_name": "container_name", }, ) assert objs is not None def test_bwcompat_args_prefixed(monkeypatch): monkeypatch.setattr( swh.objstorage.backends.azure, "ContainerClient", MockContainerClient, ) accounts = { prefix: { "account_name": f"account_name{prefix}", "api_secret_key": base64.b64encode(b"account_key"), "container_name": "container_name", } for prefix in "0123456789abcdef" } with pytest.deprecated_call(): objs = get_objstorage("azure-prefixed", {"accounts": accounts}) assert objs is not None