Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/tests/test_objstorage_seaweedfs.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from itertools import dropwhile, islice | from itertools import dropwhile, islice | ||||
import json | import json | ||||
import os | import os | ||||
import unittest | import unittest | ||||
from urllib.parse import urlparse | from urllib.parse import urlparse | ||||
from requests.utils import get_encoding_from_headers | from requests.utils import get_encoding_from_headers | ||||
import requests_mock | import requests_mock | ||||
from requests_mock.contrib import fixture | from requests_mock.contrib import fixture | ||||
from swh.objstorage.backends.seaweed import WeedObjStorage | |||||
from swh.objstorage.exc import Error | from swh.objstorage.exc import Error | ||||
from swh.objstorage.factory import get_objstorage | |||||
from swh.objstorage.objstorage import decompressors | from swh.objstorage.objstorage import decompressors | ||||
from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture | from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture | ||||
class FilerRequestsMock: | class FilerRequestsMock: | ||||
"""This is a requests_mock based mock for the seaweedfs Filer API | """This is a requests_mock based mock for the seaweedfs Filer API | ||||
It does not implement the whole API, only the parts required to make the | It does not implement the whole API, only the parts required to make the | ||||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | |||||
class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase): | class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase): | ||||
compression = "none" | compression = "none" | ||||
url = "http://127.0.0.1/test/" | url = "http://127.0.0.1/test/" | ||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
self.storage = WeedObjStorage(url=self.url, compression=self.compression) | self.storage = get_objstorage( | ||||
cls="seaweedfs", url=self.url, compression=self.compression | |||||
) | |||||
self.mock = FilerRequestsMock(baseurl=self.url) | self.mock = FilerRequestsMock(baseurl=self.url) | ||||
def test_compression(self): | def test_compression(self): | ||||
content, obj_id = self.hash_content(b"test compression") | content, obj_id = self.hash_content(b"test compression") | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
raw_content = self.storage.wf.get(self.storage._path(obj_id)) | raw_content = self.storage.wf.get(self.storage._path(obj_id)) | ||||
Show All 31 Lines |