Page MenuHomeSoftware Heritage

D92.id308.diff
No OneTemporary

D92.id308.diff

diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py
--- a/swh/objstorage/__init__.py
+++ b/swh/objstorage/__init__.py
@@ -7,7 +7,7 @@
# TODO remove PathSlicingObjStorage from this list once the config
# loading will be updated and no hardcoded objstorage types should
# remains.
-__all__ = ['get_objstorage', 'ObjStorage', 'PathSlicingObjStorage']
+__all__ = ['get_objstorage', 'ObjStorage']
_STORAGE_CLASSES = {
'pathslicing': PathSlicingObjStorage,
diff --git a/swh/objstorage/checker.py b/swh/objstorage/checker.py
--- a/swh/objstorage/checker.py
+++ b/swh/objstorage/checker.py
@@ -6,163 +6,227 @@
import click
import logging
-from swh.core import config, hashutil
-from swh.storage import get_storage
+from swh.core import config
+from swh.storage.archiver.storage import ArchiverStorage
-from .objstorage_pathslicing import PathSlicingObjStorage
+from . import get_objstorage
from .exc import ObjNotFoundError, Error
-DEFAULT_CONFIG = {
- 'storage_path': ('str', '/srv/softwareheritage/objects'),
- 'storage_depth': ('int', 3),
- 'backup_url': ('str', 'http://uffizi:5002/'),
- 'batch_size': ('int', 1000),
-}
+class BaseContentChecker(config.SWHConfig):
+ """ Abstract class of the content integrity checker.
-
-class ContentChecker():
- """ Content integrity checker that will check local objstorage content.
-
- The checker will check the data of an object storage in order to verify
- that no file have been corrupted.
-
- Attributes:
- config: dictionary that contains this
- checker configuration
- objstorage (ObjStorage): Local object storage that will be checked.
- master_storage (RemoteStorage): A distant storage that will be used to
- restore corrupted content.
+ This checker's purpose is to iterate over the contents of a storage and
+ check the integrity of each file.
+ Behavior of the checker to deal with corrupted status will be specified
+ by subclasses.
"""
- def __init__(self, config, root, slicing, backup_urls):
- """ Create a checker that ensure the objstorage have no corrupted file.
-
- Args:
- config (dict): Dictionary that contains the following keys :
- batch_size: Number of content that should be tested each
- time the content checker runs.
- root (string): Path to the objstorage directory
- depth (int): Depth of the object storage.
- backup_urls: List of url that can be contacted in order to
- get a content.
- """
- self.config = config
- self.objstorage = PathSlicingObjStorage(root, slicing)
- self.backup_storages = [get_storage('remote_storage', [backup_url])
- for backup_url in backup_urls]
+ DEFAULT_CONFIG = {
+ 'storage': ('dict',
+ {'cls': 'pathslicing',
+ 'args': {'root': '/srv/softwareheritage/objects',
+ 'slicing': '0:2/2:4/4:6'}}),
+ 'batch_size': ('int', 1000),
+ }
+ CONFIG_BASE_FILENAME = 'objstorage_checker'
- def run(self):
- """ Start the check routine
+ def __init__(self):
+ """ Create a checker that ensure the objstorage have no corrupted file
"""
- corrupted_contents = []
- batch_size = self.config['batch_size']
-
- for content_id in self.get_content_to_check(batch_size):
- if not self.check_content(content_id):
- corrupted_contents.append(content_id)
- logging.error('The content', content_id, 'have been corrupted')
-
- self.repair_contents(corrupted_contents)
+ self.config = self.parse_config_file()
+ self.objstorage = get_objstorage(**self.config['storage'])
+ self.batch_size = self.config['batch_size']
def run_as_daemon(self):
""" Start the check routine and perform it forever.
- Use this method to run the checker when it's done as a daemon that
- will iterate over the content forever in background.
+ Use this method to run the checker as a daemon that will iterate over
+ the content forever in background.
"""
while True:
try:
self.run()
- except Exception as e:
- logging.error('An error occured while verifing the content: %s'
- % e)
+ except:
+ pass
- def get_content_to_check(self, batch_size):
+ def run(self):
+ """ Check a batch of content.
+ """
+ for obj_id in self._get_content_to_check(self.batch_size):
+ cstatus = self._check_content(obj_id)
+ if cstatus == 'corrupted':
+ self.corrupted_content(obj_id)
+ elif cstatus == 'missing':
+ self.missing_content(obj_id)
+
+ def _get_content_to_check(self, batch_size):
""" Get the content that should be verified.
Returns:
An iterable of the content's id that need to be checked.
"""
- contents = self.objstorage.get_random_contents(batch_size)
- yield from contents
+ yield from self.objstorage.get_random(batch_size)
- def check_content(self, content_id):
+ def _check_content(self, obj_id):
""" Check the validity of the given content.
Returns:
True if the content was valid, false if it was corrupted.
"""
try:
- self.objstorage.check(content_id)
- except (ObjNotFoundError, Error) as e:
- logging.warning(e)
- return False
- else:
- return True
-
- def repair_contents(self, content_ids):
- """ Try to restore the given contents.
-
- Ask the backup storages for the contents that are corrupted on
- the local object storage.
- If the first storage does not contain the missing contents, send
- a request to the second one with only the content that couldn't be
- retrieved, and so on until there is no remaining content or servers.
-
- If a content couldn't be retrieved on all the servers, then log it as
- an error.
+ self.objstorage.check(obj_id)
+ except ObjNotFoundError:
+ return 'missing'
+ except Error:
+ return 'corrupted'
+
+ def corrupted_content(self, obj_id):
+ """ Perform an action to treat with a corrupted content.
+ """
+ raise NotImplementedError("%s must implement "
+ "'corrupted_content' method" % type(self))
+
+ def missing_content(self, obj_id):
+ """ Perform an action to treat with a missing content.
+ """
+ raise NotImplementedError("%s must implement "
+ "'missing_content' method" % type(self))
+
+
+class LogContentChecker(BaseContentChecker):
+ """ Content integrity checker that just log detected errors.
+ """
+
+ DEFAULT_CONFIG = {
+ 'storage': ('dict',
+ {'cls': 'pathslicing',
+ 'args': {'root': '/srv/softwareheritage/objects',
+ 'slicing': '0:2/2:4/4:6'}}),
+ 'batch_size': ('int', 1000),
+ 'log_tag': ('str', 'objstorage.checker')
+ }
+
+ def __init__(self):
+ super().__init__()
+ self.logger = logging.getLogger(self.config['log_tag'])
+
+ def corrupted_content(self, obj_id):
+ """ Perform an action to treat with a corrupted content.
+ """
+ self.logger.error('Content %s have been corrupted' % obj_id)
+
+ def missing_content(self, obj_id):
+ """ Perform an action to treat with a missing content.
"""
- contents_to_get = set(content_ids)
- # Iterates over the backup storages.
- for backup_storage in self.backup_storages:
- # Try to get all the contents that still need to be retrieved.
- contents = backup_storage.content_get(list(contents_to_get))
- for content in contents:
- if content:
- hash = content['sha1']
- data = content['data']
- # When a content is retrieved, remove it from the set
- # of needed contents.
- contents_to_get.discard(hash)
- self.objstorage.restore(data)
-
- # Contents still in contents_to_get couldn't be retrieved.
- if contents_to_get:
- logging.error(
- "Some corrupted contents could not be retrieved : %s"
- % [hashutil.hash_to_hex(id) for id in contents_to_get]
+ self.logger.error('Content %s is detected missing' % obj_id)
+
+
+class RepairContentChecker(LogContentChecker):
+ """ Content integrity checker that will try to restore contents.
+ """
+
+ DEFAULT_CONFIG = {
+ 'storage': ('dict',
+ {'cls': 'pathslicing',
+ 'args': {'root': '/srv/softwareheritage/objects',
+ 'slicing': '0:2/2:4/4:6'}}),
+ 'batch_size': ('int', 1000),
+ 'log_tag': ('str', 'objstorage.checker'),
+ 'backup_storages': ('dict',
+ {'banco': {
+ 'cls': 'remote',
+ 'args': {'base_url': 'http://banco:5003/'}
+ }})
+ }
+
+ def __init__(self):
+ super().__init__()
+ self.backups = [get_objstorage(**storage)
+ for name, storage in self.config['backup_storages']]
+
+ def corrupted_content(self, obj_id):
+ """ Perform an action to treat with a corrupted content.
+ """
+ self._restore(obj_id)
+
+ def missing_content(self, obj_id):
+ """ Perform an action to treat with a missing content.
+ """
+ self._restore(obj_id)
+
+ def _restore(self, obj_id):
+ if not self._perform_restore(obj_id):
+ # Object could not be restored
+ self.logger.critical(
+ 'Object %s is corrupted and could not be repaired' % obj_id
)
+ def _perform_restore(self, obj_id):
+ """ Try to restore the object in the current storage using the backups
+ """
+ for backup in self.backups:
+ try:
+ content = backup.get(obj_id)
+ self.objstorage.restore(content, obj_id)
+ except ObjNotFoundError as e:
+ continue
+ else:
+ # Return True direclty when a backup contains the object
+ return True
+ # No backup contains the object
+ return False
+
+
+class ArchiveNotifierContentChecker(LogContentChecker):
+ """ Implementation of the checker that will update the archiver database
+
+ Once the database is updated the archiver may restore the content on it's
+ next scheduling as it won't be present anymore, and this status change
+ will probably make the retention policy invalid.
+ """
+ DEFAULT_CONFIG = {
+ 'storage': ('dict',
+ {'cls': 'pathslicing',
+ 'args': {'root': '/srv/softwareheritage/objects',
+ 'slicing': '0:2/2:4/4:6'}}),
+ 'batch_size': ('int', 1000),
+ 'log_tag': ('str', 'objstorage.checker'),
+ 'storage_name': ('str', 'banco'),
+ 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev')
+ }
+
+ def __init__(self):
+ super().__init__()
+ self.archiver_db = ArchiverStorage(self.config['dbconn'])
+ self.storage_name = self.config['storage_name']
+
+ def corrupted_content(self, obj_id):
+ """ Perform an action to treat with a corrupted content.
+ """
+ self._update_status(obj_id, 'corrupted')
+
+ def missing_content(self, obj_id):
+ """ Perform an action to treat with a missing content.
+ """
+ self._update_status(obj_id, 'missing')
+
+ def _update_status(self, obj_id, status):
+ self.archiver_db.content_archive_update(obj_id, self.storage_name,
+ new_status=status)
+
@click.command()
-@click.argument('config-path', required=1)
-@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'][1],
- help='Path to the storage to verify')
-@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'][1],
- type=click.INT, help='Depth of the object storage')
-@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'][1],
- help='Url of a remote storage to retrieve corrupted content')
+@click.argument('--checker-type', required=1, default='log')
@click.option('--daemon/--nodaemon', default=True,
help='Indicates if the checker should run forever '
'or on a single batch of content')
-def launch(config_path, storage_path, depth, backup_url, is_daemon):
- # The configuration have following priority :
- # command line > file config > default config
- cl_config = {
- 'storage_path': storage_path,
- 'storage_depth': depth,
- 'backup_url': backup_url
+def launch(checker_type, is_daemon):
+ types = {
+ 'log': LogContentChecker,
+ 'repair': RepairContentChecker
}
- conf = config.read(config_path, DEFAULT_CONFIG)
- conf.update(cl_config)
- # Create the checker and run
- checker = ContentChecker(
- {'batch_size': conf['batch_size']},
- conf['storage_path'],
- conf['storage_depth'],
- map(lambda x: x.strip(), conf['backup_url'].split(','))
- )
+ checker = types['checker_type']()
if is_daemon:
checker.run_as_daemon()
else:
diff --git a/swh/objstorage/tests/test_checker.py b/swh/objstorage/tests/test_checker.py
--- a/swh/objstorage/tests/test_checker.py
+++ b/swh/objstorage/tests/test_checker.py
@@ -11,118 +11,145 @@
from nose.plugins.attrib import attr
from swh.core import hashutil
-from swh.objstorage.checker import ContentChecker
+from swh.objstorage.exc import ObjNotFoundError
+from swh.objstorage.checker import RepairContentChecker
-class MockBackupStorage():
+class MockBackupObjStorage():
def __init__(self):
self.values = {}
- def content_add(self, id, value):
- self.values[id] = value
+ def add(self, value, obj_id):
+ self.values[obj_id] = value
- def content_get(self, ids):
- for id in ids:
- try:
- data = self.values[id]
- except KeyError:
- yield None
- continue
-
- yield {'sha1': id, 'data': data}
+ def get(self, obj_id):
+ try:
+ return self.values[obj_id]
+ except KeyError:
+ raise ObjNotFoundError(obj_id)
@attr('fs')
-class TestChecker(unittest.TestCase):
+class TestRepairChecker(unittest.TestCase):
""" Test the content integrity checker
"""
def setUp(self):
super().setUp()
- # Connect to an objstorage
- config = {'batch_size': 10}
- path = tempfile.mkdtemp()
- slicing = '0:2/2:4/4:6'
- self.checker = ContentChecker(config, path, slicing, 'http://None')
- self.checker.backup_storages = [MockBackupStorage(),
- MockBackupStorage()]
-
- def corrupt_content(self, id):
+ self._alter_config()
+ self.checker = RepairContentChecker()
+ self.checker.backups = [MockBackupObjStorage(),
+ MockBackupObjStorage()]
+
+ def _alter_config(self):
+ RepairContentChecker.parse_config_file = (
+ lambda cls: {
+ 'storage': {'cls': 'pathslicing',
+ 'args': {'root': tempfile.mkdtemp(),
+ 'slicing': '0:2/2:4/4:6'}},
+ 'batch_size': 1000,
+ 'log_tag': 'objstorage_test',
+ 'backup_storages': {}
+ }
+ )
+
+ def _corrupt_content(self, obj_id):
""" Make the given content invalid.
"""
- hex_id = hashutil.hash_to_hex(id)
- file_path = self.checker.objstorage._obj_path(hex_id)
+ hex_obj_id = hashutil.hash_to_hex(obj_id)
+ file_path = self.checker.objstorage._obj_path(hex_obj_id)
with gzip.open(file_path, 'wb') as f:
f.write(b'Unexpected content')
+ def _is_corrupted(self, obj_id):
+ """ Ensure the given object is corrupted
+ """
+ return self.checker._check_content(obj_id) == 'corrupted'
+
+ def _is_missing(self, obj_id):
+ """ Ensure the given object is missing
+ """
+ return self.checker._check_content(obj_id) == 'missing'
+
@istest
def check_valid_content(self):
# Check that a valid content is valid.
content = b'check_valid_content'
- id = self.checker.objstorage.add(content)
- self.assertTrue(self.checker.check_content(id))
+ obj_id = self.checker.objstorage.add(content)
+ self.assertFalse(self._is_corrupted(obj_id))
+ self.assertFalse(self._is_missing(obj_id))
@istest
- def check_invalid_content(self):
+ def check_corrupted_content(self):
# Check that an invalid content is noticed.
- content = b'check_invalid_content'
- id = self.checker.objstorage.add(content)
- self.corrupt_content(id)
- self.assertFalse(self.checker.check_content(id))
+ content = b'check_corrupted_content'
+ obj_id = self.checker.objstorage.add(content)
+ self._corrupt_content(obj_id)
+ self.assertTrue(self._is_corrupted(obj_id))
+ self.assertFalse(self._is_missing(obj_id))
+
+ @istest
+ def check_missing_content(self):
+ obj_id = hashutil.hashdata(b'check_missing_content')['sha1']
+ self.assertFalse(self._is_corrupted(obj_id))
+ self.assertTrue(self._is_missing(obj_id))
@istest
def repair_content_present_first(self):
# Try to repair a content that is in the backup storage.
content = b'repair_content_present_first'
- id = self.checker.objstorage.add(content)
+ obj_id = self.checker.objstorage.add(content)
# Add a content to the mock
- self.checker.backup_storages[0].content_add(id, content)
+ self.checker.backups[0].add(content, obj_id)
# Corrupt and repair it.
- self.corrupt_content(id)
- self.assertFalse(self.checker.check_content(id))
- self.checker.repair_contents([id])
- self.assertTrue(self.checker.check_content(id))
+ self._corrupt_content(obj_id)
+ self.assertTrue(self._is_corrupted(obj_id))
+ self.checker.corrupted_content(obj_id)
+ self.assertFalse(self._is_corrupted(obj_id))
@istest
def repair_content_present_second(self):
- # Try to repair a content that is not in the first backup storage.
- content = b'repair_content_present_second'
- id = self.checker.objstorage.add(content)
+ # Try to repair a content that is in the backup storage.
+ content = b'repair_content_present_first'
+ obj_id = self.checker.objstorage.add(content)
# Add a content to the mock
- self.checker.backup_storages[1].content_add(id, content)
+ self.checker.backups[-1].add(content, obj_id)
# Corrupt and repair it.
- self.corrupt_content(id)
- self.assertFalse(self.checker.check_content(id))
- self.checker.repair_contents([id])
- self.assertTrue(self.checker.check_content(id))
+ self._corrupt_content(obj_id)
+ self.assertTrue(self._is_corrupted(obj_id))
+ self.checker.corrupted_content(obj_id)
+ self.assertFalse(self._is_corrupted(obj_id))
@istest
def repair_content_present_distributed(self):
# Try to repair two contents that are in separate backup storages.
content1 = b'repair_content_present_distributed_2'
content2 = b'repair_content_present_distributed_1'
- id1 = self.checker.objstorage.add(content1)
- id2 = self.checker.objstorage.add(content2)
+ obj_id1 = self.checker.objstorage.add(content1)
+ obj_id2 = self.checker.objstorage.add(content2)
# Add content to the mock.
- self.checker.backup_storages[0].content_add(id1, content1)
- self.checker.backup_storages[0].content_add(id2, content2)
- # Corrupt and repair it
- self.corrupt_content(id1)
- self.corrupt_content(id2)
- self.assertFalse(self.checker.check_content(id1))
- self.assertFalse(self.checker.check_content(id2))
- self.checker.repair_contents([id1, id2])
- self.assertTrue(self.checker.check_content(id1))
- self.assertTrue(self.checker.check_content(id2))
+ self.checker.backups[0].add(content1, obj_id1)
+ self.checker.backups[1].add(content2, obj_id2)
+ # Corrupt the contents
+ self._corrupt_content(obj_id1)
+ self._corrupt_content(obj_id2)
+ self.assertTrue(self._is_corrupted(obj_id1))
+ self.assertTrue(self._is_corrupted(obj_id2))
+ # Repare them
+ self.checker.corrupted_content(obj_id1)
+ self.checker.corrupted_content(obj_id2)
+ self.assertFalse(self._is_corrupted(obj_id1))
+ self.assertFalse(self._is_corrupted(obj_id2))
@istest
def repair_content_missing(self):
# Try to repair a content that is NOT in the backup storage.
- content = b'repair_content_present'
- id = self.checker.objstorage.add(content)
- # Corrupt and repair it.
- self.corrupt_content(id)
- self.assertFalse(self.checker.check_content(id))
- self.checker.repair_contents([id])
- self.assertFalse(self.checker.check_content(id))
+ content = b'repair_content_missing'
+ obj_id = self.checker.objstorage.add(content)
+ # Corrupt the content
+ self._corrupt_content(obj_id)
+ self.assertTrue(self._is_corrupted(obj_id))
+ # Try to repair it
+ self.checker.corrupted_content(obj_id)
+ self.assertTrue(self._is_corrupted(obj_id))

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 1:01 PM (9 h, 23 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227244

Event Timeline