diff --git a/test_objstorage.py b/test_objstorage.py index 021ce4d..7adb361 100644 --- a/test_objstorage.py +++ b/test_objstorage.py @@ -1,85 +1,105 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip import os import shutil import tempfile import unittest from io import BytesIO from nose.tools import istest -from swh.storage.objstorage import ObjStorage, DuplicateObjError +from swh.storage import objstorage -class Hashlib(unittest.TestCase): +class TestObjStorage(unittest.TestCase): def setUp(self): self.content = b'42\n' self.obj_id = 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd' self.obj_steps = ['d8', '1c', 'c0'] self.obj_relpath = os.path.join(*(self.obj_steps + [self.obj_id])) self.tmpdir = tempfile.mkdtemp() - self.storage = ObjStorage(root=self.tmpdir, depth=3) + self.obj_path = os.path.join(self.tmpdir, self.obj_relpath) + self.storage = objstorage.ObjStorage(root=self.tmpdir, depth=3) def tearDown(self): shutil.rmtree(self.tmpdir) def assertGzipContains(self, gzip_path, content): self.assertEqual(gzip.open(gzip_path, 'rb').read(), content) @istest def add_bytes_w_id(self): - obj_path = os.path.join(self.tmpdir, self.obj_relpath) self.storage.add_bytes(self.content, obj_id=self.obj_id) - self.assertTrue(os.path.isfile(obj_path)) - self.assertEqual(gzip.open(obj_path, 'rb').read(), self.content) - self.assertGzipContains(obj_path, self.content) + self.assertGzipContains(self.obj_path, self.content) @istest def add_bytes_wo_id(self): - obj_path = os.path.join(self.tmpdir, self.obj_relpath) self.storage.add_bytes(self.content) - self.assertTrue(os.path.isfile(obj_path)) - self.assertGzipContains(obj_path, self.content) + self.assertGzipContains(self.obj_path, self.content) @istest def add_file_w_id(self): - obj_path = os.path.join(self.tmpdir, self.obj_relpath) self.storage.add_file(BytesIO(self.content), len(self.content), obj_id=self.obj_id) - self.assertTrue(os.path.isfile(obj_path)) - self.assertGzipContains(obj_path, self.content) + self.assertGzipContains(self.obj_path, self.content) @istest def add_file_wo_id(self): - obj_path = os.path.join(self.tmpdir, self.obj_relpath) - self.storage.add_file(BytesIO(self.content), - len(self.content)) - self.assertTrue(os.path.isfile(obj_path)) - self.assertGzipContains(obj_path, self.content) + self.storage.add_file(BytesIO(self.content), len(self.content)) + self.assertGzipContains(self.obj_path, self.content) @istest def add_noclobber(self): self.storage.add_bytes(self.content) - with self.assertRaises(DuplicateObjError): + with self.assertRaises(objstorage.DuplicateObjError): self.storage.add_bytes(self.content) @istest def add_clobber(self): self.storage.add_bytes(self.content) try: self.storage.add_bytes(self.content, clobber=True) except: - self.fail('unexpected exception when clobbering') + self.fail('clobbering failed') @istest def has(self): self.storage.add_bytes(self.content) self.assertTrue(self.storage.has(self.obj_id)) self.assertFalse(self.storage.has( 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15')) + + @istest + def check_ok(self): + self.storage.add_bytes(self.content) + try: + self.storage.check(self.obj_id) + except: + self.fail('integrity check failed') + + @istest + def check_missing(self): + with self.assertRaises(objstorage.ObjNotFoundError): + self.storage.check(self.obj_id) + + @istest + def check_not_gzip(self): + self.storage.add_bytes(self.content) + with open(self.obj_path, 'ab') as f: # add trailing garbage + f.write(b'garbage') + with self.assertRaises(objstorage.ObjIntegrityError): + self.storage.check(self.obj_id) + + @istest + def check_id_mismatch(self): + self.storage.add_bytes(self.content) + with gzip.open(self.obj_path, 'wb') as f: # replace gzipped content + f.write(b'unexpected content') + with self.assertRaises(objstorage.ObjIntegrityError): + self.storage.check(self.obj_id)