diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -25,6 +25,7 @@ 'swh.storage', 'swh.storage.archiver', 'swh.storage.api', + 'swh.storage.checker', 'swh.storage.objstorage', 'swh.storage.objstorage.api', 'swh.storage.tests', diff --git a/swh/storage/checker/checker.py b/swh/storage/checker/checker.py --- a/swh/storage/checker/checker.py +++ b/swh/storage/checker/checker.py @@ -34,7 +34,7 @@ restore corrupted content. """ - def __init__(self, config, root, depth, backup_url): + def __init__(self, config, root, depth, backup_urls): """ Create a checker that ensure the objstorage have no corrupted file. Args: @@ -43,12 +43,13 @@ time the content checker runs. root (string): Path to the objstorage directory depth (int): Depth of the object storage. - master_url (string): Url of a storage that can be used to restore - content. + backup_urls: List of url that can be contacted in order to + get a content. """ self.config = config self.objstorage = ObjStorage(root, depth) - self.backup_storage = get_storage('remote_storage', [backup_url]) + self.backup_storages = [get_storage('remote_storage', [backup_url]) + for backup_url in backup_urls] def run(self): """ Start the check routine @@ -81,28 +82,43 @@ try: self.objstorage.check(content_id) except (ObjNotFoundError, Error) as e: - logging.error(e) + logging.warning(e) return False else: return True def repair_contents(self, content_ids): """ Try to restore the given contents. + + Ask the backup storages for the contents that are corrupted on + the local object storage. + If the first storage does not contain the missing contents, send + a request to the second one with only the content that couldn't be + retrieved, and so on until there is no remaining content or servers. + + If a content couldn't be retrieved on all the servers, then log it as + an error. """ - # Retrieve the data of the corrupted contents from the master storage. - contents = self.backup_storage.content_get(content_ids) - contents_set = set(content_ids) - # Erase corrupted version with new safe one. - for content in contents: - if not content: - continue - data = content['data'] - contents_set.discard(content['sha1']) - self.objstorage.restore_bytes(data) - - if contents_set: - logging.error("Some corrupted contents could not be retrieved : %s" - % [hashutil.hash_to_hex(id) for id in contents_set]) + contents_to_get = set(content_ids) + # Iterates over the backup storages. + for backup_storage in self.backup_storages: + # Try to get all the contents that still need to be retrieved. + contents = backup_storage.content_get(list(contents_to_get)) + for content in contents: + if content: + hash = content['sha1'] + data = content['data'] + # When a content is retrieved, remove it from the set + # of needed contents. + contents_to_get.discard(hash) + self.objstorage.restore_bytes(data) + + # Contents still in contents_to_get couldn't be retrieved. + if contents_to_get: + logging.error( + "Some corrupted contents could not be retrieved : %s" + % [hashutil.hash_to_hex(id) for id in contents_to_get] + ) @click.command() @@ -128,7 +144,7 @@ {'batch_size': conf['batch_size']}, conf['storage_path'], conf['storage_depth'], - conf['backup_url'] + map(lambda x: x.strip(), conf['backup_url'].split(',')) ) checker.run() diff --git a/swh/storage/tests/test_checker.py b/swh/storage/tests/test_checker.py --- a/swh/storage/tests/test_checker.py +++ b/swh/storage/tests/test_checker.py @@ -46,7 +46,8 @@ path = tempfile.mkdtemp() depth = 3 self.checker = ContentChecker(config, path, depth, 'http://None') - self.checker.backup_storage = MockBackupStorage() + self.checker.backup_storages = [MockBackupStorage(), + MockBackupStorage()] def corrupt_content(self, id): """ Make the given content invalid. @@ -72,12 +73,25 @@ self.assertFalse(self.checker.check_content(id)) @istest - def repair_content_present(self): + def repair_content_present_first(self): # Try to repair a content that is in the backup storage. - content = b'repair_content_present' + content = b'repair_content_present_first' + id = self.checker.objstorage.add_bytes(content) + # Add a content to the mock + self.checker.backup_storages[0].content_add(id, content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertTrue(self.checker.check_content(id)) + + @istest + def repair_content_present_second(self): + # Try to repair a content that is not in the first backup storage. + content = b'repair_content_present_second' id = self.checker.objstorage.add_bytes(content) # Add a content to the mock - self.checker.backup_storage.content_add(id, content) + self.checker.backup_storages[1].content_add(id, content) # Corrupt and repair it. self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) @@ -85,6 +99,25 @@ self.assertTrue(self.checker.check_content(id)) @istest + def repair_content_present_distributed(self): + # Try to repair two contents that are in separate backup storages. + content1 = b'repair_content_present_distributed_2' + content2 = b'repair_content_present_distributed_1' + id1 = self.checker.objstorage.add_bytes(content1) + id2 = self.checker.objstorage.add_bytes(content2) + # Add content to the mock. + self.checker.backup_storages[0].content_add(id1, content1) + self.checker.backup_storages[0].content_add(id2, content2) + # Corrupt and repair it + self.corrupt_content(id1) + self.corrupt_content(id2) + self.assertFalse(self.checker.check_content(id1)) + self.assertFalse(self.checker.check_content(id2)) + self.checker.repair_contents([id1, id2]) + self.assertTrue(self.checker.check_content(id1)) + self.assertTrue(self.checker.check_content(id2)) + + @istest def repair_content_missing(self): # Try to repair a content that is NOT in the backup storage. content = b'repair_content_present'