Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163857
D31.id102.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
D31.id102.diff
View Options
diff --git a/swh/storage/checker/__init__.py b/swh/storage/checker/__init__.py
new file mode 100644
diff --git a/swh/storage/checker/checker.py b/swh/storage/checker/checker.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/checker/checker.py
@@ -0,0 +1,134 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import click
+import logging
+
+from swh.core import config, hashutil
+from .. import get_storage
+from ..objstorage import ObjStorage
+from ..exc import ObjNotFoundError, Error
+
+DEFAULT_CONFIG = {
+ 'storage_path': ('str', '/srv/softwareheritage/objects'),
+ 'storage_depth': ('int', 3),
+ 'backup_url': ('str', 'http://uffizi:5002'),
+
+ 'batch_size': ('int', 1000),
+}
+
+
+class ContentChecker():
+ """ Content integrity checker that will check local objstorage content.
+
+ The checker will check the data of an object storage in order to verify
+ that no file have been corrupted.
+
+ Attributes:
+ config: dictionary that contains this
+ checker configuration
+ objstorage (ObjStorage): Local object storage that will be checked.
+ master_storage (RemoteStorage): A distant storage that will be used to
+ restore corrupted content.
+ """
+
+ def __init__(self, config, root, depth, backup_url):
+ """ Create a checker that ensure the objstorage have no corrupted file.
+
+ Args:
+ config (dict): Dictionary that contains the following keys :
+ batch_size: Number of content that should be tested each
+ time the content checker runs.
+ root (string): Path to the objstorage directory
+ depth (int): Depth of the object storage.
+ master_url (string): Url of a storage that can be used to restore
+ content.
+ """
+ self.config = config
+ self.objstorage = ObjStorage(root, depth)
+ self.backup_storage = get_storage('remote_storage', [backup_url])
+
+ def run(self):
+ """ Start the check routine
+ """
+ corrupted_contents = []
+ batch_size = self.config['batch_size']
+
+ for content_id in self.get_content_to_check(batch_size):
+ if not self.check_content(content_id):
+ self.invalidate_content(content_id)
+ corrupted_contents.append(content_id)
+ logging.error('The content', content_id, 'have been corrupted')
+
+ self.repair_contents(corrupted_contents)
+
+ def get_content_to_check(self, batch_size):
+ """ Get the content that should be verified.
+
+ Returns:
+ An iterable of the content's id that need to be checked.
+ """
+ contents = self.objstorage.get_random_contents(batch_size)
+ yield from contents
+
+ def check_content(self, content_id):
+ """ Check the validity of the given content.
+
+ Returns:
+ True if the content was valid, false if it was corrupted.
+ """
+ try:
+ self.objstorage.check(content_id)
+ except (ObjNotFoundError, Error) as e:
+ logging.error(e)
+ return False
+ else:
+ return True
+
+ def repair_contents(self, content_ids):
+ """ Try to restore the given contents.
+ """
+ # Retrieve the data of the corrupted contents from the master storage.
+ contents = self.backup_storage.content_get(content_ids)
+ contents_set = set(content_ids)
+ # Erase corrupted version with new safe one.
+ for content in contents:
+ if not content:
+ continue
+ data = content['data']
+ contents_set.discard(content['sha1'])
+ self.objstorage.restore_bytes(data)
+
+ if contents_set:
+ logging.error("Some corrupted contents could not be retrieved : %s"
+ % [hashutil.hash_to_hex(id) for id in contents_set])
+
+
+@click.command()
+@click.argument('config-path', required=1)
+@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'],
+ help='Path to the storage to verify')
+@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'],
+ type=click.INT, help='Depth of the object storage')
+@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'],
+ help='Url of a remote storage to retrieve corrupted content')
+def launch(config_path, storage_path, depth, backup_url):
+ # The configuration have following priority :
+ # command line > file config > default config
+ cl_config = {
+ 'storage_path': storage_path,
+ 'storage_depth': depth,
+ 'backup_url': backup_url
+ }
+ conf = config.read(config_path, DEFAULT_CONFIG)
+ conf.update(cl_config)
+ # Create the checker and run
+ checker = ContentChecker(
+ {'batch_size': conf['batch_size']},
+ conf['storage_path'],
+ conf['depth'],
+ conf['backup_url']
+ )
+ checker.run()
diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py
--- a/swh/storage/objstorage/api/client.py
+++ b/swh/storage/objstorage/api/client.py
@@ -76,6 +76,17 @@
"""
return self.post('content/get', {'obj_id': obj_id})
+ def content_get_random(self, batch_size):
+ """ Retrieve a random sample of existing content.
+
+ Args:
+ batch_size: Number of content requested.
+
+ Returns:
+ A list of random ids that represents existing contents.
+ """
+ return self.post('content/get/random', {'batch_size': batch_size})
+
def content_check(self, obj_id):
""" Integrity check for a given object
diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py
--- a/swh/storage/objstorage/api/server.py
+++ b/swh/storage/objstorage/api/server.py
@@ -54,6 +54,14 @@
return encode_data(g.objstorage.get_bytes(**decode_request(request)))
+@app.route('/content/get/random', methods=['POST'])
+def get_random_contents():
+ return encode_data(
+ # Transform the iterator into a list in order to sent it.
+ list(g.objstorage.get_random_contents(**decode_request(request)))
+ )
+
+
@app.route('/content/check', methods=['POST'])
def check():
return encode_data(g.objstorage.check(**decode_request(request)))
diff --git a/swh/storage/objstorage/objstorage.py b/swh/storage/objstorage/objstorage.py
--- a/swh/storage/objstorage/objstorage.py
+++ b/swh/storage/objstorage/objstorage.py
@@ -7,6 +7,7 @@
import os
import shutil
import tempfile
+import random
from contextlib import contextmanager
@@ -40,7 +41,7 @@
# compute [depth] substrings of [obj_id], each of length 2, starting from
# the beginning
- id_steps = [hex_obj_id[i*2:i*2+2] for i in range(0, depth)]
+ id_steps = [hex_obj_id[i * 2:i * 2 + 2] for i in range(0, depth)]
steps = [root_dir] + id_steps
return os.path.join(*steps)
@@ -161,7 +162,7 @@
return os.path.exists(_obj_path(hex_obj_id, self._root_dir,
self._depth))
- def add_bytes(self, bytes, obj_id=None):
+ def add_bytes(self, bytes, obj_id=None, check_presence=True):
"""add a new object to the object storage
Args:
@@ -169,7 +170,8 @@
obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
given, obj_id will be trusted to match bytes. If missing,
obj_id will be computed on the fly.
-
+ check_presence: indicate if the presence of the content should be
+ verified before adding the file.
"""
if obj_id is None:
# missing checksum, compute it in memory and write to file
@@ -177,7 +179,7 @@
h.update(bytes)
obj_id = h.digest()
- if obj_id in self:
+ if check_presence and obj_id in self:
return obj_id
hex_obj_id = hashutil.hash_to_hex(obj_id)
@@ -190,6 +192,20 @@
return obj_id
+ def restore_bytes(self, bytes, obj_id=None):
+ """ Restore a content that have been corrupted.
+
+ This function is identical to add_bytes but does not check if
+ the object id is already in the file system.
+
+ Args:
+ bytes: content of the object to be added to the storage
+ obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
+ given, obj_id will be trusted to match bytes. If missing,
+ obj_id will be computed on the fly.
+ """
+ return self.add_bytes(bytes, obj_id, check_presence=False)
+
def add_file(self, f, length, obj_id=None):
"""similar to `add_bytes`, but add the content of file-like object f to the
object storage
@@ -285,6 +301,43 @@
with self.get_file_obj(obj_id) as f:
return f.read()
+ def get_random_contents(self, batch_size):
+ """ Get random ids of existing contents
+
+ This method is used in order to get random ids to perform
+ content integrity verifications on random contents.
+
+ Attributes:
+ batch_size (int): Number of ids that will be given
+
+ Yields:
+ An iterable of ids of contents that are in the current object
+ storage.
+ """
+ def get_random_content(self, batch_size):
+ """ Get a batch of content inside a single directory.
+
+ Returns:
+ a tuple (batch size, batch).
+ """
+ dirs = []
+ for level in range(self._depth):
+ path = os.path.join(self._root_dir, *dirs)
+ dir_list = next(os.walk(path))[1]
+ if 'tmp' in dir_list:
+ dir_list.remove('tmp')
+ dirs.append(random.choice(dir_list))
+
+ path = os.path.join(self._root_dir, *dirs)
+ content_list = next(os.walk(path))[2]
+ length = min(batch_size, len(content_list))
+ return length, random.sample(content_list, length)
+
+ while batch_size:
+ length, it = get_random_content(self, batch_size)
+ batch_size = batch_size - length
+ yield from it
+
def _get_file_path(self, obj_id):
"""retrieve the path of a given object in the objects storage
diff --git a/swh/storage/tests/test_checker.py b/swh/storage/tests/test_checker.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_checker.py
@@ -0,0 +1,96 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import gzip
+import tempfile
+import unittest
+
+from nose.tools import istest
+from nose.plugins.attrib import attr
+
+from swh.core import hashutil
+from swh.storage.objstorage.objstorage import _obj_path
+from swh.storage.checker.checker import ContentChecker
+
+
+class MockBackupStorage():
+
+ def __init__(self):
+ self.values = {}
+
+ def content_add(self, id, value):
+ self.values[id] = value
+
+ def content_get(self, ids):
+ for id in ids:
+ try:
+ data = self.values[id]
+ except KeyError:
+ yield None
+ continue
+
+ yield {'sha1': id, 'data': data}
+
+
+@attr('fs')
+class TestChecker(unittest.TestCase):
+ """ Test the content integrity checker
+ """
+
+ def setUp(self):
+ super().setUp()
+ # Connect to an objstorage
+ config = {'batch_size': 10}
+ path = tempfile.mkdtemp()
+ depth = 3
+ self.checker = ContentChecker(config, path, depth, 'http://None')
+ self.checker.backup_storage = MockBackupStorage()
+
+ def corrupt_content(self, id):
+ """ Make the given content invalid.
+ """
+ hex_id = hashutil.hash_to_hex(id)
+ file_path = _obj_path(hex_id, self.checker.objstorage._root_dir, 3)
+ with gzip.open(file_path, 'wb') as f:
+ f.write(b'Unexpected content')
+
+ @istest
+ def check_valid_content(self):
+ # Check that a valid content is valid.
+ content = b'check_valid_content'
+ id = self.checker.objstorage.add_bytes(content)
+ self.assertTrue(self.checker.check_content(id))
+
+ @istest
+ def check_invalid_content(self):
+ # Check that an invalid content is noticed.
+ content = b'check_invalid_content'
+ id = self.checker.objstorage.add_bytes(content)
+ self.corrupt_content(id)
+ self.assertFalse(self.checker.check_content(id))
+
+ @istest
+ def repair_content_present(self):
+ # Try to repair a content that is in the backup storage.
+ content = b'repair_content_present'
+ id = self.checker.objstorage.add_bytes(content)
+ # Add a content to the mock
+ self.checker.backup_storage.content_add(id, content)
+ # Corrupt and repair it.
+ self.corrupt_content(id)
+ self.assertFalse(self.checker.check_content(id))
+ self.checker.repair_contents([id])
+ self.assertTrue(self.checker.check_content(id))
+
+ @istest
+ def repair_content_missing(self):
+ # Try to repair a content that is NOT in the backup storage.
+ content = b'repair_content_present'
+ id = self.checker.objstorage.add_bytes(content)
+ # Corrupt and repair it.
+ self.corrupt_content(id)
+ self.assertFalse(self.checker.check_content(id))
+ self.checker.repair_contents([id])
+ self.assertFalse(self.checker.check_content(id))
diff --git a/swh/storage/tests/test_objstorage.py b/swh/storage/tests/test_objstorage.py
--- a/swh/storage/tests/test_objstorage.py
+++ b/swh/storage/tests/test_objstorage.py
@@ -132,6 +132,12 @@
self.content)
@istest
+ def get_random_contents(self):
+ self.storage.add_bytes(self.content, obj_id=self.obj_id)
+ for id in self.storage.get_random_contents(1):
+ self.assertIn(hashutil.hex_to_hash(id), [self.obj_id])
+
+ @istest
def get_file_path(self):
self.storage.add_bytes(self.content, obj_id=self.obj_id)
path = self.storage._get_file_path(self.obj_id)
diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py
--- a/swh/storage/tests/test_objstorage_api.py
+++ b/swh/storage/tests/test_objstorage_api.py
@@ -53,6 +53,17 @@
self.objstorage.content_get(content_hash['sha1'])
@istest
+ def content_get_random(self):
+ ids = []
+ for i in range(100):
+ content = bytes('content_get_present', 'utf8')
+ id = self.objstorage.content_add(content)
+ ids.append(id)
+ for id in self.objstorage.content_get_random(50):
+ id = hashutil.hex_to_hash(id)
+ self.assertIn(id, ids)
+
+ @istest
def content_check_invalid(self):
content = bytes('content_check_invalid', 'utf8')
id = self.objstorage.content_add(content)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 4:50 PM (5 h, 38 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227255
Attached To
D31: Create a content integrity checker
Event Timeline
Log In to Comment