diff --git a/swh/objstorage/checker.py b/swh/objstorage/checker.py index f532af8..f8e51f2 100644 --- a/swh/objstorage/checker.py +++ b/swh/objstorage/checker.py @@ -1,172 +1,200 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import abc import click import logging -from swh.core import config, hashutil -from swh.storage import get_storage +from swh.core import config -from .objstorage_pathslicing import PathSlicingObjStorage +from . import get_objstorage from .exc import ObjNotFoundError, Error -DEFAULT_CONFIG = { - 'storage_path': ('str', '/srv/softwareheritage/objects'), - 'storage_depth': ('int', 3), - 'backup_url': ('str', 'http://uffizi:5002/'), - 'batch_size': ('int', 1000), -} +class BaseContentChecker(config.SWHConfig, metaclass=abc.ABCMeta): + """ Abstract class of the content integrity checker. - -class ContentChecker(): - """ Content integrity checker that will check local objstorage content. - - The checker will check the data of an object storage in order to verify - that no file have been corrupted. - - Attributes: - config: dictionary that contains this - checker configuration - objstorage (ObjStorage): Local object storage that will be checked. - master_storage (RemoteStorage): A distant storage that will be used to - restore corrupted content. + This checker's purpose is to iterate over the contents of a storage and + check the integrity of each file. + Behavior of the checker to deal with corrupted status will be specified + by subclasses. """ - def __init__(self, config, root, slicing, backup_urls): - """ Create a checker that ensure the objstorage have no corrupted file. - - Args: - config (dict): Dictionary that contains the following keys : - batch_size: Number of content that should be tested each - time the content checker runs. - root (string): Path to the objstorage directory - depth (int): Depth of the object storage. - backup_urls: List of url that can be contacted in order to - get a content. - """ - self.config = config - self.objstorage = PathSlicingObjStorage(root, slicing) - self.backup_storages = [get_storage('remote_storage', [backup_url]) - for backup_url in backup_urls] + DEFAULT_CONFIG = { + 'storage': ('dict', + {'cls': 'pathslicing', + 'args': {'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6'}}), + 'batch_size': ('int', 1000), + } + CONFIG_BASE_FILENAME = 'objstorage_checker' - def run(self): - """ Start the check routine + def __init__(self): + """ Create a checker that ensure the objstorage have no corrupted file """ - corrupted_contents = [] - batch_size = self.config['batch_size'] - - for content_id in self.get_content_to_check(batch_size): - if not self.check_content(content_id): - corrupted_contents.append(content_id) - logging.error('The content', content_id, 'have been corrupted') - - self.repair_contents(corrupted_contents) + self.config = self.parse_config_file() + self.objstorage = get_objstorage(**self.config['storage']) + self.batch_size = self.config['batch_size'] def run_as_daemon(self): """ Start the check routine and perform it forever. - Use this method to run the checker when it's done as a daemon that - will iterate over the content forever in background. + Use this method to run the checker as a daemon that will iterate over + the content forever in background. """ while True: try: self.run() - except Exception as e: - logging.error('An error occured while verifing the content: %s' - % e) + except: + pass - def get_content_to_check(self, batch_size): + def run(self): + """ Check a batch of content. + """ + for obj_id in self._get_content_to_check(self.batch_size): + cstatus = self._check_content(obj_id) + if cstatus == 'corrupted': + self.corrupted_content(obj_id) + elif cstatus == 'missing': + self.missing_content(obj_id) + + def _get_content_to_check(self, batch_size): """ Get the content that should be verified. Returns: An iterable of the content's id that need to be checked. """ - contents = self.objstorage.get_random_contents(batch_size) - yield from contents + yield from self.objstorage.get_random(batch_size) - def check_content(self, content_id): + def _check_content(self, obj_id): """ Check the validity of the given content. Returns: True if the content was valid, false if it was corrupted. """ try: - self.objstorage.check(content_id) - except (ObjNotFoundError, Error) as e: - logging.warning(e) - return False - else: - return True - - def repair_contents(self, content_ids): - """ Try to restore the given contents. - - Ask the backup storages for the contents that are corrupted on - the local object storage. - If the first storage does not contain the missing contents, send - a request to the second one with only the content that couldn't be - retrieved, and so on until there is no remaining content or servers. - - If a content couldn't be retrieved on all the servers, then log it as - an error. + self.objstorage.check(obj_id) + except ObjNotFoundError: + return 'missing' + except Error: + return 'corrupted' + + @abc.abstractmethod + def corrupted_content(self, obj_id): + """ Perform an action to treat with a corrupted content. + """ + raise NotImplementedError("%s must implement " + "'corrupted_content' method" % type(self)) + + @abc.abstractmethod + def missing_content(self, obj_id): + """ Perform an action to treat with a missing content. + """ + raise NotImplementedError("%s must implement " + "'missing_content' method" % type(self)) + + +class LogContentChecker(BaseContentChecker): + """ Content integrity checker that just log detected errors. + """ + + DEFAULT_CONFIG = { + 'storage': ('dict', + {'cls': 'pathslicing', + 'args': {'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6'}}), + 'batch_size': ('int', 1000), + 'log_tag': ('str', 'objstorage.checker') + } + + def __init__(self): + super().__init__() + self.logger = logging.getLogger(self.config['log_tag']) + + def corrupted_content(self, obj_id): + """ Perform an action to treat with a corrupted content. + """ + self.logger.error('Content %s is corrupted' % obj_id) + + def missing_content(self, obj_id): + """ Perform an action to treat with a missing content. + """ + self.logger.error('Content %s is detected missing' % obj_id) + + +class RepairContentChecker(LogContentChecker): + """ Content integrity checker that will try to restore contents. + """ + + DEFAULT_CONFIG = { + 'storage': ('dict', + {'cls': 'pathslicing', + 'args': {'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6'}}), + 'batch_size': ('int', 1000), + 'log_tag': ('str', 'objstorage.checker'), + 'backup_storages': ('dict', + {'banco': { + 'cls': 'remote', + 'args': {'base_url': 'http://banco:5003/'} + }}) + } + + def __init__(self): + super().__init__() + self.backups = [get_objstorage(**storage) + for name, storage in self.config['backup_storages']] + + def corrupted_content(self, obj_id): + """ Perform an action to treat with a corrupted content. + """ + self._restore(obj_id) + + def missing_content(self, obj_id): + """ Perform an action to treat with a missing content. """ - contents_to_get = set(content_ids) - # Iterates over the backup storages. - for backup_storage in self.backup_storages: - # Try to get all the contents that still need to be retrieved. - contents = backup_storage.content_get(list(contents_to_get)) - for content in contents: - if content: - hash = content['sha1'] - data = content['data'] - # When a content is retrieved, remove it from the set - # of needed contents. - contents_to_get.discard(hash) - self.objstorage.restore(data) - - # Contents still in contents_to_get couldn't be retrieved. - if contents_to_get: - logging.error( - "Some corrupted contents could not be retrieved : %s" - % [hashutil.hash_to_hex(id) for id in contents_to_get] + self._restore(obj_id) + + def _restore(self, obj_id): + if not self._perform_restore(obj_id): + # Object could not be restored + self.logger.critical( + 'Object %s is corrupted and could not be repaired' % obj_id ) + def _perform_restore(self, obj_id): + """ Try to restore the object in the current storage using the backups + """ + for backup in self.backups: + try: + content = backup.get(obj_id) + self.objstorage.restore(content, obj_id) + except ObjNotFoundError as e: + continue + else: + # Return True direclty when a backup contains the object + return True + # No backup contains the object + return False + @click.command() -@click.argument('config-path', required=1) -@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'][1], - help='Path to the storage to verify') -@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'][1], - type=click.INT, help='Depth of the object storage') -@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'][1], - help='Url of a remote storage to retrieve corrupted content') +@click.argument('--checker-type', required=1, default='log') @click.option('--daemon/--nodaemon', default=True, help='Indicates if the checker should run forever ' 'or on a single batch of content') -def launch(config_path, storage_path, depth, backup_url, is_daemon): - # The configuration have following priority : - # command line > file config > default config - cl_config = { - 'storage_path': storage_path, - 'storage_depth': depth, - 'backup_url': backup_url +def launch(checker_type, is_daemon): + types = { + 'log': LogContentChecker, + 'repair': RepairContentChecker } - conf = config.read(config_path, DEFAULT_CONFIG) - conf.update(cl_config) - # Create the checker and run - checker = ContentChecker( - {'batch_size': conf['batch_size']}, - conf['storage_path'], - conf['storage_depth'], - map(lambda x: x.strip(), conf['backup_url'].split(',')) - ) + checker = types[checker_type]() if is_daemon: checker.run_as_daemon() else: checker.run() if __name__ == '__main__': launch()