diff --git a/swh/objstorage/checker.py b/swh/objstorage/checker.py new file mode 100644 index 0000000..f532af8 --- /dev/null +++ b/swh/objstorage/checker.py @@ -0,0 +1,172 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +import logging + +from swh.core import config, hashutil +from swh.storage import get_storage + +from .objstorage_pathslicing import PathSlicingObjStorage +from .exc import ObjNotFoundError, Error + +DEFAULT_CONFIG = { + 'storage_path': ('str', '/srv/softwareheritage/objects'), + 'storage_depth': ('int', 3), + 'backup_url': ('str', 'http://uffizi:5002/'), + + 'batch_size': ('int', 1000), +} + + +class ContentChecker(): + """ Content integrity checker that will check local objstorage content. + + The checker will check the data of an object storage in order to verify + that no file have been corrupted. + + Attributes: + config: dictionary that contains this + checker configuration + objstorage (ObjStorage): Local object storage that will be checked. + master_storage (RemoteStorage): A distant storage that will be used to + restore corrupted content. + """ + + def __init__(self, config, root, slicing, backup_urls): + """ Create a checker that ensure the objstorage have no corrupted file. + + Args: + config (dict): Dictionary that contains the following keys : + batch_size: Number of content that should be tested each + time the content checker runs. + root (string): Path to the objstorage directory + depth (int): Depth of the object storage. + backup_urls: List of url that can be contacted in order to + get a content. + """ + self.config = config + self.objstorage = PathSlicingObjStorage(root, slicing) + self.backup_storages = [get_storage('remote_storage', [backup_url]) + for backup_url in backup_urls] + + def run(self): + """ Start the check routine + """ + corrupted_contents = [] + batch_size = self.config['batch_size'] + + for content_id in self.get_content_to_check(batch_size): + if not self.check_content(content_id): + corrupted_contents.append(content_id) + logging.error('The content', content_id, 'have been corrupted') + + self.repair_contents(corrupted_contents) + + def run_as_daemon(self): + """ Start the check routine and perform it forever. + + Use this method to run the checker when it's done as a daemon that + will iterate over the content forever in background. + """ + while True: + try: + self.run() + except Exception as e: + logging.error('An error occured while verifing the content: %s' + % e) + + def get_content_to_check(self, batch_size): + """ Get the content that should be verified. + + Returns: + An iterable of the content's id that need to be checked. + """ + contents = self.objstorage.get_random_contents(batch_size) + yield from contents + + def check_content(self, content_id): + """ Check the validity of the given content. + + Returns: + True if the content was valid, false if it was corrupted. + """ + try: + self.objstorage.check(content_id) + except (ObjNotFoundError, Error) as e: + logging.warning(e) + return False + else: + return True + + def repair_contents(self, content_ids): + """ Try to restore the given contents. + + Ask the backup storages for the contents that are corrupted on + the local object storage. + If the first storage does not contain the missing contents, send + a request to the second one with only the content that couldn't be + retrieved, and so on until there is no remaining content or servers. + + If a content couldn't be retrieved on all the servers, then log it as + an error. + """ + contents_to_get = set(content_ids) + # Iterates over the backup storages. + for backup_storage in self.backup_storages: + # Try to get all the contents that still need to be retrieved. + contents = backup_storage.content_get(list(contents_to_get)) + for content in contents: + if content: + hash = content['sha1'] + data = content['data'] + # When a content is retrieved, remove it from the set + # of needed contents. + contents_to_get.discard(hash) + self.objstorage.restore(data) + + # Contents still in contents_to_get couldn't be retrieved. + if contents_to_get: + logging.error( + "Some corrupted contents could not be retrieved : %s" + % [hashutil.hash_to_hex(id) for id in contents_to_get] + ) + + +@click.command() +@click.argument('config-path', required=1) +@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'][1], + help='Path to the storage to verify') +@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'][1], + type=click.INT, help='Depth of the object storage') +@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'][1], + help='Url of a remote storage to retrieve corrupted content') +@click.option('--daemon/--nodaemon', default=True, + help='Indicates if the checker should run forever ' + 'or on a single batch of content') +def launch(config_path, storage_path, depth, backup_url, is_daemon): + # The configuration have following priority : + # command line > file config > default config + cl_config = { + 'storage_path': storage_path, + 'storage_depth': depth, + 'backup_url': backup_url + } + conf = config.read(config_path, DEFAULT_CONFIG) + conf.update(cl_config) + # Create the checker and run + checker = ContentChecker( + {'batch_size': conf['batch_size']}, + conf['storage_path'], + conf['storage_depth'], + map(lambda x: x.strip(), conf['backup_url'].split(',')) + ) + if is_daemon: + checker.run_as_daemon() + else: + checker.run() + +if __name__ == '__main__': + launch() diff --git a/swh/objstorage/tests/test_checker.py b/swh/objstorage/tests/test_checker.py new file mode 100644 index 0000000..925ad99 --- /dev/null +++ b/swh/objstorage/tests/test_checker.py @@ -0,0 +1,128 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import gzip +import tempfile +import unittest + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.objstorage.checker import ContentChecker + + +class MockBackupStorage(): + + def __init__(self): + self.values = {} + + def content_add(self, id, value): + self.values[id] = value + + def content_get(self, ids): + for id in ids: + try: + data = self.values[id] + except KeyError: + yield None + continue + + yield {'sha1': id, 'data': data} + + +@attr('fs') +class TestChecker(unittest.TestCase): + """ Test the content integrity checker + """ + + def setUp(self): + super().setUp() + # Connect to an objstorage + config = {'batch_size': 10} + path = tempfile.mkdtemp() + slicing = '0:2/2:4/4:6' + self.checker = ContentChecker(config, path, slicing, 'http://None') + self.checker.backup_storages = [MockBackupStorage(), + MockBackupStorage()] + + def corrupt_content(self, id): + """ Make the given content invalid. + """ + hex_id = hashutil.hash_to_hex(id) + file_path = self.checker.objstorage._obj_path(hex_id) + with gzip.open(file_path, 'wb') as f: + f.write(b'Unexpected content') + + @istest + def check_valid_content(self): + # Check that a valid content is valid. + content = b'check_valid_content' + id = self.checker.objstorage.add(content) + self.assertTrue(self.checker.check_content(id)) + + @istest + def check_invalid_content(self): + # Check that an invalid content is noticed. + content = b'check_invalid_content' + id = self.checker.objstorage.add(content) + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + + @istest + def repair_content_present_first(self): + # Try to repair a content that is in the backup storage. + content = b'repair_content_present_first' + id = self.checker.objstorage.add(content) + # Add a content to the mock + self.checker.backup_storages[0].content_add(id, content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertTrue(self.checker.check_content(id)) + + @istest + def repair_content_present_second(self): + # Try to repair a content that is not in the first backup storage. + content = b'repair_content_present_second' + id = self.checker.objstorage.add(content) + # Add a content to the mock + self.checker.backup_storages[1].content_add(id, content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertTrue(self.checker.check_content(id)) + + @istest + def repair_content_present_distributed(self): + # Try to repair two contents that are in separate backup storages. + content1 = b'repair_content_present_distributed_2' + content2 = b'repair_content_present_distributed_1' + id1 = self.checker.objstorage.add(content1) + id2 = self.checker.objstorage.add(content2) + # Add content to the mock. + self.checker.backup_storages[0].content_add(id1, content1) + self.checker.backup_storages[0].content_add(id2, content2) + # Corrupt and repair it + self.corrupt_content(id1) + self.corrupt_content(id2) + self.assertFalse(self.checker.check_content(id1)) + self.assertFalse(self.checker.check_content(id2)) + self.checker.repair_contents([id1, id2]) + self.assertTrue(self.checker.check_content(id1)) + self.assertTrue(self.checker.check_content(id2)) + + @istest + def repair_content_missing(self): + # Try to repair a content that is NOT in the backup storage. + content = b'repair_content_present' + id = self.checker.objstorage.add(content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertFalse(self.checker.check_content(id))