diff --git a/bin/swh-objstorage-add-dir b/bin/swh-objstorage-add-dir --- a/bin/swh-objstorage-add-dir +++ b/bin/swh-objstorage-add-dir @@ -29,7 +29,7 @@ path = os.path.join(root, name) with open(path, 'rb') as f: try: - objs.add_file(f, length=os.path.getsize(path)) + objs.add(f.read()) except objstorage.DuplicateObjError: dups += 1 diff --git a/swh/storage/checker/checker.py b/swh/storage/checker/checker.py --- a/swh/storage/checker/checker.py +++ b/swh/storage/checker/checker.py @@ -8,7 +8,7 @@ from swh.core import config, hashutil from .. import get_storage -from ..objstorage import ObjStorage +from ..objstorage import PathSlicingObjStorage from ..exc import ObjNotFoundError, Error DEFAULT_CONFIG = { @@ -47,7 +47,7 @@ get a content. """ self.config = config - self.objstorage = ObjStorage(root, depth) + self.objstorage = PathSlicingObjStorage(root, depth, slicing=2) self.backup_storages = [get_storage('remote_storage', [backup_url]) for backup_url in backup_urls] @@ -124,7 +124,7 @@ # When a content is retrieved, remove it from the set # of needed contents. contents_to_get.discard(hash) - self.objstorage.restore_bytes(data) + self.objstorage.restore(data) # Contents still in contents_to_get couldn't be retrieved. if contents_to_get: diff --git a/swh/storage/objstorage/__init__.py b/swh/storage/objstorage/__init__.py --- a/swh/storage/objstorage/__init__.py +++ b/swh/storage/objstorage/__init__.py @@ -1 +1,4 @@ -from .objstorage import ObjStorage, DIR_MODE, FILE_MODE # NOQA +from .objstorage import ObjStorage +from .objstorage_pathslicing import PathSlicingObjStorage + +__all__ = [ObjStorage, PathSlicingObjStorage] diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py --- a/swh/storage/objstorage/api/server.py +++ b/swh/storage/objstorage/api/server.py @@ -9,7 +9,7 @@ from flask import Flask, g, request from swh.core import config -from swh.storage.objstorage import ObjStorage +from swh.storage.objstorage import PathSlicingObjStorage from swh.storage.api.common import (BytesRequest, decode_request, error_handler, encode_data_server as encode_data) @@ -30,8 +30,9 @@ @app.before_request def before_request(): - g.objstorage = ObjStorage(app.config['storage_base'], - app.config['storage_depth']) + g.objstorage = PathSlicingObjStorage(app.config['storage_base'], + app.config['storage_depth'], + slicing=2) @app.route('/') @@ -46,18 +47,18 @@ @app.route('/content/add', methods=['POST']) def add_bytes(): - return encode_data(g.objstorage.add_bytes(**decode_request(request))) + return encode_data(g.objstorage.add(**decode_request(request))) @app.route('/content/get', methods=['POST']) def get_bytes(): - return encode_data(g.objstorage.get_bytes(**decode_request(request))) + return encode_data(g.objstorage.get(**decode_request(request))) @app.route('/content/get/random', methods=['POST']) def get_random_contents(): return encode_data( - g.objstorage.get_random_contents(**decode_request(request)) + g.objstorage.get_random(**decode_request(request)) ) diff --git a/swh/storage/objstorage/objstorage.py b/swh/storage/objstorage/objstorage.py --- a/swh/storage/objstorage/objstorage.py +++ b/swh/storage/objstorage/objstorage.py @@ -1,307 +1,102 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import gzip -import os -import shutil -import tempfile -import random -from contextlib import contextmanager +class ObjStorage(): + """ High-level API to manipulate the Software Heritage object storage. -from ..exc import ObjNotFoundError, Error -from swh.core import hashutil + Conceptually, the object storage offers 5 methods: - -ID_HASH_ALGO = 'sha1' -# ID_HASH_ALGO = 'sha1_git' - -GZIP_BUFSIZ = 1048576 - -DIR_MODE = 0o755 -FILE_MODE = 0o644 - - -def _obj_dir(hex_obj_id, root_dir, depth): - """compute the storage directory of an object - - Args: - hex_obj_id: object id as hexlified string - root_dir: object storage root directory - depth: slicing depth of object IDs in the storage - - see also: `_obj_path` - - """ - if len(hex_obj_id) < depth * 2: - raise ValueError('object id "%s" is too short for slicing at depth %d' - % (hex_obj_id, depth)) - - # compute [depth] substrings of [obj_id], each of length 2, starting from - # the beginning - id_steps = [hex_obj_id[i * 2:i * 2 + 2] for i in range(0, depth)] - steps = [root_dir] + id_steps - - return os.path.join(*steps) - - -def _obj_path(hex_obj_id, root_dir, depth): - """similar to `obj_dir`, but also include the actual object file name in the - returned path - - """ - return os.path.join(_obj_dir(hex_obj_id, root_dir, depth), hex_obj_id) - - -@contextmanager -def _write_obj_file(hex_obj_id, root_dir, depth): - """context manager for writing object files to the object storage - - During writing data are written to a temporary file, which is atomically - renamed to the right file name after closing. This context manager also - takes care of (gzip) compressing the data on the fly. - - Yields: - a file-like object open for writing bytes - - Sample usage: - - with _write_obj_file(hex_obj_id, root_dir, depth) as f: - f.write(obj_data) - - """ - dir = _obj_dir(hex_obj_id, root_dir, depth) - if not os.path.isdir(dir): - os.makedirs(dir, DIR_MODE, exist_ok=True) - - path = os.path.join(dir, hex_obj_id) - (tmp, tmp_path) = tempfile.mkstemp(suffix='.tmp', prefix='hex_obj_id.', - dir=dir) - tmp_f = os.fdopen(tmp, 'wb') - with gzip.GzipFile(filename=tmp_path, fileobj=tmp_f) as f: - yield f - tmp_f.close() - os.chmod(tmp_path, FILE_MODE) - os.rename(tmp_path, path) - - -class ObjStorage: - """high-level API to manipulate the Software Heritage object storage - - Conceptually, the object storage offers 4 methods: - - - add() add a new object, returning an object id - __contains__() check if an object is present, by object id + - add() add a new object, returning an object id + - restore() same as add() but erase an already existed content - get() retrieve the content of an object, by object id - check() check the integrity of an object, by object id - Variants of the above methods are implemented by this class, depending on - how the content of an object is specified (bytes, file-like object, etc.). - - On disk, an object storage is a directory tree containing files named after - their object IDs. An object ID is a checksum of its content, depending on - the value of the ID_HASH_ALGO constant (see hashutil for its meaning). - - To avoid directories that contain too many files, the object storage has a - given depth (default: 3). Each depth level consumes two characters of the - object id. So for instance a file with (git) SHA1 of - 34973274ccef6ab4dfaaf86599792fa9c3fe4689 will be stored in an object - storage configured at depth 3 at - 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689. + And some management methods: - The actual files in the storage are stored in gzipped compressed format. - - Each file can hence be self-verified (on the shell) with something like: - - actual_id=34973274ccef6ab4dfaaf86599792fa9c3fe4689 - expected_id=$(zcat $filename | sha1sum | cut -f 1 -d' ') - if [ $actual_id != $expected_id ] ; then - echo "AYEEE, invalid object $actual_id /o\" - fi + - get_random() get random object id of existing contents (used for the + content integrity checker). + Each implementation of this interface can have a different behavior and + its own way to store the contents. """ - def __init__(self, root, depth=3): - """create a proxy object to the object storage - - Args: - root: object storage root directory - depth: slicing depth of object IDs in the storage - - """ - if not os.path.isdir(root): - raise ValueError('obj storage root "%s" is not a directory' - % root) - - self._root_dir = root - self._depth = depth - - self._temp_dir = os.path.join(root, 'tmp') - if not os.path.isdir(self._temp_dir): - os.makedirs(self._temp_dir, DIR_MODE, exist_ok=True) - - def __obj_dir(self, hex_obj_id): - """_obj_dir wrapper using this storage configuration""" - return _obj_dir(hex_obj_id, self._root_dir, self._depth) - - def __obj_path(self, hex_obj_id): - """_obj_path wrapper using this storage configuration""" - return _obj_path(hex_obj_id, self._root_dir, self._depth) + def __contains__(self, *args, **kwargs): + raise NotImplementedError( + "Implementations of ObjStorage must have a '__contains__' method" + ) - def __contains__(self, obj_id): - """check whether a given object id is present in the storage or not - - Return: - True iff the object id is present in the storage - - """ - hex_obj_id = hashutil.hash_to_hex(obj_id) - - return os.path.exists(_obj_path(hex_obj_id, self._root_dir, - self._depth)) - - def add_bytes(self, bytes, obj_id=None, check_presence=True): - """add a new object to the object storage + def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): + """ Add a new object to the object storage. Args: - bytes: content of the object to be added to the storage - obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When - given, obj_id will be trusted to match bytes. If missing, + content: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, obj_id will be computed on the fly. check_presence: indicate if the presence of the content should be verified before adding the file. - """ - if obj_id is None: - # missing checksum, compute it in memory and write to file - h = hashutil._new_hash(ID_HASH_ALGO, len(bytes)) - h.update(bytes) - obj_id = h.digest() - - if check_presence and obj_id in self: - return obj_id - - hex_obj_id = hashutil.hash_to_hex(obj_id) - - # object is either absent, or present but overwrite is requested - with _write_obj_file(hex_obj_id, - root_dir=self._root_dir, - depth=self._depth) as f: - f.write(bytes) - return obj_id + Returns: + the id of the object into the storage. + """ + raise NotImplementedError( + "Implementations of ObjStorage must have a 'add' method" + ) - def restore_bytes(self, bytes, obj_id=None): + def restore(self, content, obj_id, *args, **kwargs): """ Restore a content that have been corrupted. This function is identical to add_bytes but does not check if the object id is already in the file system. Args: - bytes: content of the object to be added to the storage + content: content of the object to be added to the storage obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. """ - return self.add_bytes(bytes, obj_id, check_presence=False) - - def add_file(self, f, length, obj_id=None): - """similar to `add_bytes`, but add the content of file-like object f to the - object storage - - add_file will read the file content only once, and avoid storing all of - it in memory - - """ - if obj_id is None: - # unknkown object id: work on temp file, compute checksum as we go, - # mv temp file into place - (tmp, tmp_path) = tempfile.mkstemp(dir=self._temp_dir) - try: - t = os.fdopen(tmp, 'wb') - tz = gzip.GzipFile(fileobj=t) - sums = hashutil._hash_file_obj(f, length, - algorithms=[ID_HASH_ALGO], - chunk_cb=lambda b: tz.write(b)) - tz.close() - t.close() - - obj_id = sums[ID_HASH_ALGO] - if obj_id in self: - return obj_id - - hex_obj_id = hashutil.hash_to_hex(obj_id) - - dir = self.__obj_dir(hex_obj_id) - if not os.path.isdir(dir): - os.makedirs(dir, DIR_MODE, exist_ok=True) - path = os.path.join(dir, hex_obj_id) - - os.chmod(tmp_path, FILE_MODE) - os.rename(tmp_path, path) - finally: - if os.path.exists(tmp_path): - os.unlink(tmp_path) - else: - # known object id: write to .new file, rename - if obj_id in self: - return obj_id + raise NotImplemented( + "Implementations of ObjStorage must have a 'restore' method" + ) - hex_obj_id = hashutil.hash_to_hex(obj_id) - - with _write_obj_file(hex_obj_id, - root_dir=self._root_dir, - depth=self._depth) as obj: - shutil.copyfileobj(f, obj) - - return obj_id - - @contextmanager - def get_file_obj(self, obj_id): - """context manager to read the content of an object + def get(self, obj_id, *args, **kwargs): + """ Retrieve the content of a given object. Args: - obj_id: object id + obj_id: object id. - Yields: - a file-like object open for reading (bytes) + Returns: + the content of the requested object as bytes. Raises: - ObjNotFoundError: if the requested object is missing - - Sample usage: - - with objstorage.get_file_obj(obj_id) as f: - do_something(f.read()) - + ObjNotFoundError: if the requested object is missing. """ - if obj_id not in self: - raise ObjNotFoundError(obj_id) - - hex_obj_id = hashutil.hash_to_hex(obj_id) + raise NotImplementedError( + "Implementations of ObjStorage must have a 'get' method" + ) - path = self.__obj_path(hex_obj_id) - with gzip.GzipFile(path, 'rb') as f: - yield f + def check(self, obj_id, *args, **kwargs): + """ Perform an integrity check for a given object. - def get_bytes(self, obj_id): - """retrieve the content of a given object + Verify that the file object is in place and that the gziped content + matches the object id. Args: - obj_id: object id - - Returns: - the content of the requested objects as bytes + obj_id: object id. Raises: - ObjNotFoundError: if the requested object is missing - + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. """ - with self.get_file_obj(obj_id) as f: - return f.read() + raise NotImplementedError( + "Implementations of ObjStorage must have a 'check' method" + ) - def get_random_contents(self, batch_size): + def get_random(self, batch_size, *args, **kwargs): """ Get random ids of existing contents This method is used in order to get random ids to perform @@ -314,127 +109,7 @@ An iterable of ids of contents that are in the current object storage. """ - def get_random_content(self, batch_size): - """ Get a batch of content inside a single directory. - - Returns: - a tuple (batch size, batch). - """ - dirs = [] - for level in range(self._depth): - path = os.path.join(self._root_dir, *dirs) - dir_list = next(os.walk(path))[1] - if 'tmp' in dir_list: - dir_list.remove('tmp') - dirs.append(random.choice(dir_list)) - - path = os.path.join(self._root_dir, *dirs) - content_list = next(os.walk(path))[2] - length = min(batch_size, len(content_list)) - return length, map(hashutil.hex_to_hash, - random.sample(content_list, length)) - - while batch_size: - length, it = get_random_content(self, batch_size) - batch_size = batch_size - length - yield from it - - def _get_file_path(self, obj_id): - """retrieve the path of a given object in the objects storage - - Note that the path point to a gzip-compressed file, so you need - gzip.open() or equivalent to get the actual object content. - - Args: - obj_id: object id - - Returns: - a file path pointing into the object storage - - Raises: - ObjNotFoundError: if the requested object is missing - - """ - if obj_id not in self: - raise ObjNotFoundError(obj_id) - - hex_obj_id = hashutil.hash_to_hex(obj_id) - - return self.__obj_path(hex_obj_id) - - def check(self, obj_id): - """integrity check for a given object - - verify that the file object is in place, and that the gzipped content - matches the object id - - Args: - obj_id: object id - - Raises: - ObjNotFoundError: if the requested object is missing - Error: if the requested object is corrupt - - """ - if obj_id not in self: - raise ObjNotFoundError(obj_id) - - hex_obj_id = hashutil.hash_to_hex(obj_id) - - try: - with gzip.open(self.__obj_path(hex_obj_id)) as f: - length = None - if ID_HASH_ALGO.endswith('_git'): - # if the hashing algorithm is git-like, we need to know the - # content size to hash on the fly. Do a first pass here to - # compute the size - length = 0 - while True: - chunk = f.read(GZIP_BUFSIZ) - length += len(chunk) - if not chunk: - break - f.rewind() - - checksums = hashutil._hash_file_obj(f, length, - algorithms=[ID_HASH_ALGO]) - actual_obj_id = checksums[ID_HASH_ALGO] - if obj_id != actual_obj_id: - raise Error('corrupt object %s should have id %s' % - (obj_id, actual_obj_id)) - except (OSError, IOError): - # IOError is for compatibility with older python versions - raise Error('corrupt object %s is not a gzip file' % obj_id) - - def __iter__(self): - """iterate over the object identifiers currently available in the storage - - Warning: with the current implementation of the object storage, this - method will walk the filesystem to list objects, meaning that listing - all objects will be very slow for large storages. You almost certainly - don't want to use this method in production. - - Return: - iterator over object IDs - - """ - def obj_iterator(): - # XXX hackish: it does not verify that the depth of found files - # matches the slicing depth of the storage - for root, _dirs, files in os.walk(self._root_dir): - for f in files: - yield bytes.fromhex(f) - - return obj_iterator() - - def __len__(self): - """compute the number of objects available in the storage - - Warning: this currently uses `__iter__`, its warning about bad - performances applies - - Return: - number of objects contained in the storage - - """ - return sum(1 for i in self) + raise NotImplementedError( + "The current implementation of ObjStorage does not support " + "'get_random' operation" + ) diff --git a/swh/storage/objstorage/objstorage_pathslicing.py b/swh/storage/objstorage/objstorage_pathslicing.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/objstorage_pathslicing.py @@ -0,0 +1,350 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import gzip +import tempfile +import random + +from contextlib import contextmanager + +from swh.core import hashutil + +from .objstorage import ObjStorage +from ..exc import ObjNotFoundError, Error + + +ID_HASH_ALGO = 'sha1' + +GZIP_BUFSIZ = 1048576 + +DIR_MODE = 0o755 +FILE_MODE = 0o644 + + +@contextmanager +def _write_obj_file(hex_obj_id, objstorage): + """ Context manager for writing object files to the object storage. + + During writing, data are written to a temporary file, which is atomically + renamed to the right file name after closing. This context manager also + takes care of (gzip) compressing the data on the fly. + + Usage sample: + with _write_obj_file(hex_obj_id, objstorage): + f.write(obj_data) + + Yields: + a file-like object open for writing bytes. + """ + # Get the final paths and create the directory if absent. + dir = objstorage._obj_dir(hex_obj_id) + if not os.path.isdir(dir): + os.makedirs(dir, DIR_MODE, exist_ok=True) + path = os.path.join(dir, hex_obj_id) + + # Create a temporary file. + (tmp, tmp_path) = tempfile.mkstemp(suffix='.tmp', prefix='hex_obj_id.', + dir=dir) + + # Open the file and yield it for writing. + tmp_f = os.fdopen(tmp, 'wb') + with gzip.GzipFile(filename=tmp_path, fileobj=tmp_f) as f: + yield f + + # Then close the temporary file and move it to the right directory. + tmp_f.close() + os.chmod(tmp_path, FILE_MODE) + os.rename(tmp_path, path) + + +@contextmanager +def _read_obj_file(hex_obj_id, objstorage): + """ Context manager for reading object file in the object storage. + + Usage sample: + with _read_obj_file(hex_obj_id, objstorage) as f: + b = f.read() + + Yields: + a file-like object open for reading bytes. + """ + path = objstorage._obj_path(hex_obj_id) + with gzip.GzipFile(path, 'rb') as f: + yield f + + +class PathSlicingObjStorage(ObjStorage): + """ Implementation of the ObjStorage API based on the hash of the content. + + On disk, an object storage is a directory tree containing files named after + their object IDs. An object ID is a checksum of its content, depending on + the value of the ID_HASH_ALGO constant (see hashutil for its meaning). + + To avoid directories that contain too many files, the object storage has a + given depth. Each depth level consumes a given amount of characters of + the object id. + + So for instance a file with SHA1 34973274ccef6ab4dfaaf86599792fa9c3fe4689 + will be stored in the given object storages : + + - depth=3, slicing=2 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689 + - depth=1, slicing=5 : 34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689 + + The files in the storage are stored in gzipped compressed format. + + Attributes: + root (string): path to the root directory of the storage on the disk. + depth (int): number of subdirectories created to store a file. + slicing (int): number of hash character consumed for each + subdirectories. + """ + + def __init__(self, root, depth, slicing): + """ Create an object to access a hash-slicing based object storage. + + Args: + root (string): path to the root directory of the storage on + the disk. + depth (int): number of subdirectories created to store a file. + slicing (int): number of hash character consumed for each + subdirectories. + """ + if not os.path.isdir(root): + raise ValueError( + 'PathSlicingObjStorage root "%s" is not a directory' % root + ) + + self.root = root + self.depth = depth + self.slicing = slicing + + def __contains__(self, obj_id): + """ Check whether the given object is present in the storage or not. + + Returns: + True iff the object is present in the storage. + """ + hex_obj_id = hashutil.hash_to_hex(obj_id) + return os.path.exists(self._obj_path(hex_obj_id)) + + def __iter__(self): + """iterate over the object identifiers currently available in the storage + + Warning: with the current implementation of the object storage, this + method will walk the filesystem to list objects, meaning that listing + all objects will be very slow for large storages. You almost certainly + don't want to use this method in production. + + Return: + iterator over object IDs + """ + def obj_iterator(): + # XXX hackish: it does not verify that the depth of found files + # matches the slicing depth of the storage + for root, _dirs, files in os.walk(self.root): + for f in files: + yield bytes.fromhex(f) + + return obj_iterator() + + def __len__(self): + """compute the number of objects available in the storage + + Warning: this currently uses `__iter__`, its warning about bad + performances applies + + Return: + number of objects contained in the storage + + """ + return sum(1 for i in self) + + def _obj_dir(self, hex_obj_id): + """ Compute the storage directory of an object. + + See also: PathSlicingObjStorage::_obj_path + + Args: + hex_obj_id: object id as hexlified string. + + Returns: + Path to the directory that contains the required object. + """ + if len(hex_obj_id) < self.depth * self.slicing: + raise ValueError( + 'Object id "%s" is to short for %d-slicing at depth %d' + % (hex_obj_id, self.slicing, self.depth) + ) + + # Compute [depth] substrings of [hex_obj_id], each of length [slicing], + # starting from the beginning. + id_steps = [hex_obj_id[i * self.slicing: + i * self.slicing + self.slicing] + for i in range(self.depth)] + steps = [self.root] + id_steps + + return os.path.join(*steps) + + def _obj_path(self, hex_obj_id): + """ Compute the full path to an object into the current storage. + + See also: PathSlicingObjStorage::_obj_dir + + Args: + hex_obj_id: object id as hexlified string. + + Returns: + Path to the actual object corresponding to the given id. + """ + return os.path.join(self._obj_dir(hex_obj_id), hex_obj_id) + + def add(self, bytes, obj_id=None, check_presence=True): + """ Add a new object to the object storage. + + Args: + bytes: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, + obj_id will be computed on the fly. + check_presence: indicate if the presence of the content should be + verified before adding the file. + + Returns: + the id of the object into the storage. + """ + if obj_id is None: + # Checksum is missing, compute it on the fly. + h = hashutil._new_hash(ID_HASH_ALGO, len(bytes)) + h.update(bytes) + obj_id = h.digest() + + if check_presence and obj_id in self: + # If the object is already present, return immediatly. + return obj_id + + hex_obj_id = hashutil.hash_to_hex(obj_id) + with _write_obj_file(hex_obj_id, self) as f: + f.write(bytes) + + return obj_id + + def restore(self, bytes, obj_id=None): + """ Restore a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + Args: + bytes: content of the object to be added to the storage + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + """ + return self.add(bytes, obj_id, check_presence=False) + + def get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: object id. + + Returns: + the content of the requested object as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing. + """ + if obj_id not in self: + raise ObjNotFoundError(obj_id) + + # Open the file and return its content as bytes + hex_obj_id = hashutil.hash_to_hex(obj_id) + with _read_obj_file(hex_obj_id, self) as f: + return f.read() + + def check(self, obj_id): + """ Perform an integrity check for a given object. + + Verify that the file object is in place and that the gziped content + matches the object id. + + Args: + obj_id: object id. + + Raises: + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. + """ + if obj_id not in self: + raise ObjNotFoundError(obj_id) + + hex_obj_id = hashutil.hash_to_hex(obj_id) + + try: + with gzip.open(self._obj_path(hex_obj_id)) as f: + length = None + if ID_HASH_ALGO.endswith('_git'): + # if the hashing algorithm is git-like, we need to know the + # content size to hash on the fly. Do a first pass here to + # compute the size + length = 0 + while True: + chunk = f.read(GZIP_BUFSIZ) + length += len(chunk) + if not chunk: + break + f.rewind() + + checksums = hashutil._hash_file_obj(f, length, + algorithms=[ID_HASH_ALGO]) + actual_obj_id = checksums[ID_HASH_ALGO] + if obj_id != actual_obj_id: + raise Error( + 'Corrupt object %s should have id %s' + % (hashutil.hash_to_hex(obj_id), + hashutil.hash_to_hex(actual_obj_id)) + ) + except (OSError, IOError): + # IOError is for compatibility with older python versions + raise Error('Corrupt object %s is not a gzip file' % obj_id) + + def get_random(self, batch_size): + """ Get random ids of existing contents + + This method is used in order to get random ids to perform + content integrity verifications on random contents. + + Attributes: + batch_size (int): Number of ids that will be given + + Yields: + An iterable of ids of contents that are in the current object + storage. + """ + def get_random_content(self, batch_size): + """ Get a batch of content inside a single directory. + + Returns: + a tuple (batch size, batch). + """ + dirs = [] + for level in range(self.depth): + path = os.path.join(self.root, *dirs) + dir_list = next(os.walk(path))[1] + if 'tmp' in dir_list: + dir_list.remove('tmp') + dirs.append(random.choice(dir_list)) + + path = os.path.join(self.root, *dirs) + content_list = next(os.walk(path))[2] + length = min(batch_size, len(content_list)) + return length, map(hashutil.hex_to_hash, + random.sample(content_list, length)) + + while batch_size: + length, it = get_random_content(self, batch_size) + batch_size = batch_size - length + yield from it diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -13,7 +13,7 @@ from . import converters from .db import Db -from .objstorage import ObjStorage +from .objstorage import PathSlicingObjStorage from .exc import ObjNotFoundError, StorageDBError from swh.core.hashutil import ALGORITHMS @@ -68,7 +68,7 @@ except psycopg2.OperationalError as e: raise StorageDBError(e) - self.objstorage = ObjStorage(obj_root) + self.objstorage = PathSlicingObjStorage(obj_root, depth=3, slicing=2) def content_add(self, content): """Add content blobs to the storage @@ -115,8 +115,8 @@ db.mktemp('content', cur) def add_to_objstorage(cont): - self.objstorage.add_bytes(cont['data'], - obj_id=cont['sha1']) + self.objstorage.add(cont['data'], + obj_id=cont['sha1']) content_filtered = (cont for cont in content_with_data if cont['sha1'] in missing_content) @@ -162,7 +162,7 @@ for obj_id in content: try: - data = self.objstorage.get_bytes(obj_id) + data = self.objstorage.get(obj_id) except ObjNotFoundError: yield None continue diff --git a/swh/storage/tests/objstorage_testing.py b/swh/storage/tests/objstorage_testing.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/objstorage_testing.py @@ -0,0 +1,68 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from nose.tools import istest + +from swh.core import hashutil +from swh.storage import exc + + +class ObjStorageTestFixture(): + + def setUp(self): + super().setUp() + + def hash_content(self, content): + obj_id = hashutil.hashdata(content)['sha1'] + return content, obj_id + + def assertContentMatch(self, obj_id, expected_content): + content = self.storage.get(obj_id) + self.assertEqual(content, expected_content) + + @istest + def add_get_w_id(self): + content, obj_id = self.hash_content(b'add_get_w_id') + r = self.storage.add(content, obj_id=obj_id) + self.assertEqual(obj_id, r) + self.assertContentMatch(obj_id, content) + + @istest + def add_get_wo_id(self): + content, obj_id = self.hash_content(b'add_get_wo_id') + r = self.storage.add(content) + self.assertEqual(obj_id, r) + self.assertContentMatch(obj_id, content) + + @istest + def restore_content(self): + content1, obj_id = self.hash_content(b'restore_content') + content2 = b'unexpected content' + ra = self.storage.add(content2, obj_id) + rr = self.storage.restore(content1) + self.assertEqual(ra, obj_id) + self.assertEqual(rr, obj_id) + self.assertContentMatch(obj_id, content1) + + @istest + def get_missing(self): + content, obj_id = self.hash_content(b'get_missing') + with self.assertRaises(exc.Error): + self.storage.get(obj_id) + + @istest + def check_missing(self): + content, obj_id = self.hash_content(b'check_missing') + with self.assertRaises(exc.Error): + self.storage.check(obj_id) + + @istest + def check_present(self): + content, obj_id = self.hash_content(b'check_missing') + self.storage.add(content) + try: + self.storage.check(obj_id) + except: + self.fail('Integrity check failed') diff --git a/swh/storage/tests/test_checker.py b/swh/storage/tests/test_checker.py --- a/swh/storage/tests/test_checker.py +++ b/swh/storage/tests/test_checker.py @@ -11,7 +11,6 @@ from nose.plugins.attrib import attr from swh.core import hashutil -from swh.storage.objstorage.objstorage import _obj_path from swh.storage.checker.checker import ContentChecker @@ -53,7 +52,7 @@ """ Make the given content invalid. """ hex_id = hashutil.hash_to_hex(id) - file_path = _obj_path(hex_id, self.checker.objstorage._root_dir, 3) + file_path = self.checker.objstorage._obj_path(hex_id) with gzip.open(file_path, 'wb') as f: f.write(b'Unexpected content') @@ -61,14 +60,14 @@ def check_valid_content(self): # Check that a valid content is valid. content = b'check_valid_content' - id = self.checker.objstorage.add_bytes(content) + id = self.checker.objstorage.add(content) self.assertTrue(self.checker.check_content(id)) @istest def check_invalid_content(self): # Check that an invalid content is noticed. content = b'check_invalid_content' - id = self.checker.objstorage.add_bytes(content) + id = self.checker.objstorage.add(content) self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) @@ -76,7 +75,7 @@ def repair_content_present_first(self): # Try to repair a content that is in the backup storage. content = b'repair_content_present_first' - id = self.checker.objstorage.add_bytes(content) + id = self.checker.objstorage.add(content) # Add a content to the mock self.checker.backup_storages[0].content_add(id, content) # Corrupt and repair it. @@ -89,7 +88,7 @@ def repair_content_present_second(self): # Try to repair a content that is not in the first backup storage. content = b'repair_content_present_second' - id = self.checker.objstorage.add_bytes(content) + id = self.checker.objstorage.add(content) # Add a content to the mock self.checker.backup_storages[1].content_add(id, content) # Corrupt and repair it. @@ -103,8 +102,8 @@ # Try to repair two contents that are in separate backup storages. content1 = b'repair_content_present_distributed_2' content2 = b'repair_content_present_distributed_1' - id1 = self.checker.objstorage.add_bytes(content1) - id2 = self.checker.objstorage.add_bytes(content2) + id1 = self.checker.objstorage.add(content1) + id2 = self.checker.objstorage.add(content2) # Add content to the mock. self.checker.backup_storages[0].content_add(id1, content1) self.checker.backup_storages[0].content_add(id2, content2) @@ -121,7 +120,7 @@ def repair_content_missing(self): # Try to repair a content that is NOT in the backup storage. content = b'repair_content_present' - id = self.checker.objstorage.add_bytes(content) + id = self.checker.objstorage.add(content) # Corrupt and repair it. self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) diff --git a/swh/storage/tests/test_objstorage.py b/swh/storage/tests/test_objstorage.py deleted file mode 100644 --- a/swh/storage/tests/test_objstorage.py +++ /dev/null @@ -1,163 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import gzip -import os -import shutil -import stat -import tempfile -import unittest - -from io import BytesIO -from nose.tools import istest - -from swh.core import hashutil -from swh.storage import objstorage -from swh.storage import exc - - -class TestObjStorage(unittest.TestCase): - - def setUp(self): - self.content = b'42\n' - - # sha1 - self.hex_obj_id = '34973274ccef6ab4dfaaf86599792fa9c3fe4689' - - # sha1_git - # self.hex_obj_id = 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd' - - self.obj_id = hashutil.hex_to_hash(self.hex_obj_id) - self.obj_steps = [self.hex_obj_id[0:2], self.hex_obj_id[2:4], - self.hex_obj_id[4:6]] - self.obj_relpath = os.path.join(*(self.obj_steps + [self.hex_obj_id])) - - self.tmpdir = tempfile.mkdtemp() - self.obj_dirs = [ - os.path.join(self.tmpdir, *self.obj_steps[:i]) - for i in range(1, len(self.obj_steps)) - ] - self.obj_path = os.path.join(self.tmpdir, self.obj_relpath) - - self.storage = objstorage.ObjStorage(root=self.tmpdir, depth=3) - - self.missing_obj_id = hashutil.hex_to_hash( - 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15') - - def tearDown(self): - shutil.rmtree(self.tmpdir) - - def assertGzipContains(self, gzip_path, content): # noqa - self.assertEqual(gzip.open(gzip_path, 'rb').read(), content) - - @istest - def add_bytes_w_id(self): - r = self.storage.add_bytes(self.content, obj_id=self.obj_id) - self.assertEqual(r, self.obj_id) - self.assertGzipContains(self.obj_path, self.content) - - @istest - def add_bytes_wo_id(self): - r = self.storage.add_bytes(self.content) - self.assertEqual(r, self.obj_id) - self.assertGzipContains(self.obj_path, self.content) - - @istest - def add_file_w_id(self): - r = self.storage.add_file(BytesIO(self.content), - len(self.content), - obj_id=self.obj_id) - self.assertEqual(r, self.obj_id) - self.assertGzipContains(self.obj_path, self.content) - - @istest - def add_file_wo_id(self): - r = self.storage.add_file(BytesIO(self.content), len(self.content)) - self.assertEqual(r, self.obj_id) - self.assertGzipContains(self.obj_path, self.content) - - @istest - def contains(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - self.assertIn(self.obj_id, self.storage) - self.assertNotIn(self.missing_obj_id, self.storage) - - @istest - def check_ok(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - try: - self.storage.check(self.obj_id) - except: - self.fail('integrity check failed') - - @istest - def check_missing(self): - with self.assertRaises(exc.Error): - self.storage.check(self.obj_id) - - @istest - def check_file_and_dirs_mode(self): - old_umask = os.umask(0) - self.storage.add_bytes(self.content, obj_id=self.obj_id) - for dir in self.obj_dirs: - stat_dir = os.stat(dir) - self.assertEquals(stat.S_IMODE(stat_dir.st_mode), - objstorage.DIR_MODE) - stat_res = os.stat(self.obj_path) - self.assertEquals(stat.S_IMODE(stat_res.st_mode), objstorage.FILE_MODE) - os.umask(old_umask) - - @istest - def check_not_gzip(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - with open(self.obj_path, 'ab') as f: # add trailing garbage - f.write(b'garbage') - with self.assertRaises(exc.Error): - self.storage.check(self.obj_id) - - @istest - def check_id_mismatch(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - with gzip.open(self.obj_path, 'wb') as f: # replace gzipped content - f.write(b'unexpected content') - with self.assertRaises(exc.Error): - self.storage.check(self.obj_id) - - @istest - def get_bytes(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - self.assertEqual(self.storage.get_bytes(self.obj_id), - self.content) - - @istest - def get_random_contents(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - for id in self.storage.get_random_contents(1): - self.assertIn(id, [self.obj_id]) - - @istest - def get_file_path(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - path = self.storage._get_file_path(self.obj_id) - self.assertEqual(os.path.basename(path), self.hex_obj_id) - self.assertEqual(gzip.open(path, 'rb').read(), self.content) - - @istest - def get_missing(self): - with self.assertRaises(exc.Error): - with self.storage.get_file_obj(self.missing_obj_id) as f: - f.read() - - @istest - def iter(self): - self.assertEqual(list(iter(self.storage)), []) - self.storage.add_bytes(self.content, obj_id=self.obj_id) - self.assertEqual(list(iter(self.storage)), [self.obj_id]) - - @istest - def len(self): - self.assertEqual(len(self.storage), 0) - self.storage.add_bytes(self.content, obj_id=self.obj_id) - self.assertEqual(len(self.storage), 1) diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py --- a/swh/storage/tests/test_objstorage_api.py +++ b/swh/storage/tests/test_objstorage_api.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os import tempfile import unittest @@ -12,7 +13,6 @@ from swh.core import hashutil from swh.storage.exc import ObjNotFoundError, Error from swh.storage.tests.server_testing import ServerTestFixture -from swh.storage.objstorage.objstorage import _obj_path from swh.storage.objstorage.api.client import RemoteObjStorage from swh.storage.objstorage.api.server import app @@ -66,9 +66,13 @@ def content_check_invalid(self): content = bytes('content_check_invalid', 'utf8') id = self.objstorage.content_add(content) - path = _obj_path(hashutil.hash_to_hex(id), - self.app.config['storage_base'], - self.app.config['storage_depth']) + hex_obj_id = hashutil.hash_to_hex(id) + path = os.path.join( + self.config['storage_base'], + *[hex_obj_id[i*2:i*2+2] + for i in range(int(self.config['storage_depth']))], + hex_obj_id + ) content = list(content) with open(path, 'bw') as f: content[0] = (content[0] + 1) % 128 diff --git a/swh/storage/tests/test_objstorage_pathslicing.py b/swh/storage/tests/test_objstorage_pathslicing.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_objstorage_pathslicing.py @@ -0,0 +1,78 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest + +from nose.tools import istest + +from swh.core import hashutil +from swh.storage import exc +from swh.storage.objstorage import PathSlicingObjStorage + +from objstorage_testing import ObjStorageTestFixture + + +class TestpathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase): + + def setUp(self): + super().setUp() + self.depth = 3 + self.slicing = 2 + self.tmpdir = tempfile.mkdtemp() + self.storage = PathSlicingObjStorage(self.tmpdir, self.depth, + self.slicing) + + def content_path(self, obj_id): + hex_obj_id = hashutil.hash_to_hex(obj_id) + return self.storage._obj_path(hex_obj_id) + + @istest + def contains(self): + content_p, obj_id_p = self.hash_content(b'contains_present') + content_m, obj_id_m = self.hash_content(b'contains_missing') + self.storage.add(content_p, obj_id=obj_id_p) + self.assertIn(obj_id_p, self.storage) + self.assertNotIn(obj_id_m, self.storage) + + @istest + def iter(self): + content, obj_id = self.hash_content(b'iter') + self.assertEqual(list(iter(self.storage)), []) + self.storage.add(content, obj_id=obj_id) + self.assertEqual(list(iter(self.storage)), [obj_id]) + + @istest + def len(self): + content, obj_id = self.hash_content(b'check_not_gzip') + self.assertEqual(len(self.storage), 0) + self.storage.add(content, obj_id=obj_id) + self.assertEqual(len(self.storage), 1) + + @istest + def check_not_gzip(self): + content, obj_id = self.hash_content(b'check_not_gzip') + self.storage.add(content, obj_id=obj_id) + with open(self.content_path(obj_id), 'ab') as f: # Add garbage. + f.write(b'garbage') + with self.assertRaises(exc.Error): + self.storage.check(obj_id) + + @istest + def check_id_mismatch(self): + content, obj_id = self.hash_content(b'check_id_mismatch') + self.storage.add(content, obj_id=obj_id) + with open(self.content_path(obj_id), 'wb') as f: + f.write(b'unexpected content') + with self.assertRaises(exc.Error): + self.storage.check(obj_id) + + @istest + def get_random_contents(self): + content, obj_id = self.hash_content(b'get_random_content') + self.storage.add(content, obj_id=obj_id) + random_contents = list(self.storage.get_random(1)) + self.assertEqual(1, len(random_contents)) + self.assertIn(obj_id, random_contents)