diff --git a/swh/storage/checker/__init__.py b/swh/storage/checker/__init__.py new file mode 100644 diff --git a/swh/storage/checker/checker.py b/swh/storage/checker/checker.py new file mode 100644 --- /dev/null +++ b/swh/storage/checker/checker.py @@ -0,0 +1,130 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +import logging + +from swh.core import config +from .. import get_storage +from ..objstorage import ObjStorage +from ..exc import ObjNotFoundError, Error + +DEFAULT_CONFIG = { + 'storage_path': ('str', '/srv/softwareheritage/objects'), + 'storage_depth': ('int', 3), + 'backup_url': ('str', 'http://uffizi:5002'), + + 'batch_size': ('int', 1000), +} + + +class ContentChecker(): + """ Content integrity checker that will check local objstorage content. + + The checker will check the data of an object storage in order to verify + that no file have been corrupted. + + Attributes: + config: dictionary that contains this + checker configuration + objstorage (ObjStorage): Local object storage that will be checked. + master_storage (RemoteStorage): A distant storage that will be used to + restore corrupted content. + """ + + def __init__(self, config, root, depth, backup_url): + """ Create a checker that ensure the objstorage have no corrupted file. + + Args: + config (dict): Dictionary that contains the following keys : + batch_size: Number of content that should be tested each + time the content checker runs. + root (string): Path to the objstorage directory + depth (int): Depth of the object storage. + master_url (string): Url of a storage that can be used to restore + content. + """ + self.config = config + self.objstorage = ObjStorage(root, depth) + self.backup_storage = get_storage('remote_storage', [backup_url]) + + def run(self): + """ Start the check routine + """ + corrupted_contents = [] + batch_size = self.config['batch_size'] + + for content_id in self.get_content_to_check(batch_size): + if not self.check_content(content_id): + self.invalidate_content(content_id) + corrupted_contents.append(content_id) + logging.error('The content', content_id, 'have been corrupted') + + self.repair_contents(corrupted_contents) + + def get_content_to_check(self, batch_size): + """ Get the content that should be verified. + + Returns: + An iterable of the content's id that need to be checked. + """ + # FIXME find a statelss method to choose what content should be checked + it = iter(self.objstorage) + return [it.__next__() for i in range(batch_size)] + + def check_content(self, content_id): + """ Check the validity of the given content. + + Returns: + True if the content was valid, false if it was corrupted. + """ + try: + self.objstorage.check(content_id) + except (ObjNotFoundError, Error) as e: + print(e) + return False + else: + return True + + def repair_contents(self, content_ids): + """ Try to restore the given contents. + """ + # Retrieve the data of the corrupted contents from the master storage. + contents = self.backup_storage.content_get(content_ids) + # Erase corrupted version with new safe one. + for content in contents: + if not content: + continue + data = content['data'] + self.objstorage.restore_bytes(data) + print(list(self.objstorage)) + + +@click.command() +@click.argument('config-path', required=1) +@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'], + help='Path to the storage to verify') +@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'], + type=click.INT, help='Depth of the object storage') +@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'], + help='Url of a remote storage to retrieve corrupted content') +def launch(config_path, storage_path, depth, backup_url): + # The configuration have following priority : + # command line > file config > default config + cl_config = { + 'storage_path': storage_path, + 'storage_depth': depth, + 'backup_url': backup_url + } + conf = config.read(config_path, DEFAULT_CONFIG) + conf.update(cl_config) + # Create the checker and run + checker = ContentChecker( + {'batch_size': conf['batch_size']}, + conf['storage_path'], + conf['depth'], + conf['backup_url'] + ) + checker.run() diff --git a/swh/storage/objstorage/objstorage.py b/swh/storage/objstorage/objstorage.py --- a/swh/storage/objstorage/objstorage.py +++ b/swh/storage/objstorage/objstorage.py @@ -40,7 +40,7 @@ # compute [depth] substrings of [obj_id], each of length 2, starting from # the beginning - id_steps = [hex_obj_id[i*2:i*2+2] for i in range(0, depth)] + id_steps = [hex_obj_id[i * 2:i * 2 + 2] for i in range(0, depth)] steps = [root_dir] + id_steps return os.path.join(*steps) @@ -161,7 +161,7 @@ return os.path.exists(_obj_path(hex_obj_id, self._root_dir, self._depth)) - def add_bytes(self, bytes, obj_id=None): + def add_bytes(self, bytes, obj_id=None, check_presence=True): """add a new object to the object storage Args: @@ -169,7 +169,8 @@ obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. - + check_presence: indicate if the presence of the content should be + verified before adding the file. """ if obj_id is None: # missing checksum, compute it in memory and write to file @@ -177,7 +178,7 @@ h.update(bytes) obj_id = h.digest() - if obj_id in self: + if check_presence and obj_id in self: return obj_id hex_obj_id = hashutil.hash_to_hex(obj_id) @@ -190,6 +191,20 @@ return obj_id + def restore_bytes(self, bytes, obj_id=None): + """ Restaure a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + Args: + bytes: content of the object to be added to the storage + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + """ + return self.add_bytes(bytes, obj_id, check_presence=False) + def add_file(self, f, length, obj_id=None): """similar to `add_bytes`, but add the content of file-like object f to the object storage diff --git a/swh/storage/tests/test_checker.py b/swh/storage/tests/test_checker.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_checker.py @@ -0,0 +1,96 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import gzip +import tempfile +import unittest + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.storage.objstorage.objstorage import _obj_path +from swh.storage.checker.checker import ContentChecker + + +class MockBackupStorage(): + + def __init__(self): + self.values = {} + + def content_add(self, id, value): + self.values[id] = value + + def content_get(self, ids): + for id in ids: + try: + data = self.values[id] + except KeyError: + yield None + continue + + yield {'sha1': id, 'data': data} + + +@attr('fs') +class TestChecker(unittest.TestCase): + """ Test the content integrity checker + """ + + def setUp(self): + super().setUp() + # Connect to an objstorage + config = {'batch_size': 10} + path = tempfile.mkdtemp() + depth = 3 + self.checker = ContentChecker(config, path, depth, 'http://None') + self.checker.backup_storage = MockBackupStorage() + + def corrupt_content(self, id): + """ Make the given content invalid. + """ + hex_id = hashutil.hash_to_hex(id) + file_path = _obj_path(hex_id, self.checker.objstorage._root_dir, 3) + with gzip.open(file_path, 'wb') as f: + f.write(b'Unexpected content') + + @istest + def check_valid_content(self): + # Check that a valid content is valid. + content = b'check_valid_content' + id = self.checker.objstorage.add_bytes(content) + self.assertTrue(self.checker.check_content(id)) + + @istest + def check_invalid_content(self): + # Check that an invalid content is noticed. + content = b'check_invalid_content' + id = self.checker.objstorage.add_bytes(content) + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + + @istest + def repair_content_present(self): + # Try to repair a content that is in the backup storage. + content = b'repair_content_present' + id = self.checker.objstorage.add_bytes(content) + # Add a content to the mock + self.checker.backup_storage.content_add(id, content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertTrue(self.checker.check_content(id)) + + @istest + def repair_content_missing(self): + # Try to repair a content that is NOT in the backup storage. + content = b'repair_content_present' + id = self.checker.objstorage.add_bytes(content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertFalse(self.checker.check_content(id))