diff --git a/swh/storage/checker/checker.py b/swh/storage/checker/checker.py index 110d72ea5..69afe4f62 100644 --- a/swh/storage/checker/checker.py +++ b/swh/storage/checker/checker.py @@ -1,171 +1,171 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from swh.core import config, hashutil from .. import get_storage from ..objstorage import PathSlicingObjStorage from ..exc import ObjNotFoundError, Error DEFAULT_CONFIG = { 'storage_path': ('str', '/srv/softwareheritage/objects'), 'storage_depth': ('int', 3), 'backup_url': ('str', 'http://uffizi:5002/'), 'batch_size': ('int', 1000), } class ContentChecker(): """ Content integrity checker that will check local objstorage content. The checker will check the data of an object storage in order to verify that no file have been corrupted. Attributes: config: dictionary that contains this checker configuration objstorage (ObjStorage): Local object storage that will be checked. master_storage (RemoteStorage): A distant storage that will be used to restore corrupted content. """ - def __init__(self, config, root, depth, backup_urls): + def __init__(self, config, root, slicing, backup_urls): """ Create a checker that ensure the objstorage have no corrupted file. Args: config (dict): Dictionary that contains the following keys : batch_size: Number of content that should be tested each time the content checker runs. root (string): Path to the objstorage directory depth (int): Depth of the object storage. backup_urls: List of url that can be contacted in order to get a content. """ self.config = config - self.objstorage = PathSlicingObjStorage(root, depth, slicing=2) + self.objstorage = PathSlicingObjStorage(root, slicing) self.backup_storages = [get_storage('remote_storage', [backup_url]) for backup_url in backup_urls] def run(self): """ Start the check routine """ corrupted_contents = [] batch_size = self.config['batch_size'] for content_id in self.get_content_to_check(batch_size): if not self.check_content(content_id): corrupted_contents.append(content_id) logging.error('The content', content_id, 'have been corrupted') self.repair_contents(corrupted_contents) def run_as_daemon(self): """ Start the check routine and perform it forever. Use this method to run the checker when it's done as a daemon that will iterate over the content forever in background. """ while True: try: self.run() except Exception as e: logging.error('An error occured while verifing the content: %s' % e) def get_content_to_check(self, batch_size): """ Get the content that should be verified. Returns: An iterable of the content's id that need to be checked. """ contents = self.objstorage.get_random_contents(batch_size) yield from contents def check_content(self, content_id): """ Check the validity of the given content. Returns: True if the content was valid, false if it was corrupted. """ try: self.objstorage.check(content_id) except (ObjNotFoundError, Error) as e: logging.warning(e) return False else: return True def repair_contents(self, content_ids): """ Try to restore the given contents. Ask the backup storages for the contents that are corrupted on the local object storage. If the first storage does not contain the missing contents, send a request to the second one with only the content that couldn't be retrieved, and so on until there is no remaining content or servers. If a content couldn't be retrieved on all the servers, then log it as an error. """ contents_to_get = set(content_ids) # Iterates over the backup storages. for backup_storage in self.backup_storages: # Try to get all the contents that still need to be retrieved. contents = backup_storage.content_get(list(contents_to_get)) for content in contents: if content: hash = content['sha1'] data = content['data'] # When a content is retrieved, remove it from the set # of needed contents. contents_to_get.discard(hash) self.objstorage.restore(data) # Contents still in contents_to_get couldn't be retrieved. if contents_to_get: logging.error( "Some corrupted contents could not be retrieved : %s" % [hashutil.hash_to_hex(id) for id in contents_to_get] ) @click.command() @click.argument('config-path', required=1) @click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'][1], help='Path to the storage to verify') @click.option('--depth', default=DEFAULT_CONFIG['storage_depth'][1], type=click.INT, help='Depth of the object storage') @click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'][1], help='Url of a remote storage to retrieve corrupted content') @click.option('--daemon/--nodaemon', default=True, help='Indicates if the checker should run forever ' 'or on a single batch of content') def launch(config_path, storage_path, depth, backup_url, is_daemon): # The configuration have following priority : # command line > file config > default config cl_config = { 'storage_path': storage_path, 'storage_depth': depth, 'backup_url': backup_url } conf = config.read(config_path, DEFAULT_CONFIG) conf.update(cl_config) # Create the checker and run checker = ContentChecker( {'batch_size': conf['batch_size']}, conf['storage_path'], conf['storage_depth'], map(lambda x: x.strip(), conf['backup_url'].split(',')) ) if is_daemon: checker.run_as_daemon() else: checker.run() if __name__ == '__main__': launch() diff --git a/swh/storage/tests/test_checker.py b/swh/storage/tests/test_checker.py index 95e96a1f9..3069abe5c 100644 --- a/swh/storage/tests/test_checker.py +++ b/swh/storage/tests/test_checker.py @@ -1,128 +1,128 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip import tempfile import unittest from nose.tools import istest from nose.plugins.attrib import attr from swh.core import hashutil from swh.storage.checker.checker import ContentChecker class MockBackupStorage(): def __init__(self): self.values = {} def content_add(self, id, value): self.values[id] = value def content_get(self, ids): for id in ids: try: data = self.values[id] except KeyError: yield None continue yield {'sha1': id, 'data': data} @attr('fs') class TestChecker(unittest.TestCase): """ Test the content integrity checker """ def setUp(self): super().setUp() # Connect to an objstorage config = {'batch_size': 10} path = tempfile.mkdtemp() - depth = 3 - self.checker = ContentChecker(config, path, depth, 'http://None') + slicing = '0:2/2:4/4:6' + self.checker = ContentChecker(config, path, slicing, 'http://None') self.checker.backup_storages = [MockBackupStorage(), MockBackupStorage()] def corrupt_content(self, id): """ Make the given content invalid. """ hex_id = hashutil.hash_to_hex(id) file_path = self.checker.objstorage._obj_path(hex_id) with gzip.open(file_path, 'wb') as f: f.write(b'Unexpected content') @istest def check_valid_content(self): # Check that a valid content is valid. content = b'check_valid_content' id = self.checker.objstorage.add(content) self.assertTrue(self.checker.check_content(id)) @istest def check_invalid_content(self): # Check that an invalid content is noticed. content = b'check_invalid_content' id = self.checker.objstorage.add(content) self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) @istest def repair_content_present_first(self): # Try to repair a content that is in the backup storage. content = b'repair_content_present_first' id = self.checker.objstorage.add(content) # Add a content to the mock self.checker.backup_storages[0].content_add(id, content) # Corrupt and repair it. self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) self.checker.repair_contents([id]) self.assertTrue(self.checker.check_content(id)) @istest def repair_content_present_second(self): # Try to repair a content that is not in the first backup storage. content = b'repair_content_present_second' id = self.checker.objstorage.add(content) # Add a content to the mock self.checker.backup_storages[1].content_add(id, content) # Corrupt and repair it. self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) self.checker.repair_contents([id]) self.assertTrue(self.checker.check_content(id)) @istest def repair_content_present_distributed(self): # Try to repair two contents that are in separate backup storages. content1 = b'repair_content_present_distributed_2' content2 = b'repair_content_present_distributed_1' id1 = self.checker.objstorage.add(content1) id2 = self.checker.objstorage.add(content2) # Add content to the mock. self.checker.backup_storages[0].content_add(id1, content1) self.checker.backup_storages[0].content_add(id2, content2) # Corrupt and repair it self.corrupt_content(id1) self.corrupt_content(id2) self.assertFalse(self.checker.check_content(id1)) self.assertFalse(self.checker.check_content(id2)) self.checker.repair_contents([id1, id2]) self.assertTrue(self.checker.check_content(id1)) self.assertTrue(self.checker.check_content(id2)) @istest def repair_content_missing(self): # Try to repair a content that is NOT in the backup storage. content = b'repair_content_present' id = self.checker.objstorage.add(content) # Corrupt and repair it. self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) self.checker.repair_contents([id]) self.assertFalse(self.checker.check_content(id))