diff --git a/swh/storage/checker/__init__.py b/swh/storage/checker/__init__.py new file mode 100644 diff --git a/swh/storage/checker/checker.py b/swh/storage/checker/checker.py new file mode 100644 --- /dev/null +++ b/swh/storage/checker/checker.py @@ -0,0 +1,134 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +import logging + +from swh.core import config, hashutil +from .. import get_storage +from ..objstorage import ObjStorage +from ..exc import ObjNotFoundError, Error + +DEFAULT_CONFIG = { + 'storage_path': ('str', '/srv/softwareheritage/objects'), + 'storage_depth': ('int', 3), + 'backup_url': ('str', 'http://uffizi:5002'), + + 'batch_size': ('int', 1000), +} + + +class ContentChecker(): + """ Content integrity checker that will check local objstorage content. + + The checker will check the data of an object storage in order to verify + that no file have been corrupted. + + Attributes: + config: dictionary that contains this + checker configuration + objstorage (ObjStorage): Local object storage that will be checked. + master_storage (RemoteStorage): A distant storage that will be used to + restore corrupted content. + """ + + def __init__(self, config, root, depth, backup_url): + """ Create a checker that ensure the objstorage have no corrupted file. + + Args: + config (dict): Dictionary that contains the following keys : + batch_size: Number of content that should be tested each + time the content checker runs. + root (string): Path to the objstorage directory + depth (int): Depth of the object storage. + master_url (string): Url of a storage that can be used to restore + content. + """ + self.config = config + self.objstorage = ObjStorage(root, depth) + self.backup_storage = get_storage('remote_storage', [backup_url]) + + def run(self): + """ Start the check routine + """ + corrupted_contents = [] + batch_size = self.config['batch_size'] + + for content_id in self.get_content_to_check(batch_size): + if not self.check_content(content_id): + self.invalidate_content(content_id) + corrupted_contents.append(content_id) + logging.error('The content', content_id, 'have been corrupted') + + self.repair_contents(corrupted_contents) + + def get_content_to_check(self, batch_size): + """ Get the content that should be verified. + + Returns: + An iterable of the content's id that need to be checked. + """ + contents = self.objstorage.get_random_contents(batch_size) + yield from contents + + def check_content(self, content_id): + """ Check the validity of the given content. + + Returns: + True if the content was valid, false if it was corrupted. + """ + try: + self.objstorage.check(content_id) + except (ObjNotFoundError, Error) as e: + logging.error(e) + return False + else: + return True + + def repair_contents(self, content_ids): + """ Try to restore the given contents. + """ + # Retrieve the data of the corrupted contents from the master storage. + contents = self.backup_storage.content_get(content_ids) + contents_set = set(content_ids) + # Erase corrupted version with new safe one. + for content in contents: + if not content: + continue + data = content['data'] + contents_set.discard(content['sha1']) + self.objstorage.restore_bytes(data) + + if contents_set: + logging.error("Some corrupted contents could not be retrieved : %s" + % [hashutil.hash_to_hex(id) for id in contents_set]) + + +@click.command() +@click.argument('config-path', required=1) +@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'], + help='Path to the storage to verify') +@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'], + type=click.INT, help='Depth of the object storage') +@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'], + help='Url of a remote storage to retrieve corrupted content') +def launch(config_path, storage_path, depth, backup_url): + # The configuration have following priority : + # command line > file config > default config + cl_config = { + 'storage_path': storage_path, + 'storage_depth': depth, + 'backup_url': backup_url + } + conf = config.read(config_path, DEFAULT_CONFIG) + conf.update(cl_config) + # Create the checker and run + checker = ContentChecker( + {'batch_size': conf['batch_size']}, + conf['storage_path'], + conf['depth'], + conf['backup_url'] + ) + checker.run() diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py --- a/swh/storage/objstorage/api/client.py +++ b/swh/storage/objstorage/api/client.py @@ -76,6 +76,17 @@ """ return self.post('content/get', {'obj_id': obj_id}) + def content_get_random(self, batch_size): + """ Retrieve a random sample of existing content. + + Args: + batch_size: Number of content requested. + + Returns: + A list of random ids that represents existing contents. + """ + return self.post('content/get/random', {'batch_size': batch_size}) + def content_check(self, obj_id): """ Integrity check for a given object diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py --- a/swh/storage/objstorage/api/server.py +++ b/swh/storage/objstorage/api/server.py @@ -54,6 +54,14 @@ return encode_data(g.objstorage.get_bytes(**decode_request(request))) +@app.route('/content/get/random', methods=['POST']) +def get_random_contents(): + return encode_data( + # Transform the iterator into a list in order to sent it. + list(g.objstorage.get_random_contents(**decode_request(request))) + ) + + @app.route('/content/check', methods=['POST']) def check(): return encode_data(g.objstorage.check(**decode_request(request))) diff --git a/swh/storage/objstorage/objstorage.py b/swh/storage/objstorage/objstorage.py --- a/swh/storage/objstorage/objstorage.py +++ b/swh/storage/objstorage/objstorage.py @@ -7,6 +7,7 @@ import os import shutil import tempfile +import random from contextlib import contextmanager @@ -40,7 +41,7 @@ # compute [depth] substrings of [obj_id], each of length 2, starting from # the beginning - id_steps = [hex_obj_id[i*2:i*2+2] for i in range(0, depth)] + id_steps = [hex_obj_id[i * 2:i * 2 + 2] for i in range(0, depth)] steps = [root_dir] + id_steps return os.path.join(*steps) @@ -161,7 +162,7 @@ return os.path.exists(_obj_path(hex_obj_id, self._root_dir, self._depth)) - def add_bytes(self, bytes, obj_id=None): + def add_bytes(self, bytes, obj_id=None, check_presence=True): """add a new object to the object storage Args: @@ -169,7 +170,8 @@ obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. - + check_presence: indicate if the presence of the content should be + verified before adding the file. """ if obj_id is None: # missing checksum, compute it in memory and write to file @@ -177,7 +179,7 @@ h.update(bytes) obj_id = h.digest() - if obj_id in self: + if check_presence and obj_id in self: return obj_id hex_obj_id = hashutil.hash_to_hex(obj_id) @@ -190,6 +192,20 @@ return obj_id + def restore_bytes(self, bytes, obj_id=None): + """ Restore a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + Args: + bytes: content of the object to be added to the storage + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + """ + return self.add_bytes(bytes, obj_id, check_presence=False) + def add_file(self, f, length, obj_id=None): """similar to `add_bytes`, but add the content of file-like object f to the object storage @@ -285,6 +301,43 @@ with self.get_file_obj(obj_id) as f: return f.read() + def get_random_contents(self, batch_size): + """ Get random ids of existing contents + + This method is used in order to get random ids to perform + content integrity verifications on random contents. + + Attributes: + batch_size (int): Number of ids that will be given + + Yields: + An iterable of ids of contents that are in the current object + storage. + """ + def get_random_content(self, batch_size): + """ Get a batch of content inside a single directory. + + Returns: + a tuple (batch size, batch). + """ + dirs = [] + for level in range(self._depth): + path = os.path.join(self._root_dir, *dirs) + dir_list = next(os.walk(path))[1] + if 'tmp' in dir_list: + dir_list.remove('tmp') + dirs.append(random.choice(dir_list)) + + path = os.path.join(self._root_dir, *dirs) + content_list = next(os.walk(path))[2] + length = min(batch_size, len(content_list)) + return length, random.sample(content_list, length) + + while batch_size: + length, it = get_random_content(self, batch_size) + batch_size = batch_size - length + yield from it + def _get_file_path(self, obj_id): """retrieve the path of a given object in the objects storage diff --git a/swh/storage/tests/test_checker.py b/swh/storage/tests/test_checker.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_checker.py @@ -0,0 +1,96 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import gzip +import tempfile +import unittest + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.storage.objstorage.objstorage import _obj_path +from swh.storage.checker.checker import ContentChecker + + +class MockBackupStorage(): + + def __init__(self): + self.values = {} + + def content_add(self, id, value): + self.values[id] = value + + def content_get(self, ids): + for id in ids: + try: + data = self.values[id] + except KeyError: + yield None + continue + + yield {'sha1': id, 'data': data} + + +@attr('fs') +class TestChecker(unittest.TestCase): + """ Test the content integrity checker + """ + + def setUp(self): + super().setUp() + # Connect to an objstorage + config = {'batch_size': 10} + path = tempfile.mkdtemp() + depth = 3 + self.checker = ContentChecker(config, path, depth, 'http://None') + self.checker.backup_storage = MockBackupStorage() + + def corrupt_content(self, id): + """ Make the given content invalid. + """ + hex_id = hashutil.hash_to_hex(id) + file_path = _obj_path(hex_id, self.checker.objstorage._root_dir, 3) + with gzip.open(file_path, 'wb') as f: + f.write(b'Unexpected content') + + @istest + def check_valid_content(self): + # Check that a valid content is valid. + content = b'check_valid_content' + id = self.checker.objstorage.add_bytes(content) + self.assertTrue(self.checker.check_content(id)) + + @istest + def check_invalid_content(self): + # Check that an invalid content is noticed. + content = b'check_invalid_content' + id = self.checker.objstorage.add_bytes(content) + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + + @istest + def repair_content_present(self): + # Try to repair a content that is in the backup storage. + content = b'repair_content_present' + id = self.checker.objstorage.add_bytes(content) + # Add a content to the mock + self.checker.backup_storage.content_add(id, content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertTrue(self.checker.check_content(id)) + + @istest + def repair_content_missing(self): + # Try to repair a content that is NOT in the backup storage. + content = b'repair_content_present' + id = self.checker.objstorage.add_bytes(content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertFalse(self.checker.check_content(id)) diff --git a/swh/storage/tests/test_objstorage.py b/swh/storage/tests/test_objstorage.py --- a/swh/storage/tests/test_objstorage.py +++ b/swh/storage/tests/test_objstorage.py @@ -132,6 +132,12 @@ self.content) @istest + def get_random_contents(self): + self.storage.add_bytes(self.content, obj_id=self.obj_id) + for id in self.storage.get_random_contents(1): + self.assertIn(hashutil.hex_to_hash(id), [self.obj_id]) + + @istest def get_file_path(self): self.storage.add_bytes(self.content, obj_id=self.obj_id) path = self.storage._get_file_path(self.obj_id) diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py --- a/swh/storage/tests/test_objstorage_api.py +++ b/swh/storage/tests/test_objstorage_api.py @@ -53,6 +53,17 @@ self.objstorage.content_get(content_hash['sha1']) @istest + def content_get_random(self): + ids = [] + for i in range(100): + content = bytes('content_get_present', 'utf8') + id = self.objstorage.content_add(content) + ids.append(id) + for id in self.objstorage.content_get_random(50): + id = hashutil.hex_to_hash(id) + self.assertIn(id, ids) + + @istest def content_check_invalid(self): content = bytes('content_check_invalid', 'utf8') id = self.objstorage.content_add(content)