Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/tests/test_objstorage_pathslicing.py
# Copyright (C) 2015-2017 The Software Heritage developers | # Copyright (C) 2015-2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import shutil | import shutil | ||||
import tempfile | import tempfile | ||||
import unittest | import unittest | ||||
from unittest.mock import patch, DEFAULT | from unittest.mock import patch, DEFAULT | ||||
from typing import Optional | |||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.objstorage import exc, get_objstorage, ID_HASH_LENGTH | from swh.objstorage import exc, get_objstorage, ID_HASH_LENGTH | ||||
from .objstorage_testing import ObjStorageTestFixture | from .objstorage_testing import ObjStorageTestFixture | ||||
class TestPathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase): | class TestPathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase): | ||||
compression = None # type: Optional[str] | compression = 'none' | ||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
self.slicing = '0:2/2:4/4:6' | self.slicing = '0:2/2:4/4:6' | ||||
self.tmpdir = tempfile.mkdtemp() | self.tmpdir = tempfile.mkdtemp() | ||||
self.storage = get_objstorage( | self.storage = get_objstorage( | ||||
'pathslicing', { | 'pathslicing', { | ||||
'root': self.tmpdir, | 'root': self.tmpdir, | ||||
▲ Show 20 Lines • Show All 103 Lines • ▼ Show 20 Lines | class TestPathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase): | ||||
def test_check_not_compressed(self): | def test_check_not_compressed(self): | ||||
content, obj_id = self.hash_content(b'check_not_compressed') | content, obj_id = self.hash_content(b'check_not_compressed') | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
with open(self.content_path(obj_id), 'ab') as f: # Add garbage. | with open(self.content_path(obj_id), 'ab') as f: # Add garbage. | ||||
f.write(b'garbage') | f.write(b'garbage') | ||||
with self.assertRaises(exc.Error) as error: | with self.assertRaises(exc.Error) as error: | ||||
self.storage.check(obj_id) | self.storage.check(obj_id) | ||||
if self.compression is None: | if self.compression == 'none': | ||||
self.assertIn('Corrupt object', error.exception.args[0]) | self.assertIn('Corrupt object', error.exception.args[0]) | ||||
else: | else: | ||||
self.assertIn('trailing data found', error.exception.args[0]) | self.assertIn('trailing data found', error.exception.args[0]) | ||||
class TestPathSlicingObjStorageGzip(TestPathSlicingObjStorage): | class TestPathSlicingObjStorageGzip(TestPathSlicingObjStorage): | ||||
compression = 'gzip' | compression = 'gzip' | ||||
Show All 11 Lines |