Page MenuHomeSoftware Heritage

D3116.diff
No OneTemporary

D3116.diff

diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,4 +1,4 @@
pytest
apache-libcloud
-azure-storage
+azure-storage-blob >= 12.0
python-cephlibs
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,4 +9,4 @@
# optional dependencies
# apache-libcloud
-# azure-storage
+# azure-storage-blob >= 12.0
diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py
--- a/swh/objstorage/backends/azure.py
+++ b/swh/objstorage/backends/azure.py
@@ -3,53 +3,146 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import logging
+import datetime
+from itertools import product
import string
-from itertools import dropwhile, islice, product
+from typing import Dict, Optional, Union
+import warnings
-from azure.storage.blob import BlockBlobService
-from azure.common import AzureMissingResourceHttpError
-import requests
+from azure.storage.blob import (
+ ContainerClient,
+ ContainerSasPermissions,
+ generate_container_sas,
+)
+from azure.core.exceptions import ResourceNotFoundError
from swh.objstorage.objstorage import (
ObjStorage,
compute_hash,
- DEFAULT_LIMIT,
compressors,
decompressors,
)
from swh.objstorage.exc import ObjNotFoundError, Error
from swh.model import hashutil
-logging.getLogger("azure.storage").setLevel(logging.CRITICAL)
+def get_container_url(
+ account_name: str,
+ account_key: str,
+ container_name: str,
+ access_policy: str = "read_only",
+ expiry: datetime.timedelta = datetime.timedelta(days=365),
+ **kwargs,
+) -> str:
+ """Get the full url, for the given container on the given account, with a
+ Shared Access Signature granting the specified access policy.
+
+ Args:
+ account_name: name of the storage account for which to generate the URL
+ account_key: shared account key of the storage account used to generate the SAS
+ container_name: name of the container for which to grant access in the storage
+ account
+ access_policy: one of ``read_only``, ``append_only``, ``full``
+ expiry: the interval in the future with which the signature will expire
+
+ Returns:
+ the full URL of the container, with the shared access signature.
+ """
-class AzureCloudObjStorage(ObjStorage):
- """ObjStorage with azure abilities.
+ access_policies = {
+ "read_only": ContainerSasPermissions(
+ read=True, list=True, delete=False, write=False
+ ),
+ "append_only": ContainerSasPermissions(
+ read=True, list=True, delete=False, write=True
+ ),
+ "full": ContainerSasPermissions(read=True, list=True, delete=True, write=True),
+ }
+
+ current_time = datetime.datetime.utcnow()
+
+ signature = generate_container_sas(
+ account_name,
+ container_name,
+ account_key=account_key,
+ permission=access_policies[access_policy],
+ start=current_time + datetime.timedelta(minutes=-1),
+ expiry=current_time + expiry,
+ )
+ return f"https://{account_name}.blob.core.windows.net/{container_name}?{signature}"
+
+
+class AzureCloudObjStorage(ObjStorage):
+ """ObjStorage backend for Azure blob storage accounts.
+
+ Args:
+ container_url: the URL of the container in which the objects are stored.
+ account_name: (deprecated) the name of the storage account under which objects are
+ stored
+ api_secret_key: (deprecated) the shared account key
+ container_name: (deprecated) the name of the container under which objects are
+ stored
+ compression: the compression algorithm used to compress objects in storage
+
+ Notes:
+ The container url should contain the credentials via a "Shared Access
+ Signature". The :func:`get_container_url` helper can be used to generate
+ such a URL from the account's access keys. The ``account_name``,
+ ``api_secret_key`` and ``container_name`` arguments are deprecated.
"""
def __init__(
- self, account_name, api_secret_key, container_name, compression="gzip", **kwargs
+ self,
+ container_url: Optional[str] = None,
+ account_name: Optional[str] = None,
+ api_secret_key: Optional[str] = None,
+ container_name: Optional[str] = None,
+ compression="gzip",
+ **kwargs,
):
+ if container_url is None:
+ if account_name is None or api_secret_key is None or container_name is None:
+ raise ValueError(
+ "AzureCloudObjStorage must have a container_url or all three "
+ "account_name, api_secret_key and container_name"
+ )
+ else:
+ warnings.warn(
+ "The Azure objstorage account secret key parameters are "
+ "deprecated, please use container URLs instead.",
+ DeprecationWarning,
+ )
+ container_url = get_container_url(
+ account_name=account_name,
+ account_key=api_secret_key,
+ container_name=container_name,
+ access_policy="full",
+ )
+
super().__init__(**kwargs)
- self.block_blob_service = BlockBlobService(
- account_name=account_name,
- account_key=api_secret_key,
- request_session=requests.Session(),
- )
- self.container_name = container_name
+ self.container_client = ContainerClient.from_container_url(container_url)
self.compression = compression
- def get_blob_service(self, hex_obj_id):
- """Get the block_blob_service and container that contains the object with
+ def get_container_client(self, hex_obj_id):
+ """Get the container client for the container that contains the object with
internal id hex_obj_id
+
+ This is used to allow the PrefixedAzureCloudObjStorage to dispatch the
+ client according to the prefix of the object id.
+
"""
- return self.block_blob_service, self.container_name
+ return self.container_client
+
+ def get_blob_client(self, hex_obj_id):
+ """Get the azure blob client for the given hex obj id"""
+ container_client = self.get_container_client(hex_obj_id)
- def get_all_blob_services(self):
+ return container_client.get_blob_client(blob=hex_obj_id)
+
+ def get_all_container_clients(self):
"""Get all active block_blob_services"""
- yield self.block_blob_service, self.container_name
+ yield self.container_client
def _internal_id(self, obj_id):
"""Internal id is the hex version in objstorage.
@@ -59,8 +152,8 @@
def check_config(self, *, check_write):
"""Check the configuration for this object storage"""
- for service, container in self.get_all_blob_services():
- props = service.get_container_properties(container)
+ for container_client in self.get_all_container_clients():
+ props = container_client.get_container_properties()
# FIXME: check_write is ignored here
if not props:
@@ -73,15 +166,20 @@
"""
hex_obj_id = self._internal_id(obj_id)
- service, container = self.get_blob_service(hex_obj_id)
- return service.exists(container_name=container, blob_name=hex_obj_id)
+ client = self.get_blob_client(hex_obj_id)
+ try:
+ client.get_blob_properties()
+ except ResourceNotFoundError:
+ return False
+ else:
+ return True
def __iter__(self):
"""Iterate over the objects present in the storage.
"""
- for service, container in self.get_all_blob_services():
- for obj in service.list_blobs(container):
+ for client in self.get_all_container_clients():
+ for obj in client.list_blobs():
yield hashutil.hash_to_bytes(obj.name)
def __len__(self):
@@ -108,12 +206,11 @@
# Send the compressed content
compressor = compressors[self.compression]()
- blob = [compressor.compress(content), compressor.flush()]
+ data = compressor.compress(content)
+ data += compressor.flush()
- service, container = self.get_blob_service(hex_obj_id)
- service.create_blob_from_bytes(
- container_name=container, blob_name=hex_obj_id, blob=b"".join(blob),
- )
+ client = self.get_blob_client(hex_obj_id)
+ client.upload_blob(data=data, length=len(data))
return obj_id
@@ -121,6 +218,13 @@
"""Restore a content.
"""
+ if obj_id is None:
+ # Checksum is missing, compute it on the fly.
+ obj_id = compute_hash(content)
+
+ if obj_id in self:
+ self.delete(obj_id)
+
return self.add(content, obj_id, check_presence=False)
def get(self, obj_id):
@@ -128,16 +232,17 @@
"""
hex_obj_id = self._internal_id(obj_id)
- service, container = self.get_blob_service(hex_obj_id)
+ client = self.get_blob_client(hex_obj_id)
+
try:
- blob = service.get_blob_to_bytes(
- container_name=container, blob_name=hex_obj_id
- )
- except AzureMissingResourceHttpError:
- raise ObjNotFoundError(obj_id)
+ download = client.download_blob()
+ except ResourceNotFoundError:
+ raise ObjNotFoundError(obj_id) from None
+ else:
+ data = download.content_as_bytes()
decompressor = decompressors[self.compression]()
- ret = decompressor.decompress(blob.content)
+ ret = decompressor.decompress(data)
if decompressor.unused_data:
raise Error("Corrupt object %s: trailing data found" % hex_obj_id)
return ret
@@ -155,47 +260,28 @@
"""Delete an object."""
super().delete(obj_id) # Check delete permission
hex_obj_id = self._internal_id(obj_id)
- service, container = self.get_blob_service(hex_obj_id)
+ client = self.get_blob_client(hex_obj_id)
try:
- service.delete_blob(container_name=container, blob_name=hex_obj_id)
- except AzureMissingResourceHttpError:
- raise ObjNotFoundError("Content {} not found!".format(hex_obj_id))
+ client.delete_blob()
+ except ResourceNotFoundError:
+ raise ObjNotFoundError(obj_id) from None
return True
- def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
- all_blob_services = self.get_all_blob_services()
- if last_obj_id:
- last_obj_id = self._internal_id(last_obj_id)
- last_service, _ = self.get_blob_service(last_obj_id)
- all_blob_services = dropwhile(
- lambda srv: srv[0] != last_service, all_blob_services
- )
- else:
- last_service = None
-
- def iterate_blobs():
- for service, container in all_blob_services:
- marker = last_obj_id if service == last_service else None
- for obj in service.list_blobs(
- container, marker=marker, maxresults=limit
- ):
- yield hashutil.hash_to_bytes(obj.name)
-
- return islice(iterate_blobs(), limit)
-
class PrefixedAzureCloudObjStorage(AzureCloudObjStorage):
"""ObjStorage with azure capabilities, striped by prefix.
accounts is a dict containing entries of the form:
- <prefix>:
- account_name: <account_name>
- api_secret_key: <api_secret_key>
- container_name: <container_name>
+ <prefix>: <container_url_for_prefix>
"""
- def __init__(self, accounts, compression="gzip", **kwargs):
+ def __init__(
+ self,
+ accounts: Dict[str, Union[str, Dict[str, str]]],
+ compression="gzip",
+ **kwargs,
+ ):
# shortcut AzureCloudObjStorage __init__
ObjStorage.__init__(self, **kwargs)
@@ -223,26 +309,35 @@
"Missing prefixes %s" % ", ".join(sorted(missing_prefixes))
)
+ do_warning = False
+
self.prefixes = {}
- request_session = requests.Session()
- for prefix, account in accounts.items():
- self.prefixes[prefix] = (
- BlockBlobService(
- account_name=account["account_name"],
- account_key=account["api_secret_key"],
- request_session=request_session,
- ),
- account["container_name"],
+ for prefix, container_url in accounts.items():
+ if isinstance(container_url, dict):
+ do_warning = True
+ container_url = get_container_url(
+ account_name=container_url["account_name"],
+ account_key=container_url["api_secret_key"],
+ container_name=container_url["container_name"],
+ access_policy="full",
+ )
+ self.prefixes[prefix] = ContainerClient.from_container_url(container_url)
+
+ if do_warning:
+ warnings.warn(
+ "The Azure objstorage account secret key parameters are "
+ "deprecated, please use container URLs instead.",
+ DeprecationWarning,
)
- def get_blob_service(self, hex_obj_id):
+ def get_container_client(self, hex_obj_id):
"""Get the block_blob_service and container that contains the object with
internal id hex_obj_id
"""
return self.prefixes[hex_obj_id[: self.prefix_len]]
- def get_all_blob_services(self):
- """Get all active block_blob_services"""
+ def get_all_container_clients(self):
+ """Get all active container clients"""
# iterate on items() to sort blob services;
# needed to be able to paginate in the list_content() method
yield from (v for _, v in sorted(self.prefixes.items()))
diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py
--- a/swh/objstorage/tests/objstorage_testing.py
+++ b/swh/objstorage/tests/objstorage_testing.py
@@ -64,6 +64,8 @@
self.assertIsNone(result[0])
def test_restore_content(self):
+ self.storage.allow_delete = True
+
valid_content, valid_obj_id = self.hash_content(b"restore_content")
invalid_content = b"unexpected content"
id_adding = self.storage.add(invalid_content, valid_obj_id)
diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py
--- a/swh/objstorage/tests/test_objstorage_azure.py
+++ b/swh/objstorage/tests/test_objstorage_azure.py
@@ -3,15 +3,18 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import base64
+from dataclasses import dataclass
import unittest
-from collections import defaultdict
from unittest.mock import patch
+from urllib.parse import urlparse, parse_qs
-from typing import Any, Dict
+from azure.core.exceptions import ResourceNotFoundError
+import pytest
-from azure.common import AzureMissingResourceHttpError
from swh.model.hashutil import hash_to_hex
+import swh.objstorage.backends.azure
from swh.objstorage.factory import get_objstorage
from swh.objstorage.objstorage import decompressors
from swh.objstorage.exc import Error
@@ -19,51 +22,73 @@
from .objstorage_testing import ObjStorageTestFixture
-class MockBlob:
- """ Libcloud object mock that replicates its API """
+@dataclass
+class MockListedObject:
+ name: str
- def __init__(self, name, content):
- self.name = name
- self.content = content
+class MockDownloadClient:
+ def __init__(self, blob_data):
+ self.blob_data = blob_data
-class MockBlockBlobService:
- """Mock internal azure library which AzureCloudObjStorage depends upon.
+ def content_as_bytes(self):
+ return self.blob_data
- """
- _data: Dict[str, Any] = {}
+class MockBlobClient:
+ def __init__(self, container, blob):
+ self.container = container
+ self.blob = blob
- def __init__(self, account_name, account_key, **kwargs):
- # do not care for the account_name and the api_secret_key here
- self._data = defaultdict(dict)
+ def get_blob_properties(self):
+ if self.blob not in self.container.blobs:
+ raise ResourceNotFoundError("Blob not found")
- def get_container_properties(self, container_name):
- self._data[container_name]
- return container_name in self._data
+ return {"exists": True}
- def create_blob_from_bytes(self, container_name, blob_name, blob):
- self._data[container_name][blob_name] = blob
+ def upload_blob(self, data, length=None):
+ if self.blob in self.container.blobs:
+ raise ValueError("Blob already exists")
- def get_blob_to_bytes(self, container_name, blob_name):
- if blob_name not in self._data[container_name]:
- raise AzureMissingResourceHttpError("Blob %s not found" % blob_name, 404)
- return MockBlob(name=blob_name, content=self._data[container_name][blob_name])
+ if length is not None and length != len(data):
+ raise ValueError("Wrong length for blob data!")
- def delete_blob(self, container_name, blob_name):
- try:
- self._data[container_name].pop(blob_name)
- except KeyError:
- raise AzureMissingResourceHttpError("Blob %s not found" % blob_name, 404)
- return True
+ self.container.blobs[self.blob] = data
- def exists(self, container_name, blob_name):
- return blob_name in self._data[container_name]
+ def download_blob(self):
+ if self.blob not in self.container.blobs:
+ raise ResourceNotFoundError("Blob not found")
- def list_blobs(self, container_name, marker=None, maxresults=None):
- for blob_name, content in sorted(self._data[container_name].items()):
- if marker is None or blob_name > marker:
- yield MockBlob(name=blob_name, content=content)
+ return MockDownloadClient(self.container.blobs[self.blob])
+
+ def delete_blob(self):
+ if self.blob not in self.container.blobs:
+ raise ResourceNotFoundError("Blob not found")
+
+ del self.container.blobs[self.blob]
+
+
+class MockContainerClient:
+ def __init__(self, container_url):
+ self.container_url = container_url
+ self.blobs = {}
+
+ @classmethod
+ def from_container_url(cls, container_url):
+ return cls(container_url)
+
+ def get_container_properties(self):
+ return {"exists": True}
+
+ def get_blob_client(self, blob):
+ return MockBlobClient(self, blob)
+
+ def list_blobs(self):
+ for obj in sorted(self.blobs):
+ yield MockListedObject(obj)
+
+ def delete_blob(self, blob):
+ self.get_blob_client(blob.name).delete_blob()
class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase):
@@ -72,7 +97,7 @@
def setUp(self):
super().setUp()
patcher = patch(
- "swh.objstorage.backends.azure.BlockBlobService", MockBlockBlobService,
+ "swh.objstorage.backends.azure.ContainerClient", MockContainerClient,
)
patcher.start()
self.addCleanup(patcher.stop)
@@ -80,9 +105,7 @@
self.storage = get_objstorage(
"azure",
{
- "account_name": "account-name",
- "api_secret_key": "api-secret-key",
- "container_name": "container-name",
+ "container_url": "https://bogus-container-url.example",
"compression": self.compression,
},
)
@@ -91,23 +114,25 @@
content, obj_id = self.hash_content(b"test content is compressed")
self.storage.add(content, obj_id=obj_id)
- blob_service, container = self.storage.get_blob_service(obj_id)
internal_id = self.storage._internal_id(obj_id)
-
- raw_blob = blob_service.get_blob_to_bytes(container, internal_id)
+ blob_client = self.storage.get_blob_client(internal_id)
+ raw_blob = blob_client.download_blob().content_as_bytes()
d = decompressors[self.compression]()
- assert d.decompress(raw_blob.content) == content
+ assert d.decompress(raw_blob) == content
assert d.unused_data == b""
def test_trailing_data_on_stored_blob(self):
content, obj_id = self.hash_content(b"test content without garbage")
self.storage.add(content, obj_id=obj_id)
- blob_service, container = self.storage.get_blob_service(obj_id)
internal_id = self.storage._internal_id(obj_id)
+ blob_client = self.storage.get_blob_client(internal_id)
+ raw_blob = blob_client.download_blob().content_as_bytes()
+ new_data = raw_blob + b"trailing garbage"
- blob_service._data[container][internal_id] += b"trailing garbage"
+ blob_client.delete_blob()
+ blob_client.upload_blob(data=new_data, length=len(new_data))
if self.compression == "none":
with self.assertRaises(Error) as e:
@@ -138,18 +163,14 @@
def setUp(self):
super().setUp()
patcher = patch(
- "swh.objstorage.backends.azure.BlockBlobService", MockBlockBlobService,
+ "swh.objstorage.backends.azure.ContainerClient", MockContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
self.accounts = {}
for prefix in "0123456789abcdef":
- self.accounts[prefix] = {
- "account_name": "account_%s" % prefix,
- "api_secret_key": "secret_key_%s" % prefix,
- "container_name": "container_%s" % prefix,
- }
+ self.accounts[prefix] = "https://bogus-container-url.example/" + prefix
self.storage = get_objstorage("azure-prefixed", {"accounts": self.accounts})
@@ -173,7 +194,75 @@
hex_obj_id = hash_to_hex(obj_id)
prefix = hex_obj_id[0]
self.assertTrue(
- self.storage.prefixes[prefix][0].exists(
- self.accounts[prefix]["container_name"], hex_obj_id
- )
+ self.storage.prefixes[prefix]
+ .get_blob_client(hex_obj_id)
+ .get_blob_properties()
)
+
+
+def test_get_container_url():
+ # r=read, l=list, w=write, d=delete
+ policy_map = {
+ "read_only": "rl",
+ "append_only": "rwl",
+ "full": "rwdl",
+ }
+
+ for policy, expected in policy_map.items():
+ ret = swh.objstorage.backends.azure.get_container_url(
+ account_name="account_name",
+ account_key=base64.b64encode(b"account_key"),
+ container_name="container_name",
+ access_policy=policy,
+ )
+
+ p = urlparse(ret)
+ assert p.scheme == "https"
+ assert p.netloc == "account_name.blob.core.windows.net"
+ assert p.path == "/container_name"
+
+ qs = parse_qs(p.query)
+ # sp: permissions
+ assert qs["sp"] == [expected]
+ # sr: resource (c=container)
+ assert qs["sr"] == ["c"]
+ # st: start; se: expiry
+ assert qs["st"][0] < qs["se"][0]
+
+
+def test_bwcompat_args(monkeypatch):
+ monkeypatch.setattr(
+ swh.objstorage.backends.azure, "ContainerClient", MockContainerClient,
+ )
+
+ with pytest.deprecated_call():
+ objs = get_objstorage(
+ "azure",
+ {
+ "account_name": "account_name",
+ "api_secret_key": base64.b64encode(b"account_key"),
+ "container_name": "container_name",
+ },
+ )
+
+ assert objs is not None
+
+
+def test_bwcompat_args_prefixed(monkeypatch):
+ monkeypatch.setattr(
+ swh.objstorage.backends.azure, "ContainerClient", MockContainerClient,
+ )
+
+ accounts = {
+ prefix: {
+ "account_name": f"account_name{prefix}",
+ "api_secret_key": base64.b64encode(b"account_key"),
+ "container_name": "container_name",
+ }
+ for prefix in "0123456789abcdef"
+ }
+
+ with pytest.deprecated_call():
+ objs = get_objstorage("azure-prefixed", {"accounts": accounts})
+
+ assert objs is not None

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 5:09 PM (3 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223256

Event Timeline