Page MenuHomeSoftware Heritage

test_objstorage_azure.py
No OneTemporary

test_objstorage_azure.py

# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import base64
import collections
from dataclasses import dataclass
import unittest
from unittest.mock import patch
from urllib.parse import parse_qs, urlparse
from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
import pytest
from swh.model.hashutil import hash_to_hex
import swh.objstorage.backends.azure
from swh.objstorage.exc import Error
from swh.objstorage.factory import get_objstorage
from swh.objstorage.objstorage import decompressors
from .objstorage_testing import ObjStorageTestFixture
@dataclass
class MockListedObject:
name: str
class MockAsyncDownloadClient:
def __init__(self, blob_data):
self.blob_data = blob_data
def content_as_bytes(self):
future = asyncio.Future()
future.set_result(self.blob_data)
return future
class MockDownloadClient:
def __init__(self, blob_data):
self.blob_data = blob_data
def content_as_bytes(self):
return self.blob_data
def __await__(self):
yield from ()
return MockAsyncDownloadClient(self.blob_data)
class MockBlobClient:
def __init__(self, container, blob):
self.container = container
self.blob = blob
def get_blob_properties(self):
if self.blob not in self.container.blobs:
raise ResourceNotFoundError("Blob not found")
return {"exists": True}
def upload_blob(self, data, length=None):
if self.blob in self.container.blobs:
raise ResourceExistsError("Blob already exists")
if length is not None and length != len(data):
raise ValueError("Wrong length for blob data!")
self.container.blobs[self.blob] = data
def download_blob(self):
if self.blob not in self.container.blobs:
raise ResourceNotFoundError("Blob not found")
return MockDownloadClient(self.container.blobs[self.blob])
def delete_blob(self):
if self.blob not in self.container.blobs:
raise ResourceNotFoundError("Blob not found")
del self.container.blobs[self.blob]
def get_MockContainerClient():
blobs = collections.defaultdict(dict) # {container_url: {blob_id: blob}}
class MockContainerClient:
def __init__(self, container_url):
self.container_url = container_url
self.blobs = blobs[self.container_url]
@classmethod
def from_container_url(cls, container_url):
return cls(container_url)
def get_container_properties(self):
return {"exists": True}
def get_blob_client(self, blob):
return MockBlobClient(self, blob)
def list_blobs(self):
for obj in sorted(self.blobs):
yield MockListedObject(obj)
def delete_blob(self, blob):
self.get_blob_client(blob.name).delete_blob()
def __aenter__(self):
return self
def __await__(self):
future = asyncio.Future()
future.set_result(self)
yield from future
def __aexit__(self, *args):
return self
return MockContainerClient
class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase):
compression = "none"
def setUp(self):
super().setUp()
ContainerClient = get_MockContainerClient()
patcher = patch(
"swh.objstorage.backends.azure.ContainerClient", ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
patcher = patch(
"swh.objstorage.backends.azure.AsyncContainerClient", ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
self.storage = get_objstorage(
"azure",
{
"container_url": "https://bogus-container-url.example",
"compression": self.compression,
},
)
def test_compression(self):
content, obj_id = self.hash_content(b"test content is compressed")
self.storage.add(content, obj_id=obj_id)
internal_id = self.storage._internal_id(obj_id)
blob_client = self.storage.get_blob_client(internal_id)
raw_blob = blob_client.download_blob().content_as_bytes()
d = decompressors[self.compression]()
assert d.decompress(raw_blob) == content
assert d.unused_data == b""
def test_trailing_data_on_stored_blob(self):
content, obj_id = self.hash_content(b"test content without garbage")
self.storage.add(content, obj_id=obj_id)
internal_id = self.storage._internal_id(obj_id)
blob_client = self.storage.get_blob_client(internal_id)
raw_blob = blob_client.download_blob().content_as_bytes()
new_data = raw_blob + b"trailing garbage"
blob_client.delete_blob()
blob_client.upload_blob(data=new_data, length=len(new_data))
if self.compression == "none":
with self.assertRaises(Error) as e:
self.storage.check(obj_id)
else:
with self.assertRaises(Error) as e:
self.storage.get(obj_id)
assert "trailing data" in e.exception.args[0]
class TestAzureCloudObjStorageGzip(TestAzureCloudObjStorage):
compression = "gzip"
class TestAzureCloudObjStorageZlib(TestAzureCloudObjStorage):
compression = "zlib"
class TestAzureCloudObjStorageLzma(TestAzureCloudObjStorage):
compression = "lzma"
class TestAzureCloudObjStorageBz2(TestAzureCloudObjStorage):
compression = "bz2"
class TestPrefixedAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase):
def setUp(self):
super().setUp()
self.ContainerClient = get_MockContainerClient()
patcher = patch(
"swh.objstorage.backends.azure.ContainerClient", self.ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
patcher = patch(
"swh.objstorage.backends.azure.AsyncContainerClient", self.ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
self.accounts = {}
for prefix in "0123456789abcdef":
self.accounts[prefix] = "https://bogus-container-url.example/" + prefix
self.storage = get_objstorage("azure-prefixed", {"accounts": self.accounts})
def test_prefixedazure_instantiation_missing_prefixes(self):
del self.accounts["d"]
del self.accounts["e"]
with self.assertRaisesRegex(ValueError, "Missing prefixes"):
get_objstorage("azure-prefixed", {"accounts": self.accounts})
def test_prefixedazure_instantiation_inconsistent_prefixes(self):
self.accounts["00"] = self.accounts["0"]
with self.assertRaisesRegex(ValueError, "Inconsistent prefixes"):
get_objstorage("azure-prefixed", {"accounts": self.accounts})
def test_prefixedazure_sharding_behavior(self):
for i in range(100):
content, obj_id = self.hash_content(b"test_content_%02d" % i)
self.storage.add(content, obj_id=obj_id)
hex_obj_id = hash_to_hex(obj_id)
prefix = hex_obj_id[0]
self.assertTrue(
self.ContainerClient(self.storage.container_urls[prefix])
.get_blob_client(hex_obj_id)
.get_blob_properties()
)
def test_get_container_url():
# r=read, l=list, w=write, d=delete
policy_map = {
"read_only": "rl",
"append_only": "rwl",
"full": "rwdl",
}
for policy, expected in policy_map.items():
ret = swh.objstorage.backends.azure.get_container_url(
account_name="account_name",
account_key=base64.b64encode(b"account_key"),
container_name="container_name",
access_policy=policy,
)
p = urlparse(ret)
assert p.scheme == "https"
assert p.netloc == "account_name.blob.core.windows.net"
assert p.path == "/container_name"
qs = parse_qs(p.query)
# sp: permissions
assert qs["sp"] == [expected]
# sr: resource (c=container)
assert qs["sr"] == ["c"]
# st: start; se: expiry
assert qs["st"][0] < qs["se"][0]
def test_bwcompat_args(monkeypatch):
monkeypatch.setattr(
swh.objstorage.backends.azure, "ContainerClient", get_MockContainerClient(),
)
with pytest.deprecated_call():
objs = get_objstorage(
"azure",
{
"account_name": "account_name",
"api_secret_key": base64.b64encode(b"account_key"),
"container_name": "container_name",
},
)
assert objs is not None
def test_bwcompat_args_prefixed(monkeypatch):
monkeypatch.setattr(
swh.objstorage.backends.azure, "ContainerClient", get_MockContainerClient(),
)
accounts = {
prefix: {
"account_name": f"account_name{prefix}",
"api_secret_key": base64.b64encode(b"account_key"),
"container_name": "container_name",
}
for prefix in "0123456789abcdef"
}
with pytest.deprecated_call():
objs = get_objstorage("azure-prefixed", {"accounts": accounts})
assert objs is not None

File Metadata

Mime Type
text/x-python
Expires
Thu, Jul 3, 11:47 AM (3 d, 9 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3277943

Event Timeline