diff --git a/requirements-test.txt b/requirements-test.txt index 6366880..b878758 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,9 +1,8 @@ pytest < 7.0.0 # v7.0.0 removed _pytest.tmpdir.TempdirFactory, which is used by some of the pytest plugins we use -pytest-asyncio pytest-mock requests_mock[fixture] >= 1.9 requests_toolbelt types-pyyaml types-requests pytest-postgresql < 4.0.0 # version 4.0 depends on psycopg 3. https://github.com/ClearcodeHQ/pytest-postgresql/blob/main/CHANGES.rst#400 diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py index be7aeaa..029dab3 100644 --- a/swh/objstorage/backends/azure.py +++ b/swh/objstorage/backends/azure.py @@ -1,442 +1,433 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib import datetime from itertools import product import string from typing import Dict, Optional, Union import warnings from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError from azure.storage.blob import ( ContainerClient, ContainerSasPermissions, generate_container_sas, ) from azure.storage.blob.aio import ContainerClient as AsyncContainerClient from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError from swh.objstorage.objstorage import ( ObjStorage, compressors, compute_hash, decompressors, ) +from swh.objstorage.utils import call_async def get_container_url( account_name: str, account_key: str, container_name: str, access_policy: str = "read_only", expiry: datetime.timedelta = datetime.timedelta(days=365), **kwargs, ) -> str: """Get the full url, for the given container on the given account, with a Shared Access Signature granting the specified access policy. Args: account_name: name of the storage account for which to generate the URL account_key: shared account key of the storage account used to generate the SAS container_name: name of the container for which to grant access in the storage account access_policy: one of ``read_only``, ``append_only``, ``full`` expiry: the interval in the future with which the signature will expire Returns: the full URL of the container, with the shared access signature. """ access_policies = { "read_only": ContainerSasPermissions( read=True, list=True, delete=False, write=False ), "append_only": ContainerSasPermissions( read=True, list=True, delete=False, write=True ), "full": ContainerSasPermissions(read=True, list=True, delete=True, write=True), } current_time = datetime.datetime.utcnow() signature = generate_container_sas( account_name, container_name, account_key=account_key, permission=access_policies[access_policy], start=current_time + datetime.timedelta(minutes=-1), expiry=current_time + expiry, ) return f"https://{account_name}.blob.core.windows.net/{container_name}?{signature}" -def call_async(f, *args): - """Calls an async coroutine from a synchronous function.""" - loop = asyncio.new_event_loop() - try: - return loop.run_until_complete(f(*args)) - finally: - loop.run_until_complete(loop.shutdown_asyncgens()) - loop.close() - - class AzureCloudObjStorage(ObjStorage): """ObjStorage backend for Azure blob storage accounts. Args: container_url: the URL of the container in which the objects are stored. account_name: (deprecated) the name of the storage account under which objects are stored api_secret_key: (deprecated) the shared account key container_name: (deprecated) the name of the container under which objects are stored compression: the compression algorithm used to compress objects in storage Notes: The container url should contain the credentials via a "Shared Access Signature". The :func:`get_container_url` helper can be used to generate such a URL from the account's access keys. The ``account_name``, ``api_secret_key`` and ``container_name`` arguments are deprecated. """ def __init__( self, container_url: Optional[str] = None, account_name: Optional[str] = None, api_secret_key: Optional[str] = None, container_name: Optional[str] = None, compression="gzip", **kwargs, ): if container_url is None: if account_name is None or api_secret_key is None or container_name is None: raise ValueError( "AzureCloudObjStorage must have a container_url or all three " "account_name, api_secret_key and container_name" ) else: warnings.warn( "The Azure objstorage account secret key parameters are " "deprecated, please use container URLs instead.", DeprecationWarning, ) container_url = get_container_url( account_name=account_name, account_key=api_secret_key, container_name=container_name, access_policy="full", ) super().__init__(**kwargs) self.container_url = container_url self.compression = compression def get_container_client(self, hex_obj_id): """Get the container client for the container that contains the object with internal id hex_obj_id This is used to allow the PrefixedAzureCloudObjStorage to dispatch the client according to the prefix of the object id. """ return ContainerClient.from_container_url(self.container_url) @contextlib.asynccontextmanager async def get_async_container_clients(self): """Returns a collection of container clients, to be passed to ``get_async_blob_client``. Each container may not be used in more than one asyncio loop.""" client = AsyncContainerClient.from_container_url(self.container_url) async with client: yield {"": client} def get_blob_client(self, hex_obj_id): """Get the azure blob client for the given hex obj id""" container_client = self.get_container_client(hex_obj_id) return container_client.get_blob_client(blob=hex_obj_id) def get_async_blob_client(self, hex_obj_id, container_clients): """Get the azure blob client for the given hex obj id and a collection yielded by ``get_async_container_clients``.""" return container_clients[""].get_blob_client(blob=hex_obj_id) def get_all_container_clients(self): """Get all active block_blob_services""" yield self.get_container_client("") def _internal_id(self, obj_id): """Internal id is the hex version in objstorage. """ return hashutil.hash_to_hex(obj_id) def check_config(self, *, check_write): """Check the configuration for this object storage""" for container_client in self.get_all_container_clients(): props = container_client.get_container_properties() # FIXME: check_write is ignored here if not props: return False return True def __contains__(self, obj_id): """Does the storage contains the obj_id. """ hex_obj_id = self._internal_id(obj_id) client = self.get_blob_client(hex_obj_id) try: client.get_blob_properties() except ResourceNotFoundError: return False else: return True def __iter__(self): """Iterate over the objects present in the storage. """ for client in self.get_all_container_clients(): for obj in client.list_blobs(): yield hashutil.hash_to_bytes(obj.name) def __len__(self): """Compute the number of objects in the current object storage. Returns: number of objects contained in the storage. """ return sum(1 for i in self) def add(self, content, obj_id=None, check_presence=True): """Add an obj in storage if it's not there already. """ if obj_id is None: # Checksum is missing, compute it on the fly. obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id hex_obj_id = self._internal_id(obj_id) # Send the compressed content compressor = compressors[self.compression]() data = compressor.compress(content) data += compressor.flush() client = self.get_blob_client(hex_obj_id) try: client.upload_blob(data=data, length=len(data)) except ResourceExistsError: # There's a race condition between check_presence and upload_blob, # that we can't get rid of as the azure api doesn't allow atomic # replaces or renaming a blob. As the restore operation explicitly # removes the blob, it should be safe to just ignore the error. pass return obj_id def restore(self, content, obj_id=None): """Restore a content. """ if obj_id is None: # Checksum is missing, compute it on the fly. obj_id = compute_hash(content) if obj_id in self: self.delete(obj_id) return self.add(content, obj_id, check_presence=False) def get(self, obj_id): """retrieve blob's content if found. """ return call_async(self._get_async, obj_id) async def _get_async(self, obj_id, container_clients=None): """Coroutine implementing ``get(obj_id)`` using azure-storage-blob's asynchronous implementation. While ``get(obj_id)`` does not need asynchronicity, this is useful to ``get_batch(obj_ids)``, as it can run multiple ``_get_async`` tasks concurrently.""" if container_clients is None: # If the container_clients argument is not passed, create a new # collection of container_clients and restart the function with it. async with self.get_async_container_clients() as container_clients: return await self._get_async(obj_id, container_clients) hex_obj_id = self._internal_id(obj_id) client = self.get_async_blob_client(hex_obj_id, container_clients) try: download = await client.download_blob() except ResourceNotFoundError: raise ObjNotFoundError(obj_id) from None else: data = await download.content_as_bytes() decompressor = decompressors[self.compression]() ret = decompressor.decompress(data) if decompressor.unused_data: raise Error("Corrupt object %s: trailing data found" % hex_obj_id) return ret async def _get_async_or_none(self, obj_id, container_clients): """Like ``get_async(obj_id)``, but returns None instead of raising ResourceNotFoundError. Used by ``get_batch`` so other blobs can be returned even if one is missing.""" try: return await self._get_async(obj_id, container_clients) except ObjNotFoundError: return None async def _get_batch_async(self, obj_ids): async with self.get_async_container_clients() as container_clients: return await asyncio.gather( *[ self._get_async_or_none(obj_id, container_clients) for obj_id in obj_ids ] ) def get_batch(self, obj_ids): """Retrieve objects' raw content in bulk from storage, concurrently.""" return call_async(self._get_batch_async, obj_ids) def check(self, obj_id): """Check the content integrity. """ obj_content = self.get(obj_id) content_obj_id = compute_hash(obj_content) if content_obj_id != obj_id: raise Error(obj_id) def delete(self, obj_id): """Delete an object.""" super().delete(obj_id) # Check delete permission hex_obj_id = self._internal_id(obj_id) client = self.get_blob_client(hex_obj_id) try: client.delete_blob() except ResourceNotFoundError: raise ObjNotFoundError(obj_id) from None return True class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): """ObjStorage with azure capabilities, striped by prefix. accounts is a dict containing entries of the form: : """ def __init__( self, accounts: Dict[str, Union[str, Dict[str, str]]], compression="gzip", **kwargs, ): # shortcut AzureCloudObjStorage __init__ ObjStorage.__init__(self, **kwargs) self.compression = compression # Definition sanity check prefix_lengths = set(len(prefix) for prefix in accounts) if not len(prefix_lengths) == 1: raise ValueError( "Inconsistent prefixes, found lengths %s" % ", ".join(str(lst) for lst in sorted(prefix_lengths)) ) self.prefix_len = prefix_lengths.pop() expected_prefixes = set( "".join(letters) for letters in product( set(string.hexdigits.lower()), repeat=self.prefix_len ) ) missing_prefixes = expected_prefixes - set(accounts) if missing_prefixes: raise ValueError( "Missing prefixes %s" % ", ".join(sorted(missing_prefixes)) ) do_warning = False self.container_urls = {} for prefix, container_url in accounts.items(): if isinstance(container_url, dict): do_warning = True container_url = get_container_url( account_name=container_url["account_name"], account_key=container_url["api_secret_key"], container_name=container_url["container_name"], access_policy="full", ) self.container_urls[prefix] = container_url if do_warning: warnings.warn( "The Azure objstorage account secret key parameters are " "deprecated, please use container URLs instead.", DeprecationWarning, ) def get_container_client(self, hex_obj_id): """Get the block_blob_service and container that contains the object with internal id hex_obj_id """ prefix = hex_obj_id[: self.prefix_len] return ContainerClient.from_container_url(self.container_urls[prefix]) @contextlib.asynccontextmanager async def get_async_container_clients(self): # This is equivalent to: # client1 = AsyncContainerClient.from_container_url(url1) # ... # client16 = AsyncContainerClient.from_container_url(url16) # async with client1, ..., client16: # yield {prefix1: client1, ..., prefix16: client16} clients = { prefix: AsyncContainerClient.from_container_url(url) for (prefix, url) in self.container_urls.items() } async with contextlib.AsyncExitStack() as stack: for client in clients.values(): await stack.enter_async_context(client) yield clients def get_async_blob_client(self, hex_obj_id, container_clients): """Get the azure blob client for the given hex obj id and a collection yielded by ``get_async_container_clients``.""" prefix = hex_obj_id[: self.prefix_len] return container_clients[prefix].get_blob_client(blob=hex_obj_id) def get_all_container_clients(self): """Get all active container clients""" # iterate on items() to sort blob services; # needed to be able to paginate in the list_content() method yield from ( self.get_container_client(prefix) for prefix in sorted(self.container_urls) ) diff --git a/swh/objstorage/tests/test_objstorage_winery.py b/swh/objstorage/tests/test_objstorage_winery.py index b39b176..018fce7 100644 --- a/swh/objstorage/tests/test_objstorage_winery.py +++ b/swh/objstorage/tests/test_objstorage_winery.py @@ -1,402 +1,401 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import time import pytest import sh from swh.objstorage import exc from swh.objstorage.backends.winery.database import DatabaseAdmin from swh.objstorage.backends.winery.objstorage import Packer, pack from swh.objstorage.backends.winery.stats import Stats from swh.objstorage.backends.winery.throttler import ( BandwidthCalculator, IOThrottler, LeakyBucket, Throttler, ) from swh.objstorage.factory import get_objstorage +from swh.objstorage.utils import call_async from .winery_benchmark import Bench, work from .winery_testing_helpers import PoolHelper, SharedBaseHelper @pytest.fixture def needs_ceph(): try: sh.ceph("--version") except sh.CommandNotFound: pytest.skip("the ceph CLI was not found") @pytest.fixture def ceph_pool(needs_ceph): pool = PoolHelper(shard_max_size=10 * 1024 * 1024) pool.clobber() pool.pool_create() yield pool pool.images_clobber() pool.clobber() @pytest.fixture def storage(request, postgresql): marker = request.node.get_closest_marker("shard_max_size") if marker is None: shard_max_size = 1024 else: shard_max_size = marker.args[0] dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) storage = get_objstorage( cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=shard_max_size, throttle_write=200 * 1024 * 1024, throttle_read=100 * 1024 * 1024, ) yield storage storage.winery.uninit() # # pytest-postgresql will not remove databases that it did not # create between tests (only at the very end). # d = DatabaseAdmin(dsn) for database in d.list_databases(): if database != postgresql.info.dbname and database != "tests_tmpl": DatabaseAdmin(dsn, database).drop_database() @pytest.fixture def winery(storage): return storage.winery def test_winery_sharedbase(winery): base = winery.base shard1 = base.whoami assert shard1 is not None assert shard1 == base.whoami id1 = base.id assert id1 is not None assert id1 == base.id def test_winery_add_get(winery): shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) assert ( obj_id.hex() == "866878b165607851782d8d233edf0c261172ff67926330d3bbd10c705b92d24f" ) assert winery.add(content=content, obj_id=obj_id) == obj_id assert winery.add(content=content, obj_id=obj_id, check_presence=False) == obj_id assert winery.base.whoami == shard assert winery.get(obj_id) == content with pytest.raises(exc.ObjNotFoundError): winery.get(b"unknown") winery.shard.drop() @pytest.mark.shard_max_size(1) def test_winery_add_and_pack(winery, mocker): mocker.patch("swh.objstorage.backends.winery.objstorage.pack", return_value=True) shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) assert ( obj_id.hex() == "866878b165607851782d8d233edf0c261172ff67926330d3bbd10c705b92d24f" ) assert winery.base.whoami != shard assert len(winery.packers) == 1 packer = winery.packers[0] packer.join() assert packer.exitcode == 0 def test_winery_delete(storage): with pytest.raises(PermissionError): storage.delete(None) def test_winery_get_shard_info(winery): assert winery.base.get_shard_info(1234) is None assert SharedBaseHelper(winery.base).get_shard_info_by_name("nothing") is None @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_packer(winery, ceph_pool): shard = winery.base.whoami content = b"SOMETHING" winery.add(content=content) winery.base.shard_packing_starts() packer = Packer(shard, **winery.args) try: assert packer.run() is True finally: packer.uninit() readonly, packing = SharedBaseHelper(winery.base).get_shard_info_by_name(shard) assert readonly is True assert packing is False @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_get_object(winery, ceph_pool): shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) winery.base.shard_packing_starts() assert pack(shard, **winery.args) is True assert winery.get(obj_id) == content def test_winery_ceph_pool(needs_ceph): name = "IMAGE" pool = PoolHelper(shard_max_size=10 * 1024 * 1024) pool.clobber() pool.pool_create() pool.image_create(name) p = pool.image_path(name) assert p.endswith(name) something = "SOMETHING" open(p, "w").write(something) assert open(p).read(len(something)) == something assert pool.image_list() == [name] pool.image_remap_ro(name) pool.images_clobber() assert pool.image_list() == [name] pool.clobber() assert pool.image_list() == [] @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_bench_work(winery, ceph_pool, tmpdir): # # rw worker creates a shard # whoami = winery.base.whoami shards_info = list(winery.base.list_shards()) assert len(shards_info) == 1 shard, readonly, packing = shards_info[0] assert (readonly, packing) == (False, False) winery.args["dir"] = str(tmpdir) assert work("rw", winery.args) == "rw" shards_info = { name: (readonly, packing) for name, readonly, packing in winery.base.list_shards() } assert len(shards_info) == 2 assert shards_info[whoami] == (True, False) # # ro worker reads a shard # winery.args["ro_worker_max_request"] = 1 assert work("ro", winery.args) == "ro" -@pytest.mark.asyncio -async def test_winery_bench_real(pytestconfig, postgresql, ceph_pool): +def test_winery_bench_real(pytestconfig, postgresql, ceph_pool): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) kwargs = { "output_dir": pytestconfig.getoption("--winery-bench-output-directory"), "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), "shard_max_size": pytestconfig.getoption("--winery-shard-max-size"), "ro_worker_max_request": pytestconfig.getoption( "--winery-bench-ro-worker-max-request" ), "duration": pytestconfig.getoption("--winery-bench-duration"), "base_dsn": dsn, "shard_dsn": dsn, "throttle_read": pytestconfig.getoption("--winery-bench-throttle-read"), "throttle_write": pytestconfig.getoption("--winery-bench-throttle-write"), } - count = await Bench(kwargs).run() + count = call_async(Bench(kwargs).run) assert count > 0 -@pytest.mark.asyncio -async def test_winery_bench_fake(pytestconfig, mocker): +def test_winery_bench_fake(pytestconfig, mocker): kwargs = { "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), "duration": pytestconfig.getoption("--winery-bench-duration"), } def run(kind): time.sleep(kwargs["duration"] * 2) return kind mocker.patch("swh.objstorage.tests.winery_benchmark.Worker.run", side_effect=run) - assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] + assert call_async(Bench(kwargs).run) == kwargs["rw_workers"] + kwargs["ro_workers"] def test_winery_leaky_bucket_tick(mocker): total = 100 half = 50 b = LeakyBucket(total) sleep = mocker.spy(time, "sleep") assert b.current == b.total sleep.assert_not_called() # # Bucket is at 100, add(50) => drops to 50 # b.add(half) assert b.current == half sleep.assert_not_called() # # Bucket is at 50, add(50) => drops to 0 # b.add(half) assert b.current == 0 sleep.assert_not_called() # # Bucket is at 0, add(50) => waits until it is at 50 and then drops to 0 # b.add(half) assert b.current == 0 sleep.assert_called_once() # # Sleep more than one second, bucket is full again, i.e. at 100 # time.sleep(2) mocker.resetall() b.add(0) assert b.current == total sleep.assert_not_called() # # Bucket is full at 100 and and waits when requesting 150 which is # more than it can contain # b.add(total + half) assert b.current == 0 sleep.assert_called_once() mocker.resetall() # # Bucket is empty and and waits when requesting 150 which is more # than it can contain # b.add(total + half) assert b.current == 0 sleep.assert_called_once() mocker.resetall() def test_winery_leaky_bucket_reset(): b = LeakyBucket(100) assert b.total == 100 assert b.current == b.total b.reset(50) assert b.total == 50 assert b.current == b.total b.reset(100) assert b.total == 100 assert b.current == 50 def test_winery_bandwidth_calculator(mocker): now = 1 def monotonic(): return now mocker.patch("time.monotonic", side_effect=monotonic) b = BandwidthCalculator() assert b.get() == 0 count = 100 * 1024 * 1024 going_up = [] for t in range(b.duration): now += 1 b.add(count) going_up.append(b.get()) assert b.get() == count going_down = [] for t in range(b.duration - 1): now += 1 b.add(0) going_down.append(b.get()) going_down.reverse() assert going_up[:-1] == going_down assert len(b.history) == b.duration - 1 def test_winery_io_throttler(postgresql, mocker): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) DatabaseAdmin(dsn, "throttler").create_database() sleep = mocker.spy(time, "sleep") speed = 100 i = IOThrottler("read", base_dsn=dsn, throttle_read=100) count = speed i.add(count) sleep.assert_not_called() i.add(count) sleep.assert_called_once() # # Force slow down # mocker.resetall() i.sync_interval = 0 i.max_speed = 1 assert i.max_speed != i.bucket.total i.add(2) assert i.max_speed == i.bucket.total sleep.assert_called_once() def test_winery_throttler(postgresql): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) t = Throttler(base_dsn=dsn, throttle_read=100, throttle_write=100) base = {} key = "KEY" content = "CONTENT" def reader(k): return base[k] def writer(k, v): base[k] = v return True assert t.throttle_add(writer, key, content) is True assert t.throttle_get(reader, key) == content def test_winery_stats(tmpdir): s = Stats(None) assert s.stats_active is False s = Stats(tmpdir / "stats") assert s.stats_active is True assert os.path.exists(s.stats_filename) size = os.path.getsize(s.stats_filename) s._stats_flush_interval = 0 k = "KEY" v = "CONTENT" s.stats_read(k, v) s.stats_write(k, v) s.stats_read(k, v) s.stats_write(k, v) s.__del__() assert os.path.getsize(s.stats_filename) > size diff --git a/swh/objstorage/utils.py b/swh/objstorage/utils.py new file mode 100644 index 0000000..0d84a11 --- /dev/null +++ b/swh/objstorage/utils.py @@ -0,0 +1,16 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import asyncio + + +def call_async(f, *args): + """Calls an async coroutine from a synchronous function.""" + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(f(*args)) + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close()