Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345849
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
45 KB
Subscribers
None
View Options
diff --git a/swh/objstorage/tests/test_multiplexer_filter.py b/swh/objstorage/tests/test_multiplexer_filter.py
index 7e66e7b..c7d7eaa 100644
--- a/swh/objstorage/tests/test_multiplexer_filter.py
+++ b/swh/objstorage/tests/test_multiplexer_filter.py
@@ -1,330 +1,332 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import random
import shutil
from string import ascii_lowercase
import tempfile
import unittest
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
from swh.objstorage.factory import get_objstorage
from swh.objstorage.multiplexer.filter import id_prefix, id_regex, read_only
from swh.objstorage.objstorage import compute_hash
def get_random_content():
return bytes("".join(random.sample(ascii_lowercase, 10)), "utf8")
class MixinTestReadFilter(unittest.TestCase):
# Read only filter should not allow writing
def setUp(self):
super().setUp()
self.tmpdir = tempfile.mkdtemp()
pstorage = {
"cls": "pathslicing",
"root": self.tmpdir,
"slicing": "0:5",
}
base_storage = get_objstorage(**pstorage)
self.storage = get_objstorage(
"filtered", storage_conf=pstorage, filters_conf=[read_only()]
)
self.valid_content = b"pre-existing content"
self.invalid_content = b"invalid_content"
self.true_invalid_content = b"Anything that is not correct"
self.absent_content = b"non-existent content"
# Create a valid content.
self.valid_id = compute_hash(self.valid_content)
base_storage.add(self.valid_content, obj_id=self.valid_id)
# Create an invalid id and add a content with it.
self.invalid_id = compute_hash(self.true_invalid_content)
base_storage.add(self.invalid_content, obj_id=self.invalid_id)
# Compute an id for a non-existing content.
self.absent_id = compute_hash(self.absent_content)
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmpdir)
def test_can_contains(self):
self.assertTrue(self.valid_id in self.storage)
self.assertTrue(self.invalid_id in self.storage)
self.assertFalse(self.absent_id in self.storage)
def test_can_iter(self):
self.assertIn(self.valid_id, iter(self.storage))
self.assertIn(self.invalid_id, iter(self.storage))
def test_can_len(self):
self.assertEqual(2, len(self.storage))
def test_can_get(self):
self.assertEqual(self.valid_content, self.storage.get(self.valid_id))
self.assertEqual(self.invalid_content, self.storage.get(self.invalid_id))
def test_can_check(self):
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.absent_id)
with self.assertRaises(Error):
self.storage.check(self.invalid_id)
self.storage.check(self.valid_id)
def test_can_get_random(self):
self.assertEqual(1, len(list(self.storage.get_random(1))))
self.assertEqual(
len(list(self.storage)), len(set(self.storage.get_random(1000)))
)
def test_cannot_add(self):
new_id = self.storage.add(b"New content")
result = self.storage.add(self.valid_content, self.valid_id)
self.assertIsNone(new_id, self.storage)
self.assertIsNone(result)
def test_cannot_restore(self):
result = self.storage.restore(self.valid_content, self.valid_id)
self.assertIsNone(result)
class MixinTestIdFilter:
"""Mixin class that tests the filters based on filter.IdFilter
Methods "make_valid", "make_invalid" and "filter_storage" must be
implemented by subclasses.
"""
def setUp(self):
super().setUp()
# Use a hack here : as the mock uses the content as id, it is easy to
# create contents that are filtered or not.
self.prefix = "71"
self.tmpdir = tempfile.mkdtemp()
# Make the storage filtered
self.sconf = {
"cls": "pathslicing",
- "args": {"root": self.tmpdir, "slicing": "0:5"},
+ "root": self.tmpdir,
+ "slicing": "0:5",
}
storage = get_objstorage(**self.sconf)
self.base_storage = storage
self.storage = self.filter_storage(self.sconf)
# Present content with valid id
self.present_valid_content = self.ensure_valid(b"yroqdtotji")
self.present_valid_id = compute_hash(self.present_valid_content)
# Present content with invalid id
self.present_invalid_content = self.ensure_invalid(b"glxddlmmzb")
self.present_invalid_id = compute_hash(self.present_invalid_content)
# Missing content with valid id
self.missing_valid_content = self.ensure_valid(b"rmzkdclkez")
self.missing_valid_id = compute_hash(self.missing_valid_content)
# Missing content with invalid id
self.missing_invalid_content = self.ensure_invalid(b"hlejfuginh")
self.missing_invalid_id = compute_hash(self.missing_invalid_content)
# Present corrupted content with valid id
self.present_corrupted_valid_content = self.ensure_valid(b"cdsjwnpaij")
self.true_present_corrupted_valid_content = self.ensure_valid(b"mgsdpawcrr")
self.present_corrupted_valid_id = compute_hash(
self.true_present_corrupted_valid_content
)
# Present corrupted content with invalid id
self.present_corrupted_invalid_content = self.ensure_invalid(b"pspjljnrco")
self.true_present_corrupted_invalid_content = self.ensure_invalid(b"rjocbnnbso")
self.present_corrupted_invalid_id = compute_hash(
self.true_present_corrupted_invalid_content
)
# Missing (potentially) corrupted content with valid id
self.missing_corrupted_valid_content = self.ensure_valid(b"zxkokfgtou")
self.true_missing_corrupted_valid_content = self.ensure_valid(b"royoncooqa")
self.missing_corrupted_valid_id = compute_hash(
self.true_missing_corrupted_valid_content
)
# Missing (potentially) corrupted content with invalid id
self.missing_corrupted_invalid_content = self.ensure_invalid(b"hxaxnrmnyk")
self.true_missing_corrupted_invalid_content = self.ensure_invalid(b"qhbolyuifr")
self.missing_corrupted_invalid_id = compute_hash(
self.true_missing_corrupted_invalid_content
)
# Add the content that are supposed to be present
self.storage.add(self.present_valid_content, obj_id=self.present_valid_id)
self.storage.add(self.present_invalid_content, obj_id=self.present_invalid_id)
self.storage.add(
self.present_corrupted_valid_content, obj_id=self.present_corrupted_valid_id
)
self.storage.add(
self.present_corrupted_invalid_content,
obj_id=self.present_corrupted_invalid_id,
)
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmpdir)
def filter_storage(self, sconf):
raise NotImplementedError(
"Id_filter test class must have a filter_storage method"
)
def ensure_valid(self, content=None):
if content is None:
content = get_random_content()
while not self.storage.is_valid(compute_hash(content)):
content = get_random_content()
return content
def ensure_invalid(self, content=None):
if content is None:
content = get_random_content()
while self.storage.is_valid(compute_hash(content)):
content = get_random_content()
return content
def test_contains(self):
# Both contents are present, but the invalid one should be ignored.
self.assertTrue(self.present_valid_id in self.storage)
self.assertFalse(self.present_invalid_id in self.storage)
self.assertFalse(self.missing_valid_id in self.storage)
self.assertFalse(self.missing_invalid_id in self.storage)
self.assertTrue(self.present_corrupted_valid_id in self.storage)
self.assertFalse(self.present_corrupted_invalid_id in self.storage)
self.assertFalse(self.missing_corrupted_valid_id in self.storage)
self.assertFalse(self.missing_corrupted_invalid_id in self.storage)
def test_iter(self):
self.assertIn(self.present_valid_id, iter(self.storage))
self.assertNotIn(self.present_invalid_id, iter(self.storage))
self.assertNotIn(self.missing_valid_id, iter(self.storage))
self.assertNotIn(self.missing_invalid_id, iter(self.storage))
self.assertIn(self.present_corrupted_valid_id, iter(self.storage))
self.assertNotIn(self.present_corrupted_invalid_id, iter(self.storage))
self.assertNotIn(self.missing_corrupted_valid_id, iter(self.storage))
self.assertNotIn(self.missing_corrupted_invalid_id, iter(self.storage))
def test_len(self):
# Four contents are present, but only two should be valid.
self.assertEqual(2, len(self.storage))
def test_get(self):
self.assertEqual(
self.present_valid_content, self.storage.get(self.present_valid_id)
)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.present_invalid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.missing_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.missing_invalid_id)
self.assertEqual(
self.present_corrupted_valid_content,
self.storage.get(self.present_corrupted_valid_id),
)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.present_corrupted_invalid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.missing_corrupted_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.missing_corrupted_invalid_id)
def test_check(self):
self.storage.check(self.present_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.present_invalid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.missing_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.missing_invalid_id)
with self.assertRaises(Error):
self.storage.check(self.present_corrupted_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.present_corrupted_invalid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.missing_corrupted_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.missing_corrupted_invalid_id)
def test_get_random(self):
self.assertEqual(0, len(list(self.storage.get_random(0))))
random_content = list(self.storage.get_random(1000))
self.assertIn(self.present_valid_id, random_content)
self.assertNotIn(self.present_invalid_id, random_content)
self.assertNotIn(self.missing_valid_id, random_content)
self.assertNotIn(self.missing_invalid_id, random_content)
self.assertIn(self.present_corrupted_valid_id, random_content)
self.assertNotIn(self.present_corrupted_invalid_id, random_content)
self.assertNotIn(self.missing_corrupted_valid_id, random_content)
self.assertNotIn(self.missing_corrupted_invalid_id, random_content)
def test_add(self):
# Add valid and invalid contents to the storage and check their
# presence with the unfiltered storage.
valid_content = self.ensure_valid(b"ulepsrjbgt")
valid_id = compute_hash(valid_content)
invalid_content = self.ensure_invalid(b"znvghkjked")
invalid_id = compute_hash(invalid_content)
self.storage.add(valid_content, obj_id=valid_id)
self.storage.add(invalid_content, obj_id=invalid_id)
self.assertTrue(valid_id in self.base_storage)
self.assertFalse(invalid_id in self.base_storage)
def test_restore(self):
# Add corrupted content to the storage and the try to restore it
valid_content = self.ensure_valid(b"ulepsrjbgt")
valid_id = compute_hash(valid_content)
corrupted_content = self.ensure_valid(b"ltjkjsloyb")
corrupted_id = compute_hash(corrupted_content)
self.storage.add(corrupted_content, obj_id=valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(corrupted_id)
with self.assertRaises(Error):
self.storage.check(valid_id)
self.storage.restore(valid_content, obj_id=valid_id)
self.storage.check(valid_id)
class TestPrefixFilter(MixinTestIdFilter, unittest.TestCase):
def setUp(self):
self.prefix = b"71"
super().setUp()
def ensure_valid(self, content):
obj_id = compute_hash(content)
hex_obj_id = hashutil.hash_to_hex(obj_id)
self.assertTrue(hex_obj_id.startswith(self.prefix))
return content
def ensure_invalid(self, content):
obj_id = compute_hash(content)
hex_obj_id = hashutil.hash_to_hex(obj_id)
self.assertFalse(hex_obj_id.startswith(self.prefix))
return content
def filter_storage(self, sconf):
return get_objstorage(
"filtered",
- {"storage_conf": sconf, "filters_conf": [id_prefix(self.prefix)]},
+ storage_conf=sconf,
+ filters_conf=[id_prefix(self.prefix)],
)
class TestRegexFilter(MixinTestIdFilter, unittest.TestCase):
def setUp(self):
self.regex = r"[a-f][0-9].*"
super().setUp()
def filter_storage(self, sconf):
return get_objstorage(
- "filtered", {"storage_conf": sconf, "filters_conf": [id_regex(self.regex)]}
+ "filtered", storage_conf=sconf, filters_conf=[id_regex(self.regex)]
)
diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py
index 2bea41b..ee852a5 100644
--- a/swh/objstorage/tests/test_objstorage_api.py
+++ b/swh/objstorage/tests/test_objstorage_api.py
@@ -1,47 +1,47 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import shutil
import tempfile
import unittest
import pytest
from swh.core.api.tests.server_testing import ServerTestFixture
from swh.objstorage.api.server import app
from swh.objstorage.factory import get_objstorage
from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture
class TestRemoteObjStorage(ServerTestFixture, ObjStorageTestFixture, unittest.TestCase):
"""Test the remote archive API."""
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.config = {
"objstorage": {
"cls": "pathslicing",
"root": self.tmpdir,
"slicing": "0:1/0:5",
"allow_delete": True,
},
"client_max_size": 8 * 1024 * 1024,
}
self.app = app
super().setUp()
- self.storage = get_objstorage("remote", {"url": self.url()})
+ self.storage = get_objstorage("remote", url=self.url())
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmpdir)
@pytest.mark.skip("makes no sense to test this for the remote api")
def test_delete_not_allowed(self):
pass
@pytest.mark.skip("makes no sense to test this for the remote api")
def test_delete_not_allowed_by_default(self):
pass
diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py
index dcc1f78..0ee6c71 100644
--- a/swh/objstorage/tests/test_objstorage_azure.py
+++ b/swh/objstorage/tests/test_objstorage_azure.py
@@ -1,317 +1,313 @@
# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import base64
import collections
from dataclasses import dataclass
import unittest
from unittest.mock import patch
from urllib.parse import parse_qs, urlparse
from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
import pytest
from swh.model.hashutil import hash_to_hex
import swh.objstorage.backends.azure
from swh.objstorage.exc import Error
from swh.objstorage.factory import get_objstorage
from swh.objstorage.objstorage import decompressors
from .objstorage_testing import ObjStorageTestFixture
@dataclass
class MockListedObject:
name: str
class MockAsyncDownloadClient:
def __init__(self, blob_data):
self.blob_data = blob_data
def content_as_bytes(self):
future = asyncio.Future()
future.set_result(self.blob_data)
return future
class MockDownloadClient:
def __init__(self, blob_data):
self.blob_data = blob_data
def content_as_bytes(self):
return self.blob_data
def __await__(self):
yield from ()
return MockAsyncDownloadClient(self.blob_data)
class MockBlobClient:
def __init__(self, container, blob):
self.container = container
self.blob = blob
def get_blob_properties(self):
if self.blob not in self.container.blobs:
raise ResourceNotFoundError("Blob not found")
return {"exists": True}
def upload_blob(self, data, length=None):
if self.blob in self.container.blobs:
raise ResourceExistsError("Blob already exists")
if length is not None and length != len(data):
raise ValueError("Wrong length for blob data!")
self.container.blobs[self.blob] = data
def download_blob(self):
if self.blob not in self.container.blobs:
raise ResourceNotFoundError("Blob not found")
return MockDownloadClient(self.container.blobs[self.blob])
def delete_blob(self):
if self.blob not in self.container.blobs:
raise ResourceNotFoundError("Blob not found")
del self.container.blobs[self.blob]
def get_MockContainerClient():
blobs = collections.defaultdict(dict) # {container_url: {blob_id: blob}}
class MockContainerClient:
def __init__(self, container_url):
self.container_url = container_url
self.blobs = blobs[self.container_url]
@classmethod
def from_container_url(cls, container_url):
return cls(container_url)
def get_container_properties(self):
return {"exists": True}
def get_blob_client(self, blob):
return MockBlobClient(self, blob)
def list_blobs(self):
for obj in sorted(self.blobs):
yield MockListedObject(obj)
def delete_blob(self, blob):
self.get_blob_client(blob.name).delete_blob()
def __aenter__(self):
return self
def __await__(self):
future = asyncio.Future()
future.set_result(self)
yield from future
def __aexit__(self, *args):
return self
return MockContainerClient
class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase):
compression = "none"
def setUp(self):
super().setUp()
ContainerClient = get_MockContainerClient()
patcher = patch(
"swh.objstorage.backends.azure.ContainerClient", ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
patcher = patch(
"swh.objstorage.backends.azure.AsyncContainerClient", ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
self.storage = get_objstorage(
"azure",
- {
- "container_url": "https://bogus-container-url.example",
- "compression": self.compression,
- },
+ container_url="https://bogus-container-url.example",
+ compression=self.compression,
)
def test_compression(self):
content, obj_id = self.hash_content(b"test content is compressed")
self.storage.add(content, obj_id=obj_id)
internal_id = self.storage._internal_id(obj_id)
blob_client = self.storage.get_blob_client(internal_id)
raw_blob = blob_client.download_blob().content_as_bytes()
d = decompressors[self.compression]()
assert d.decompress(raw_blob) == content
assert d.unused_data == b""
def test_trailing_data_on_stored_blob(self):
content, obj_id = self.hash_content(b"test content without garbage")
self.storage.add(content, obj_id=obj_id)
internal_id = self.storage._internal_id(obj_id)
blob_client = self.storage.get_blob_client(internal_id)
raw_blob = blob_client.download_blob().content_as_bytes()
new_data = raw_blob + b"trailing garbage"
blob_client.delete_blob()
blob_client.upload_blob(data=new_data, length=len(new_data))
if self.compression == "none":
with self.assertRaises(Error) as e:
self.storage.check(obj_id)
else:
with self.assertRaises(Error) as e:
self.storage.get(obj_id)
assert "trailing data" in e.exception.args[0]
class TestAzureCloudObjStorageGzip(TestAzureCloudObjStorage):
compression = "gzip"
class TestAzureCloudObjStorageZlib(TestAzureCloudObjStorage):
compression = "zlib"
class TestAzureCloudObjStorageLzma(TestAzureCloudObjStorage):
compression = "lzma"
class TestAzureCloudObjStorageBz2(TestAzureCloudObjStorage):
compression = "bz2"
class TestPrefixedAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase):
def setUp(self):
super().setUp()
self.ContainerClient = get_MockContainerClient()
patcher = patch(
"swh.objstorage.backends.azure.ContainerClient", self.ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
patcher = patch(
"swh.objstorage.backends.azure.AsyncContainerClient", self.ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
self.accounts = {}
for prefix in "0123456789abcdef":
self.accounts[prefix] = "https://bogus-container-url.example/" + prefix
- self.storage = get_objstorage("azure-prefixed", {"accounts": self.accounts})
+ self.storage = get_objstorage("azure-prefixed", accounts=self.accounts)
def test_prefixedazure_instantiation_missing_prefixes(self):
del self.accounts["d"]
del self.accounts["e"]
with self.assertRaisesRegex(ValueError, "Missing prefixes"):
- get_objstorage("azure-prefixed", {"accounts": self.accounts})
+ get_objstorage("azure-prefixed", accounts=self.accounts)
def test_prefixedazure_instantiation_inconsistent_prefixes(self):
self.accounts["00"] = self.accounts["0"]
with self.assertRaisesRegex(ValueError, "Inconsistent prefixes"):
- get_objstorage("azure-prefixed", {"accounts": self.accounts})
+ get_objstorage("azure-prefixed", accounts=self.accounts)
def test_prefixedazure_sharding_behavior(self):
for i in range(100):
content, obj_id = self.hash_content(b"test_content_%02d" % i)
self.storage.add(content, obj_id=obj_id)
hex_obj_id = hash_to_hex(obj_id)
prefix = hex_obj_id[0]
self.assertTrue(
self.ContainerClient(self.storage.container_urls[prefix])
.get_blob_client(hex_obj_id)
.get_blob_properties()
)
def test_get_container_url():
# r=read, l=list, w=write, d=delete
policy_map = {
"read_only": "rl",
"append_only": "rwl",
"full": "rwdl",
}
for policy, expected in policy_map.items():
ret = swh.objstorage.backends.azure.get_container_url(
account_name="account_name",
account_key=base64.b64encode(b"account_key"),
container_name="container_name",
access_policy=policy,
)
p = urlparse(ret)
assert p.scheme == "https"
assert p.netloc == "account_name.blob.core.windows.net"
assert p.path == "/container_name"
qs = parse_qs(p.query)
# sp: permissions
assert qs["sp"] == [expected]
# sr: resource (c=container)
assert qs["sr"] == ["c"]
# st: start; se: expiry
assert qs["st"][0] < qs["se"][0]
def test_bwcompat_args(monkeypatch):
monkeypatch.setattr(
swh.objstorage.backends.azure,
"ContainerClient",
get_MockContainerClient(),
)
with pytest.deprecated_call():
objs = get_objstorage(
"azure",
- {
- "account_name": "account_name",
- "api_secret_key": base64.b64encode(b"account_key"),
- "container_name": "container_name",
- },
+ account_name="account_name",
+ api_secret_key=base64.b64encode(b"account_key"),
+ container_name="container_name",
)
assert objs is not None
def test_bwcompat_args_prefixed(monkeypatch):
monkeypatch.setattr(
swh.objstorage.backends.azure,
"ContainerClient",
get_MockContainerClient(),
)
accounts = {
prefix: {
"account_name": f"account_name{prefix}",
"api_secret_key": base64.b64encode(b"account_key"),
"container_name": "container_name",
}
for prefix in "0123456789abcdef"
}
with pytest.deprecated_call():
- objs = get_objstorage("azure-prefixed", {"accounts": accounts})
+ objs = get_objstorage("azure-prefixed", accounts=accounts)
assert objs is not None
diff --git a/swh/objstorage/tests/test_objstorage_in_memory.py b/swh/objstorage/tests/test_objstorage_in_memory.py
index d152cf5..a7ab4f5 100644
--- a/swh/objstorage/tests/test_objstorage_in_memory.py
+++ b/swh/objstorage/tests/test_objstorage_in_memory.py
@@ -1,16 +1,16 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from swh.objstorage.factory import get_objstorage
from .objstorage_testing import ObjStorageTestFixture
class TestInMemoryObjStorage(ObjStorageTestFixture, unittest.TestCase):
def setUp(self):
super().setUp()
- self.storage = get_objstorage(cls="memory", args={})
+ self.storage = get_objstorage(cls="memory")
diff --git a/swh/objstorage/tests/test_objstorage_instantiation.py b/swh/objstorage/tests/test_objstorage_instantiation.py
index 6308718..370456a 100644
--- a/swh/objstorage/tests/test_objstorage_instantiation.py
+++ b/swh/objstorage/tests/test_objstorage_instantiation.py
@@ -1,40 +1,40 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import shutil
import tempfile
import unittest
from swh.objstorage.api.client import RemoteObjStorage
from swh.objstorage.backends.pathslicing import PathSlicingObjStorage
from swh.objstorage.factory import get_objstorage
class TestObjStorageInitialization(unittest.TestCase):
"""Test that the methods for ObjStorage initializations with
`get_objstorage` works properly.
"""
def setUp(self):
self.path = tempfile.mkdtemp()
self.path2 = tempfile.mkdtemp()
# Server is launched at self.url()
self.config = {"storage_base": self.path2, "storage_slicing": "0:1/0:5"}
super().setUp()
def tearDown(self):
super().tearDown()
shutil.rmtree(self.path)
shutil.rmtree(self.path2)
def test_pathslicing_objstorage(self):
- conf = {"cls": "pathslicing", "args": {"root": self.path, "slicing": "0:2/0:5"}}
+ conf = {"cls": "pathslicing", "root": self.path, "slicing": "0:2/0:5"}
st = get_objstorage(**conf)
self.assertTrue(isinstance(st, PathSlicingObjStorage))
def test_remote_objstorage(self):
- conf = {"cls": "remote", "args": {"url": "http://127.0.0.1:4242/"}}
+ conf = {"cls": "remote", "url": "http://127.0.0.1:4242/"}
st = get_objstorage(**conf)
self.assertTrue(isinstance(st, RemoteObjStorage))
diff --git a/swh/objstorage/tests/test_objstorage_pathslicing.py b/swh/objstorage/tests/test_objstorage_pathslicing.py
index 1ef6eaf..41a77cc 100644
--- a/swh/objstorage/tests/test_objstorage_pathslicing.py
+++ b/swh/objstorage/tests/test_objstorage_pathslicing.py
@@ -1,163 +1,161 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import shutil
import tempfile
import unittest
from unittest.mock import DEFAULT, patch
from swh.model import hashutil
from swh.objstorage import exc
from swh.objstorage.constants import ID_DIGEST_LENGTH
from swh.objstorage.factory import get_objstorage
from .objstorage_testing import ObjStorageTestFixture
class TestPathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase):
compression = "none"
def setUp(self):
super().setUp()
self.slicing = "0:2/2:4/4:6"
self.tmpdir = tempfile.mkdtemp()
self.storage = get_objstorage(
"pathslicing",
- {
- "root": self.tmpdir,
- "slicing": self.slicing,
- "compression": self.compression,
- },
+ root=self.tmpdir,
+ slicing=self.slicing,
+ compression=self.compression,
)
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmpdir)
def content_path(self, obj_id):
hex_obj_id = hashutil.hash_to_hex(obj_id)
return self.storage.slicer.get_path(hex_obj_id)
def test_iter(self):
content, obj_id = self.hash_content(b"iter")
self.assertEqual(list(iter(self.storage)), [])
self.storage.add(content, obj_id=obj_id)
self.assertEqual(list(iter(self.storage)), [obj_id])
def test_len(self):
content, obj_id = self.hash_content(b"len")
self.assertEqual(len(self.storage), 0)
self.storage.add(content, obj_id=obj_id)
self.assertEqual(len(self.storage), 1)
def test_check_ok(self):
content, obj_id = self.hash_content(b"check_ok")
self.storage.add(content, obj_id=obj_id)
assert self.storage.check(obj_id) is None
assert self.storage.check(obj_id.hex()) is None
def test_check_id_mismatch(self):
content, obj_id = self.hash_content(b"check_id_mismatch")
self.storage.add(b"unexpected content", obj_id=obj_id)
with self.assertRaises(exc.Error) as error:
self.storage.check(obj_id)
self.assertEqual(
(
"Corrupt object %s should have id "
"12ebb2d6c81395bcc5cab965bdff640110cb67ff" % obj_id.hex(),
),
error.exception.args,
)
def test_get_random_contents(self):
content, obj_id = self.hash_content(b"get_random_content")
self.storage.add(content, obj_id=obj_id)
random_contents = list(self.storage.get_random(1))
self.assertEqual(1, len(random_contents))
self.assertIn(obj_id, random_contents)
def test_iterate_from(self):
all_ids = []
for i in range(100):
content, obj_id = self.hash_content(b"content %d" % i)
self.storage.add(content, obj_id=obj_id)
all_ids.append(obj_id)
all_ids.sort()
ids = list(self.storage.iter_from(b"\x00" * ID_DIGEST_LENGTH))
self.assertEqual(len(ids), len(all_ids))
self.assertEqual(ids, all_ids)
ids = list(self.storage.iter_from(all_ids[0]))
self.assertEqual(len(ids), len(all_ids) - 1)
self.assertEqual(ids, all_ids[1:])
ids = list(self.storage.iter_from(all_ids[-1], n_leaf=True))
n_leaf = ids[-1]
ids = ids[:-1]
self.assertEqual(n_leaf, 1)
self.assertEqual(len(ids), 0)
ids = list(self.storage.iter_from(all_ids[-2], n_leaf=True))
n_leaf = ids[-1]
ids = ids[:-1]
self.assertEqual(n_leaf, 2) # beware, this depends on the hash algo
self.assertEqual(len(ids), 1)
self.assertEqual(ids, all_ids[-1:])
def test_fdatasync_default(self):
content, obj_id = self.hash_content(b"check_fdatasync")
with patch.multiple("os", fsync=DEFAULT, fdatasync=DEFAULT) as patched:
self.storage.add(content, obj_id=obj_id)
if self.storage.use_fdatasync:
assert patched["fdatasync"].call_count == 1
assert patched["fsync"].call_count == 0
else:
assert patched["fdatasync"].call_count == 0
assert patched["fsync"].call_count == 1
def test_fdatasync_forced_on(self):
self.storage.use_fdatasync = True
content, obj_id = self.hash_content(b"check_fdatasync")
with patch.multiple("os", fsync=DEFAULT, fdatasync=DEFAULT) as patched:
self.storage.add(content, obj_id=obj_id)
assert patched["fdatasync"].call_count == 1
assert patched["fsync"].call_count == 0
def test_fdatasync_forced_off(self):
self.storage.use_fdatasync = False
content, obj_id = self.hash_content(b"check_fdatasync")
with patch.multiple("os", fsync=DEFAULT, fdatasync=DEFAULT) as patched:
self.storage.add(content, obj_id=obj_id)
assert patched["fdatasync"].call_count == 0
assert patched["fsync"].call_count == 1
def test_check_not_compressed(self):
content, obj_id = self.hash_content(b"check_not_compressed")
self.storage.add(content, obj_id=obj_id)
with open(self.content_path(obj_id), "ab") as f: # Add garbage.
f.write(b"garbage")
with self.assertRaises(exc.Error) as error:
self.storage.check(obj_id)
if self.compression == "none":
self.assertIn("Corrupt object", error.exception.args[0])
else:
self.assertIn("trailing data found", error.exception.args[0])
class TestPathSlicingObjStorageGzip(TestPathSlicingObjStorage):
compression = "gzip"
class TestPathSlicingObjStorageZlib(TestPathSlicingObjStorage):
compression = "zlib"
class TestPathSlicingObjStorageBz2(TestPathSlicingObjStorage):
compression = "bz2"
class TestPathSlicingObjStorageLzma(TestPathSlicingObjStorage):
compression = "lzma"
diff --git a/swh/objstorage/tests/test_objstorage_random_generator.py b/swh/objstorage/tests/test_objstorage_random_generator.py
index 919440a..83f79b4 100644
--- a/swh/objstorage/tests/test_objstorage_random_generator.py
+++ b/swh/objstorage/tests/test_objstorage_random_generator.py
@@ -1,39 +1,39 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections.abc import Iterator
from swh.objstorage.factory import get_objstorage
def test_random_generator_objstorage():
- sto = get_objstorage("random", {})
+ sto = get_objstorage("random")
assert sto
blobs = [sto.get(None) for i in range(100)]
lengths = [len(x) for x in blobs]
assert max(lengths) <= 55056238
def test_random_generator_objstorage_list_content():
- sto = get_objstorage("random", {"total": 100})
+ sto = get_objstorage("random", total=100)
assert isinstance(sto.list_content(), Iterator)
assert list(sto.list_content()) == [b"%d" % i for i in range(1, 101)]
assert list(sto.list_content(limit=10)) == [b"%d" % i for i in range(1, 11)]
assert list(sto.list_content(last_obj_id=b"10", limit=10)) == [
b"%d" % i for i in range(11, 21)
]
def test_random_generator_objstorage_total():
- sto = get_objstorage("random", {"total": 5})
+ sto = get_objstorage("random", total=5)
assert len([x for x in sto]) == 5
def test_random_generator_objstorage_size():
- sto = get_objstorage("random", {"filesize": 10})
+ sto = get_objstorage("random", filesize=10)
for i in range(10):
assert len(sto.get(None)) == 10
diff --git a/swh/objstorage/tests/test_objstorage_striping.py b/swh/objstorage/tests/test_objstorage_striping.py
index a96f35c..86e2472 100644
--- a/swh/objstorage/tests/test_objstorage_striping.py
+++ b/swh/objstorage/tests/test_objstorage_striping.py
@@ -1,77 +1,71 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import tempfile
import unittest
from swh.objstorage.factory import get_objstorage
from .objstorage_testing import ObjStorageTestFixture
class TestStripingObjStorage(ObjStorageTestFixture, unittest.TestCase):
def setUp(self):
super().setUp()
self.base_dir = tempfile.mkdtemp()
os.mkdir(os.path.join(self.base_dir, "root1"))
os.mkdir(os.path.join(self.base_dir, "root2"))
storage_config = {
"cls": "striping",
- "args": {
- "objstorages": [
- {
- "cls": "pathslicing",
- "args": {
- "root": os.path.join(self.base_dir, "root1"),
- "slicing": "0:2",
- "allow_delete": True,
- },
- },
- {
- "cls": "pathslicing",
- "args": {
- "root": os.path.join(self.base_dir, "root2"),
- "slicing": "0:2",
- "allow_delete": True,
- },
- },
- ]
- },
+ "objstorages": [
+ {
+ "cls": "pathslicing",
+ "root": os.path.join(self.base_dir, "root1"),
+ "slicing": "0:2",
+ "allow_delete": True,
+ },
+ {
+ "cls": "pathslicing",
+ "root": os.path.join(self.base_dir, "root2"),
+ "slicing": "0:2",
+ "allow_delete": True,
+ },
+ ],
}
self.storage = get_objstorage(**storage_config)
def tearDown(self):
shutil.rmtree(self.base_dir)
def test_add_striping_behavior(self):
exp_storage_counts = [0, 0]
storage_counts = [0, 0]
for i in range(100):
content, obj_id = self.hash_content(b"striping_behavior_test%02d" % i)
self.storage.add(content, obj_id)
exp_storage_counts[self.storage.get_storage_index(obj_id)] += 1
count = 0
for i, storage in enumerate(self.storage.storages):
if obj_id not in storage:
continue
count += 1
storage_counts[i] += 1
self.assertEqual(count, 1)
self.assertEqual(storage_counts, exp_storage_counts)
def test_get_striping_behavior(self):
# Make sure we can read objects that are available in any backend
# storage
content, obj_id = self.hash_content(b"striping_behavior_test")
for storage in self.storage.storages:
storage.add(content, obj_id)
self.assertIn(obj_id, self.storage)
storage.delete(obj_id)
self.assertNotIn(obj_id, self.storage)
def test_list_content(self):
self.skipTest("Quite a chellenge to make it work")
diff --git a/swh/objstorage/tests/test_server.py b/swh/objstorage/tests/test_server.py
index 5dbde1d..cec2a62 100644
--- a/swh/objstorage/tests/test_server.py
+++ b/swh/objstorage/tests/test_server.py
@@ -1,126 +1,125 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import pytest
import yaml
from swh.objstorage.api.server import load_and_check_config
def prepare_config_file(tmpdir, content, name="config.yml"):
"""Prepare configuration file in `$tmpdir/name` with content `content`.
Args:
tmpdir (LocalPath): root directory
content (str/dict): Content of the file either as string or as a dict.
If a dict, converts the dict into a yaml string.
name (str): configuration filename
Returns
path (str) of the configuration file prepared.
"""
config_path = tmpdir / name
if isinstance(content, dict): # convert if needed
content = yaml.dump(content)
config_path.write_text(content, encoding="utf-8")
# pytest on python3.5 does not support LocalPath manipulation, so
# convert path to string
return str(config_path)
def test_load_and_check_config_no_configuration():
"""Inexistent configuration files raises"""
with pytest.raises(EnvironmentError, match="Configuration file must be defined"):
load_and_check_config(None)
config_path = "/indexer/inexistent/config.yml"
with pytest.raises(FileNotFoundError, match=f"{config_path} does not exist"):
load_and_check_config(config_path)
def test_load_and_check_config_invalid_configuration_toplevel(tmpdir):
"""Invalid configuration raises"""
config = {"something": "useless"}
config_path = prepare_config_file(tmpdir, content=config)
with pytest.raises(KeyError, match="missing objstorage config entry"):
load_and_check_config(config_path)
def test_load_and_check_config_invalid_configuration(tmpdir):
"""Invalid configuration raises"""
config_path = prepare_config_file(
tmpdir, content={"objstorage": {"something": "useless"}}
)
with pytest.raises(KeyError, match="missing cls config entry"):
load_and_check_config(config_path)
def test_load_and_check_config_invalid_configuration_level2(tmpdir):
"""Invalid configuration at 2nd level raises"""
config = {
"objstorage": {
"cls": "pathslicing",
- "args": {
- "root": "root",
- "slicing": "slicing",
- },
+ "root": "root",
+ "slicing": "slicing",
"client_max_size": "10",
}
}
for key in ("root", "slicing"):
c = copy.deepcopy(config)
- c["objstorage"]["args"].pop(key)
+ c["objstorage"].pop(key)
config_path = prepare_config_file(tmpdir, c)
with pytest.raises(KeyError, match=f"missing {key} config entry"):
load_and_check_config(config_path)
@pytest.mark.parametrize(
"config",
[
pytest.param(
{
"objstorage": {
"cls": "pathslicing",
- "args": {"root": "root", "slicing": "slicing"},
+ "root": "root",
+ "slicing": "slicing",
}
},
id="pathslicing-bw-compat",
),
pytest.param(
{
"objstorage": {
"cls": "pathslicing",
"root": "root",
"slicing": "slicing",
}
},
id="pathslicing",
),
pytest.param(
- {"client_max_size": "10", "objstorage": {"cls": "memory", "args": {}}},
+ {"client_max_size": "10", "objstorage": {"cls": "memory"}},
id="empty-args-bw-compat",
),
pytest.param(
{"client_max_size": "10", "objstorage": {"cls": "memory"}}, id="empty-args"
),
pytest.param(
{
"objstorage": {
"cls": "noop",
}
},
id="noop",
),
],
)
def test_load_and_check_config(tmpdir, config):
"""pathslicing configuration fine loads ok"""
config_path = prepare_config_file(tmpdir, config)
cfg = load_and_check_config(config_path)
assert cfg == config
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:33 PM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3275842
Attached To
rDOBJS Object storage
Event Timeline
Log In to Comment