Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9347211
D3116.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
24 KB
Subscribers
None
D3116.diff
View Options
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,4 +1,4 @@
pytest
apache-libcloud
-azure-storage
+azure-storage-blob >= 12.0
python-cephlibs
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,4 +9,4 @@
# optional dependencies
# apache-libcloud
-# azure-storage
+# azure-storage-blob >= 12.0
diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py
--- a/swh/objstorage/backends/azure.py
+++ b/swh/objstorage/backends/azure.py
@@ -3,53 +3,146 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import logging
+import datetime
+from itertools import product
import string
-from itertools import dropwhile, islice, product
+from typing import Dict, Optional, Union
+import warnings
-from azure.storage.blob import BlockBlobService
-from azure.common import AzureMissingResourceHttpError
-import requests
+from azure.storage.blob import (
+ ContainerClient,
+ ContainerSasPermissions,
+ generate_container_sas,
+)
+from azure.core.exceptions import ResourceNotFoundError
from swh.objstorage.objstorage import (
ObjStorage,
compute_hash,
- DEFAULT_LIMIT,
compressors,
decompressors,
)
from swh.objstorage.exc import ObjNotFoundError, Error
from swh.model import hashutil
-logging.getLogger("azure.storage").setLevel(logging.CRITICAL)
+def get_container_url(
+ account_name: str,
+ account_key: str,
+ container_name: str,
+ access_policy: str = "read_only",
+ expiry: datetime.timedelta = datetime.timedelta(days=365),
+ **kwargs,
+) -> str:
+ """Get the full url, for the given container on the given account, with a
+ Shared Access Signature granting the specified access policy.
+
+ Args:
+ account_name: name of the storage account for which to generate the URL
+ account_key: shared account key of the storage account used to generate the SAS
+ container_name: name of the container for which to grant access in the storage
+ account
+ access_policy: one of ``read_only``, ``append_only``, ``full``
+ expiry: the interval in the future with which the signature will expire
+
+ Returns:
+ the full URL of the container, with the shared access signature.
+ """
-class AzureCloudObjStorage(ObjStorage):
- """ObjStorage with azure abilities.
+ access_policies = {
+ "read_only": ContainerSasPermissions(
+ read=True, list=True, delete=False, write=False
+ ),
+ "append_only": ContainerSasPermissions(
+ read=True, list=True, delete=False, write=True
+ ),
+ "full": ContainerSasPermissions(read=True, list=True, delete=True, write=True),
+ }
+
+ current_time = datetime.datetime.utcnow()
+
+ signature = generate_container_sas(
+ account_name,
+ container_name,
+ account_key=account_key,
+ permission=access_policies[access_policy],
+ start=current_time + datetime.timedelta(minutes=-1),
+ expiry=current_time + expiry,
+ )
+ return f"https://{account_name}.blob.core.windows.net/{container_name}?{signature}"
+
+
+class AzureCloudObjStorage(ObjStorage):
+ """ObjStorage backend for Azure blob storage accounts.
+
+ Args:
+ container_url: the URL of the container in which the objects are stored.
+ account_name: (deprecated) the name of the storage account under which objects are
+ stored
+ api_secret_key: (deprecated) the shared account key
+ container_name: (deprecated) the name of the container under which objects are
+ stored
+ compression: the compression algorithm used to compress objects in storage
+
+ Notes:
+ The container url should contain the credentials via a "Shared Access
+ Signature". The :func:`get_container_url` helper can be used to generate
+ such a URL from the account's access keys. The ``account_name``,
+ ``api_secret_key`` and ``container_name`` arguments are deprecated.
"""
def __init__(
- self, account_name, api_secret_key, container_name, compression="gzip", **kwargs
+ self,
+ container_url: Optional[str] = None,
+ account_name: Optional[str] = None,
+ api_secret_key: Optional[str] = None,
+ container_name: Optional[str] = None,
+ compression="gzip",
+ **kwargs,
):
+ if container_url is None:
+ if account_name is None or api_secret_key is None or container_name is None:
+ raise ValueError(
+ "AzureCloudObjStorage must have a container_url or all three "
+ "account_name, api_secret_key and container_name"
+ )
+ else:
+ warnings.warn(
+ "The Azure objstorage account secret key parameters are "
+ "deprecated, please use container URLs instead.",
+ DeprecationWarning,
+ )
+ container_url = get_container_url(
+ account_name=account_name,
+ account_key=api_secret_key,
+ container_name=container_name,
+ access_policy="full",
+ )
+
super().__init__(**kwargs)
- self.block_blob_service = BlockBlobService(
- account_name=account_name,
- account_key=api_secret_key,
- request_session=requests.Session(),
- )
- self.container_name = container_name
+ self.container_client = ContainerClient.from_container_url(container_url)
self.compression = compression
- def get_blob_service(self, hex_obj_id):
- """Get the block_blob_service and container that contains the object with
+ def get_container_client(self, hex_obj_id):
+ """Get the container client for the container that contains the object with
internal id hex_obj_id
+
+ This is used to allow the PrefixedAzureCloudObjStorage to dispatch the
+ client according to the prefix of the object id.
+
"""
- return self.block_blob_service, self.container_name
+ return self.container_client
+
+ def get_blob_client(self, hex_obj_id):
+ """Get the azure blob client for the given hex obj id"""
+ container_client = self.get_container_client(hex_obj_id)
- def get_all_blob_services(self):
+ return container_client.get_blob_client(blob=hex_obj_id)
+
+ def get_all_container_clients(self):
"""Get all active block_blob_services"""
- yield self.block_blob_service, self.container_name
+ yield self.container_client
def _internal_id(self, obj_id):
"""Internal id is the hex version in objstorage.
@@ -59,8 +152,8 @@
def check_config(self, *, check_write):
"""Check the configuration for this object storage"""
- for service, container in self.get_all_blob_services():
- props = service.get_container_properties(container)
+ for container_client in self.get_all_container_clients():
+ props = container_client.get_container_properties()
# FIXME: check_write is ignored here
if not props:
@@ -73,15 +166,20 @@
"""
hex_obj_id = self._internal_id(obj_id)
- service, container = self.get_blob_service(hex_obj_id)
- return service.exists(container_name=container, blob_name=hex_obj_id)
+ client = self.get_blob_client(hex_obj_id)
+ try:
+ client.get_blob_properties()
+ except ResourceNotFoundError:
+ return False
+ else:
+ return True
def __iter__(self):
"""Iterate over the objects present in the storage.
"""
- for service, container in self.get_all_blob_services():
- for obj in service.list_blobs(container):
+ for client in self.get_all_container_clients():
+ for obj in client.list_blobs():
yield hashutil.hash_to_bytes(obj.name)
def __len__(self):
@@ -108,12 +206,11 @@
# Send the compressed content
compressor = compressors[self.compression]()
- blob = [compressor.compress(content), compressor.flush()]
+ data = compressor.compress(content)
+ data += compressor.flush()
- service, container = self.get_blob_service(hex_obj_id)
- service.create_blob_from_bytes(
- container_name=container, blob_name=hex_obj_id, blob=b"".join(blob),
- )
+ client = self.get_blob_client(hex_obj_id)
+ client.upload_blob(data=data, length=len(data))
return obj_id
@@ -121,6 +218,13 @@
"""Restore a content.
"""
+ if obj_id is None:
+ # Checksum is missing, compute it on the fly.
+ obj_id = compute_hash(content)
+
+ if obj_id in self:
+ self.delete(obj_id)
+
return self.add(content, obj_id, check_presence=False)
def get(self, obj_id):
@@ -128,16 +232,17 @@
"""
hex_obj_id = self._internal_id(obj_id)
- service, container = self.get_blob_service(hex_obj_id)
+ client = self.get_blob_client(hex_obj_id)
+
try:
- blob = service.get_blob_to_bytes(
- container_name=container, blob_name=hex_obj_id
- )
- except AzureMissingResourceHttpError:
- raise ObjNotFoundError(obj_id)
+ download = client.download_blob()
+ except ResourceNotFoundError:
+ raise ObjNotFoundError(obj_id) from None
+ else:
+ data = download.content_as_bytes()
decompressor = decompressors[self.compression]()
- ret = decompressor.decompress(blob.content)
+ ret = decompressor.decompress(data)
if decompressor.unused_data:
raise Error("Corrupt object %s: trailing data found" % hex_obj_id)
return ret
@@ -155,47 +260,28 @@
"""Delete an object."""
super().delete(obj_id) # Check delete permission
hex_obj_id = self._internal_id(obj_id)
- service, container = self.get_blob_service(hex_obj_id)
+ client = self.get_blob_client(hex_obj_id)
try:
- service.delete_blob(container_name=container, blob_name=hex_obj_id)
- except AzureMissingResourceHttpError:
- raise ObjNotFoundError("Content {} not found!".format(hex_obj_id))
+ client.delete_blob()
+ except ResourceNotFoundError:
+ raise ObjNotFoundError(obj_id) from None
return True
- def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
- all_blob_services = self.get_all_blob_services()
- if last_obj_id:
- last_obj_id = self._internal_id(last_obj_id)
- last_service, _ = self.get_blob_service(last_obj_id)
- all_blob_services = dropwhile(
- lambda srv: srv[0] != last_service, all_blob_services
- )
- else:
- last_service = None
-
- def iterate_blobs():
- for service, container in all_blob_services:
- marker = last_obj_id if service == last_service else None
- for obj in service.list_blobs(
- container, marker=marker, maxresults=limit
- ):
- yield hashutil.hash_to_bytes(obj.name)
-
- return islice(iterate_blobs(), limit)
-
class PrefixedAzureCloudObjStorage(AzureCloudObjStorage):
"""ObjStorage with azure capabilities, striped by prefix.
accounts is a dict containing entries of the form:
- <prefix>:
- account_name: <account_name>
- api_secret_key: <api_secret_key>
- container_name: <container_name>
+ <prefix>: <container_url_for_prefix>
"""
- def __init__(self, accounts, compression="gzip", **kwargs):
+ def __init__(
+ self,
+ accounts: Dict[str, Union[str, Dict[str, str]]],
+ compression="gzip",
+ **kwargs,
+ ):
# shortcut AzureCloudObjStorage __init__
ObjStorage.__init__(self, **kwargs)
@@ -223,26 +309,35 @@
"Missing prefixes %s" % ", ".join(sorted(missing_prefixes))
)
+ do_warning = False
+
self.prefixes = {}
- request_session = requests.Session()
- for prefix, account in accounts.items():
- self.prefixes[prefix] = (
- BlockBlobService(
- account_name=account["account_name"],
- account_key=account["api_secret_key"],
- request_session=request_session,
- ),
- account["container_name"],
+ for prefix, container_url in accounts.items():
+ if isinstance(container_url, dict):
+ do_warning = True
+ container_url = get_container_url(
+ account_name=container_url["account_name"],
+ account_key=container_url["api_secret_key"],
+ container_name=container_url["container_name"],
+ access_policy="full",
+ )
+ self.prefixes[prefix] = ContainerClient.from_container_url(container_url)
+
+ if do_warning:
+ warnings.warn(
+ "The Azure objstorage account secret key parameters are "
+ "deprecated, please use container URLs instead.",
+ DeprecationWarning,
)
- def get_blob_service(self, hex_obj_id):
+ def get_container_client(self, hex_obj_id):
"""Get the block_blob_service and container that contains the object with
internal id hex_obj_id
"""
return self.prefixes[hex_obj_id[: self.prefix_len]]
- def get_all_blob_services(self):
- """Get all active block_blob_services"""
+ def get_all_container_clients(self):
+ """Get all active container clients"""
# iterate on items() to sort blob services;
# needed to be able to paginate in the list_content() method
yield from (v for _, v in sorted(self.prefixes.items()))
diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py
--- a/swh/objstorage/tests/objstorage_testing.py
+++ b/swh/objstorage/tests/objstorage_testing.py
@@ -64,6 +64,8 @@
self.assertIsNone(result[0])
def test_restore_content(self):
+ self.storage.allow_delete = True
+
valid_content, valid_obj_id = self.hash_content(b"restore_content")
invalid_content = b"unexpected content"
id_adding = self.storage.add(invalid_content, valid_obj_id)
diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py
--- a/swh/objstorage/tests/test_objstorage_azure.py
+++ b/swh/objstorage/tests/test_objstorage_azure.py
@@ -3,15 +3,18 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import base64
+from dataclasses import dataclass
import unittest
-from collections import defaultdict
from unittest.mock import patch
+from urllib.parse import urlparse, parse_qs
-from typing import Any, Dict
+from azure.core.exceptions import ResourceNotFoundError
+import pytest
-from azure.common import AzureMissingResourceHttpError
from swh.model.hashutil import hash_to_hex
+import swh.objstorage.backends.azure
from swh.objstorage.factory import get_objstorage
from swh.objstorage.objstorage import decompressors
from swh.objstorage.exc import Error
@@ -19,51 +22,73 @@
from .objstorage_testing import ObjStorageTestFixture
-class MockBlob:
- """ Libcloud object mock that replicates its API """
+@dataclass
+class MockListedObject:
+ name: str
- def __init__(self, name, content):
- self.name = name
- self.content = content
+class MockDownloadClient:
+ def __init__(self, blob_data):
+ self.blob_data = blob_data
-class MockBlockBlobService:
- """Mock internal azure library which AzureCloudObjStorage depends upon.
+ def content_as_bytes(self):
+ return self.blob_data
- """
- _data: Dict[str, Any] = {}
+class MockBlobClient:
+ def __init__(self, container, blob):
+ self.container = container
+ self.blob = blob
- def __init__(self, account_name, account_key, **kwargs):
- # do not care for the account_name and the api_secret_key here
- self._data = defaultdict(dict)
+ def get_blob_properties(self):
+ if self.blob not in self.container.blobs:
+ raise ResourceNotFoundError("Blob not found")
- def get_container_properties(self, container_name):
- self._data[container_name]
- return container_name in self._data
+ return {"exists": True}
- def create_blob_from_bytes(self, container_name, blob_name, blob):
- self._data[container_name][blob_name] = blob
+ def upload_blob(self, data, length=None):
+ if self.blob in self.container.blobs:
+ raise ValueError("Blob already exists")
- def get_blob_to_bytes(self, container_name, blob_name):
- if blob_name not in self._data[container_name]:
- raise AzureMissingResourceHttpError("Blob %s not found" % blob_name, 404)
- return MockBlob(name=blob_name, content=self._data[container_name][blob_name])
+ if length is not None and length != len(data):
+ raise ValueError("Wrong length for blob data!")
- def delete_blob(self, container_name, blob_name):
- try:
- self._data[container_name].pop(blob_name)
- except KeyError:
- raise AzureMissingResourceHttpError("Blob %s not found" % blob_name, 404)
- return True
+ self.container.blobs[self.blob] = data
- def exists(self, container_name, blob_name):
- return blob_name in self._data[container_name]
+ def download_blob(self):
+ if self.blob not in self.container.blobs:
+ raise ResourceNotFoundError("Blob not found")
- def list_blobs(self, container_name, marker=None, maxresults=None):
- for blob_name, content in sorted(self._data[container_name].items()):
- if marker is None or blob_name > marker:
- yield MockBlob(name=blob_name, content=content)
+ return MockDownloadClient(self.container.blobs[self.blob])
+
+ def delete_blob(self):
+ if self.blob not in self.container.blobs:
+ raise ResourceNotFoundError("Blob not found")
+
+ del self.container.blobs[self.blob]
+
+
+class MockContainerClient:
+ def __init__(self, container_url):
+ self.container_url = container_url
+ self.blobs = {}
+
+ @classmethod
+ def from_container_url(cls, container_url):
+ return cls(container_url)
+
+ def get_container_properties(self):
+ return {"exists": True}
+
+ def get_blob_client(self, blob):
+ return MockBlobClient(self, blob)
+
+ def list_blobs(self):
+ for obj in sorted(self.blobs):
+ yield MockListedObject(obj)
+
+ def delete_blob(self, blob):
+ self.get_blob_client(blob.name).delete_blob()
class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase):
@@ -72,7 +97,7 @@
def setUp(self):
super().setUp()
patcher = patch(
- "swh.objstorage.backends.azure.BlockBlobService", MockBlockBlobService,
+ "swh.objstorage.backends.azure.ContainerClient", MockContainerClient,
)
patcher.start()
self.addCleanup(patcher.stop)
@@ -80,9 +105,7 @@
self.storage = get_objstorage(
"azure",
{
- "account_name": "account-name",
- "api_secret_key": "api-secret-key",
- "container_name": "container-name",
+ "container_url": "https://bogus-container-url.example",
"compression": self.compression,
},
)
@@ -91,23 +114,25 @@
content, obj_id = self.hash_content(b"test content is compressed")
self.storage.add(content, obj_id=obj_id)
- blob_service, container = self.storage.get_blob_service(obj_id)
internal_id = self.storage._internal_id(obj_id)
-
- raw_blob = blob_service.get_blob_to_bytes(container, internal_id)
+ blob_client = self.storage.get_blob_client(internal_id)
+ raw_blob = blob_client.download_blob().content_as_bytes()
d = decompressors[self.compression]()
- assert d.decompress(raw_blob.content) == content
+ assert d.decompress(raw_blob) == content
assert d.unused_data == b""
def test_trailing_data_on_stored_blob(self):
content, obj_id = self.hash_content(b"test content without garbage")
self.storage.add(content, obj_id=obj_id)
- blob_service, container = self.storage.get_blob_service(obj_id)
internal_id = self.storage._internal_id(obj_id)
+ blob_client = self.storage.get_blob_client(internal_id)
+ raw_blob = blob_client.download_blob().content_as_bytes()
+ new_data = raw_blob + b"trailing garbage"
- blob_service._data[container][internal_id] += b"trailing garbage"
+ blob_client.delete_blob()
+ blob_client.upload_blob(data=new_data, length=len(new_data))
if self.compression == "none":
with self.assertRaises(Error) as e:
@@ -138,18 +163,14 @@
def setUp(self):
super().setUp()
patcher = patch(
- "swh.objstorage.backends.azure.BlockBlobService", MockBlockBlobService,
+ "swh.objstorage.backends.azure.ContainerClient", MockContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
self.accounts = {}
for prefix in "0123456789abcdef":
- self.accounts[prefix] = {
- "account_name": "account_%s" % prefix,
- "api_secret_key": "secret_key_%s" % prefix,
- "container_name": "container_%s" % prefix,
- }
+ self.accounts[prefix] = "https://bogus-container-url.example/" + prefix
self.storage = get_objstorage("azure-prefixed", {"accounts": self.accounts})
@@ -173,7 +194,75 @@
hex_obj_id = hash_to_hex(obj_id)
prefix = hex_obj_id[0]
self.assertTrue(
- self.storage.prefixes[prefix][0].exists(
- self.accounts[prefix]["container_name"], hex_obj_id
- )
+ self.storage.prefixes[prefix]
+ .get_blob_client(hex_obj_id)
+ .get_blob_properties()
)
+
+
+def test_get_container_url():
+ # r=read, l=list, w=write, d=delete
+ policy_map = {
+ "read_only": "rl",
+ "append_only": "rwl",
+ "full": "rwdl",
+ }
+
+ for policy, expected in policy_map.items():
+ ret = swh.objstorage.backends.azure.get_container_url(
+ account_name="account_name",
+ account_key=base64.b64encode(b"account_key"),
+ container_name="container_name",
+ access_policy=policy,
+ )
+
+ p = urlparse(ret)
+ assert p.scheme == "https"
+ assert p.netloc == "account_name.blob.core.windows.net"
+ assert p.path == "/container_name"
+
+ qs = parse_qs(p.query)
+ # sp: permissions
+ assert qs["sp"] == [expected]
+ # sr: resource (c=container)
+ assert qs["sr"] == ["c"]
+ # st: start; se: expiry
+ assert qs["st"][0] < qs["se"][0]
+
+
+def test_bwcompat_args(monkeypatch):
+ monkeypatch.setattr(
+ swh.objstorage.backends.azure, "ContainerClient", MockContainerClient,
+ )
+
+ with pytest.deprecated_call():
+ objs = get_objstorage(
+ "azure",
+ {
+ "account_name": "account_name",
+ "api_secret_key": base64.b64encode(b"account_key"),
+ "container_name": "container_name",
+ },
+ )
+
+ assert objs is not None
+
+
+def test_bwcompat_args_prefixed(monkeypatch):
+ monkeypatch.setattr(
+ swh.objstorage.backends.azure, "ContainerClient", MockContainerClient,
+ )
+
+ accounts = {
+ prefix: {
+ "account_name": f"account_name{prefix}",
+ "api_secret_key": base64.b64encode(b"account_key"),
+ "container_name": "container_name",
+ }
+ for prefix in "0123456789abcdef"
+ }
+
+ with pytest.deprecated_call():
+ objs = get_objstorage("azure-prefixed", {"accounts": accounts})
+
+ assert objs is not None
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 5:09 PM (3 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223256
Attached To
D3116: Reimplement the azure objstorage on top of recent azure-storage-blob
Event Timeline
Log In to Comment