Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/azure.py
# Copyright (C) 2016-2020 The Software Heritage developers | # Copyright (C) 2016-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import datetime | ||||
from itertools import product | |||||
import string | import string | ||||
from itertools import dropwhile, islice, product | from typing import Dict, Optional, Union | ||||
import warnings | |||||
from azure.storage.blob import BlockBlobService | from azure.storage.blob import ( | ||||
from azure.common import AzureMissingResourceHttpError | ContainerClient, | ||||
import requests | ContainerSasPermissions, | ||||
generate_container_sas, | |||||
) | |||||
from azure.core.exceptions import ResourceNotFoundError | |||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | ||||
ObjStorage, | ObjStorage, | ||||
compute_hash, | compute_hash, | ||||
DEFAULT_LIMIT, | |||||
compressors, | compressors, | ||||
decompressors, | decompressors, | ||||
) | ) | ||||
from swh.objstorage.exc import ObjNotFoundError, Error | from swh.objstorage.exc import ObjNotFoundError, Error | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
logging.getLogger("azure.storage").setLevel(logging.CRITICAL) | |||||
def get_container_url( | |||||
account_name: str, | |||||
account_key: str, | |||||
container_name: str, | |||||
access_policy: str = "read_only", | |||||
expiry: datetime.timedelta = datetime.timedelta(days=365), | |||||
**kwargs, | |||||
) -> str: | |||||
"""Get the full url, for the given container on the given account, with a | |||||
Shared Access Signature granting the specified access policy. | |||||
Args: | |||||
account_name: name of the storage account for which to generate the URL | |||||
account_key: shared account key of the storage account used to generate the SAS | |||||
container_name: name of the container for which to grant access in the storage | |||||
account | |||||
access_policy: one of ``read_only``, ``append_only``, ``full`` | |||||
expiry: the interval in the future with which the signature will expire | |||||
Returns: | |||||
the full URL of the container, with the shared access signature. | |||||
""" | |||||
access_policies = { | |||||
"read_only": ContainerSasPermissions( | |||||
read=True, list=True, delete=False, write=False | |||||
), | |||||
"append_only": ContainerSasPermissions( | |||||
read=True, list=True, delete=False, write=True | |||||
), | |||||
"full": ContainerSasPermissions(read=True, list=True, delete=True, write=True), | |||||
} | |||||
current_time = datetime.datetime.utcnow() | |||||
signature = generate_container_sas( | |||||
account_name, | |||||
container_name, | |||||
account_key=account_key, | |||||
permission=access_policies[access_policy], | |||||
start=current_time + datetime.timedelta(minutes=-1), | |||||
expiry=current_time + expiry, | |||||
) | |||||
return f"https://{account_name}.blob.core.windows.net/{container_name}?{signature}" | |||||
class AzureCloudObjStorage(ObjStorage): | class AzureCloudObjStorage(ObjStorage): | ||||
"""ObjStorage with azure abilities. | """ObjStorage backend for Azure blob storage accounts. | ||||
Args: | |||||
container_url: the URL of the container in which the objects are stored. | |||||
account_name: (deprecated) the name of the storage account under which objects are | |||||
stored | |||||
api_secret_key: (deprecated) the shared account key | |||||
container_name: (deprecated) the name of the container under which objects are | |||||
stored | |||||
compression: the compression algorithm used to compress objects in storage | |||||
Notes: | |||||
The container url should contain the credentials via a "Shared Access | |||||
Signature". The :func:`get_container_url` helper can be used to generate | |||||
such a URL from the account's access keys. The ``account_name``, | |||||
``api_secret_key`` and ``container_name`` arguments are deprecated. | |||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, account_name, api_secret_key, container_name, compression="gzip", **kwargs | self, | ||||
container_url: Optional[str] = None, | |||||
account_name: Optional[str] = None, | |||||
api_secret_key: Optional[str] = None, | |||||
container_name: Optional[str] = None, | |||||
compression="gzip", | |||||
**kwargs, | |||||
): | ): | ||||
super().__init__(**kwargs) | if container_url is None: | ||||
self.block_blob_service = BlockBlobService( | if account_name is None or api_secret_key is None or container_name is None: | ||||
raise ValueError( | |||||
"AzureCloudObjStorage must have a container_url or all three " | |||||
"account_name, api_secret_key and container_name" | |||||
) | |||||
else: | |||||
warnings.warn( | |||||
"The Azure objstorage account secret key parameters are " | |||||
"deprecated, please use container URLs instead.", | |||||
DeprecationWarning, | |||||
) | |||||
container_url = get_container_url( | |||||
account_name=account_name, | account_name=account_name, | ||||
account_key=api_secret_key, | account_key=api_secret_key, | ||||
request_session=requests.Session(), | container_name=container_name, | ||||
access_policy="full", | |||||
) | ) | ||||
self.container_name = container_name | |||||
super().__init__(**kwargs) | |||||
self.container_client = ContainerClient.from_container_url(container_url) | |||||
self.compression = compression | self.compression = compression | ||||
def get_blob_service(self, hex_obj_id): | def get_container_client(self, hex_obj_id): | ||||
"""Get the block_blob_service and container that contains the object with | """Get the container client for the container that contains the object with | ||||
internal id hex_obj_id | internal id hex_obj_id | ||||
This is used to allow the PrefixedAzureCloudObjStorage to dispatch the | |||||
client according to the prefix of the object id. | |||||
""" | """ | ||||
return self.block_blob_service, self.container_name | return self.container_client | ||||
def get_blob_client(self, hex_obj_id): | |||||
"""Get the azure blob client for the given hex obj id""" | |||||
container_client = self.get_container_client(hex_obj_id) | |||||
douardda: this method looks weird as is. Why does it exists? (not read the whole file yet, but my bet is… | |||||
def get_all_blob_services(self): | return container_client.get_blob_client(blob=hex_obj_id) | ||||
def get_all_container_clients(self): | |||||
"""Get all active block_blob_services""" | """Get all active block_blob_services""" | ||||
yield self.block_blob_service, self.container_name | yield self.container_client | ||||
def _internal_id(self, obj_id): | def _internal_id(self, obj_id): | ||||
"""Internal id is the hex version in objstorage. | """Internal id is the hex version in objstorage. | ||||
""" | """ | ||||
return hashutil.hash_to_hex(obj_id) | return hashutil.hash_to_hex(obj_id) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
"""Check the configuration for this object storage""" | """Check the configuration for this object storage""" | ||||
for service, container in self.get_all_blob_services(): | for container_client in self.get_all_container_clients(): | ||||
props = service.get_container_properties(container) | props = container_client.get_container_properties() | ||||
# FIXME: check_write is ignored here | # FIXME: check_write is ignored here | ||||
if not props: | if not props: | ||||
return False | return False | ||||
return True | return True | ||||
def __contains__(self, obj_id): | def __contains__(self, obj_id): | ||||
"""Does the storage contains the obj_id. | """Does the storage contains the obj_id. | ||||
""" | """ | ||||
hex_obj_id = self._internal_id(obj_id) | hex_obj_id = self._internal_id(obj_id) | ||||
service, container = self.get_blob_service(hex_obj_id) | client = self.get_blob_client(hex_obj_id) | ||||
return service.exists(container_name=container, blob_name=hex_obj_id) | try: | ||||
client.get_blob_properties() | |||||
except ResourceNotFoundError: | |||||
return False | |||||
else: | |||||
return True | |||||
def __iter__(self): | def __iter__(self): | ||||
"""Iterate over the objects present in the storage. | """Iterate over the objects present in the storage. | ||||
""" | """ | ||||
for service, container in self.get_all_blob_services(): | for client in self.get_all_container_clients(): | ||||
for obj in service.list_blobs(container): | for obj in client.list_blobs(): | ||||
yield hashutil.hash_to_bytes(obj.name) | yield hashutil.hash_to_bytes(obj.name) | ||||
def __len__(self): | def __len__(self): | ||||
"""Compute the number of objects in the current object storage. | """Compute the number of objects in the current object storage. | ||||
Returns: | Returns: | ||||
number of objects contained in the storage. | number of objects contained in the storage. | ||||
Show All 10 Lines | def add(self, content, obj_id=None, check_presence=True): | ||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | ||||
return obj_id | return obj_id | ||||
hex_obj_id = self._internal_id(obj_id) | hex_obj_id = self._internal_id(obj_id) | ||||
# Send the compressed content | # Send the compressed content | ||||
compressor = compressors[self.compression]() | compressor = compressors[self.compression]() | ||||
blob = [compressor.compress(content), compressor.flush()] | data = compressor.compress(content) | ||||
data += compressor.flush() | |||||
service, container = self.get_blob_service(hex_obj_id) | client = self.get_blob_client(hex_obj_id) | ||||
service.create_blob_from_bytes( | client.upload_blob(data=data, length=len(data)) | ||||
container_name=container, blob_name=hex_obj_id, blob=b"".join(blob), | |||||
) | |||||
return obj_id | return obj_id | ||||
def restore(self, content, obj_id=None): | def restore(self, content, obj_id=None): | ||||
"""Restore a content. | """Restore a content. | ||||
""" | """ | ||||
if obj_id is None: | |||||
# Checksum is missing, compute it on the fly. | |||||
obj_id = compute_hash(content) | |||||
if obj_id in self: | |||||
self.delete(obj_id) | |||||
return self.add(content, obj_id, check_presence=False) | return self.add(content, obj_id, check_presence=False) | ||||
def get(self, obj_id): | def get(self, obj_id): | ||||
"""Retrieve blob's content if found. | """Retrieve blob's content if found. | ||||
""" | """ | ||||
hex_obj_id = self._internal_id(obj_id) | hex_obj_id = self._internal_id(obj_id) | ||||
service, container = self.get_blob_service(hex_obj_id) | client = self.get_blob_client(hex_obj_id) | ||||
try: | try: | ||||
blob = service.get_blob_to_bytes( | download = client.download_blob() | ||||
container_name=container, blob_name=hex_obj_id | except ResourceNotFoundError: | ||||
) | raise ObjNotFoundError(obj_id) from None | ||||
except AzureMissingResourceHttpError: | else: | ||||
raise ObjNotFoundError(obj_id) | data = download.content_as_bytes() | ||||
decompressor = decompressors[self.compression]() | decompressor = decompressors[self.compression]() | ||||
ret = decompressor.decompress(blob.content) | ret = decompressor.decompress(data) | ||||
if decompressor.unused_data: | if decompressor.unused_data: | ||||
raise Error("Corrupt object %s: trailing data found" % hex_obj_id) | raise Error("Corrupt object %s: trailing data found" % hex_obj_id) | ||||
return ret | return ret | ||||
def check(self, obj_id): | def check(self, obj_id): | ||||
"""Check the content integrity. | """Check the content integrity. | ||||
""" | """ | ||||
obj_content = self.get(obj_id) | obj_content = self.get(obj_id) | ||||
content_obj_id = compute_hash(obj_content) | content_obj_id = compute_hash(obj_content) | ||||
if content_obj_id != obj_id: | if content_obj_id != obj_id: | ||||
raise Error(obj_id) | raise Error(obj_id) | ||||
def delete(self, obj_id): | def delete(self, obj_id): | ||||
"""Delete an object.""" | """Delete an object.""" | ||||
super().delete(obj_id) # Check delete permission | super().delete(obj_id) # Check delete permission | ||||
hex_obj_id = self._internal_id(obj_id) | hex_obj_id = self._internal_id(obj_id) | ||||
service, container = self.get_blob_service(hex_obj_id) | client = self.get_blob_client(hex_obj_id) | ||||
try: | try: | ||||
service.delete_blob(container_name=container, blob_name=hex_obj_id) | client.delete_blob() | ||||
except AzureMissingResourceHttpError: | except ResourceNotFoundError: | ||||
raise ObjNotFoundError("Content {} not found!".format(hex_obj_id)) | raise ObjNotFoundError(obj_id) from None | ||||
return True | return True | ||||
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | |||||
all_blob_services = self.get_all_blob_services() | |||||
if last_obj_id: | |||||
last_obj_id = self._internal_id(last_obj_id) | |||||
last_service, _ = self.get_blob_service(last_obj_id) | |||||
all_blob_services = dropwhile( | |||||
lambda srv: srv[0] != last_service, all_blob_services | |||||
) | |||||
else: | |||||
last_service = None | |||||
def iterate_blobs(): | |||||
for service, container in all_blob_services: | |||||
marker = last_obj_id if service == last_service else None | |||||
for obj in service.list_blobs( | |||||
container, marker=marker, maxresults=limit | |||||
): | |||||
yield hashutil.hash_to_bytes(obj.name) | |||||
return islice(iterate_blobs(), limit) | |||||
class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): | class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): | ||||
"""ObjStorage with azure capabilities, striped by prefix. | """ObjStorage with azure capabilities, striped by prefix. | ||||
accounts is a dict containing entries of the form: | accounts is a dict containing entries of the form: | ||||
<prefix>: | <prefix>: <container_url_for_prefix> | ||||
account_name: <account_name> | |||||
api_secret_key: <api_secret_key> | |||||
container_name: <container_name> | |||||
""" | """ | ||||
def __init__(self, accounts, compression="gzip", **kwargs): | def __init__( | ||||
self, | |||||
accounts: Dict[str, Union[str, Dict[str, str]]], | |||||
compression="gzip", | |||||
**kwargs, | |||||
): | |||||
# shortcut AzureCloudObjStorage __init__ | # shortcut AzureCloudObjStorage __init__ | ||||
ObjStorage.__init__(self, **kwargs) | ObjStorage.__init__(self, **kwargs) | ||||
self.compression = compression | self.compression = compression | ||||
# Definition sanity check | # Definition sanity check | ||||
prefix_lengths = set(len(prefix) for prefix in accounts) | prefix_lengths = set(len(prefix) for prefix in accounts) | ||||
if not len(prefix_lengths) == 1: | if not len(prefix_lengths) == 1: | ||||
Show All 11 Lines | ): | ||||
) | ) | ||||
) | ) | ||||
missing_prefixes = expected_prefixes - set(accounts) | missing_prefixes = expected_prefixes - set(accounts) | ||||
if missing_prefixes: | if missing_prefixes: | ||||
raise ValueError( | raise ValueError( | ||||
"Missing prefixes %s" % ", ".join(sorted(missing_prefixes)) | "Missing prefixes %s" % ", ".join(sorted(missing_prefixes)) | ||||
) | ) | ||||
do_warning = False | |||||
self.prefixes = {} | self.prefixes = {} | ||||
request_session = requests.Session() | for prefix, container_url in accounts.items(): | ||||
for prefix, account in accounts.items(): | if isinstance(container_url, dict): | ||||
self.prefixes[prefix] = ( | do_warning = True | ||||
BlockBlobService( | container_url = get_container_url( | ||||
account_name=account["account_name"], | account_name=container_url["account_name"], | ||||
account_key=account["api_secret_key"], | account_key=container_url["api_secret_key"], | ||||
request_session=request_session, | container_name=container_url["container_name"], | ||||
), | access_policy="full", | ||||
account["container_name"], | |||||
) | ) | ||||
self.prefixes[prefix] = ContainerClient.from_container_url(container_url) | |||||
def get_blob_service(self, hex_obj_id): | if do_warning: | ||||
warnings.warn( | |||||
"The Azure objstorage account secret key parameters are " | |||||
"deprecated, please use container URLs instead.", | |||||
DeprecationWarning, | |||||
) | |||||
def get_container_client(self, hex_obj_id): | |||||
"""Get the block_blob_service and container that contains the object with | """Get the block_blob_service and container that contains the object with | ||||
internal id hex_obj_id | internal id hex_obj_id | ||||
""" | """ | ||||
return self.prefixes[hex_obj_id[: self.prefix_len]] | return self.prefixes[hex_obj_id[: self.prefix_len]] | ||||
def get_all_blob_services(self): | def get_all_container_clients(self): | ||||
"""Get all active block_blob_services""" | """Get all active container clients""" | ||||
# iterate on items() to sort blob services; | # iterate on items() to sort blob services; | ||||
# needed to be able to paginate in the list_content() method | # needed to be able to paginate in the list_content() method | ||||
yield from (v for _, v in sorted(self.prefixes.items())) | yield from (v for _, v in sorted(self.prefixes.items())) |
this method looks weird as is. Why does it exists? (not read the whole file yet, but my bet is a dispatching mechanism implemented in a subclass.
I've noted this was already like this before, nonetheless, it could benefit from a comment or something IMHO.