diff --git a/swh/storage/checker/checker.py b/swh/storage/checker/checker.py index a3156c9d..71c291c2 100644 --- a/swh/storage/checker/checker.py +++ b/swh/storage/checker/checker.py @@ -1,136 +1,152 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from swh.core import config, hashutil from .. import get_storage from ..objstorage import ObjStorage from ..exc import ObjNotFoundError, Error DEFAULT_CONFIG = { 'storage_path': ('str', '/srv/softwareheritage/objects'), 'storage_depth': ('int', 3), 'backup_url': ('str', 'http://uffizi:5002/'), 'batch_size': ('int', 1000), } class ContentChecker(): """ Content integrity checker that will check local objstorage content. The checker will check the data of an object storage in order to verify that no file have been corrupted. Attributes: config: dictionary that contains this checker configuration objstorage (ObjStorage): Local object storage that will be checked. master_storage (RemoteStorage): A distant storage that will be used to restore corrupted content. """ - def __init__(self, config, root, depth, backup_url): + def __init__(self, config, root, depth, backup_urls): """ Create a checker that ensure the objstorage have no corrupted file. Args: config (dict): Dictionary that contains the following keys : batch_size: Number of content that should be tested each time the content checker runs. root (string): Path to the objstorage directory depth (int): Depth of the object storage. - master_url (string): Url of a storage that can be used to restore - content. + backup_urls: List of url that can be contacted in order to + get a content. """ self.config = config self.objstorage = ObjStorage(root, depth) - self.backup_storage = get_storage('remote_storage', [backup_url]) + self.backup_storages = [get_storage('remote_storage', [backup_url]) + for backup_url in backup_urls] def run(self): """ Start the check routine """ corrupted_contents = [] batch_size = self.config['batch_size'] for content_id in self.get_content_to_check(batch_size): if not self.check_content(content_id): corrupted_contents.append(content_id) logging.error('The content', content_id, 'have been corrupted') self.repair_contents(corrupted_contents) def get_content_to_check(self, batch_size): """ Get the content that should be verified. Returns: An iterable of the content's id that need to be checked. """ contents = self.objstorage.get_random_contents(batch_size) yield from contents def check_content(self, content_id): """ Check the validity of the given content. Returns: True if the content was valid, false if it was corrupted. """ try: self.objstorage.check(content_id) except (ObjNotFoundError, Error) as e: - logging.error(e) + logging.warning(e) return False else: return True def repair_contents(self, content_ids): """ Try to restore the given contents. + + Ask the backup storages for the contents that are corrupted on + the local object storage. + If the first storage does not contain the missing contents, send + a request to the second one with only the content that couldn't be + retrieved, and so on until there is no remaining content or servers. + + If a content couldn't be retrieved on all the servers, then log it as + an error. """ - # Retrieve the data of the corrupted contents from the master storage. - contents = self.backup_storage.content_get(content_ids) - contents_set = set(content_ids) - # Erase corrupted version with new safe one. - for content in contents: - if not content: - continue - data = content['data'] - contents_set.discard(content['sha1']) - self.objstorage.restore_bytes(data) - - if contents_set: - logging.error("Some corrupted contents could not be retrieved : %s" - % [hashutil.hash_to_hex(id) for id in contents_set]) + contents_to_get = set(content_ids) + # Iterates over the backup storages. + for backup_storage in self.backup_storages: + # Try to get all the contents that still need to be retrieved. + contents = backup_storage.content_get(list(contents_to_get)) + for content in contents: + if content: + hash = content['sha1'] + data = content['data'] + # When a content is retrieved, remove it from the set + # of needed contents. + contents_to_get.discard(hash) + self.objstorage.restore_bytes(data) + + # Contents still in contents_to_get couldn't be retrieved. + if contents_to_get: + logging.error( + "Some corrupted contents could not be retrieved : %s" + % [hashutil.hash_to_hex(id) for id in contents_to_get] + ) @click.command() @click.argument('config-path', required=1) @click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'][1], help='Path to the storage to verify') @click.option('--depth', default=DEFAULT_CONFIG['storage_depth'][1], type=click.INT, help='Depth of the object storage') @click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'][1], help='Url of a remote storage to retrieve corrupted content') def launch(config_path, storage_path, depth, backup_url): # The configuration have following priority : # command line > file config > default config cl_config = { 'storage_path': storage_path, 'storage_depth': depth, 'backup_url': backup_url } conf = config.read(config_path, DEFAULT_CONFIG) conf.update(cl_config) # Create the checker and run checker = ContentChecker( {'batch_size': conf['batch_size']}, conf['storage_path'], conf['storage_depth'], - conf['backup_url'] + map(lambda x: x.strip(), conf['backup_url'].split(',')) ) checker.run() if __name__ == '__main__': launch() diff --git a/swh/storage/tests/test_checker.py b/swh/storage/tests/test_checker.py index af28e50b..6a5ff9ce 100644 --- a/swh/storage/tests/test_checker.py +++ b/swh/storage/tests/test_checker.py @@ -1,96 +1,110 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip import tempfile import unittest from nose.tools import istest from nose.plugins.attrib import attr from swh.core import hashutil from swh.storage.objstorage.objstorage import _obj_path from swh.storage.checker.checker import ContentChecker class MockBackupStorage(): def __init__(self): self.values = {} def content_add(self, id, value): self.values[id] = value def content_get(self, ids): for id in ids: try: data = self.values[id] except KeyError: yield None continue yield {'sha1': id, 'data': data} @attr('fs') class TestChecker(unittest.TestCase): """ Test the content integrity checker """ def setUp(self): super().setUp() # Connect to an objstorage config = {'batch_size': 10} path = tempfile.mkdtemp() depth = 3 self.checker = ContentChecker(config, path, depth, 'http://None') - self.checker.backup_storage = MockBackupStorage() + self.checker.backup_storages = [MockBackupStorage(), + MockBackupStorage()] def corrupt_content(self, id): """ Make the given content invalid. """ hex_id = hashutil.hash_to_hex(id) file_path = _obj_path(hex_id, self.checker.objstorage._root_dir, 3) with gzip.open(file_path, 'wb') as f: f.write(b'Unexpected content') @istest def check_valid_content(self): # Check that a valid content is valid. content = b'check_valid_content' id = self.checker.objstorage.add_bytes(content) self.assertTrue(self.checker.check_content(id)) @istest def check_invalid_content(self): # Check that an invalid content is noticed. content = b'check_invalid_content' id = self.checker.objstorage.add_bytes(content) self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) @istest - def repair_content_present(self): + def repair_content_present_first(self): # Try to repair a content that is in the backup storage. - content = b'repair_content_present' + content = b'repair_content_present_first' + id = self.checker.objstorage.add_bytes(content) + # Add a content to the mock + self.checker.backup_storages[0].content_add(id, content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertTrue(self.checker.check_content(id)) + + @istest + def repair_content_present_second(self): + # Try to repair a content that is not in the first backup storage. + content = b'repair_content_present_second' id = self.checker.objstorage.add_bytes(content) # Add a content to the mock - self.checker.backup_storage.content_add(id, content) + self.checker.backup_storages[1].content_add(id, content) # Corrupt and repair it. self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) self.checker.repair_contents([id]) self.assertTrue(self.checker.check_content(id)) @istest def repair_content_missing(self): # Try to repair a content that is NOT in the backup storage. content = b'repair_content_present' id = self.checker.objstorage.add_bytes(content) # Corrupt and repair it. self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) self.checker.repair_contents([id]) self.assertFalse(self.checker.check_content(id))