Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163678
D92.id308.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Subscribers
None
D92.id308.diff
View Options
diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py
--- a/swh/objstorage/__init__.py
+++ b/swh/objstorage/__init__.py
@@ -7,7 +7,7 @@
# TODO remove PathSlicingObjStorage from this list once the config
# loading will be updated and no hardcoded objstorage types should
# remains.
-__all__ = ['get_objstorage', 'ObjStorage', 'PathSlicingObjStorage']
+__all__ = ['get_objstorage', 'ObjStorage']
_STORAGE_CLASSES = {
'pathslicing': PathSlicingObjStorage,
diff --git a/swh/objstorage/checker.py b/swh/objstorage/checker.py
--- a/swh/objstorage/checker.py
+++ b/swh/objstorage/checker.py
@@ -6,163 +6,227 @@
import click
import logging
-from swh.core import config, hashutil
-from swh.storage import get_storage
+from swh.core import config
+from swh.storage.archiver.storage import ArchiverStorage
-from .objstorage_pathslicing import PathSlicingObjStorage
+from . import get_objstorage
from .exc import ObjNotFoundError, Error
-DEFAULT_CONFIG = {
- 'storage_path': ('str', '/srv/softwareheritage/objects'),
- 'storage_depth': ('int', 3),
- 'backup_url': ('str', 'http://uffizi:5002/'),
- 'batch_size': ('int', 1000),
-}
+class BaseContentChecker(config.SWHConfig):
+ """ Abstract class of the content integrity checker.
-
-class ContentChecker():
- """ Content integrity checker that will check local objstorage content.
-
- The checker will check the data of an object storage in order to verify
- that no file have been corrupted.
-
- Attributes:
- config: dictionary that contains this
- checker configuration
- objstorage (ObjStorage): Local object storage that will be checked.
- master_storage (RemoteStorage): A distant storage that will be used to
- restore corrupted content.
+ This checker's purpose is to iterate over the contents of a storage and
+ check the integrity of each file.
+ Behavior of the checker to deal with corrupted status will be specified
+ by subclasses.
"""
- def __init__(self, config, root, slicing, backup_urls):
- """ Create a checker that ensure the objstorage have no corrupted file.
-
- Args:
- config (dict): Dictionary that contains the following keys :
- batch_size: Number of content that should be tested each
- time the content checker runs.
- root (string): Path to the objstorage directory
- depth (int): Depth of the object storage.
- backup_urls: List of url that can be contacted in order to
- get a content.
- """
- self.config = config
- self.objstorage = PathSlicingObjStorage(root, slicing)
- self.backup_storages = [get_storage('remote_storage', [backup_url])
- for backup_url in backup_urls]
+ DEFAULT_CONFIG = {
+ 'storage': ('dict',
+ {'cls': 'pathslicing',
+ 'args': {'root': '/srv/softwareheritage/objects',
+ 'slicing': '0:2/2:4/4:6'}}),
+ 'batch_size': ('int', 1000),
+ }
+ CONFIG_BASE_FILENAME = 'objstorage_checker'
- def run(self):
- """ Start the check routine
+ def __init__(self):
+ """ Create a checker that ensure the objstorage have no corrupted file
"""
- corrupted_contents = []
- batch_size = self.config['batch_size']
-
- for content_id in self.get_content_to_check(batch_size):
- if not self.check_content(content_id):
- corrupted_contents.append(content_id)
- logging.error('The content', content_id, 'have been corrupted')
-
- self.repair_contents(corrupted_contents)
+ self.config = self.parse_config_file()
+ self.objstorage = get_objstorage(**self.config['storage'])
+ self.batch_size = self.config['batch_size']
def run_as_daemon(self):
""" Start the check routine and perform it forever.
- Use this method to run the checker when it's done as a daemon that
- will iterate over the content forever in background.
+ Use this method to run the checker as a daemon that will iterate over
+ the content forever in background.
"""
while True:
try:
self.run()
- except Exception as e:
- logging.error('An error occured while verifing the content: %s'
- % e)
+ except:
+ pass
- def get_content_to_check(self, batch_size):
+ def run(self):
+ """ Check a batch of content.
+ """
+ for obj_id in self._get_content_to_check(self.batch_size):
+ cstatus = self._check_content(obj_id)
+ if cstatus == 'corrupted':
+ self.corrupted_content(obj_id)
+ elif cstatus == 'missing':
+ self.missing_content(obj_id)
+
+ def _get_content_to_check(self, batch_size):
""" Get the content that should be verified.
Returns:
An iterable of the content's id that need to be checked.
"""
- contents = self.objstorage.get_random_contents(batch_size)
- yield from contents
+ yield from self.objstorage.get_random(batch_size)
- def check_content(self, content_id):
+ def _check_content(self, obj_id):
""" Check the validity of the given content.
Returns:
True if the content was valid, false if it was corrupted.
"""
try:
- self.objstorage.check(content_id)
- except (ObjNotFoundError, Error) as e:
- logging.warning(e)
- return False
- else:
- return True
-
- def repair_contents(self, content_ids):
- """ Try to restore the given contents.
-
- Ask the backup storages for the contents that are corrupted on
- the local object storage.
- If the first storage does not contain the missing contents, send
- a request to the second one with only the content that couldn't be
- retrieved, and so on until there is no remaining content or servers.
-
- If a content couldn't be retrieved on all the servers, then log it as
- an error.
+ self.objstorage.check(obj_id)
+ except ObjNotFoundError:
+ return 'missing'
+ except Error:
+ return 'corrupted'
+
+ def corrupted_content(self, obj_id):
+ """ Perform an action to treat with a corrupted content.
+ """
+ raise NotImplementedError("%s must implement "
+ "'corrupted_content' method" % type(self))
+
+ def missing_content(self, obj_id):
+ """ Perform an action to treat with a missing content.
+ """
+ raise NotImplementedError("%s must implement "
+ "'missing_content' method" % type(self))
+
+
+class LogContentChecker(BaseContentChecker):
+ """ Content integrity checker that just log detected errors.
+ """
+
+ DEFAULT_CONFIG = {
+ 'storage': ('dict',
+ {'cls': 'pathslicing',
+ 'args': {'root': '/srv/softwareheritage/objects',
+ 'slicing': '0:2/2:4/4:6'}}),
+ 'batch_size': ('int', 1000),
+ 'log_tag': ('str', 'objstorage.checker')
+ }
+
+ def __init__(self):
+ super().__init__()
+ self.logger = logging.getLogger(self.config['log_tag'])
+
+ def corrupted_content(self, obj_id):
+ """ Perform an action to treat with a corrupted content.
+ """
+ self.logger.error('Content %s have been corrupted' % obj_id)
+
+ def missing_content(self, obj_id):
+ """ Perform an action to treat with a missing content.
"""
- contents_to_get = set(content_ids)
- # Iterates over the backup storages.
- for backup_storage in self.backup_storages:
- # Try to get all the contents that still need to be retrieved.
- contents = backup_storage.content_get(list(contents_to_get))
- for content in contents:
- if content:
- hash = content['sha1']
- data = content['data']
- # When a content is retrieved, remove it from the set
- # of needed contents.
- contents_to_get.discard(hash)
- self.objstorage.restore(data)
-
- # Contents still in contents_to_get couldn't be retrieved.
- if contents_to_get:
- logging.error(
- "Some corrupted contents could not be retrieved : %s"
- % [hashutil.hash_to_hex(id) for id in contents_to_get]
+ self.logger.error('Content %s is detected missing' % obj_id)
+
+
+class RepairContentChecker(LogContentChecker):
+ """ Content integrity checker that will try to restore contents.
+ """
+
+ DEFAULT_CONFIG = {
+ 'storage': ('dict',
+ {'cls': 'pathslicing',
+ 'args': {'root': '/srv/softwareheritage/objects',
+ 'slicing': '0:2/2:4/4:6'}}),
+ 'batch_size': ('int', 1000),
+ 'log_tag': ('str', 'objstorage.checker'),
+ 'backup_storages': ('dict',
+ {'banco': {
+ 'cls': 'remote',
+ 'args': {'base_url': 'http://banco:5003/'}
+ }})
+ }
+
+ def __init__(self):
+ super().__init__()
+ self.backups = [get_objstorage(**storage)
+ for name, storage in self.config['backup_storages']]
+
+ def corrupted_content(self, obj_id):
+ """ Perform an action to treat with a corrupted content.
+ """
+ self._restore(obj_id)
+
+ def missing_content(self, obj_id):
+ """ Perform an action to treat with a missing content.
+ """
+ self._restore(obj_id)
+
+ def _restore(self, obj_id):
+ if not self._perform_restore(obj_id):
+ # Object could not be restored
+ self.logger.critical(
+ 'Object %s is corrupted and could not be repaired' % obj_id
)
+ def _perform_restore(self, obj_id):
+ """ Try to restore the object in the current storage using the backups
+ """
+ for backup in self.backups:
+ try:
+ content = backup.get(obj_id)
+ self.objstorage.restore(content, obj_id)
+ except ObjNotFoundError as e:
+ continue
+ else:
+ # Return True direclty when a backup contains the object
+ return True
+ # No backup contains the object
+ return False
+
+
+class ArchiveNotifierContentChecker(LogContentChecker):
+ """ Implementation of the checker that will update the archiver database
+
+ Once the database is updated the archiver may restore the content on it's
+ next scheduling as it won't be present anymore, and this status change
+ will probably make the retention policy invalid.
+ """
+ DEFAULT_CONFIG = {
+ 'storage': ('dict',
+ {'cls': 'pathslicing',
+ 'args': {'root': '/srv/softwareheritage/objects',
+ 'slicing': '0:2/2:4/4:6'}}),
+ 'batch_size': ('int', 1000),
+ 'log_tag': ('str', 'objstorage.checker'),
+ 'storage_name': ('str', 'banco'),
+ 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev')
+ }
+
+ def __init__(self):
+ super().__init__()
+ self.archiver_db = ArchiverStorage(self.config['dbconn'])
+ self.storage_name = self.config['storage_name']
+
+ def corrupted_content(self, obj_id):
+ """ Perform an action to treat with a corrupted content.
+ """
+ self._update_status(obj_id, 'corrupted')
+
+ def missing_content(self, obj_id):
+ """ Perform an action to treat with a missing content.
+ """
+ self._update_status(obj_id, 'missing')
+
+ def _update_status(self, obj_id, status):
+ self.archiver_db.content_archive_update(obj_id, self.storage_name,
+ new_status=status)
+
@click.command()
-@click.argument('config-path', required=1)
-@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'][1],
- help='Path to the storage to verify')
-@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'][1],
- type=click.INT, help='Depth of the object storage')
-@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'][1],
- help='Url of a remote storage to retrieve corrupted content')
+@click.argument('--checker-type', required=1, default='log')
@click.option('--daemon/--nodaemon', default=True,
help='Indicates if the checker should run forever '
'or on a single batch of content')
-def launch(config_path, storage_path, depth, backup_url, is_daemon):
- # The configuration have following priority :
- # command line > file config > default config
- cl_config = {
- 'storage_path': storage_path,
- 'storage_depth': depth,
- 'backup_url': backup_url
+def launch(checker_type, is_daemon):
+ types = {
+ 'log': LogContentChecker,
+ 'repair': RepairContentChecker
}
- conf = config.read(config_path, DEFAULT_CONFIG)
- conf.update(cl_config)
- # Create the checker and run
- checker = ContentChecker(
- {'batch_size': conf['batch_size']},
- conf['storage_path'],
- conf['storage_depth'],
- map(lambda x: x.strip(), conf['backup_url'].split(','))
- )
+ checker = types['checker_type']()
if is_daemon:
checker.run_as_daemon()
else:
diff --git a/swh/objstorage/tests/test_checker.py b/swh/objstorage/tests/test_checker.py
--- a/swh/objstorage/tests/test_checker.py
+++ b/swh/objstorage/tests/test_checker.py
@@ -11,118 +11,145 @@
from nose.plugins.attrib import attr
from swh.core import hashutil
-from swh.objstorage.checker import ContentChecker
+from swh.objstorage.exc import ObjNotFoundError
+from swh.objstorage.checker import RepairContentChecker
-class MockBackupStorage():
+class MockBackupObjStorage():
def __init__(self):
self.values = {}
- def content_add(self, id, value):
- self.values[id] = value
+ def add(self, value, obj_id):
+ self.values[obj_id] = value
- def content_get(self, ids):
- for id in ids:
- try:
- data = self.values[id]
- except KeyError:
- yield None
- continue
-
- yield {'sha1': id, 'data': data}
+ def get(self, obj_id):
+ try:
+ return self.values[obj_id]
+ except KeyError:
+ raise ObjNotFoundError(obj_id)
@attr('fs')
-class TestChecker(unittest.TestCase):
+class TestRepairChecker(unittest.TestCase):
""" Test the content integrity checker
"""
def setUp(self):
super().setUp()
- # Connect to an objstorage
- config = {'batch_size': 10}
- path = tempfile.mkdtemp()
- slicing = '0:2/2:4/4:6'
- self.checker = ContentChecker(config, path, slicing, 'http://None')
- self.checker.backup_storages = [MockBackupStorage(),
- MockBackupStorage()]
-
- def corrupt_content(self, id):
+ self._alter_config()
+ self.checker = RepairContentChecker()
+ self.checker.backups = [MockBackupObjStorage(),
+ MockBackupObjStorage()]
+
+ def _alter_config(self):
+ RepairContentChecker.parse_config_file = (
+ lambda cls: {
+ 'storage': {'cls': 'pathslicing',
+ 'args': {'root': tempfile.mkdtemp(),
+ 'slicing': '0:2/2:4/4:6'}},
+ 'batch_size': 1000,
+ 'log_tag': 'objstorage_test',
+ 'backup_storages': {}
+ }
+ )
+
+ def _corrupt_content(self, obj_id):
""" Make the given content invalid.
"""
- hex_id = hashutil.hash_to_hex(id)
- file_path = self.checker.objstorage._obj_path(hex_id)
+ hex_obj_id = hashutil.hash_to_hex(obj_id)
+ file_path = self.checker.objstorage._obj_path(hex_obj_id)
with gzip.open(file_path, 'wb') as f:
f.write(b'Unexpected content')
+ def _is_corrupted(self, obj_id):
+ """ Ensure the given object is corrupted
+ """
+ return self.checker._check_content(obj_id) == 'corrupted'
+
+ def _is_missing(self, obj_id):
+ """ Ensure the given object is missing
+ """
+ return self.checker._check_content(obj_id) == 'missing'
+
@istest
def check_valid_content(self):
# Check that a valid content is valid.
content = b'check_valid_content'
- id = self.checker.objstorage.add(content)
- self.assertTrue(self.checker.check_content(id))
+ obj_id = self.checker.objstorage.add(content)
+ self.assertFalse(self._is_corrupted(obj_id))
+ self.assertFalse(self._is_missing(obj_id))
@istest
- def check_invalid_content(self):
+ def check_corrupted_content(self):
# Check that an invalid content is noticed.
- content = b'check_invalid_content'
- id = self.checker.objstorage.add(content)
- self.corrupt_content(id)
- self.assertFalse(self.checker.check_content(id))
+ content = b'check_corrupted_content'
+ obj_id = self.checker.objstorage.add(content)
+ self._corrupt_content(obj_id)
+ self.assertTrue(self._is_corrupted(obj_id))
+ self.assertFalse(self._is_missing(obj_id))
+
+ @istest
+ def check_missing_content(self):
+ obj_id = hashutil.hashdata(b'check_missing_content')['sha1']
+ self.assertFalse(self._is_corrupted(obj_id))
+ self.assertTrue(self._is_missing(obj_id))
@istest
def repair_content_present_first(self):
# Try to repair a content that is in the backup storage.
content = b'repair_content_present_first'
- id = self.checker.objstorage.add(content)
+ obj_id = self.checker.objstorage.add(content)
# Add a content to the mock
- self.checker.backup_storages[0].content_add(id, content)
+ self.checker.backups[0].add(content, obj_id)
# Corrupt and repair it.
- self.corrupt_content(id)
- self.assertFalse(self.checker.check_content(id))
- self.checker.repair_contents([id])
- self.assertTrue(self.checker.check_content(id))
+ self._corrupt_content(obj_id)
+ self.assertTrue(self._is_corrupted(obj_id))
+ self.checker.corrupted_content(obj_id)
+ self.assertFalse(self._is_corrupted(obj_id))
@istest
def repair_content_present_second(self):
- # Try to repair a content that is not in the first backup storage.
- content = b'repair_content_present_second'
- id = self.checker.objstorage.add(content)
+ # Try to repair a content that is in the backup storage.
+ content = b'repair_content_present_first'
+ obj_id = self.checker.objstorage.add(content)
# Add a content to the mock
- self.checker.backup_storages[1].content_add(id, content)
+ self.checker.backups[-1].add(content, obj_id)
# Corrupt and repair it.
- self.corrupt_content(id)
- self.assertFalse(self.checker.check_content(id))
- self.checker.repair_contents([id])
- self.assertTrue(self.checker.check_content(id))
+ self._corrupt_content(obj_id)
+ self.assertTrue(self._is_corrupted(obj_id))
+ self.checker.corrupted_content(obj_id)
+ self.assertFalse(self._is_corrupted(obj_id))
@istest
def repair_content_present_distributed(self):
# Try to repair two contents that are in separate backup storages.
content1 = b'repair_content_present_distributed_2'
content2 = b'repair_content_present_distributed_1'
- id1 = self.checker.objstorage.add(content1)
- id2 = self.checker.objstorage.add(content2)
+ obj_id1 = self.checker.objstorage.add(content1)
+ obj_id2 = self.checker.objstorage.add(content2)
# Add content to the mock.
- self.checker.backup_storages[0].content_add(id1, content1)
- self.checker.backup_storages[0].content_add(id2, content2)
- # Corrupt and repair it
- self.corrupt_content(id1)
- self.corrupt_content(id2)
- self.assertFalse(self.checker.check_content(id1))
- self.assertFalse(self.checker.check_content(id2))
- self.checker.repair_contents([id1, id2])
- self.assertTrue(self.checker.check_content(id1))
- self.assertTrue(self.checker.check_content(id2))
+ self.checker.backups[0].add(content1, obj_id1)
+ self.checker.backups[1].add(content2, obj_id2)
+ # Corrupt the contents
+ self._corrupt_content(obj_id1)
+ self._corrupt_content(obj_id2)
+ self.assertTrue(self._is_corrupted(obj_id1))
+ self.assertTrue(self._is_corrupted(obj_id2))
+ # Repare them
+ self.checker.corrupted_content(obj_id1)
+ self.checker.corrupted_content(obj_id2)
+ self.assertFalse(self._is_corrupted(obj_id1))
+ self.assertFalse(self._is_corrupted(obj_id2))
@istest
def repair_content_missing(self):
# Try to repair a content that is NOT in the backup storage.
- content = b'repair_content_present'
- id = self.checker.objstorage.add(content)
- # Corrupt and repair it.
- self.corrupt_content(id)
- self.assertFalse(self.checker.check_content(id))
- self.checker.repair_contents([id])
- self.assertFalse(self.checker.check_content(id))
+ content = b'repair_content_missing'
+ obj_id = self.checker.objstorage.add(content)
+ # Corrupt the content
+ self._corrupt_content(obj_id)
+ self.assertTrue(self._is_corrupted(obj_id))
+ # Try to repair it
+ self.checker.corrupted_content(obj_id)
+ self.assertTrue(self._is_corrupted(obj_id))
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 1:01 PM (9 h, 23 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227244
Attached To
D92: Make the content checker easy to extend and add an implementation
Event Timeline
Log In to Comment