Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/objstorage/tests/test_multiplexer_filter.py b/swh/objstorage/tests/test_multiplexer_filter.py
index 7e66e7b..c7d7eaa 100644
--- a/swh/objstorage/tests/test_multiplexer_filter.py
+++ b/swh/objstorage/tests/test_multiplexer_filter.py
@@ -1,330 +1,332 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import random
import shutil
from string import ascii_lowercase
import tempfile
import unittest
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
from swh.objstorage.factory import get_objstorage
from swh.objstorage.multiplexer.filter import id_prefix, id_regex, read_only
from swh.objstorage.objstorage import compute_hash
def get_random_content():
return bytes("".join(random.sample(ascii_lowercase, 10)), "utf8")
class MixinTestReadFilter(unittest.TestCase):
# Read only filter should not allow writing
def setUp(self):
super().setUp()
self.tmpdir = tempfile.mkdtemp()
pstorage = {
"cls": "pathslicing",
"root": self.tmpdir,
"slicing": "0:5",
}
base_storage = get_objstorage(**pstorage)
self.storage = get_objstorage(
"filtered", storage_conf=pstorage, filters_conf=[read_only()]
)
self.valid_content = b"pre-existing content"
self.invalid_content = b"invalid_content"
self.true_invalid_content = b"Anything that is not correct"
self.absent_content = b"non-existent content"
# Create a valid content.
self.valid_id = compute_hash(self.valid_content)
base_storage.add(self.valid_content, obj_id=self.valid_id)
# Create an invalid id and add a content with it.
self.invalid_id = compute_hash(self.true_invalid_content)
base_storage.add(self.invalid_content, obj_id=self.invalid_id)
# Compute an id for a non-existing content.
self.absent_id = compute_hash(self.absent_content)
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmpdir)
def test_can_contains(self):
self.assertTrue(self.valid_id in self.storage)
self.assertTrue(self.invalid_id in self.storage)
self.assertFalse(self.absent_id in self.storage)
def test_can_iter(self):
self.assertIn(self.valid_id, iter(self.storage))
self.assertIn(self.invalid_id, iter(self.storage))
def test_can_len(self):
self.assertEqual(2, len(self.storage))
def test_can_get(self):
self.assertEqual(self.valid_content, self.storage.get(self.valid_id))
self.assertEqual(self.invalid_content, self.storage.get(self.invalid_id))
def test_can_check(self):
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.absent_id)
with self.assertRaises(Error):
self.storage.check(self.invalid_id)
self.storage.check(self.valid_id)
def test_can_get_random(self):
self.assertEqual(1, len(list(self.storage.get_random(1))))
self.assertEqual(
len(list(self.storage)), len(set(self.storage.get_random(1000)))
)
def test_cannot_add(self):
new_id = self.storage.add(b"New content")
result = self.storage.add(self.valid_content, self.valid_id)
self.assertIsNone(new_id, self.storage)
self.assertIsNone(result)
def test_cannot_restore(self):
result = self.storage.restore(self.valid_content, self.valid_id)
self.assertIsNone(result)
class MixinTestIdFilter:
"""Mixin class that tests the filters based on filter.IdFilter
Methods "make_valid", "make_invalid" and "filter_storage" must be
implemented by subclasses.
"""
def setUp(self):
super().setUp()
# Use a hack here : as the mock uses the content as id, it is easy to
# create contents that are filtered or not.
self.prefix = "71"
self.tmpdir = tempfile.mkdtemp()
# Make the storage filtered
self.sconf = {
"cls": "pathslicing",
- "args": {"root": self.tmpdir, "slicing": "0:5"},
+ "root": self.tmpdir,
+ "slicing": "0:5",
}
storage = get_objstorage(**self.sconf)
self.base_storage = storage
self.storage = self.filter_storage(self.sconf)
# Present content with valid id
self.present_valid_content = self.ensure_valid(b"yroqdtotji")
self.present_valid_id = compute_hash(self.present_valid_content)
# Present content with invalid id
self.present_invalid_content = self.ensure_invalid(b"glxddlmmzb")
self.present_invalid_id = compute_hash(self.present_invalid_content)
# Missing content with valid id
self.missing_valid_content = self.ensure_valid(b"rmzkdclkez")
self.missing_valid_id = compute_hash(self.missing_valid_content)
# Missing content with invalid id
self.missing_invalid_content = self.ensure_invalid(b"hlejfuginh")
self.missing_invalid_id = compute_hash(self.missing_invalid_content)
# Present corrupted content with valid id
self.present_corrupted_valid_content = self.ensure_valid(b"cdsjwnpaij")
self.true_present_corrupted_valid_content = self.ensure_valid(b"mgsdpawcrr")
self.present_corrupted_valid_id = compute_hash(
self.true_present_corrupted_valid_content
)
# Present corrupted content with invalid id
self.present_corrupted_invalid_content = self.ensure_invalid(b"pspjljnrco")
self.true_present_corrupted_invalid_content = self.ensure_invalid(b"rjocbnnbso")
self.present_corrupted_invalid_id = compute_hash(
self.true_present_corrupted_invalid_content
)
# Missing (potentially) corrupted content with valid id
self.missing_corrupted_valid_content = self.ensure_valid(b"zxkokfgtou")
self.true_missing_corrupted_valid_content = self.ensure_valid(b"royoncooqa")
self.missing_corrupted_valid_id = compute_hash(
self.true_missing_corrupted_valid_content
)
# Missing (potentially) corrupted content with invalid id
self.missing_corrupted_invalid_content = self.ensure_invalid(b"hxaxnrmnyk")
self.true_missing_corrupted_invalid_content = self.ensure_invalid(b"qhbolyuifr")
self.missing_corrupted_invalid_id = compute_hash(
self.true_missing_corrupted_invalid_content
)
# Add the content that are supposed to be present
self.storage.add(self.present_valid_content, obj_id=self.present_valid_id)
self.storage.add(self.present_invalid_content, obj_id=self.present_invalid_id)
self.storage.add(
self.present_corrupted_valid_content, obj_id=self.present_corrupted_valid_id
)
self.storage.add(
self.present_corrupted_invalid_content,
obj_id=self.present_corrupted_invalid_id,
)
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmpdir)
def filter_storage(self, sconf):
raise NotImplementedError(
"Id_filter test class must have a filter_storage method"
)
def ensure_valid(self, content=None):
if content is None:
content = get_random_content()
while not self.storage.is_valid(compute_hash(content)):
content = get_random_content()
return content
def ensure_invalid(self, content=None):
if content is None:
content = get_random_content()
while self.storage.is_valid(compute_hash(content)):
content = get_random_content()
return content
def test_contains(self):
# Both contents are present, but the invalid one should be ignored.
self.assertTrue(self.present_valid_id in self.storage)
self.assertFalse(self.present_invalid_id in self.storage)
self.assertFalse(self.missing_valid_id in self.storage)
self.assertFalse(self.missing_invalid_id in self.storage)
self.assertTrue(self.present_corrupted_valid_id in self.storage)
self.assertFalse(self.present_corrupted_invalid_id in self.storage)
self.assertFalse(self.missing_corrupted_valid_id in self.storage)
self.assertFalse(self.missing_corrupted_invalid_id in self.storage)
def test_iter(self):
self.assertIn(self.present_valid_id, iter(self.storage))
self.assertNotIn(self.present_invalid_id, iter(self.storage))
self.assertNotIn(self.missing_valid_id, iter(self.storage))
self.assertNotIn(self.missing_invalid_id, iter(self.storage))
self.assertIn(self.present_corrupted_valid_id, iter(self.storage))
self.assertNotIn(self.present_corrupted_invalid_id, iter(self.storage))
self.assertNotIn(self.missing_corrupted_valid_id, iter(self.storage))
self.assertNotIn(self.missing_corrupted_invalid_id, iter(self.storage))
def test_len(self):
# Four contents are present, but only two should be valid.
self.assertEqual(2, len(self.storage))
def test_get(self):
self.assertEqual(
self.present_valid_content, self.storage.get(self.present_valid_id)
)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.present_invalid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.missing_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.missing_invalid_id)
self.assertEqual(
self.present_corrupted_valid_content,
self.storage.get(self.present_corrupted_valid_id),
)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.present_corrupted_invalid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.missing_corrupted_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.missing_corrupted_invalid_id)
def test_check(self):
self.storage.check(self.present_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.present_invalid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.missing_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.missing_invalid_id)
with self.assertRaises(Error):
self.storage.check(self.present_corrupted_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.present_corrupted_invalid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.missing_corrupted_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.missing_corrupted_invalid_id)
def test_get_random(self):
self.assertEqual(0, len(list(self.storage.get_random(0))))
random_content = list(self.storage.get_random(1000))
self.assertIn(self.present_valid_id, random_content)
self.assertNotIn(self.present_invalid_id, random_content)
self.assertNotIn(self.missing_valid_id, random_content)
self.assertNotIn(self.missing_invalid_id, random_content)
self.assertIn(self.present_corrupted_valid_id, random_content)
self.assertNotIn(self.present_corrupted_invalid_id, random_content)
self.assertNotIn(self.missing_corrupted_valid_id, random_content)
self.assertNotIn(self.missing_corrupted_invalid_id, random_content)
def test_add(self):
# Add valid and invalid contents to the storage and check their
# presence with the unfiltered storage.
valid_content = self.ensure_valid(b"ulepsrjbgt")
valid_id = compute_hash(valid_content)
invalid_content = self.ensure_invalid(b"znvghkjked")
invalid_id = compute_hash(invalid_content)
self.storage.add(valid_content, obj_id=valid_id)
self.storage.add(invalid_content, obj_id=invalid_id)
self.assertTrue(valid_id in self.base_storage)
self.assertFalse(invalid_id in self.base_storage)
def test_restore(self):
# Add corrupted content to the storage and the try to restore it
valid_content = self.ensure_valid(b"ulepsrjbgt")
valid_id = compute_hash(valid_content)
corrupted_content = self.ensure_valid(b"ltjkjsloyb")
corrupted_id = compute_hash(corrupted_content)
self.storage.add(corrupted_content, obj_id=valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(corrupted_id)
with self.assertRaises(Error):
self.storage.check(valid_id)
self.storage.restore(valid_content, obj_id=valid_id)
self.storage.check(valid_id)
class TestPrefixFilter(MixinTestIdFilter, unittest.TestCase):
def setUp(self):
self.prefix = b"71"
super().setUp()
def ensure_valid(self, content):
obj_id = compute_hash(content)
hex_obj_id = hashutil.hash_to_hex(obj_id)
self.assertTrue(hex_obj_id.startswith(self.prefix))
return content
def ensure_invalid(self, content):
obj_id = compute_hash(content)
hex_obj_id = hashutil.hash_to_hex(obj_id)
self.assertFalse(hex_obj_id.startswith(self.prefix))
return content
def filter_storage(self, sconf):
return get_objstorage(
"filtered",
- {"storage_conf": sconf, "filters_conf": [id_prefix(self.prefix)]},
+ storage_conf=sconf,
+ filters_conf=[id_prefix(self.prefix)],
)
class TestRegexFilter(MixinTestIdFilter, unittest.TestCase):
def setUp(self):
self.regex = r"[a-f][0-9].*"
super().setUp()
def filter_storage(self, sconf):
return get_objstorage(
- "filtered", {"storage_conf": sconf, "filters_conf": [id_regex(self.regex)]}
+ "filtered", storage_conf=sconf, filters_conf=[id_regex(self.regex)]
)
diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py
index 2bea41b..ee852a5 100644
--- a/swh/objstorage/tests/test_objstorage_api.py
+++ b/swh/objstorage/tests/test_objstorage_api.py
@@ -1,47 +1,47 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import shutil
import tempfile
import unittest
import pytest
from swh.core.api.tests.server_testing import ServerTestFixture
from swh.objstorage.api.server import app
from swh.objstorage.factory import get_objstorage
from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture
class TestRemoteObjStorage(ServerTestFixture, ObjStorageTestFixture, unittest.TestCase):
"""Test the remote archive API."""
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.config = {
"objstorage": {
"cls": "pathslicing",
"root": self.tmpdir,
"slicing": "0:1/0:5",
"allow_delete": True,
},
"client_max_size": 8 * 1024 * 1024,
}
self.app = app
super().setUp()
- self.storage = get_objstorage("remote", {"url": self.url()})
+ self.storage = get_objstorage("remote", url=self.url())
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmpdir)
@pytest.mark.skip("makes no sense to test this for the remote api")
def test_delete_not_allowed(self):
pass
@pytest.mark.skip("makes no sense to test this for the remote api")
def test_delete_not_allowed_by_default(self):
pass
diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py
index dcc1f78..0ee6c71 100644
--- a/swh/objstorage/tests/test_objstorage_azure.py
+++ b/swh/objstorage/tests/test_objstorage_azure.py
@@ -1,317 +1,313 @@
# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import base64
import collections
from dataclasses import dataclass
import unittest
from unittest.mock import patch
from urllib.parse import parse_qs, urlparse
from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
import pytest
from swh.model.hashutil import hash_to_hex
import swh.objstorage.backends.azure
from swh.objstorage.exc import Error
from swh.objstorage.factory import get_objstorage
from swh.objstorage.objstorage import decompressors
from .objstorage_testing import ObjStorageTestFixture
@dataclass
class MockListedObject:
name: str
class MockAsyncDownloadClient:
def __init__(self, blob_data):
self.blob_data = blob_data
def content_as_bytes(self):
future = asyncio.Future()
future.set_result(self.blob_data)
return future
class MockDownloadClient:
def __init__(self, blob_data):
self.blob_data = blob_data
def content_as_bytes(self):
return self.blob_data
def __await__(self):
yield from ()
return MockAsyncDownloadClient(self.blob_data)
class MockBlobClient:
def __init__(self, container, blob):
self.container = container
self.blob = blob
def get_blob_properties(self):
if self.blob not in self.container.blobs:
raise ResourceNotFoundError("Blob not found")
return {"exists": True}
def upload_blob(self, data, length=None):
if self.blob in self.container.blobs:
raise ResourceExistsError("Blob already exists")
if length is not None and length != len(data):
raise ValueError("Wrong length for blob data!")
self.container.blobs[self.blob] = data
def download_blob(self):
if self.blob not in self.container.blobs:
raise ResourceNotFoundError("Blob not found")
return MockDownloadClient(self.container.blobs[self.blob])
def delete_blob(self):
if self.blob not in self.container.blobs:
raise ResourceNotFoundError("Blob not found")
del self.container.blobs[self.blob]
def get_MockContainerClient():
blobs = collections.defaultdict(dict) # {container_url: {blob_id: blob}}
class MockContainerClient:
def __init__(self, container_url):
self.container_url = container_url
self.blobs = blobs[self.container_url]
@classmethod
def from_container_url(cls, container_url):
return cls(container_url)
def get_container_properties(self):
return {"exists": True}
def get_blob_client(self, blob):
return MockBlobClient(self, blob)
def list_blobs(self):
for obj in sorted(self.blobs):
yield MockListedObject(obj)
def delete_blob(self, blob):
self.get_blob_client(blob.name).delete_blob()
def __aenter__(self):
return self
def __await__(self):
future = asyncio.Future()
future.set_result(self)
yield from future
def __aexit__(self, *args):
return self
return MockContainerClient
class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase):
compression = "none"
def setUp(self):
super().setUp()
ContainerClient = get_MockContainerClient()
patcher = patch(
"swh.objstorage.backends.azure.ContainerClient", ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
patcher = patch(
"swh.objstorage.backends.azure.AsyncContainerClient", ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
self.storage = get_objstorage(
"azure",
- {
- "container_url": "https://bogus-container-url.example",
- "compression": self.compression,
- },
+ container_url="https://bogus-container-url.example",
+ compression=self.compression,
)
def test_compression(self):
content, obj_id = self.hash_content(b"test content is compressed")
self.storage.add(content, obj_id=obj_id)
internal_id = self.storage._internal_id(obj_id)
blob_client = self.storage.get_blob_client(internal_id)
raw_blob = blob_client.download_blob().content_as_bytes()
d = decompressors[self.compression]()
assert d.decompress(raw_blob) == content
assert d.unused_data == b""
def test_trailing_data_on_stored_blob(self):
content, obj_id = self.hash_content(b"test content without garbage")
self.storage.add(content, obj_id=obj_id)
internal_id = self.storage._internal_id(obj_id)
blob_client = self.storage.get_blob_client(internal_id)
raw_blob = blob_client.download_blob().content_as_bytes()
new_data = raw_blob + b"trailing garbage"
blob_client.delete_blob()
blob_client.upload_blob(data=new_data, length=len(new_data))
if self.compression == "none":
with self.assertRaises(Error) as e:
self.storage.check(obj_id)
else:
with self.assertRaises(Error) as e:
self.storage.get(obj_id)
assert "trailing data" in e.exception.args[0]
class TestAzureCloudObjStorageGzip(TestAzureCloudObjStorage):
compression = "gzip"
class TestAzureCloudObjStorageZlib(TestAzureCloudObjStorage):
compression = "zlib"
class TestAzureCloudObjStorageLzma(TestAzureCloudObjStorage):
compression = "lzma"
class TestAzureCloudObjStorageBz2(TestAzureCloudObjStorage):
compression = "bz2"
class TestPrefixedAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase):
def setUp(self):
super().setUp()
self.ContainerClient = get_MockContainerClient()
patcher = patch(
"swh.objstorage.backends.azure.ContainerClient", self.ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
patcher = patch(
"swh.objstorage.backends.azure.AsyncContainerClient", self.ContainerClient
)
patcher.start()
self.addCleanup(patcher.stop)
self.accounts = {}
for prefix in "0123456789abcdef":
self.accounts[prefix] = "https://bogus-container-url.example/" + prefix
- self.storage = get_objstorage("azure-prefixed", {"accounts": self.accounts})
+ self.storage = get_objstorage("azure-prefixed", accounts=self.accounts)
def test_prefixedazure_instantiation_missing_prefixes(self):
del self.accounts["d"]
del self.accounts["e"]
with self.assertRaisesRegex(ValueError, "Missing prefixes"):
- get_objstorage("azure-prefixed", {"accounts": self.accounts})
+ get_objstorage("azure-prefixed", accounts=self.accounts)
def test_prefixedazure_instantiation_inconsistent_prefixes(self):
self.accounts["00"] = self.accounts["0"]
with self.assertRaisesRegex(ValueError, "Inconsistent prefixes"):
- get_objstorage("azure-prefixed", {"accounts": self.accounts})
+ get_objstorage("azure-prefixed", accounts=self.accounts)
def test_prefixedazure_sharding_behavior(self):
for i in range(100):
content, obj_id = self.hash_content(b"test_content_%02d" % i)
self.storage.add(content, obj_id=obj_id)
hex_obj_id = hash_to_hex(obj_id)
prefix = hex_obj_id[0]
self.assertTrue(
self.ContainerClient(self.storage.container_urls[prefix])
.get_blob_client(hex_obj_id)
.get_blob_properties()
)
def test_get_container_url():
# r=read, l=list, w=write, d=delete
policy_map = {
"read_only": "rl",
"append_only": "rwl",
"full": "rwdl",
}
for policy, expected in policy_map.items():
ret = swh.objstorage.backends.azure.get_container_url(
account_name="account_name",
account_key=base64.b64encode(b"account_key"),
container_name="container_name",
access_policy=policy,
)
p = urlparse(ret)
assert p.scheme == "https"
assert p.netloc == "account_name.blob.core.windows.net"
assert p.path == "/container_name"
qs = parse_qs(p.query)
# sp: permissions
assert qs["sp"] == [expected]
# sr: resource (c=container)
assert qs["sr"] == ["c"]
# st: start; se: expiry
assert qs["st"][0] < qs["se"][0]
def test_bwcompat_args(monkeypatch):
monkeypatch.setattr(
swh.objstorage.backends.azure,
"ContainerClient",
get_MockContainerClient(),
)
with pytest.deprecated_call():
objs = get_objstorage(
"azure",
- {
- "account_name": "account_name",
- "api_secret_key": base64.b64encode(b"account_key"),
- "container_name": "container_name",
- },
+ account_name="account_name",
+ api_secret_key=base64.b64encode(b"account_key"),
+ container_name="container_name",
)
assert objs is not None
def test_bwcompat_args_prefixed(monkeypatch):
monkeypatch.setattr(
swh.objstorage.backends.azure,
"ContainerClient",
get_MockContainerClient(),
)
accounts = {
prefix: {
"account_name": f"account_name{prefix}",
"api_secret_key": base64.b64encode(b"account_key"),
"container_name": "container_name",
}
for prefix in "0123456789abcdef"
}
with pytest.deprecated_call():
- objs = get_objstorage("azure-prefixed", {"accounts": accounts})
+ objs = get_objstorage("azure-prefixed", accounts=accounts)
assert objs is not None
diff --git a/swh/objstorage/tests/test_objstorage_in_memory.py b/swh/objstorage/tests/test_objstorage_in_memory.py
index d152cf5..a7ab4f5 100644
--- a/swh/objstorage/tests/test_objstorage_in_memory.py
+++ b/swh/objstorage/tests/test_objstorage_in_memory.py
@@ -1,16 +1,16 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from swh.objstorage.factory import get_objstorage
from .objstorage_testing import ObjStorageTestFixture
class TestInMemoryObjStorage(ObjStorageTestFixture, unittest.TestCase):
def setUp(self):
super().setUp()
- self.storage = get_objstorage(cls="memory", args={})
+ self.storage = get_objstorage(cls="memory")
diff --git a/swh/objstorage/tests/test_objstorage_instantiation.py b/swh/objstorage/tests/test_objstorage_instantiation.py
index 6308718..370456a 100644
--- a/swh/objstorage/tests/test_objstorage_instantiation.py
+++ b/swh/objstorage/tests/test_objstorage_instantiation.py
@@ -1,40 +1,40 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import shutil
import tempfile
import unittest
from swh.objstorage.api.client import RemoteObjStorage
from swh.objstorage.backends.pathslicing import PathSlicingObjStorage
from swh.objstorage.factory import get_objstorage
class TestObjStorageInitialization(unittest.TestCase):
"""Test that the methods for ObjStorage initializations with
`get_objstorage` works properly.
"""
def setUp(self):
self.path = tempfile.mkdtemp()
self.path2 = tempfile.mkdtemp()
# Server is launched at self.url()
self.config = {"storage_base": self.path2, "storage_slicing": "0:1/0:5"}
super().setUp()
def tearDown(self):
super().tearDown()
shutil.rmtree(self.path)
shutil.rmtree(self.path2)
def test_pathslicing_objstorage(self):
- conf = {"cls": "pathslicing", "args": {"root": self.path, "slicing": "0:2/0:5"}}
+ conf = {"cls": "pathslicing", "root": self.path, "slicing": "0:2/0:5"}
st = get_objstorage(**conf)
self.assertTrue(isinstance(st, PathSlicingObjStorage))
def test_remote_objstorage(self):
- conf = {"cls": "remote", "args": {"url": "http://127.0.0.1:4242/"}}
+ conf = {"cls": "remote", "url": "http://127.0.0.1:4242/"}
st = get_objstorage(**conf)
self.assertTrue(isinstance(st, RemoteObjStorage))
diff --git a/swh/objstorage/tests/test_objstorage_pathslicing.py b/swh/objstorage/tests/test_objstorage_pathslicing.py
index 1ef6eaf..41a77cc 100644
--- a/swh/objstorage/tests/test_objstorage_pathslicing.py
+++ b/swh/objstorage/tests/test_objstorage_pathslicing.py
@@ -1,163 +1,161 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import shutil
import tempfile
import unittest
from unittest.mock import DEFAULT, patch
from swh.model import hashutil
from swh.objstorage import exc
from swh.objstorage.constants import ID_DIGEST_LENGTH
from swh.objstorage.factory import get_objstorage
from .objstorage_testing import ObjStorageTestFixture
class TestPathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase):
compression = "none"
def setUp(self):
super().setUp()
self.slicing = "0:2/2:4/4:6"
self.tmpdir = tempfile.mkdtemp()
self.storage = get_objstorage(
"pathslicing",
- {
- "root": self.tmpdir,
- "slicing": self.slicing,
- "compression": self.compression,
- },
+ root=self.tmpdir,
+ slicing=self.slicing,
+ compression=self.compression,
)
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmpdir)
def content_path(self, obj_id):
hex_obj_id = hashutil.hash_to_hex(obj_id)
return self.storage.slicer.get_path(hex_obj_id)
def test_iter(self):
content, obj_id = self.hash_content(b"iter")
self.assertEqual(list(iter(self.storage)), [])
self.storage.add(content, obj_id=obj_id)
self.assertEqual(list(iter(self.storage)), [obj_id])
def test_len(self):
content, obj_id = self.hash_content(b"len")
self.assertEqual(len(self.storage), 0)
self.storage.add(content, obj_id=obj_id)
self.assertEqual(len(self.storage), 1)
def test_check_ok(self):
content, obj_id = self.hash_content(b"check_ok")
self.storage.add(content, obj_id=obj_id)
assert self.storage.check(obj_id) is None
assert self.storage.check(obj_id.hex()) is None
def test_check_id_mismatch(self):
content, obj_id = self.hash_content(b"check_id_mismatch")
self.storage.add(b"unexpected content", obj_id=obj_id)
with self.assertRaises(exc.Error) as error:
self.storage.check(obj_id)
self.assertEqual(
(
"Corrupt object %s should have id "
"12ebb2d6c81395bcc5cab965bdff640110cb67ff" % obj_id.hex(),
),
error.exception.args,
)
def test_get_random_contents(self):
content, obj_id = self.hash_content(b"get_random_content")
self.storage.add(content, obj_id=obj_id)
random_contents = list(self.storage.get_random(1))
self.assertEqual(1, len(random_contents))
self.assertIn(obj_id, random_contents)
def test_iterate_from(self):
all_ids = []
for i in range(100):
content, obj_id = self.hash_content(b"content %d" % i)
self.storage.add(content, obj_id=obj_id)
all_ids.append(obj_id)
all_ids.sort()
ids = list(self.storage.iter_from(b"\x00" * ID_DIGEST_LENGTH))
self.assertEqual(len(ids), len(all_ids))
self.assertEqual(ids, all_ids)
ids = list(self.storage.iter_from(all_ids[0]))
self.assertEqual(len(ids), len(all_ids) - 1)
self.assertEqual(ids, all_ids[1:])
ids = list(self.storage.iter_from(all_ids[-1], n_leaf=True))
n_leaf = ids[-1]
ids = ids[:-1]
self.assertEqual(n_leaf, 1)
self.assertEqual(len(ids), 0)
ids = list(self.storage.iter_from(all_ids[-2], n_leaf=True))
n_leaf = ids[-1]
ids = ids[:-1]
self.assertEqual(n_leaf, 2) # beware, this depends on the hash algo
self.assertEqual(len(ids), 1)
self.assertEqual(ids, all_ids[-1:])
def test_fdatasync_default(self):
content, obj_id = self.hash_content(b"check_fdatasync")
with patch.multiple("os", fsync=DEFAULT, fdatasync=DEFAULT) as patched:
self.storage.add(content, obj_id=obj_id)
if self.storage.use_fdatasync:
assert patched["fdatasync"].call_count == 1
assert patched["fsync"].call_count == 0
else:
assert patched["fdatasync"].call_count == 0
assert patched["fsync"].call_count == 1
def test_fdatasync_forced_on(self):
self.storage.use_fdatasync = True
content, obj_id = self.hash_content(b"check_fdatasync")
with patch.multiple("os", fsync=DEFAULT, fdatasync=DEFAULT) as patched:
self.storage.add(content, obj_id=obj_id)
assert patched["fdatasync"].call_count == 1
assert patched["fsync"].call_count == 0
def test_fdatasync_forced_off(self):
self.storage.use_fdatasync = False
content, obj_id = self.hash_content(b"check_fdatasync")
with patch.multiple("os", fsync=DEFAULT, fdatasync=DEFAULT) as patched:
self.storage.add(content, obj_id=obj_id)
assert patched["fdatasync"].call_count == 0
assert patched["fsync"].call_count == 1
def test_check_not_compressed(self):
content, obj_id = self.hash_content(b"check_not_compressed")
self.storage.add(content, obj_id=obj_id)
with open(self.content_path(obj_id), "ab") as f: # Add garbage.
f.write(b"garbage")
with self.assertRaises(exc.Error) as error:
self.storage.check(obj_id)
if self.compression == "none":
self.assertIn("Corrupt object", error.exception.args[0])
else:
self.assertIn("trailing data found", error.exception.args[0])
class TestPathSlicingObjStorageGzip(TestPathSlicingObjStorage):
compression = "gzip"
class TestPathSlicingObjStorageZlib(TestPathSlicingObjStorage):
compression = "zlib"
class TestPathSlicingObjStorageBz2(TestPathSlicingObjStorage):
compression = "bz2"
class TestPathSlicingObjStorageLzma(TestPathSlicingObjStorage):
compression = "lzma"
diff --git a/swh/objstorage/tests/test_objstorage_random_generator.py b/swh/objstorage/tests/test_objstorage_random_generator.py
index 919440a..83f79b4 100644
--- a/swh/objstorage/tests/test_objstorage_random_generator.py
+++ b/swh/objstorage/tests/test_objstorage_random_generator.py
@@ -1,39 +1,39 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections.abc import Iterator
from swh.objstorage.factory import get_objstorage
def test_random_generator_objstorage():
- sto = get_objstorage("random", {})
+ sto = get_objstorage("random")
assert sto
blobs = [sto.get(None) for i in range(100)]
lengths = [len(x) for x in blobs]
assert max(lengths) <= 55056238
def test_random_generator_objstorage_list_content():
- sto = get_objstorage("random", {"total": 100})
+ sto = get_objstorage("random", total=100)
assert isinstance(sto.list_content(), Iterator)
assert list(sto.list_content()) == [b"%d" % i for i in range(1, 101)]
assert list(sto.list_content(limit=10)) == [b"%d" % i for i in range(1, 11)]
assert list(sto.list_content(last_obj_id=b"10", limit=10)) == [
b"%d" % i for i in range(11, 21)
]
def test_random_generator_objstorage_total():
- sto = get_objstorage("random", {"total": 5})
+ sto = get_objstorage("random", total=5)
assert len([x for x in sto]) == 5
def test_random_generator_objstorage_size():
- sto = get_objstorage("random", {"filesize": 10})
+ sto = get_objstorage("random", filesize=10)
for i in range(10):
assert len(sto.get(None)) == 10
diff --git a/swh/objstorage/tests/test_objstorage_striping.py b/swh/objstorage/tests/test_objstorage_striping.py
index a96f35c..86e2472 100644
--- a/swh/objstorage/tests/test_objstorage_striping.py
+++ b/swh/objstorage/tests/test_objstorage_striping.py
@@ -1,77 +1,71 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import tempfile
import unittest
from swh.objstorage.factory import get_objstorage
from .objstorage_testing import ObjStorageTestFixture
class TestStripingObjStorage(ObjStorageTestFixture, unittest.TestCase):
def setUp(self):
super().setUp()
self.base_dir = tempfile.mkdtemp()
os.mkdir(os.path.join(self.base_dir, "root1"))
os.mkdir(os.path.join(self.base_dir, "root2"))
storage_config = {
"cls": "striping",
- "args": {
- "objstorages": [
- {
- "cls": "pathslicing",
- "args": {
- "root": os.path.join(self.base_dir, "root1"),
- "slicing": "0:2",
- "allow_delete": True,
- },
- },
- {
- "cls": "pathslicing",
- "args": {
- "root": os.path.join(self.base_dir, "root2"),
- "slicing": "0:2",
- "allow_delete": True,
- },
- },
- ]
- },
+ "objstorages": [
+ {
+ "cls": "pathslicing",
+ "root": os.path.join(self.base_dir, "root1"),
+ "slicing": "0:2",
+ "allow_delete": True,
+ },
+ {
+ "cls": "pathslicing",
+ "root": os.path.join(self.base_dir, "root2"),
+ "slicing": "0:2",
+ "allow_delete": True,
+ },
+ ],
}
self.storage = get_objstorage(**storage_config)
def tearDown(self):
shutil.rmtree(self.base_dir)
def test_add_striping_behavior(self):
exp_storage_counts = [0, 0]
storage_counts = [0, 0]
for i in range(100):
content, obj_id = self.hash_content(b"striping_behavior_test%02d" % i)
self.storage.add(content, obj_id)
exp_storage_counts[self.storage.get_storage_index(obj_id)] += 1
count = 0
for i, storage in enumerate(self.storage.storages):
if obj_id not in storage:
continue
count += 1
storage_counts[i] += 1
self.assertEqual(count, 1)
self.assertEqual(storage_counts, exp_storage_counts)
def test_get_striping_behavior(self):
# Make sure we can read objects that are available in any backend
# storage
content, obj_id = self.hash_content(b"striping_behavior_test")
for storage in self.storage.storages:
storage.add(content, obj_id)
self.assertIn(obj_id, self.storage)
storage.delete(obj_id)
self.assertNotIn(obj_id, self.storage)
def test_list_content(self):
self.skipTest("Quite a chellenge to make it work")
diff --git a/swh/objstorage/tests/test_server.py b/swh/objstorage/tests/test_server.py
index 5dbde1d..cec2a62 100644
--- a/swh/objstorage/tests/test_server.py
+++ b/swh/objstorage/tests/test_server.py
@@ -1,126 +1,125 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import pytest
import yaml
from swh.objstorage.api.server import load_and_check_config
def prepare_config_file(tmpdir, content, name="config.yml"):
"""Prepare configuration file in `$tmpdir/name` with content `content`.
Args:
tmpdir (LocalPath): root directory
content (str/dict): Content of the file either as string or as a dict.
If a dict, converts the dict into a yaml string.
name (str): configuration filename
Returns
path (str) of the configuration file prepared.
"""
config_path = tmpdir / name
if isinstance(content, dict): # convert if needed
content = yaml.dump(content)
config_path.write_text(content, encoding="utf-8")
# pytest on python3.5 does not support LocalPath manipulation, so
# convert path to string
return str(config_path)
def test_load_and_check_config_no_configuration():
"""Inexistent configuration files raises"""
with pytest.raises(EnvironmentError, match="Configuration file must be defined"):
load_and_check_config(None)
config_path = "/indexer/inexistent/config.yml"
with pytest.raises(FileNotFoundError, match=f"{config_path} does not exist"):
load_and_check_config(config_path)
def test_load_and_check_config_invalid_configuration_toplevel(tmpdir):
"""Invalid configuration raises"""
config = {"something": "useless"}
config_path = prepare_config_file(tmpdir, content=config)
with pytest.raises(KeyError, match="missing objstorage config entry"):
load_and_check_config(config_path)
def test_load_and_check_config_invalid_configuration(tmpdir):
"""Invalid configuration raises"""
config_path = prepare_config_file(
tmpdir, content={"objstorage": {"something": "useless"}}
)
with pytest.raises(KeyError, match="missing cls config entry"):
load_and_check_config(config_path)
def test_load_and_check_config_invalid_configuration_level2(tmpdir):
"""Invalid configuration at 2nd level raises"""
config = {
"objstorage": {
"cls": "pathslicing",
- "args": {
- "root": "root",
- "slicing": "slicing",
- },
+ "root": "root",
+ "slicing": "slicing",
"client_max_size": "10",
}
}
for key in ("root", "slicing"):
c = copy.deepcopy(config)
- c["objstorage"]["args"].pop(key)
+ c["objstorage"].pop(key)
config_path = prepare_config_file(tmpdir, c)
with pytest.raises(KeyError, match=f"missing {key} config entry"):
load_and_check_config(config_path)
@pytest.mark.parametrize(
"config",
[
pytest.param(
{
"objstorage": {
"cls": "pathslicing",
- "args": {"root": "root", "slicing": "slicing"},
+ "root": "root",
+ "slicing": "slicing",
}
},
id="pathslicing-bw-compat",
),
pytest.param(
{
"objstorage": {
"cls": "pathslicing",
"root": "root",
"slicing": "slicing",
}
},
id="pathslicing",
),
pytest.param(
- {"client_max_size": "10", "objstorage": {"cls": "memory", "args": {}}},
+ {"client_max_size": "10", "objstorage": {"cls": "memory"}},
id="empty-args-bw-compat",
),
pytest.param(
{"client_max_size": "10", "objstorage": {"cls": "memory"}}, id="empty-args"
),
pytest.param(
{
"objstorage": {
"cls": "noop",
}
},
id="noop",
),
],
)
def test_load_and_check_config(tmpdir, config):
"""pathslicing configuration fine loads ok"""
config_path = prepare_config_file(tmpdir, config)
cfg = load_and_check_config(config_path)
assert cfg == config

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:33 PM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3275842

Event Timeline