diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -4,9 +4,7 @@ from .multiplexer import MultiplexerObjStorage from .multiplexer.filter import add_filters -# TODO remove PathSlicingObjStorage from this list once the config -# loading will be updated and no hardcoded objstorage types should -# remains. + __all__ = ['get_objstorage', 'ObjStorage'] _STORAGE_CLASSES = { diff --git a/swh/objstorage/checker.py b/swh/objstorage/checker.py --- a/swh/objstorage/checker.py +++ b/swh/objstorage/checker.py @@ -3,166 +3,234 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import abc import click import logging -from swh.core import config, hashutil -from swh.storage import get_storage +from swh.core import config +from swh.storage.archiver.storage import ArchiverStorage -from .objstorage_pathslicing import PathSlicingObjStorage +from . import get_objstorage from .exc import ObjNotFoundError, Error -DEFAULT_CONFIG = { - 'storage_path': ('str', '/srv/softwareheritage/objects'), - 'storage_depth': ('int', 3), - 'backup_url': ('str', 'http://uffizi:5002/'), - 'batch_size': ('int', 1000), -} +class BaseContentChecker(config.SWHConfig, metaclass=abc.ABCMeta): + """ Abstract class of the content integrity checker. - -class ContentChecker(): - """ Content integrity checker that will check local objstorage content. - - The checker will check the data of an object storage in order to verify - that no file have been corrupted. - - Attributes: - config: dictionary that contains this - checker configuration - objstorage (ObjStorage): Local object storage that will be checked. - master_storage (RemoteStorage): A distant storage that will be used to - restore corrupted content. + This checker's purpose is to iterate over the contents of a storage and + check the integrity of each file. + Behavior of the checker to deal with corrupted status will be specified + by subclasses. """ - def __init__(self, config, root, slicing, backup_urls): - """ Create a checker that ensure the objstorage have no corrupted file. - - Args: - config (dict): Dictionary that contains the following keys : - batch_size: Number of content that should be tested each - time the content checker runs. - root (string): Path to the objstorage directory - depth (int): Depth of the object storage. - backup_urls: List of url that can be contacted in order to - get a content. - """ - self.config = config - self.objstorage = PathSlicingObjStorage(root, slicing) - self.backup_storages = [get_storage('remote_storage', [backup_url]) - for backup_url in backup_urls] + DEFAULT_CONFIG = { + 'storage': ('dict', + {'cls': 'pathslicing', + 'args': {'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6'}}), + 'batch_size': ('int', 1000), + } + CONFIG_BASE_FILENAME = 'objstorage_checker' - def run(self): - """ Start the check routine + def __init__(self): + """ Create a checker that ensure the objstorage have no corrupted file """ - corrupted_contents = [] - batch_size = self.config['batch_size'] - - for content_id in self.get_content_to_check(batch_size): - if not self.check_content(content_id): - corrupted_contents.append(content_id) - logging.error('The content', content_id, 'have been corrupted') - - self.repair_contents(corrupted_contents) + self.config = self.parse_config_file() + self.objstorage = get_objstorage(**self.config['storage']) + self.batch_size = self.config['batch_size'] def run_as_daemon(self): """ Start the check routine and perform it forever. - Use this method to run the checker when it's done as a daemon that - will iterate over the content forever in background. + Use this method to run the checker as a daemon that will iterate over + the content forever in background. """ while True: try: self.run() - except Exception as e: - logging.error('An error occured while verifing the content: %s' - % e) + except: + pass - def get_content_to_check(self, batch_size): + def run(self): + """ Check a batch of content. + """ + for obj_id in self._get_content_to_check(self.batch_size): + cstatus = self._check_content(obj_id) + if cstatus == 'corrupted': + self.corrupted_content(obj_id) + elif cstatus == 'missing': + self.missing_content(obj_id) + + def _get_content_to_check(self, batch_size): """ Get the content that should be verified. Returns: An iterable of the content's id that need to be checked. """ - contents = self.objstorage.get_random_contents(batch_size) - yield from contents + yield from self.objstorage.get_random(batch_size) - def check_content(self, content_id): + def _check_content(self, obj_id): """ Check the validity of the given content. Returns: True if the content was valid, false if it was corrupted. """ try: - self.objstorage.check(content_id) - except (ObjNotFoundError, Error) as e: - logging.warning(e) - return False - else: - return True - - def repair_contents(self, content_ids): - """ Try to restore the given contents. - - Ask the backup storages for the contents that are corrupted on - the local object storage. - If the first storage does not contain the missing contents, send - a request to the second one with only the content that couldn't be - retrieved, and so on until there is no remaining content or servers. - - If a content couldn't be retrieved on all the servers, then log it as - an error. + self.objstorage.check(obj_id) + except ObjNotFoundError: + return 'missing' + except Error: + return 'corrupted' + + @abc.abstractmethod + def corrupted_content(self, obj_id): + """ Perform an action to treat with a corrupted content. + """ + raise NotImplementedError("%s must implement " + "'corrupted_content' method" % type(self)) + + @abc.abstractmethod + def missing_content(self, obj_id): + """ Perform an action to treat with a missing content. + """ + raise NotImplementedError("%s must implement " + "'missing_content' method" % type(self)) + + +class LogContentChecker(BaseContentChecker): + """ Content integrity checker that just log detected errors. + """ + + DEFAULT_CONFIG = { + 'storage': ('dict', + {'cls': 'pathslicing', + 'args': {'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6'}}), + 'batch_size': ('int', 1000), + 'log_tag': ('str', 'objstorage.checker') + } + + def __init__(self): + super().__init__() + self.logger = logging.getLogger(self.config['log_tag']) + + def corrupted_content(self, obj_id): + """ Perform an action to treat with a corrupted content. + """ + self.logger.error('Content %s is corrupted' % obj_id) + + def missing_content(self, obj_id): + """ Perform an action to treat with a missing content. """ - contents_to_get = set(content_ids) - # Iterates over the backup storages. - for backup_storage in self.backup_storages: - # Try to get all the contents that still need to be retrieved. - contents = backup_storage.content_get(list(contents_to_get)) - for content in contents: - if content: - hash = content['sha1'] - data = content['data'] - # When a content is retrieved, remove it from the set - # of needed contents. - contents_to_get.discard(hash) - self.objstorage.restore(data) - - # Contents still in contents_to_get couldn't be retrieved. - if contents_to_get: - logging.error( - "Some corrupted contents could not be retrieved : %s" - % [hashutil.hash_to_hex(id) for id in contents_to_get] + self.logger.error('Content %s is detected missing' % obj_id) + + +class RepairContentChecker(LogContentChecker): + """ Content integrity checker that will try to restore contents. + """ + + DEFAULT_CONFIG = { + 'storage': ('dict', + {'cls': 'pathslicing', + 'args': {'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6'}}), + 'batch_size': ('int', 1000), + 'log_tag': ('str', 'objstorage.checker'), + 'backup_storages': ('dict', + {'banco': { + 'cls': 'remote', + 'args': {'base_url': 'http://banco:5003/'} + }}) + } + + def __init__(self): + super().__init__() + self.backups = [get_objstorage(**storage) + for name, storage in self.config['backup_storages']] + + def corrupted_content(self, obj_id): + """ Perform an action to treat with a corrupted content. + """ + self._restore(obj_id) + + def missing_content(self, obj_id): + """ Perform an action to treat with a missing content. + """ + self._restore(obj_id) + + def _restore(self, obj_id): + if not self._perform_restore(obj_id): + # Object could not be restored + self.logger.critical( + 'Object %s is corrupted and could not be repaired' % obj_id ) + def _perform_restore(self, obj_id): + """ Try to restore the object in the current storage using the backups + """ + for backup in self.backups: + try: + content = backup.get(obj_id) + self.objstorage.restore(content, obj_id) + except ObjNotFoundError as e: + continue + else: + # Return True direclty when a backup contains the object + return True + # No backup contains the object + return False + + +class ArchiveNotifierContentChecker(LogContentChecker): + """ Implementation of the checker that will update the archiver database + + Once the database is updated the archiver may restore the content on it's + next scheduling as it won't be present anymore, and this status change + will probably make the retention policy invalid. + """ + DEFAULT_CONFIG = { + 'storage': ('dict', + {'cls': 'pathslicing', + 'args': {'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6'}}), + 'batch_size': ('int', 1000), + 'log_tag': ('str', 'objstorage.checker'), + 'storage_name': ('str', 'banco'), + 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev') + } + + def __init__(self): + super().__init__() + self.archiver_db = ArchiverStorage(self.config['dbconn']) + self.storage_name = self.config['storage_name'] + + def corrupted_content(self, obj_id): + """ Perform an action to treat with a corrupted content. + """ + self._update_status(obj_id, 'corrupted') + + def missing_content(self, obj_id): + """ Perform an action to treat with a missing content. + """ + self._update_status(obj_id, 'missing') + + def _update_status(self, obj_id, status): + self.archiver_db.content_archive_update(obj_id, self.storage_name, + new_status=status) + @click.command() -@click.argument('config-path', required=1) -@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'][1], - help='Path to the storage to verify') -@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'][1], - type=click.INT, help='Depth of the object storage') -@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'][1], - help='Url of a remote storage to retrieve corrupted content') +@click.argument('--checker-type', required=1, default='log') @click.option('--daemon/--nodaemon', default=True, help='Indicates if the checker should run forever ' 'or on a single batch of content') -def launch(config_path, storage_path, depth, backup_url, is_daemon): - # The configuration have following priority : - # command line > file config > default config - cl_config = { - 'storage_path': storage_path, - 'storage_depth': depth, - 'backup_url': backup_url +def launch(checker_type, is_daemon): + types = { + 'log': LogContentChecker, + 'repair': RepairContentChecker, + 'archiver_notifier': ArchiveNotifierContentChecker } - conf = config.read(config_path, DEFAULT_CONFIG) - conf.update(cl_config) - # Create the checker and run - checker = ContentChecker( - {'batch_size': conf['batch_size']}, - conf['storage_path'], - conf['storage_depth'], - map(lambda x: x.strip(), conf['backup_url'].split(',')) - ) + checker = types[checker_type]() if is_daemon: checker.run_as_daemon() else: diff --git a/swh/objstorage/tests/test_checker.py b/swh/objstorage/tests/test_checker.py --- a/swh/objstorage/tests/test_checker.py +++ b/swh/objstorage/tests/test_checker.py @@ -11,118 +11,145 @@ from nose.plugins.attrib import attr from swh.core import hashutil -from swh.objstorage.checker import ContentChecker +from swh.objstorage.exc import ObjNotFoundError +from swh.objstorage.checker import RepairContentChecker -class MockBackupStorage(): +class MockBackupObjStorage(): def __init__(self): self.values = {} - def content_add(self, id, value): - self.values[id] = value + def add(self, value, obj_id): + self.values[obj_id] = value - def content_get(self, ids): - for id in ids: - try: - data = self.values[id] - except KeyError: - yield None - continue - - yield {'sha1': id, 'data': data} + def get(self, obj_id): + try: + return self.values[obj_id] + except KeyError: + raise ObjNotFoundError(obj_id) @attr('fs') -class TestChecker(unittest.TestCase): +class TestRepairChecker(unittest.TestCase): """ Test the content integrity checker """ def setUp(self): super().setUp() - # Connect to an objstorage - config = {'batch_size': 10} - path = tempfile.mkdtemp() - slicing = '0:2/2:4/4:6' - self.checker = ContentChecker(config, path, slicing, 'http://None') - self.checker.backup_storages = [MockBackupStorage(), - MockBackupStorage()] - - def corrupt_content(self, id): + self._alter_config() + self.checker = RepairContentChecker() + self.checker.backups = [MockBackupObjStorage(), + MockBackupObjStorage()] + + def _alter_config(self): + RepairContentChecker.parse_config_file = ( + lambda cls: { + 'storage': {'cls': 'pathslicing', + 'args': {'root': tempfile.mkdtemp(), + 'slicing': '0:2/2:4/4:6'}}, + 'batch_size': 1000, + 'log_tag': 'objstorage_test', + 'backup_storages': {} + } + ) + + def _corrupt_content(self, obj_id): """ Make the given content invalid. """ - hex_id = hashutil.hash_to_hex(id) - file_path = self.checker.objstorage._obj_path(hex_id) + hex_obj_id = hashutil.hash_to_hex(obj_id) + file_path = self.checker.objstorage._obj_path(hex_obj_id) with gzip.open(file_path, 'wb') as f: f.write(b'Unexpected content') + def _is_corrupted(self, obj_id): + """ Ensure the given object is corrupted + """ + return self.checker._check_content(obj_id) == 'corrupted' + + def _is_missing(self, obj_id): + """ Ensure the given object is missing + """ + return self.checker._check_content(obj_id) == 'missing' + @istest def check_valid_content(self): # Check that a valid content is valid. content = b'check_valid_content' - id = self.checker.objstorage.add(content) - self.assertTrue(self.checker.check_content(id)) + obj_id = self.checker.objstorage.add(content) + self.assertFalse(self._is_corrupted(obj_id)) + self.assertFalse(self._is_missing(obj_id)) @istest - def check_invalid_content(self): + def check_corrupted_content(self): # Check that an invalid content is noticed. - content = b'check_invalid_content' - id = self.checker.objstorage.add(content) - self.corrupt_content(id) - self.assertFalse(self.checker.check_content(id)) + content = b'check_corrupted_content' + obj_id = self.checker.objstorage.add(content) + self._corrupt_content(obj_id) + self.assertTrue(self._is_corrupted(obj_id)) + self.assertFalse(self._is_missing(obj_id)) + + @istest + def check_missing_content(self): + obj_id = hashutil.hashdata(b'check_missing_content')['sha1'] + self.assertFalse(self._is_corrupted(obj_id)) + self.assertTrue(self._is_missing(obj_id)) @istest def repair_content_present_first(self): # Try to repair a content that is in the backup storage. content = b'repair_content_present_first' - id = self.checker.objstorage.add(content) + obj_id = self.checker.objstorage.add(content) # Add a content to the mock - self.checker.backup_storages[0].content_add(id, content) + self.checker.backups[0].add(content, obj_id) # Corrupt and repair it. - self.corrupt_content(id) - self.assertFalse(self.checker.check_content(id)) - self.checker.repair_contents([id]) - self.assertTrue(self.checker.check_content(id)) + self._corrupt_content(obj_id) + self.assertTrue(self._is_corrupted(obj_id)) + self.checker.corrupted_content(obj_id) + self.assertFalse(self._is_corrupted(obj_id)) @istest def repair_content_present_second(self): - # Try to repair a content that is not in the first backup storage. - content = b'repair_content_present_second' - id = self.checker.objstorage.add(content) + # Try to repair a content that is in the backup storage. + content = b'repair_content_present_first' + obj_id = self.checker.objstorage.add(content) # Add a content to the mock - self.checker.backup_storages[1].content_add(id, content) + self.checker.backups[-1].add(content, obj_id) # Corrupt and repair it. - self.corrupt_content(id) - self.assertFalse(self.checker.check_content(id)) - self.checker.repair_contents([id]) - self.assertTrue(self.checker.check_content(id)) + self._corrupt_content(obj_id) + self.assertTrue(self._is_corrupted(obj_id)) + self.checker.corrupted_content(obj_id) + self.assertFalse(self._is_corrupted(obj_id)) @istest def repair_content_present_distributed(self): # Try to repair two contents that are in separate backup storages. content1 = b'repair_content_present_distributed_2' content2 = b'repair_content_present_distributed_1' - id1 = self.checker.objstorage.add(content1) - id2 = self.checker.objstorage.add(content2) + obj_id1 = self.checker.objstorage.add(content1) + obj_id2 = self.checker.objstorage.add(content2) # Add content to the mock. - self.checker.backup_storages[0].content_add(id1, content1) - self.checker.backup_storages[0].content_add(id2, content2) - # Corrupt and repair it - self.corrupt_content(id1) - self.corrupt_content(id2) - self.assertFalse(self.checker.check_content(id1)) - self.assertFalse(self.checker.check_content(id2)) - self.checker.repair_contents([id1, id2]) - self.assertTrue(self.checker.check_content(id1)) - self.assertTrue(self.checker.check_content(id2)) + self.checker.backups[0].add(content1, obj_id1) + self.checker.backups[1].add(content2, obj_id2) + # Corrupt the contents + self._corrupt_content(obj_id1) + self._corrupt_content(obj_id2) + self.assertTrue(self._is_corrupted(obj_id1)) + self.assertTrue(self._is_corrupted(obj_id2)) + # Repare them + self.checker.corrupted_content(obj_id1) + self.checker.corrupted_content(obj_id2) + self.assertFalse(self._is_corrupted(obj_id1)) + self.assertFalse(self._is_corrupted(obj_id2)) @istest def repair_content_missing(self): # Try to repair a content that is NOT in the backup storage. - content = b'repair_content_present' - id = self.checker.objstorage.add(content) - # Corrupt and repair it. - self.corrupt_content(id) - self.assertFalse(self.checker.check_content(id)) - self.checker.repair_contents([id]) - self.assertFalse(self.checker.check_content(id)) + content = b'repair_content_missing' + obj_id = self.checker.objstorage.add(content) + # Corrupt the content + self._corrupt_content(obj_id) + self.assertTrue(self._is_corrupted(obj_id)) + # Try to repair it + self.checker.corrupted_content(obj_id) + self.assertTrue(self._is_corrupted(obj_id))