Page MenuHomeSoftware Heritage

D31.id102.diff
No OneTemporary

D31.id102.diff

diff --git a/swh/storage/checker/__init__.py b/swh/storage/checker/__init__.py
new file mode 100644
diff --git a/swh/storage/checker/checker.py b/swh/storage/checker/checker.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/checker/checker.py
@@ -0,0 +1,134 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import click
+import logging
+
+from swh.core import config, hashutil
+from .. import get_storage
+from ..objstorage import ObjStorage
+from ..exc import ObjNotFoundError, Error
+
+DEFAULT_CONFIG = {
+ 'storage_path': ('str', '/srv/softwareheritage/objects'),
+ 'storage_depth': ('int', 3),
+ 'backup_url': ('str', 'http://uffizi:5002'),
+
+ 'batch_size': ('int', 1000),
+}
+
+
+class ContentChecker():
+ """ Content integrity checker that will check local objstorage content.
+
+ The checker will check the data of an object storage in order to verify
+ that no file have been corrupted.
+
+ Attributes:
+ config: dictionary that contains this
+ checker configuration
+ objstorage (ObjStorage): Local object storage that will be checked.
+ master_storage (RemoteStorage): A distant storage that will be used to
+ restore corrupted content.
+ """
+
+ def __init__(self, config, root, depth, backup_url):
+ """ Create a checker that ensure the objstorage have no corrupted file.
+
+ Args:
+ config (dict): Dictionary that contains the following keys :
+ batch_size: Number of content that should be tested each
+ time the content checker runs.
+ root (string): Path to the objstorage directory
+ depth (int): Depth of the object storage.
+ master_url (string): Url of a storage that can be used to restore
+ content.
+ """
+ self.config = config
+ self.objstorage = ObjStorage(root, depth)
+ self.backup_storage = get_storage('remote_storage', [backup_url])
+
+ def run(self):
+ """ Start the check routine
+ """
+ corrupted_contents = []
+ batch_size = self.config['batch_size']
+
+ for content_id in self.get_content_to_check(batch_size):
+ if not self.check_content(content_id):
+ self.invalidate_content(content_id)
+ corrupted_contents.append(content_id)
+ logging.error('The content', content_id, 'have been corrupted')
+
+ self.repair_contents(corrupted_contents)
+
+ def get_content_to_check(self, batch_size):
+ """ Get the content that should be verified.
+
+ Returns:
+ An iterable of the content's id that need to be checked.
+ """
+ contents = self.objstorage.get_random_contents(batch_size)
+ yield from contents
+
+ def check_content(self, content_id):
+ """ Check the validity of the given content.
+
+ Returns:
+ True if the content was valid, false if it was corrupted.
+ """
+ try:
+ self.objstorage.check(content_id)
+ except (ObjNotFoundError, Error) as e:
+ logging.error(e)
+ return False
+ else:
+ return True
+
+ def repair_contents(self, content_ids):
+ """ Try to restore the given contents.
+ """
+ # Retrieve the data of the corrupted contents from the master storage.
+ contents = self.backup_storage.content_get(content_ids)
+ contents_set = set(content_ids)
+ # Erase corrupted version with new safe one.
+ for content in contents:
+ if not content:
+ continue
+ data = content['data']
+ contents_set.discard(content['sha1'])
+ self.objstorage.restore_bytes(data)
+
+ if contents_set:
+ logging.error("Some corrupted contents could not be retrieved : %s"
+ % [hashutil.hash_to_hex(id) for id in contents_set])
+
+
+@click.command()
+@click.argument('config-path', required=1)
+@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'],
+ help='Path to the storage to verify')
+@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'],
+ type=click.INT, help='Depth of the object storage')
+@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'],
+ help='Url of a remote storage to retrieve corrupted content')
+def launch(config_path, storage_path, depth, backup_url):
+ # The configuration have following priority :
+ # command line > file config > default config
+ cl_config = {
+ 'storage_path': storage_path,
+ 'storage_depth': depth,
+ 'backup_url': backup_url
+ }
+ conf = config.read(config_path, DEFAULT_CONFIG)
+ conf.update(cl_config)
+ # Create the checker and run
+ checker = ContentChecker(
+ {'batch_size': conf['batch_size']},
+ conf['storage_path'],
+ conf['depth'],
+ conf['backup_url']
+ )
+ checker.run()
diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py
--- a/swh/storage/objstorage/api/client.py
+++ b/swh/storage/objstorage/api/client.py
@@ -76,6 +76,17 @@
"""
return self.post('content/get', {'obj_id': obj_id})
+ def content_get_random(self, batch_size):
+ """ Retrieve a random sample of existing content.
+
+ Args:
+ batch_size: Number of content requested.
+
+ Returns:
+ A list of random ids that represents existing contents.
+ """
+ return self.post('content/get/random', {'batch_size': batch_size})
+
def content_check(self, obj_id):
""" Integrity check for a given object
diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py
--- a/swh/storage/objstorage/api/server.py
+++ b/swh/storage/objstorage/api/server.py
@@ -54,6 +54,14 @@
return encode_data(g.objstorage.get_bytes(**decode_request(request)))
+@app.route('/content/get/random', methods=['POST'])
+def get_random_contents():
+ return encode_data(
+ # Transform the iterator into a list in order to sent it.
+ list(g.objstorage.get_random_contents(**decode_request(request)))
+ )
+
+
@app.route('/content/check', methods=['POST'])
def check():
return encode_data(g.objstorage.check(**decode_request(request)))
diff --git a/swh/storage/objstorage/objstorage.py b/swh/storage/objstorage/objstorage.py
--- a/swh/storage/objstorage/objstorage.py
+++ b/swh/storage/objstorage/objstorage.py
@@ -7,6 +7,7 @@
import os
import shutil
import tempfile
+import random
from contextlib import contextmanager
@@ -40,7 +41,7 @@
# compute [depth] substrings of [obj_id], each of length 2, starting from
# the beginning
- id_steps = [hex_obj_id[i*2:i*2+2] for i in range(0, depth)]
+ id_steps = [hex_obj_id[i * 2:i * 2 + 2] for i in range(0, depth)]
steps = [root_dir] + id_steps
return os.path.join(*steps)
@@ -161,7 +162,7 @@
return os.path.exists(_obj_path(hex_obj_id, self._root_dir,
self._depth))
- def add_bytes(self, bytes, obj_id=None):
+ def add_bytes(self, bytes, obj_id=None, check_presence=True):
"""add a new object to the object storage
Args:
@@ -169,7 +170,8 @@
obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
given, obj_id will be trusted to match bytes. If missing,
obj_id will be computed on the fly.
-
+ check_presence: indicate if the presence of the content should be
+ verified before adding the file.
"""
if obj_id is None:
# missing checksum, compute it in memory and write to file
@@ -177,7 +179,7 @@
h.update(bytes)
obj_id = h.digest()
- if obj_id in self:
+ if check_presence and obj_id in self:
return obj_id
hex_obj_id = hashutil.hash_to_hex(obj_id)
@@ -190,6 +192,20 @@
return obj_id
+ def restore_bytes(self, bytes, obj_id=None):
+ """ Restore a content that have been corrupted.
+
+ This function is identical to add_bytes but does not check if
+ the object id is already in the file system.
+
+ Args:
+ bytes: content of the object to be added to the storage
+ obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
+ given, obj_id will be trusted to match bytes. If missing,
+ obj_id will be computed on the fly.
+ """
+ return self.add_bytes(bytes, obj_id, check_presence=False)
+
def add_file(self, f, length, obj_id=None):
"""similar to `add_bytes`, but add the content of file-like object f to the
object storage
@@ -285,6 +301,43 @@
with self.get_file_obj(obj_id) as f:
return f.read()
+ def get_random_contents(self, batch_size):
+ """ Get random ids of existing contents
+
+ This method is used in order to get random ids to perform
+ content integrity verifications on random contents.
+
+ Attributes:
+ batch_size (int): Number of ids that will be given
+
+ Yields:
+ An iterable of ids of contents that are in the current object
+ storage.
+ """
+ def get_random_content(self, batch_size):
+ """ Get a batch of content inside a single directory.
+
+ Returns:
+ a tuple (batch size, batch).
+ """
+ dirs = []
+ for level in range(self._depth):
+ path = os.path.join(self._root_dir, *dirs)
+ dir_list = next(os.walk(path))[1]
+ if 'tmp' in dir_list:
+ dir_list.remove('tmp')
+ dirs.append(random.choice(dir_list))
+
+ path = os.path.join(self._root_dir, *dirs)
+ content_list = next(os.walk(path))[2]
+ length = min(batch_size, len(content_list))
+ return length, random.sample(content_list, length)
+
+ while batch_size:
+ length, it = get_random_content(self, batch_size)
+ batch_size = batch_size - length
+ yield from it
+
def _get_file_path(self, obj_id):
"""retrieve the path of a given object in the objects storage
diff --git a/swh/storage/tests/test_checker.py b/swh/storage/tests/test_checker.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_checker.py
@@ -0,0 +1,96 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import gzip
+import tempfile
+import unittest
+
+from nose.tools import istest
+from nose.plugins.attrib import attr
+
+from swh.core import hashutil
+from swh.storage.objstorage.objstorage import _obj_path
+from swh.storage.checker.checker import ContentChecker
+
+
+class MockBackupStorage():
+
+ def __init__(self):
+ self.values = {}
+
+ def content_add(self, id, value):
+ self.values[id] = value
+
+ def content_get(self, ids):
+ for id in ids:
+ try:
+ data = self.values[id]
+ except KeyError:
+ yield None
+ continue
+
+ yield {'sha1': id, 'data': data}
+
+
+@attr('fs')
+class TestChecker(unittest.TestCase):
+ """ Test the content integrity checker
+ """
+
+ def setUp(self):
+ super().setUp()
+ # Connect to an objstorage
+ config = {'batch_size': 10}
+ path = tempfile.mkdtemp()
+ depth = 3
+ self.checker = ContentChecker(config, path, depth, 'http://None')
+ self.checker.backup_storage = MockBackupStorage()
+
+ def corrupt_content(self, id):
+ """ Make the given content invalid.
+ """
+ hex_id = hashutil.hash_to_hex(id)
+ file_path = _obj_path(hex_id, self.checker.objstorage._root_dir, 3)
+ with gzip.open(file_path, 'wb') as f:
+ f.write(b'Unexpected content')
+
+ @istest
+ def check_valid_content(self):
+ # Check that a valid content is valid.
+ content = b'check_valid_content'
+ id = self.checker.objstorage.add_bytes(content)
+ self.assertTrue(self.checker.check_content(id))
+
+ @istest
+ def check_invalid_content(self):
+ # Check that an invalid content is noticed.
+ content = b'check_invalid_content'
+ id = self.checker.objstorage.add_bytes(content)
+ self.corrupt_content(id)
+ self.assertFalse(self.checker.check_content(id))
+
+ @istest
+ def repair_content_present(self):
+ # Try to repair a content that is in the backup storage.
+ content = b'repair_content_present'
+ id = self.checker.objstorage.add_bytes(content)
+ # Add a content to the mock
+ self.checker.backup_storage.content_add(id, content)
+ # Corrupt and repair it.
+ self.corrupt_content(id)
+ self.assertFalse(self.checker.check_content(id))
+ self.checker.repair_contents([id])
+ self.assertTrue(self.checker.check_content(id))
+
+ @istest
+ def repair_content_missing(self):
+ # Try to repair a content that is NOT in the backup storage.
+ content = b'repair_content_present'
+ id = self.checker.objstorage.add_bytes(content)
+ # Corrupt and repair it.
+ self.corrupt_content(id)
+ self.assertFalse(self.checker.check_content(id))
+ self.checker.repair_contents([id])
+ self.assertFalse(self.checker.check_content(id))
diff --git a/swh/storage/tests/test_objstorage.py b/swh/storage/tests/test_objstorage.py
--- a/swh/storage/tests/test_objstorage.py
+++ b/swh/storage/tests/test_objstorage.py
@@ -132,6 +132,12 @@
self.content)
@istest
+ def get_random_contents(self):
+ self.storage.add_bytes(self.content, obj_id=self.obj_id)
+ for id in self.storage.get_random_contents(1):
+ self.assertIn(hashutil.hex_to_hash(id), [self.obj_id])
+
+ @istest
def get_file_path(self):
self.storage.add_bytes(self.content, obj_id=self.obj_id)
path = self.storage._get_file_path(self.obj_id)
diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py
--- a/swh/storage/tests/test_objstorage_api.py
+++ b/swh/storage/tests/test_objstorage_api.py
@@ -53,6 +53,17 @@
self.objstorage.content_get(content_hash['sha1'])
@istest
+ def content_get_random(self):
+ ids = []
+ for i in range(100):
+ content = bytes('content_get_present', 'utf8')
+ id = self.objstorage.content_add(content)
+ ids.append(id)
+ for id in self.objstorage.content_get_random(50):
+ id = hashutil.hex_to_hash(id)
+ self.assertIn(id, ids)
+
+ @istest
def content_check_invalid(self):
content = bytes('content_check_invalid', 'utf8')
id = self.objstorage.content_add(content)

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 4:50 PM (5 h, 38 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227255

Event Timeline