Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/azure.py
# Copyright (C) 2016-2018 The Software Heritage developers | # Copyright (C) 2016-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import gzip | |||||
import string | import string | ||||
from itertools import dropwhile, islice, product | from itertools import dropwhile, islice, product | ||||
from azure.storage.blob import BlockBlobService | from azure.storage.blob import BlockBlobService | ||||
from azure.common import AzureMissingResourceHttpError | from azure.common import AzureMissingResourceHttpError | ||||
import requests | import requests | ||||
from swh.objstorage.objstorage import ObjStorage, compute_hash, DEFAULT_LIMIT | from swh.objstorage.objstorage import (ObjStorage, compute_hash, DEFAULT_LIMIT, | ||||
compressors, decompressors) | |||||
from swh.objstorage.exc import ObjNotFoundError, Error | from swh.objstorage.exc import ObjNotFoundError, Error | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
class AzureCloudObjStorage(ObjStorage): | class AzureCloudObjStorage(ObjStorage): | ||||
"""ObjStorage with azure abilities. | """ObjStorage with azure abilities. | ||||
""" | """ | ||||
def __init__(self, account_name, api_secret_key, container_name, **kwargs): | def __init__(self, account_name, api_secret_key, container_name, | ||||
compression='gzip', **kwargs): | |||||
super().__init__(**kwargs) | super().__init__(**kwargs) | ||||
self.block_blob_service = BlockBlobService( | self.block_blob_service = BlockBlobService( | ||||
account_name=account_name, | account_name=account_name, | ||||
account_key=api_secret_key, | account_key=api_secret_key, | ||||
request_session=requests.Session(), | request_session=requests.Session(), | ||||
) | ) | ||||
self.container_name = container_name | self.container_name = container_name | ||||
self.compression = compression | |||||
def get_blob_service(self, hex_obj_id): | def get_blob_service(self, hex_obj_id): | ||||
"""Get the block_blob_service and container that contains the object with | """Get the block_blob_service and container that contains the object with | ||||
internal id hex_obj_id | internal id hex_obj_id | ||||
""" | """ | ||||
return self.block_blob_service, self.container_name | return self.block_blob_service, self.container_name | ||||
def get_all_blob_services(self): | def get_all_blob_services(self): | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | def add(self, content, obj_id=None, check_presence=True): | ||||
# Checksum is missing, compute it on the fly. | # Checksum is missing, compute it on the fly. | ||||
obj_id = compute_hash(content) | obj_id = compute_hash(content) | ||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | ||||
return obj_id | return obj_id | ||||
hex_obj_id = self._internal_id(obj_id) | hex_obj_id = self._internal_id(obj_id) | ||||
# Send the gzipped content | # Send the compressed content | ||||
compressor = compressors[self.compression]() | |||||
blob = [compressor.compress(content), compressor.flush()] | |||||
service, container = self.get_blob_service(hex_obj_id) | service, container = self.get_blob_service(hex_obj_id) | ||||
service.create_blob_from_bytes( | service.create_blob_from_bytes( | ||||
container_name=container, | container_name=container, | ||||
blob_name=hex_obj_id, | blob_name=hex_obj_id, | ||||
blob=gzip.compress(content)) | blob=b''.join(blob), | ||||
) | |||||
return obj_id | return obj_id | ||||
def restore(self, content, obj_id=None): | def restore(self, content, obj_id=None): | ||||
"""Restore a content. | """Restore a content. | ||||
""" | """ | ||||
return self.add(content, obj_id, check_presence=False) | return self.add(content, obj_id, check_presence=False) | ||||
def get(self, obj_id): | def get(self, obj_id): | ||||
"""Retrieve blob's content if found. | """Retrieve blob's content if found. | ||||
""" | """ | ||||
hex_obj_id = self._internal_id(obj_id) | hex_obj_id = self._internal_id(obj_id) | ||||
service, container = self.get_blob_service(hex_obj_id) | service, container = self.get_blob_service(hex_obj_id) | ||||
try: | try: | ||||
blob = service.get_blob_to_bytes( | blob = service.get_blob_to_bytes( | ||||
container_name=container, | container_name=container, | ||||
blob_name=hex_obj_id) | blob_name=hex_obj_id) | ||||
except AzureMissingResourceHttpError: | except AzureMissingResourceHttpError: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
return gzip.decompress(blob.content) | decompressor = decompressors[self.compression]() | ||||
ret = decompressor.decompress(blob.content) | |||||
if decompressor.unused_data: | |||||
raise Error('Corrupt object %s: trailing data found' % hex_obj_id) | |||||
return ret | |||||
def check(self, obj_id): | def check(self, obj_id): | ||||
"""Check the content integrity. | """Check the content integrity. | ||||
""" | """ | ||||
obj_content = self.get(obj_id) | obj_content = self.get(obj_id) | ||||
content_obj_id = compute_hash(obj_content) | content_obj_id = compute_hash(obj_content) | ||||
if content_obj_id != obj_id: | if content_obj_id != obj_id: | ||||
Show All 36 Lines | class PrefixedAzureCloudObjStorage(AzureCloudObjStorage): | ||||
"""ObjStorage with azure capabilities, striped by prefix. | """ObjStorage with azure capabilities, striped by prefix. | ||||
accounts is a dict containing entries of the form: | accounts is a dict containing entries of the form: | ||||
<prefix>: | <prefix>: | ||||
account_name: <account_name> | account_name: <account_name> | ||||
api_secret_key: <api_secret_key> | api_secret_key: <api_secret_key> | ||||
container_name: <container_name> | container_name: <container_name> | ||||
""" | """ | ||||
def __init__(self, accounts, **kwargs): | def __init__(self, accounts, compression='gzip', **kwargs): | ||||
# shortcut AzureCloudObjStorage __init__ | # shortcut AzureCloudObjStorage __init__ | ||||
ObjStorage.__init__(self, **kwargs) | ObjStorage.__init__(self, **kwargs) | ||||
self.compression = compression | |||||
# Definition sanity check | # Definition sanity check | ||||
prefix_lengths = set(len(prefix) for prefix in accounts) | prefix_lengths = set(len(prefix) for prefix in accounts) | ||||
if not len(prefix_lengths) == 1: | if not len(prefix_lengths) == 1: | ||||
raise ValueError("Inconsistent prefixes, found lengths %s" | raise ValueError("Inconsistent prefixes, found lengths %s" | ||||
% ', '.join( | % ', '.join( | ||||
str(l) for l in sorted(prefix_lengths) | str(l) for l in sorted(prefix_lengths) | ||||
)) | )) | ||||
Show All 36 Lines |