Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/azure.py
# Copyright (C) 2016-2021 The Software Heritage developers | # Copyright (C) 2016-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import asyncio | import asyncio | ||||
import contextlib | import contextlib | ||||
import datetime | import datetime | ||||
from itertools import product | from itertools import product | ||||
import string | import string | ||||
from typing import Dict, Optional, Union | from typing import Dict, Iterator, List, Optional, Union | ||||
import warnings | import warnings | ||||
from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError | from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError | ||||
from azure.storage.blob import ( | from azure.storage.blob import ( | ||||
ContainerClient, | ContainerClient, | ||||
ContainerSasPermissions, | ContainerSasPermissions, | ||||
generate_container_sas, | generate_container_sas, | ||||
) | ) | ||||
from azure.storage.blob.aio import ContainerClient as AsyncContainerClient | from azure.storage.blob.aio import ContainerClient as AsyncContainerClient | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.objstorage.exc import Error, ObjNotFoundError | from swh.objstorage.exc import Error, ObjNotFoundError | ||||
from swh.objstorage.interface import ObjId | |||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | ||||
ObjStorage, | ObjStorage, | ||||
compressors, | compressors, | ||||
compute_hash, | compute_hash, | ||||
decompressors, | decompressors, | ||||
) | ) | ||||
from swh.objstorage.utils import call_async | from swh.objstorage.utils import call_async | ||||
▲ Show 20 Lines • Show All 168 Lines • ▼ Show 20 Lines | def __len__(self): | ||||
"""Compute the number of objects in the current object storage. | """Compute the number of objects in the current object storage. | ||||
Returns: | Returns: | ||||
number of objects contained in the storage. | number of objects contained in the storage. | ||||
""" | """ | ||||
return sum(1 for i in self) | return sum(1 for i in self) | ||||
def add(self, content, obj_id, check_presence=True): | def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId: | ||||
"""Add an obj in storage if it's not there already.""" | """Add an obj in storage if it's not there already.""" | ||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | ||||
return obj_id | return obj_id | ||||
hex_obj_id = self._internal_id(obj_id) | hex_obj_id = self._internal_id(obj_id) | ||||
# Send the compressed content | # Send the compressed content | ||||
compressor = compressors[self.compression]() | compressor = compressors[self.compression]() | ||||
data = compressor.compress(content) | data = compressor.compress(content) | ||||
data += compressor.flush() | data += compressor.flush() | ||||
client = self.get_blob_client(hex_obj_id) | client = self.get_blob_client(hex_obj_id) | ||||
try: | try: | ||||
client.upload_blob(data=data, length=len(data)) | client.upload_blob(data=data, length=len(data)) | ||||
except ResourceExistsError: | except ResourceExistsError: | ||||
# There's a race condition between check_presence and upload_blob, | # There's a race condition between check_presence and upload_blob, | ||||
# that we can't get rid of as the azure api doesn't allow atomic | # that we can't get rid of as the azure api doesn't allow atomic | ||||
# replaces or renaming a blob. As the restore operation explicitly | # replaces or renaming a blob. As the restore operation explicitly | ||||
# removes the blob, it should be safe to just ignore the error. | # removes the blob, it should be safe to just ignore the error. | ||||
pass | pass | ||||
return obj_id | return obj_id | ||||
def restore(self, content, obj_id): | def restore(self, content: bytes, obj_id: ObjId): | ||||
"""Restore a content.""" | """Restore a content.""" | ||||
if obj_id in self: | if obj_id in self: | ||||
self.delete(obj_id) | self.delete(obj_id) | ||||
return self.add(content, obj_id, check_presence=False) | return self.add(content, obj_id, check_presence=False) | ||||
def get(self, obj_id): | def get(self, obj_id: ObjId) -> bytes: | ||||
"""retrieve blob's content if found.""" | """retrieve blob's content if found.""" | ||||
return call_async(self._get_async, obj_id) | return call_async(self._get_async, obj_id) | ||||
async def _get_async(self, obj_id, container_clients=None): | async def _get_async(self, obj_id, container_clients=None): | ||||
"""Coroutine implementing ``get(obj_id)`` using azure-storage-blob's | """Coroutine implementing ``get(obj_id)`` using azure-storage-blob's | ||||
asynchronous implementation. | asynchronous implementation. | ||||
While ``get(obj_id)`` does not need asynchronicity, this is useful to | While ``get(obj_id)`` does not need asynchronicity, this is useful to | ||||
``get_batch(obj_ids)``, as it can run multiple ``_get_async`` tasks | ``get_batch(obj_ids)``, as it can run multiple ``_get_async`` tasks | ||||
Show All 33 Lines | async def _get_batch_async(self, obj_ids): | ||||
async with self.get_async_container_clients() as container_clients: | async with self.get_async_container_clients() as container_clients: | ||||
return await asyncio.gather( | return await asyncio.gather( | ||||
*[ | *[ | ||||
self._get_async_or_none(obj_id, container_clients) | self._get_async_or_none(obj_id, container_clients) | ||||
for obj_id in obj_ids | for obj_id in obj_ids | ||||
] | ] | ||||
) | ) | ||||
def get_batch(self, obj_ids): | def get_batch(self, obj_ids: List[ObjId]) -> Iterator[Optional[bytes]]: | ||||
"""Retrieve objects' raw content in bulk from storage, concurrently.""" | """Retrieve objects' raw content in bulk from storage, concurrently.""" | ||||
return call_async(self._get_batch_async, obj_ids) | return call_async(self._get_batch_async, obj_ids) | ||||
def check(self, obj_id): | def check(self, obj_id: ObjId) -> None: | ||||
"""Check the content integrity.""" | """Check the content integrity.""" | ||||
obj_content = self.get(obj_id) | obj_content = self.get(obj_id) | ||||
content_obj_id = compute_hash(obj_content) | content_obj_id = compute_hash(obj_content) | ||||
if content_obj_id != obj_id: | if content_obj_id != obj_id: | ||||
raise Error(obj_id) | raise Error(obj_id) | ||||
def delete(self, obj_id): | def delete(self, obj_id: ObjId): | ||||
"""Delete an object.""" | """Delete an object.""" | ||||
super().delete(obj_id) # Check delete permission | super().delete(obj_id) # Check delete permission | ||||
hex_obj_id = self._internal_id(obj_id) | hex_obj_id = self._internal_id(obj_id) | ||||
client = self.get_blob_client(hex_obj_id) | client = self.get_blob_client(hex_obj_id) | ||||
try: | try: | ||||
client.delete_blob() | client.delete_blob() | ||||
except ResourceNotFoundError: | except ResourceNotFoundError: | ||||
raise ObjNotFoundError(obj_id) from None | raise ObjNotFoundError(obj_id) from None | ||||
▲ Show 20 Lines • Show All 103 Lines • Show Last 20 Lines |