diff --git a/PKG-INFO b/PKG-INFO index 5ab929225..9b08d190e 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.40 +Version: 0.0.41 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 5ab929225..9b08d190e 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.40 +Version: 0.0.41 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/storage/checker/checker.py b/swh/storage/checker/checker.py index 110d72ea5..69afe4f62 100644 --- a/swh/storage/checker/checker.py +++ b/swh/storage/checker/checker.py @@ -1,171 +1,171 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from swh.core import config, hashutil from .. import get_storage from ..objstorage import PathSlicingObjStorage from ..exc import ObjNotFoundError, Error DEFAULT_CONFIG = { 'storage_path': ('str', '/srv/softwareheritage/objects'), 'storage_depth': ('int', 3), 'backup_url': ('str', 'http://uffizi:5002/'), 'batch_size': ('int', 1000), } class ContentChecker(): """ Content integrity checker that will check local objstorage content. The checker will check the data of an object storage in order to verify that no file have been corrupted. Attributes: config: dictionary that contains this checker configuration objstorage (ObjStorage): Local object storage that will be checked. master_storage (RemoteStorage): A distant storage that will be used to restore corrupted content. """ - def __init__(self, config, root, depth, backup_urls): + def __init__(self, config, root, slicing, backup_urls): """ Create a checker that ensure the objstorage have no corrupted file. Args: config (dict): Dictionary that contains the following keys : batch_size: Number of content that should be tested each time the content checker runs. root (string): Path to the objstorage directory depth (int): Depth of the object storage. backup_urls: List of url that can be contacted in order to get a content. """ self.config = config - self.objstorage = PathSlicingObjStorage(root, depth, slicing=2) + self.objstorage = PathSlicingObjStorage(root, slicing) self.backup_storages = [get_storage('remote_storage', [backup_url]) for backup_url in backup_urls] def run(self): """ Start the check routine """ corrupted_contents = [] batch_size = self.config['batch_size'] for content_id in self.get_content_to_check(batch_size): if not self.check_content(content_id): corrupted_contents.append(content_id) logging.error('The content', content_id, 'have been corrupted') self.repair_contents(corrupted_contents) def run_as_daemon(self): """ Start the check routine and perform it forever. Use this method to run the checker when it's done as a daemon that will iterate over the content forever in background. """ while True: try: self.run() except Exception as e: logging.error('An error occured while verifing the content: %s' % e) def get_content_to_check(self, batch_size): """ Get the content that should be verified. Returns: An iterable of the content's id that need to be checked. """ contents = self.objstorage.get_random_contents(batch_size) yield from contents def check_content(self, content_id): """ Check the validity of the given content. Returns: True if the content was valid, false if it was corrupted. """ try: self.objstorage.check(content_id) except (ObjNotFoundError, Error) as e: logging.warning(e) return False else: return True def repair_contents(self, content_ids): """ Try to restore the given contents. Ask the backup storages for the contents that are corrupted on the local object storage. If the first storage does not contain the missing contents, send a request to the second one with only the content that couldn't be retrieved, and so on until there is no remaining content or servers. If a content couldn't be retrieved on all the servers, then log it as an error. """ contents_to_get = set(content_ids) # Iterates over the backup storages. for backup_storage in self.backup_storages: # Try to get all the contents that still need to be retrieved. contents = backup_storage.content_get(list(contents_to_get)) for content in contents: if content: hash = content['sha1'] data = content['data'] # When a content is retrieved, remove it from the set # of needed contents. contents_to_get.discard(hash) self.objstorage.restore(data) # Contents still in contents_to_get couldn't be retrieved. if contents_to_get: logging.error( "Some corrupted contents could not be retrieved : %s" % [hashutil.hash_to_hex(id) for id in contents_to_get] ) @click.command() @click.argument('config-path', required=1) @click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'][1], help='Path to the storage to verify') @click.option('--depth', default=DEFAULT_CONFIG['storage_depth'][1], type=click.INT, help='Depth of the object storage') @click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'][1], help='Url of a remote storage to retrieve corrupted content') @click.option('--daemon/--nodaemon', default=True, help='Indicates if the checker should run forever ' 'or on a single batch of content') def launch(config_path, storage_path, depth, backup_url, is_daemon): # The configuration have following priority : # command line > file config > default config cl_config = { 'storage_path': storage_path, 'storage_depth': depth, 'backup_url': backup_url } conf = config.read(config_path, DEFAULT_CONFIG) conf.update(cl_config) # Create the checker and run checker = ContentChecker( {'batch_size': conf['batch_size']}, conf['storage_path'], conf['storage_depth'], map(lambda x: x.strip(), conf['backup_url'].split(',')) ) if is_daemon: checker.run_as_daemon() else: checker.run() if __name__ == '__main__': launch() diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py index 968ef4620..6cb8885f1 100644 --- a/swh/storage/objstorage/api/server.py +++ b/swh/storage/objstorage/api/server.py @@ -1,97 +1,96 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from flask import Flask, g, request from swh.core import config from swh.storage.objstorage import PathSlicingObjStorage from swh.storage.api.common import (BytesRequest, decode_request, error_handler, encode_data_server as encode_data) DEFAULT_CONFIG = { 'storage_base': ('str', '/tmp/swh-storage/objects/'), - 'storage_depth': ('int', 3) + 'storage_slicing': ('str', '0:2/2:4/4:6') } app = Flask(__name__) app.request_class = BytesRequest @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.before_request def before_request(): g.objstorage = PathSlicingObjStorage(app.config['storage_base'], - app.config['storage_depth'], - slicing=2) + app.config['storage_slicing']) @app.route('/') def index(): return "SWH Objstorage API server" @app.route('/content') def content(): return str(list(g.storage)) @app.route('/content/add', methods=['POST']) def add_bytes(): return encode_data(g.objstorage.add(**decode_request(request))) @app.route('/content/get', methods=['POST']) def get_bytes(): return encode_data(g.objstorage.get(**decode_request(request))) @app.route('/content/get/random', methods=['POST']) def get_random_contents(): return encode_data( g.objstorage.get_random(**decode_request(request)) ) @app.route('/content/check', methods=['POST']) def check(): return encode_data(g.objstorage.check(**decode_request(request))) def run_from_webserver(environ, start_response): """Run the WSGI app from the webserver, loading the configuration. """ config_path = '/etc/softwareheritage/storage/objstorage.ini' app.config.update(config.read(config_path, DEFAULT_CONFIG)) handler = logging.StreamHandler() app.logger.addHandler(handler) return app(environ, start_response) @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5000, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app.config.update(config.read(config_path, DEFAULT_CONFIG)) app.run(host, port=int(port), debug=bool(debug)) if __name__ == '__main__': launch() diff --git a/swh/storage/objstorage/objstorage.py b/swh/storage/objstorage/objstorage.py index c651b37c1..9e4291766 100644 --- a/swh/storage/objstorage/objstorage.py +++ b/swh/storage/objstorage/objstorage.py @@ -1,115 +1,119 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +ID_HASH_ALGO = 'sha1' +ID_HASH_LENGTH = 40 # Size in bytes of the hash hexadecimal representation. + + class ObjStorage(): """ High-level API to manipulate the Software Heritage object storage. Conceptually, the object storage offers 5 methods: - __contains__() check if an object is present, by object id - add() add a new object, returning an object id - restore() same as add() but erase an already existed content - get() retrieve the content of an object, by object id - check() check the integrity of an object, by object id And some management methods: - get_random() get random object id of existing contents (used for the content integrity checker). Each implementation of this interface can have a different behavior and its own way to store the contents. """ def __contains__(self, *args, **kwargs): raise NotImplementedError( "Implementations of ObjStorage must have a '__contains__' method" ) def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): """ Add a new object to the object storage. Args: content: content of the object to be added to the storage. obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When given, obj_id will be trusted to match the bytes. If missing, obj_id will be computed on the fly. check_presence: indicate if the presence of the content should be verified before adding the file. Returns: the id of the object into the storage. """ raise NotImplementedError( "Implementations of ObjStorage must have a 'add' method" ) - def restore(self, content, obj_id, *args, **kwargs): + def restore(self, content, obj_id=None, *args, **kwargs): """ Restore a content that have been corrupted. This function is identical to add_bytes but does not check if the object id is already in the file system. Args: content: content of the object to be added to the storage obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. """ raise NotImplemented( "Implementations of ObjStorage must have a 'restore' method" ) def get(self, obj_id, *args, **kwargs): """ Retrieve the content of a given object. Args: obj_id: object id. Returns: the content of the requested object as bytes. Raises: ObjNotFoundError: if the requested object is missing. """ raise NotImplementedError( "Implementations of ObjStorage must have a 'get' method" ) def check(self, obj_id, *args, **kwargs): """ Perform an integrity check for a given object. Verify that the file object is in place and that the gziped content matches the object id. Args: obj_id: object id. Raises: ObjNotFoundError: if the requested object is missing. Error: if the request object is corrupted. """ raise NotImplementedError( "Implementations of ObjStorage must have a 'check' method" ) def get_random(self, batch_size, *args, **kwargs): """ Get random ids of existing contents This method is used in order to get random ids to perform content integrity verifications on random contents. Attributes: batch_size (int): Number of ids that will be given Yields: An iterable of ids of contents that are in the current object storage. """ raise NotImplementedError( "The current implementation of ObjStorage does not support " "'get_random' operation" ) diff --git a/swh/storage/objstorage/objstorage_pathslicing.py b/swh/storage/objstorage/objstorage_pathslicing.py index 875fd753a..7da58b450 100644 --- a/swh/storage/objstorage/objstorage_pathslicing.py +++ b/swh/storage/objstorage/objstorage_pathslicing.py @@ -1,350 +1,347 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import gzip import tempfile import random from contextlib import contextmanager from swh.core import hashutil -from .objstorage import ObjStorage +from .objstorage import ObjStorage, ID_HASH_ALGO, ID_HASH_LENGTH from ..exc import ObjNotFoundError, Error -ID_HASH_ALGO = 'sha1' - GZIP_BUFSIZ = 1048576 DIR_MODE = 0o755 FILE_MODE = 0o644 @contextmanager def _write_obj_file(hex_obj_id, objstorage): """ Context manager for writing object files to the object storage. During writing, data are written to a temporary file, which is atomically renamed to the right file name after closing. This context manager also takes care of (gzip) compressing the data on the fly. Usage sample: with _write_obj_file(hex_obj_id, objstorage): f.write(obj_data) Yields: a file-like object open for writing bytes. """ # Get the final paths and create the directory if absent. dir = objstorage._obj_dir(hex_obj_id) if not os.path.isdir(dir): os.makedirs(dir, DIR_MODE, exist_ok=True) path = os.path.join(dir, hex_obj_id) # Create a temporary file. (tmp, tmp_path) = tempfile.mkstemp(suffix='.tmp', prefix='hex_obj_id.', dir=dir) # Open the file and yield it for writing. tmp_f = os.fdopen(tmp, 'wb') with gzip.GzipFile(filename=tmp_path, fileobj=tmp_f) as f: yield f # Then close the temporary file and move it to the right directory. tmp_f.close() os.chmod(tmp_path, FILE_MODE) os.rename(tmp_path, path) @contextmanager def _read_obj_file(hex_obj_id, objstorage): """ Context manager for reading object file in the object storage. Usage sample: with _read_obj_file(hex_obj_id, objstorage) as f: b = f.read() Yields: a file-like object open for reading bytes. """ path = objstorage._obj_path(hex_obj_id) with gzip.GzipFile(path, 'rb') as f: yield f class PathSlicingObjStorage(ObjStorage): """ Implementation of the ObjStorage API based on the hash of the content. On disk, an object storage is a directory tree containing files named after their object IDs. An object ID is a checksum of its content, depending on the value of the ID_HASH_ALGO constant (see hashutil for its meaning). To avoid directories that contain too many files, the object storage has a - given depth. Each depth level consumes a given amount of characters of - the object id. + given slicing. Each slicing correspond to a directory that is named + according to the hash of its content. So for instance a file with SHA1 34973274ccef6ab4dfaaf86599792fa9c3fe4689 will be stored in the given object storages : - - depth=3, slicing=2 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689 - - depth=1, slicing=5 : 34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689 + - 0:2/2:4/4:6 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689 + - 0:1/0:5/ : 3/34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689 The files in the storage are stored in gzipped compressed format. Attributes: root (string): path to the root directory of the storage on the disk. - depth (int): number of subdirectories created to store a file. - slicing (int): number of hash character consumed for each - subdirectories. + bounds: list of tuples that indicates the beginning and the end of + each subdirectory for a content. """ - def __init__(self, root, depth, slicing): + def __init__(self, root, slicing): """ Create an object to access a hash-slicing based object storage. Args: root (string): path to the root directory of the storage on the disk. - depth (int): number of subdirectories created to store a file. - slicing (int): number of hash character consumed for each - subdirectories. + slicing (string): string that indicates the slicing to perform + on the hash of the content to know the path where it should + be stored. """ if not os.path.isdir(root): raise ValueError( 'PathSlicingObjStorage root "%s" is not a directory' % root ) self.root = root - self.depth = depth - self.slicing = slicing + # Make a list of tuples where each tuple contains the beginning + # and the end of each slicing. + self.bounds = [ + slice(*map(int, sbounds.split(':'))) + for sbounds in slicing.split('/') + if sbounds + ] + + max_endchar = max(map(lambda bound: bound.stop, self.bounds)) + if ID_HASH_LENGTH < max_endchar: + raise ValueError( + 'Algorithm %s has too short hash for slicing to char %d' + % (ID_HASH_ALGO, max_endchar) + ) def __contains__(self, obj_id): """ Check whether the given object is present in the storage or not. Returns: True iff the object is present in the storage. """ hex_obj_id = hashutil.hash_to_hex(obj_id) return os.path.exists(self._obj_path(hex_obj_id)) def __iter__(self): """iterate over the object identifiers currently available in the storage Warning: with the current implementation of the object storage, this method will walk the filesystem to list objects, meaning that listing all objects will be very slow for large storages. You almost certainly don't want to use this method in production. Return: iterator over object IDs """ def obj_iterator(): # XXX hackish: it does not verify that the depth of found files # matches the slicing depth of the storage for root, _dirs, files in os.walk(self.root): for f in files: yield bytes.fromhex(f) return obj_iterator() def __len__(self): """compute the number of objects available in the storage Warning: this currently uses `__iter__`, its warning about bad performances applies Return: number of objects contained in the storage """ return sum(1 for i in self) def _obj_dir(self, hex_obj_id): """ Compute the storage directory of an object. See also: PathSlicingObjStorage::_obj_path Args: hex_obj_id: object id as hexlified string. Returns: Path to the directory that contains the required object. """ - if len(hex_obj_id) < self.depth * self.slicing: - raise ValueError( - 'Object id "%s" is to short for %d-slicing at depth %d' - % (hex_obj_id, self.slicing, self.depth) - ) - - # Compute [depth] substrings of [hex_obj_id], each of length [slicing], - # starting from the beginning. - id_steps = [hex_obj_id[i * self.slicing: - i * self.slicing + self.slicing] - for i in range(self.depth)] - steps = [self.root] + id_steps - - return os.path.join(*steps) + slices = [hex_obj_id[bound] for bound in self.bounds] + return os.path.join(self.root, *slices) def _obj_path(self, hex_obj_id): """ Compute the full path to an object into the current storage. See also: PathSlicingObjStorage::_obj_dir Args: hex_obj_id: object id as hexlified string. Returns: Path to the actual object corresponding to the given id. """ return os.path.join(self._obj_dir(hex_obj_id), hex_obj_id) def add(self, bytes, obj_id=None, check_presence=True): """ Add a new object to the object storage. Args: bytes: content of the object to be added to the storage. obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When given, obj_id will be trusted to match the bytes. If missing, obj_id will be computed on the fly. check_presence: indicate if the presence of the content should be verified before adding the file. Returns: the id of the object into the storage. """ if obj_id is None: # Checksum is missing, compute it on the fly. h = hashutil._new_hash(ID_HASH_ALGO, len(bytes)) h.update(bytes) obj_id = h.digest() if check_presence and obj_id in self: # If the object is already present, return immediatly. return obj_id hex_obj_id = hashutil.hash_to_hex(obj_id) with _write_obj_file(hex_obj_id, self) as f: f.write(bytes) return obj_id def restore(self, bytes, obj_id=None): """ Restore a content that have been corrupted. This function is identical to add_bytes but does not check if the object id is already in the file system. Args: bytes: content of the object to be added to the storage obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. """ return self.add(bytes, obj_id, check_presence=False) def get(self, obj_id): """ Retrieve the content of a given object. Args: obj_id: object id. Returns: the content of the requested object as bytes. Raises: ObjNotFoundError: if the requested object is missing. """ if obj_id not in self: raise ObjNotFoundError(obj_id) # Open the file and return its content as bytes hex_obj_id = hashutil.hash_to_hex(obj_id) with _read_obj_file(hex_obj_id, self) as f: return f.read() def check(self, obj_id): """ Perform an integrity check for a given object. Verify that the file object is in place and that the gziped content matches the object id. Args: obj_id: object id. Raises: ObjNotFoundError: if the requested object is missing. Error: if the request object is corrupted. """ if obj_id not in self: raise ObjNotFoundError(obj_id) hex_obj_id = hashutil.hash_to_hex(obj_id) try: with gzip.open(self._obj_path(hex_obj_id)) as f: length = None if ID_HASH_ALGO.endswith('_git'): # if the hashing algorithm is git-like, we need to know the # content size to hash on the fly. Do a first pass here to # compute the size length = 0 while True: chunk = f.read(GZIP_BUFSIZ) length += len(chunk) if not chunk: break f.rewind() checksums = hashutil._hash_file_obj(f, length, algorithms=[ID_HASH_ALGO]) actual_obj_id = checksums[ID_HASH_ALGO] if obj_id != actual_obj_id: raise Error( 'Corrupt object %s should have id %s' % (hashutil.hash_to_hex(obj_id), hashutil.hash_to_hex(actual_obj_id)) ) except (OSError, IOError): # IOError is for compatibility with older python versions raise Error('Corrupt object %s is not a gzip file' % obj_id) def get_random(self, batch_size): """ Get random ids of existing contents This method is used in order to get random ids to perform content integrity verifications on random contents. Attributes: batch_size (int): Number of ids that will be given Yields: An iterable of ids of contents that are in the current object storage. """ def get_random_content(self, batch_size): """ Get a batch of content inside a single directory. Returns: a tuple (batch size, batch). """ dirs = [] - for level in range(self.depth): + for level in range(len(self.bounds)): path = os.path.join(self.root, *dirs) dir_list = next(os.walk(path))[1] if 'tmp' in dir_list: dir_list.remove('tmp') dirs.append(random.choice(dir_list)) path = os.path.join(self.root, *dirs) content_list = next(os.walk(path))[2] length = min(batch_size, len(content_list)) return length, map(hashutil.hex_to_hash, random.sample(content_list, length)) while batch_size: length, it = get_random_content(self, batch_size) batch_size = batch_size - length yield from it diff --git a/swh/storage/storage.py b/swh/storage/storage.py index 3fb8cc6a7..c345539d1 100644 --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1,1078 +1,1080 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime import functools import itertools import dateutil.parser import psycopg2 from . import converters from .db import Db from .objstorage import PathSlicingObjStorage from .exc import ObjNotFoundError, StorageDBError from swh.core.hashutil import ALGORITHMS # Max block size of contents to return BULK_BLOCK_CONTENT_LEN_MAX = 10000 def db_transaction(meth): """decorator to execute Storage methods within DB transactions The decorated method must accept a `cur` keyword argument """ @functools.wraps(meth) def _meth(self, *args, **kwargs): with self.db.transaction() as cur: return meth(self, *args, cur=cur, **kwargs) return _meth def db_transaction_generator(meth): """decorator to execute Storage methods within DB transactions, while returning a generator The decorated method must accept a `cur` keyword argument """ @functools.wraps(meth) def _meth(self, *args, **kwargs): with self.db.transaction() as cur: yield from meth(self, *args, cur=cur, **kwargs) return _meth class Storage(): """SWH storage proxy, encompassing DB and object storage """ def __init__(self, db_conn, obj_root): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection obj_root: path to the root of the object storage """ try: if isinstance(db_conn, psycopg2.extensions.connection): self.db = Db(db_conn) else: self.db = Db.connect(db_conn) except psycopg2.OperationalError as e: raise StorageDBError(e) - self.objstorage = PathSlicingObjStorage(obj_root, depth=3, slicing=2) + # TODO this needs to be configured + self.objstorage = PathSlicingObjStorage(obj_root, + slicing='0:2/2:4/4:6') def content_add(self, content): """Add content blobs to the storage Note: in case of DB errors, objects might have already been added to the object storage and will not be removed. Since addition to the object storage is idempotent, that should not be a problem. Args: content: iterable of dictionaries representing individual pieces of content to add. Each dictionary has the following keys: - data (bytes): the actual content - length (int): content length (default: -1) - one key for each checksum algorithm in swh.core.hashutil.ALGORITHMS, mapped to the corresponding checksum - status (str): one of visible, hidden, absent - reason (str): if status = absent, the reason why - origin (int): if status = absent, the origin we saw the content in """ db = self.db content_by_status = defaultdict(list) for d in content: if 'status' not in d: d['status'] = 'visible' if 'length' not in d: d['length'] = -1 content_by_status[d['status']].append(d) content_with_data = content_by_status['visible'] content_without_data = content_by_status['absent'] missing_content = set(self.content_missing(content_with_data)) missing_skipped = set( sha1_git for sha1, sha1_git, sha256 in self.skipped_content_missing(content_without_data)) with db.transaction() as cur: if missing_content: # create temporary table for metadata injection db.mktemp('content', cur) def add_to_objstorage(cont): self.objstorage.add(cont['data'], obj_id=cont['sha1']) content_filtered = (cont for cont in content_with_data if cont['sha1'] in missing_content) db.copy_to(content_filtered, 'tmp_content', ['sha1', 'sha1_git', 'sha256', 'length', 'status'], cur, item_cb=add_to_objstorage) # move metadata in place db.content_add_from_temp(cur) if missing_skipped: missing_filtered = (cont for cont in content_without_data if cont['sha1_git'] in missing_skipped) db.mktemp('skipped_content', cur) db.copy_to(missing_filtered, 'tmp_skipped_content', ['sha1', 'sha1_git', 'sha256', 'length', 'reason', 'status', 'origin'], cur) # move metadata in place db.skipped_content_add_from_temp(cur) def content_get(self, content): """Retrieve in bulk contents and their data. Args: content: iterables of sha1 Returns: Generates streams of contents as dict with their raw data: - sha1: sha1's content - data: bytes data of the content Raises: ValueError in case of too much contents are required. cf. BULK_BLOCK_CONTENT_LEN_MAX """ # FIXME: Improve on server module to slice the result if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise ValueError( "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) for obj_id in content: try: data = self.objstorage.get(obj_id) except ObjNotFoundError: yield None continue yield {'sha1': obj_id, 'data': data} @db_transaction_generator def content_missing(self, content, key_hash='sha1', cur=None): """List content missing from storage Args: content: iterable of dictionaries containing one key for each checksum algorithm in swh.core.hashutil.ALGORITHMS, mapped to the corresponding checksum, and a length key mapped to the content length. key_hash: the name of the hash used as key (default: 'sha1') Returns: an iterable of `key_hash`es missing from the storage Raises: TODO: an exception when we get a hash collision. """ db = self.db keys = ['sha1', 'sha1_git', 'sha256'] if key_hash not in keys: raise ValueError("key_hash should be one of %s" % keys) key_hash_idx = keys.index(key_hash) # Create temporary table for metadata injection db.mktemp('content', cur) db.copy_to(content, 'tmp_content', keys + ['length'], cur) for obj in db.content_missing_from_temp(cur): yield obj[key_hash_idx] @db_transaction_generator def content_missing_per_sha1(self, contents, cur=None): """List content missing from storage based only on sha1. Args: contents: Iterable of sha1 to check for absence. Returns: an iterable of `sha1`s missing from the storage. Raises: TODO: an exception when we get a hash collision. """ db = self.db db.store_tmp_bytea(contents, cur) for obj in db.content_missing_per_sha1_from_temp(cur): yield obj[0] @db_transaction_generator def skipped_content_missing(self, content, cur=None): """List skipped_content missing from storage Args: content: iterable of dictionaries containing the data for each checksum algorithm. Returns: an iterable of signatures missing from the storage """ keys = ['sha1', 'sha1_git', 'sha256'] db = self.db db.mktemp('skipped_content', cur) db.copy_to(content, 'tmp_skipped_content', keys + ['length', 'reason'], cur) yield from db.skipped_content_missing_from_temp(cur) @db_transaction def content_find(self, content, cur=None): """Find a content hash in db. Args: content: a dictionary representing one content hash, mapping checksum algorithm names (see swh.core.hashutil.ALGORITHMS) to checksum values Returns: a triplet (sha1, sha1_git, sha256) if the content exist or None otherwise. Raises: ValueError in case the key of the dictionary is not sha1, sha1_git nor sha256. """ db = self.db if not set(content).intersection(ALGORITHMS): raise ValueError('content keys must contain at least one of: ' 'sha1, sha1_git, sha256') c = db.content_find(sha1=content.get('sha1'), sha1_git=content.get('sha1_git'), sha256=content.get('sha256'), cur=cur) if c: keys = ['sha1', 'sha1_git', 'sha256', 'length', 'ctime', 'status'] return dict(zip(keys, c)) return None @db_transaction def content_find_occurrence(self, content, cur=None): """Find the content's occurrence. Args: content: a dictionary entry representing one content hash. The dictionary key is one of swh.core.hashutil.ALGORITHMS. The value mapped to the corresponding checksum. Returns: The occurrence of the content. Raises: ValueError in case the key of the dictionary is not sha1, sha1_git nor sha256. """ db = self.db c = self.content_find(content) if not c: return None sha1 = c['sha1'] found_occ = db.content_find_occurrence(sha1, cur=cur) if found_occ: keys = ['origin_type', 'origin_url', 'branch', 'target', 'target_type', 'path'] return dict(zip(keys, found_occ)) return None def directory_add(self, directories): """Add directories to the storage Args: directories: iterable of dictionaries representing the individual directories to add. Each dict has the following keys: - id (sha1_git): the id of the directory to add - entries (list): list of dicts for each entry in the directory. Each dict has the following keys: - name (bytes) - type (one of 'file', 'dir', 'rev'): type of the directory entry (file, directory, revision) - target (sha1_git): id of the object pointed at by the directory entry - perms (int): entry permissions """ dirs = set() dir_entries = { 'file': defaultdict(list), 'dir': defaultdict(list), 'rev': defaultdict(list), } for cur_dir in directories: dir_id = cur_dir['id'] dirs.add(dir_id) for src_entry in cur_dir['entries']: entry = src_entry.copy() entry['dir_id'] = dir_id dir_entries[entry['type']][dir_id].append(entry) dirs_missing = set(self.directory_missing(dirs)) if not dirs_missing: return db = self.db with db.transaction() as cur: # Copy directory ids dirs_missing_dict = ({'id': dir} for dir in dirs_missing) db.mktemp('directory', cur) db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur) # Copy entries for entry_type, entry_list in dir_entries.items(): entries = itertools.chain.from_iterable( entries_for_dir for dir_id, entries_for_dir in entry_list.items() if dir_id in dirs_missing) db.mktemp_dir_entry(entry_type) db.copy_to( entries, 'tmp_directory_entry_%s' % entry_type, ['target', 'name', 'perms', 'dir_id'], cur, ) # Do the final copy db.directory_add_from_temp(cur) @db_transaction_generator def directory_missing(self, directories, cur): """List directories missing from storage Args: an iterable of directory ids Returns: a list of missing directory ids """ db = self.db # Create temporary table for metadata injection db.mktemp('directory', cur) directories_dicts = ({'id': dir} for dir in directories) db.copy_to(directories_dicts, 'tmp_directory', ['id'], cur) for obj in db.directory_missing_from_temp(cur): yield obj[0] @db_transaction_generator def directory_get(self, directories, cur=None): """Get information on directories. Args: - directories: an iterable of directory ids Returns: List of directories as dict with keys and associated values. """ db = self.db keys = ('id', 'dir_entries', 'file_entries', 'rev_entries') db.mktemp('directory', cur) db.copy_to(({'id': dir_id} for dir_id in directories), 'tmp_directory', ['id'], cur) dirs = db.directory_get_from_temp(cur) for line in dirs: yield dict(zip(keys, line)) @db_transaction_generator def directory_ls(self, directory, recursive=False, cur=None): """Get entries for one directory. Args: - directory: the directory to list entries from. - recursive: if flag on, this list recursively from this directory. Returns: List of entries for such directory. """ db = self.db keys = ['dir_id', 'type', 'target', 'name', 'perms', 'status', 'sha1', 'sha1_git', 'sha256'] if recursive: res_gen = db.directory_walk(directory) else: res_gen = db.directory_walk_one(directory) for line in res_gen: yield dict(zip(keys, line)) @db_transaction def directory_entry_get_by_path(self, directory, paths, cur=None): """Get the directory entry (either file or dir) from directory with path. Args: - directory: sha1 of the top level directory - paths: path to lookup from the top level directory. From left (top) to right (bottom). Returns: The corresponding directory entry if found, None otherwise. """ db = self.db keys = ('dir_id', 'type', 'target', 'name', 'perms', 'status', 'sha1', 'sha1_git', 'sha256') res = db.directory_entry_get_by_path(directory, paths, cur) if res: return dict(zip(keys, res)) def revision_add(self, revisions): """Add revisions to the storage Args: revisions: iterable of dictionaries representing the individual revisions to add. Each dict has the following keys: - id (sha1_git): id of the revision to add - date (datetime.DateTime): date the revision was written - date_offset (int): offset from UTC in minutes the revision was written - date_neg_utc_offset (boolean): whether a null date_offset represents a negative UTC offset - committer_date (datetime.DateTime): date the revision got added to the origin - committer_date_offset (int): offset from UTC in minutes the revision was added to the origin - committer_date_neg_utc_offset (boolean): whether a null committer_date_offset represents a negative UTC offset - type (one of 'git', 'tar'): type of the revision added - directory (sha1_git): the directory the revision points at - message (bytes): the message associated with the revision - author_name (bytes): the name of the revision author - author_email (bytes): the email of the revision author - committer_name (bytes): the name of the revision committer - committer_email (bytes): the email of the revision committer - metadata (jsonb): extra information as dictionary - synthetic (bool): revision's nature (tarball, directory creates synthetic revision) - parents (list of sha1_git): the parents of this revision """ db = self.db revisions_missing = set(self.revision_missing( set(revision['id'] for revision in revisions))) if not revisions_missing: return with db.transaction() as cur: db.mktemp_revision(cur) revisions_filtered = ( converters.revision_to_db(revision) for revision in revisions if revision['id'] in revisions_missing) parents_filtered = [] db.copy_to( revisions_filtered, 'tmp_revision', db.revision_add_cols, cur, lambda rev: parents_filtered.extend(rev['parents'])) db.revision_add_from_temp(cur) db.copy_to(parents_filtered, 'revision_history', ['id', 'parent_id', 'parent_rank'], cur) @db_transaction_generator def revision_missing(self, revisions, cur=None): """List revisions missing from storage Args: an iterable of revision ids Returns: a list of missing revision ids """ db = self.db db.store_tmp_bytea(revisions, cur) for obj in db.revision_missing_from_temp(cur): yield obj[0] @db_transaction_generator def revision_get(self, revisions, cur): """Get all revisions from storage Args: an iterable of revision ids Returns: an iterable of revisions as dictionaries (or None if the revision doesn't exist) """ db = self.db db.store_tmp_bytea(revisions, cur) for line in self.db.revision_get_from_temp(cur): data = converters.db_to_revision( dict(zip(db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data @db_transaction_generator def revision_log(self, revisions, limit=None, cur=None): """Fetch revision entry from the given root revisions. Args: - revisions: array of root revision to lookup - limit: limitation on the output result. Default to null. Yields: List of revision log from such revisions root. """ db = self.db for line in db.revision_log(revisions, limit, cur): data = converters.db_to_revision( dict(zip(db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data @db_transaction_generator def revision_shortlog(self, revisions, limit=None, cur=None): """Fetch the shortlog for the given revisions Args: revisions: list of root revisions to lookup limit: depth limitation for the output Yields: a list of (id, parents) tuples. """ db = self.db yield from db.revision_shortlog(revisions, limit, cur) @db_transaction_generator def revision_log_by(self, origin_id, limit=None, cur=None): """Fetch revision entry from the actual origin_id's latest revision. """ db = self.db for line in db.revision_log_by(origin_id, limit, cur): data = converters.db_to_revision( dict(zip(db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data def release_add(self, releases): """Add releases to the storage Args: releases: iterable of dictionaries representing the individual releases to add. Each dict has the following keys: - id (sha1_git): id of the release to add - revision (sha1_git): id of the revision the release points to - date (datetime.DateTime): the date the release was made - date_offset (int): offset from UTC in minutes the release was made - date_neg_utc_offset (boolean): whether a null date_offset represents a negative UTC offset - name (bytes): the name of the release - comment (bytes): the comment associated with the release - author_name (bytes): the name of the release author - author_email (bytes): the email of the release author """ db = self.db release_ids = set(release['id'] for release in releases) releases_missing = set(self.release_missing(release_ids)) if not releases_missing: return with db.transaction() as cur: db.mktemp_release(cur) releases_filtered = ( converters.release_to_db(release) for release in releases if release['id'] in releases_missing ) db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, cur) db.release_add_from_temp(cur) @db_transaction_generator def release_missing(self, releases, cur=None): """List releases missing from storage Args: an iterable of release ids Returns: a list of missing release ids """ db = self.db # Create temporary table for metadata injection db.store_tmp_bytea(releases, cur) for obj in db.release_missing_from_temp(cur): yield obj[0] @db_transaction_generator def release_get(self, releases, cur=None): """Given a list of sha1, return the releases's information Args: releases: list of sha1s Returns: Generates the list of releases dict with the following keys: - id: origin's id - revision: origin's type - url: origin's url - lister: lister's uuid - project: project's uuid (FIXME, retrieve this information) Raises: ValueError if the keys does not match (url and type) nor id. """ db = self.db # Create temporary table for metadata injection db.store_tmp_bytea(releases, cur) for release in db.release_get_from_temp(cur): yield converters.db_to_release( dict(zip(db.release_get_cols, release)) ) @db_transaction def occurrence_add(self, occurrences, cur=None): """Add occurrences to the storage Args: occurrences: iterable of dictionaries representing the individual occurrences to add. Each dict has the following keys: - origin (int): id of the origin corresponding to the occurrence - branch (str): the reference name of the occurrence - target (sha1_git): the id of the object pointed to by the occurrence - target_type (str): the type of object pointed to by the occurrence - date (datetime.DateTime): the validity date for the given occurrence """ db = self.db processed = [] for occurrence in occurrences: if isinstance(occurrence['date'], str): occurrence['date'] = dateutil.parser.parse(occurrence['date']) processed.append(occurrence) db.mktemp_occurrence_history(cur) db.copy_to(processed, 'tmp_occurrence_history', ['origin', 'branch', 'target', 'target_type', 'date'], cur) db.occurrence_history_add_from_temp(cur) @db_transaction_generator def occurrence_get(self, origin_id, cur=None): """Retrieve occurrence information per origin_id. Args: origin_id: The occurrence's origin. Yields: List of occurrences matching criterion. """ db = self.db for line in db.occurrence_get(origin_id, cur): yield { 'origin': line[0], 'branch': line[1], 'target': line[2], 'target_type': line[3], } @db_transaction_generator def revision_get_by(self, origin_id, branch_name=None, timestamp=None, limit=None, cur=None): """Given an origin_id, retrieve occurrences' list per given criterions. Args: origin_id: The origin to filter on. branch_name: optional branch name. timestamp: limit: Yields: List of occurrences matching the criterions or None if nothing is found. """ for line in self.db.revision_get_by(origin_id, branch_name, timestamp, limit=limit, cur=cur): data = converters.db_to_revision( dict(zip(self.db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data def release_get_by(self, origin_id, limit=None): """Given an origin id, return all the tag objects pointing to heads of origin_id. Args: origin_id: the origin to filter on. limit: None by default Yields: List of releases matching the criterions or None if nothing is found. """ for line in self.db.release_get_by(origin_id, limit=limit): data = converters.db_to_release( dict(zip(self.db.release_get_cols, line)) ) yield data @db_transaction def object_find_by_sha1_git(self, ids, cur=None): """Return the objects found with the given ids. Args: ids: a generator of sha1_gits Returns: a dict mapping the id to the list of objects found. Each object found is itself a dict with keys: sha1_git: the input id type: the type of object found id: the id of the object found object_id: the numeric id of the object found. """ db = self.db ret = {id: [] for id in ids} for retval in db.object_find_by_sha1_git(ids): if retval[1]: ret[retval[0]].append(dict(zip(db.object_find_by_sha1_git_cols, retval))) return ret @db_transaction def origin_get(self, origin, cur=None): """Return the origin either identified by its id or its tuple (type, url). Args: origin: dictionary representing the individual origin to find. This dict has either the keys type and url: - type (FIXME: enum TBD): the origin type ('git', 'wget', ...) - url (bytes): the url the origin points to either the id: - id: the origin id Returns: the origin dict with the keys: - id: origin's id - type: origin's type - url: origin's url - lister: lister's uuid - project: project's uuid (FIXME, retrieve this information) Raises: ValueError if the keys does not match (url and type) nor id. """ db = self.db keys = ['id', 'type', 'url', 'lister', 'project'] origin_id = origin.get('id') if origin_id: # check lookup per id first ori = db.origin_get(origin_id, cur) elif 'type' in origin and 'url' in origin: # or lookup per type, url ori = db.origin_get_with(origin['type'], origin['url'], cur) else: # unsupported lookup raise ValueError('Origin must have either id or (type and url).') if ori: return dict(zip(keys, ori)) return None @db_transaction def _person_add(self, person, cur=None): """Add a person in storage. BEWARE: Internal function for now. Do not do anything fancy in case a person already exists. Please adapt code if more checks are needed. Args: person dictionary with keys name and email. Returns: Id of the new person. """ db = self.db return db.person_add(person) @db_transaction_generator def person_get(self, person, cur=None): """Return the persons identified by their ids. Args: person: array of ids. Returns: The array of persons corresponding of the ids. """ db = self.db for person in db.person_get(person): yield dict(zip(db.person_get_cols, person)) @db_transaction def origin_add_one(self, origin, cur=None): """Add origin to the storage Args: origin: dictionary representing the individual origin to add. This dict has the following keys: - type (FIXME: enum TBD): the origin type ('git', 'wget', ...) - url (bytes): the url the origin points to Returns: the id of the added origin, or of the identical one that already exists. """ db = self.db data = db.origin_get_with(origin['type'], origin['url'], cur) if data: return data[0] return db.origin_add(origin['type'], origin['url'], cur) @db_transaction def fetch_history_start(self, origin_id, cur=None): """Add an entry for origin origin_id in fetch_history. Returns the id of the added fetch_history entry """ fetch_history = { 'origin': origin_id, 'date': datetime.datetime.now(tz=datetime.timezone.utc), } return self.db.create_fetch_history(fetch_history, cur) @db_transaction def fetch_history_end(self, fetch_history_id, data, cur=None): """Close the fetch_history entry with id `fetch_history_id`, replacing its data with `data`. """ now = datetime.datetime.now(tz=datetime.timezone.utc) fetch_history = self.db.get_fetch_history(fetch_history_id, cur) if not fetch_history: raise ValueError('No fetch_history with id %d' % fetch_history_id) fetch_history['duration'] = now - fetch_history['date'] fetch_history.update(data) self.db.update_fetch_history(fetch_history, cur) @db_transaction def fetch_history_get(self, fetch_history_id, cur=None): """Get the fetch_history entry with id `fetch_history_id`. """ return self.db.get_fetch_history(fetch_history_id, cur) @db_transaction def entity_add(self, entities, cur=None): """Add the given entitites to the database (in entity_history). Args: - entities: iterable of dictionaries containing the following keys: - uuid (uuid): id of the entity - parent (uuid): id of the parent entity - name (str): name of the entity - type (str): type of entity (one of 'organization', 'group_of_entities', 'hosting', 'group_of_persons', 'person', 'project') - description (str, optional): description of the entity - homepage (str): url of the entity's homepage - active (bool): whether the entity is active - generated (bool): whether the entity was generated - lister_metadata (dict): lister-specific entity metadata - metadata (dict): other metadata for the entity - validity (datetime.DateTime array): timestamps at which we listed the entity. """ db = self.db cols = list(db.entity_history_cols) cols.remove('id') db.mktemp_entity_history() db.copy_to(entities, 'tmp_entity_history', cols, cur) db.entity_history_add_from_temp() @db_transaction_generator def entity_get_from_lister_metadata(self, entities, cur=None): """Fetch entities from the database, matching with the lister and associated metadata. Args: entities: iterable of dictionaries containing the lister metadata to look for. Useful keys are 'lister', 'type', 'id', ... Returns: A generator of fetched entities with all their attributes. If no match was found, the returned entity is None. """ db = self.db db.mktemp_entity_lister(cur) mapped_entities = [] for i, entity in enumerate(entities): mapped_entity = { 'id': i, 'lister_metadata': entity, } mapped_entities.append(mapped_entity) db.copy_to(mapped_entities, 'tmp_entity_lister', ['id', 'lister_metadata'], cur) cur.execute('''select id, %s from swh_entity_from_tmp_entity_lister() order by id''' % ','.join(db.entity_cols)) for id, *entity_vals in cur: fetched_entity = dict(zip(db.entity_cols, entity_vals)) if fetched_entity['uuid']: yield fetched_entity else: yield { 'uuid': None, 'lister_metadata': entities[i], } @db_transaction_generator def entity_get(self, uuid, cur=None): """Returns the list of entity per its uuid identifier and also its parent hierarchy. Args: uuid: entity's identifier Returns: List of entities starting with entity with uuid and the parent hierarchy from such entity. """ db = self.db for entity in db.entity_get(uuid, cur): yield dict(zip(db.entity_cols, entity)) @db_transaction def entity_get_one(self, uuid, cur=None): """Returns one entity using its uuid identifier. Args: uuid: entity's identifier Returns: the object corresponding to the given entity """ db = self.db entity = db.entity_get_one(uuid, cur) if entity: return dict(zip(db.entity_cols, entity)) else: return None @db_transaction def stat_counters(self, cur=None): """compute statistics about the number of tuples in various tables Returns: a dictionary mapping textual labels (e.g., content) to integer values (e.g., the number of tuples in table content) """ return {k: v for (k, v) in self.db.stat_counters()} diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py index 0db83a2ee..df5f45e91 100644 --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -1,245 +1,245 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest import os from nose.tools import istest from nose.plugins.attrib import attr from datetime import datetime, timedelta from swh.core import hashutil from swh.core.tests.db_testing import DbTestFixture from server_testing import ServerTestFixture from swh.storage import Storage from swh.storage.exc import ObjNotFoundError from swh.storage.archiver import ArchiverDirector, ArchiverWorker from swh.storage.objstorage.api.client import RemoteObjStorage from swh.storage.objstorage.api.server import app TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') @attr('db') class TestArchiver(DbTestFixture, ServerTestFixture, unittest.TestCase): """ Test the objstorage archiver. """ TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') def setUp(self): # Launch the backup server self.backup_objroot = tempfile.mkdtemp(prefix='remote') self.config = {'storage_base': self.backup_objroot, - 'storage_depth': 3} + 'storage_slicing': '0:2/2:4/4:6'} self.app = app super().setUp() # Launch a client to check objects presence self.remote_objstorage = RemoteObjStorage(self.url()) # Create the local storage. self.objroot = tempfile.mkdtemp(prefix='local') self.storage = Storage(self.conn, self.objroot) # Initializes and fill the tables. self.initialize_tables() # Create the archiver self.archiver = self.__create_director() self.storage_data = ('Local', 'http://localhost:%s/' % self.port) def tearDown(self): self.empty_tables() super().tearDown() def initialize_tables(self): """ Initializes the database with a sample of items. """ # Add an archive self.cursor.execute("""INSERT INTO archives(id, url) VALUES('Local', 'http://localhost:{}/') """.format(self.port)) self.conn.commit() def empty_tables(self): # Remove all content self.cursor.execute('DELETE FROM content_archive') self.cursor.execute('DELETE FROM archives') self.conn.commit() def __add_content(self, content_data, status='missing', date='now()'): # Add the content content = hashutil.hashdata(content_data) content.update({'data': content_data}) self.storage.content_add([content]) # Then update database content_id = r'\x' + hashutil.hash_to_hex(content['sha1']) self.cursor.execute("""INSERT INTO content_archive VALUES('%s'::sha1, 'Local', '%s', %s) """ % (content_id, status, date)) return content['sha1'] def __get_missing(self): self.cursor.execute("""SELECT content_id FROM content_archive WHERE status='missing'""") return self.cursor.fetchall() def __create_director(self, batch_size=5000, archival_max_age=3600, retention_policy=1, asynchronous=False): config = { 'objstorage_path': self.objroot, 'batch_max_size': batch_size, 'archival_max_age': archival_max_age, 'retention_policy': retention_policy, 'asynchronous': asynchronous # Avoid depending on queue for tests. } director = ArchiverDirector(self.conn, config) return director def __create_worker(self, batch={}, config={}): mstorage_args = [self.archiver.master_storage.db.conn, self.objroot] slaves = [self.storage_data] if not config: config = self.archiver.config return ArchiverWorker(batch, mstorage_args, slaves, config) # Integration test @istest def archive_missing_content(self): """ Run archiver on a missing content should archive it. """ content_data = b'archive_missing_content' id = self.__add_content(content_data) # After the run, the content should be in the archive. self.archiver.run() remote_data = self.remote_objstorage.content_get(id) self.assertEquals(content_data, remote_data) @istest def archive_present_content(self): """ A content that is not 'missing' shouldn't be archived. """ id = self.__add_content(b'archive_present_content', status='present') # After the run, the content should NOT be in the archive.* self.archiver.run() with self.assertRaises(ObjNotFoundError): self.remote_objstorage.content_get(id) @istest def archive_already_enough(self): """ A content missing with enough copies shouldn't be archived. """ id = self.__add_content(b'archive_alread_enough') director = self.__create_director(retention_policy=0) director.run() with self.assertRaises(ObjNotFoundError): self.remote_objstorage.content_get(id) # Unit test for ArchiverDirector def vstatus(self, status, mtime): return self.archiver.get_virtual_status(status, mtime) @istest def vstatus_present(self): self.assertEquals( self.vstatus('present', None), 'present' ) @istest def vstatus_missing(self): self.assertEquals( self.vstatus('missing', None), 'missing' ) @istest def vstatus_ongoing_remaining(self): current_time = datetime.now() self.assertEquals( self.vstatus('ongoing', current_time), 'present' ) @istest def vstatus_ongoing_elapsed(self): past_time = datetime.now() - timedelta( seconds=self.archiver.config['archival_max_age'] + 1 ) self.assertEquals( self.vstatus('ongoing', past_time), 'missing' ) # Unit tests for archive worker @istest def need_archival_missing(self): """ A content should still need archival when it is missing. """ id = self.__add_content(b'need_archival_missing', status='missing') id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), True) @istest def need_archival_present(self): """ A content should still need archival when it is missing """ id = self.__add_content(b'need_archival_missing', status='present') id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), False) @istest def need_archival_ongoing_remaining(self): """ An ongoing archival with remaining time shouldnt need archival. """ id = self.__add_content(b'need_archival_ongoing_remaining', status='ongoing', date="'%s'" % datetime.now()) id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), False) @istest def need_archival_ongoing_elasped(self): """ An ongoing archival with elapsed time should be scheduled again. """ id = self.__add_content( b'archive_ongoing_elapsed', status='ongoing', date="'%s'" % (datetime.now() - timedelta( seconds=self.archiver.config['archival_max_age'] + 1 )) ) id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), True) @istest def content_sorting_by_archiver(self): """ Check that the content is correctly sorted. """ batch = { 'id1': { 'present': [('slave1', 'slave1_url')], 'missing': [] }, 'id2': { 'present': [], 'missing': [('slave1', 'slave1_url')] } } worker = self.__create_worker(batch=batch) mapping = worker.sort_content_by_archive() self.assertNotIn('id1', mapping[('slave1', 'slave1_url')]) self.assertIn('id2', mapping[('slave1', 'slave1_url')]) diff --git a/swh/storage/tests/test_checker.py b/swh/storage/tests/test_checker.py index 95e96a1f9..3069abe5c 100644 --- a/swh/storage/tests/test_checker.py +++ b/swh/storage/tests/test_checker.py @@ -1,128 +1,128 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip import tempfile import unittest from nose.tools import istest from nose.plugins.attrib import attr from swh.core import hashutil from swh.storage.checker.checker import ContentChecker class MockBackupStorage(): def __init__(self): self.values = {} def content_add(self, id, value): self.values[id] = value def content_get(self, ids): for id in ids: try: data = self.values[id] except KeyError: yield None continue yield {'sha1': id, 'data': data} @attr('fs') class TestChecker(unittest.TestCase): """ Test the content integrity checker """ def setUp(self): super().setUp() # Connect to an objstorage config = {'batch_size': 10} path = tempfile.mkdtemp() - depth = 3 - self.checker = ContentChecker(config, path, depth, 'http://None') + slicing = '0:2/2:4/4:6' + self.checker = ContentChecker(config, path, slicing, 'http://None') self.checker.backup_storages = [MockBackupStorage(), MockBackupStorage()] def corrupt_content(self, id): """ Make the given content invalid. """ hex_id = hashutil.hash_to_hex(id) file_path = self.checker.objstorage._obj_path(hex_id) with gzip.open(file_path, 'wb') as f: f.write(b'Unexpected content') @istest def check_valid_content(self): # Check that a valid content is valid. content = b'check_valid_content' id = self.checker.objstorage.add(content) self.assertTrue(self.checker.check_content(id)) @istest def check_invalid_content(self): # Check that an invalid content is noticed. content = b'check_invalid_content' id = self.checker.objstorage.add(content) self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) @istest def repair_content_present_first(self): # Try to repair a content that is in the backup storage. content = b'repair_content_present_first' id = self.checker.objstorage.add(content) # Add a content to the mock self.checker.backup_storages[0].content_add(id, content) # Corrupt and repair it. self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) self.checker.repair_contents([id]) self.assertTrue(self.checker.check_content(id)) @istest def repair_content_present_second(self): # Try to repair a content that is not in the first backup storage. content = b'repair_content_present_second' id = self.checker.objstorage.add(content) # Add a content to the mock self.checker.backup_storages[1].content_add(id, content) # Corrupt and repair it. self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) self.checker.repair_contents([id]) self.assertTrue(self.checker.check_content(id)) @istest def repair_content_present_distributed(self): # Try to repair two contents that are in separate backup storages. content1 = b'repair_content_present_distributed_2' content2 = b'repair_content_present_distributed_1' id1 = self.checker.objstorage.add(content1) id2 = self.checker.objstorage.add(content2) # Add content to the mock. self.checker.backup_storages[0].content_add(id1, content1) self.checker.backup_storages[0].content_add(id2, content2) # Corrupt and repair it self.corrupt_content(id1) self.corrupt_content(id2) self.assertFalse(self.checker.check_content(id1)) self.assertFalse(self.checker.check_content(id2)) self.checker.repair_contents([id1, id2]) self.assertTrue(self.checker.check_content(id1)) self.assertTrue(self.checker.check_content(id2)) @istest def repair_content_missing(self): # Try to repair a content that is NOT in the backup storage. content = b'repair_content_present' id = self.checker.objstorage.add(content) # Corrupt and repair it. self.corrupt_content(id) self.assertFalse(self.checker.check_content(id)) self.checker.repair_contents([id]) self.assertFalse(self.checker.check_content(id)) diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py index 6676cd7c5..4e43f18fd 100644 --- a/swh/storage/tests/test_objstorage_api.py +++ b/swh/storage/tests/test_objstorage_api.py @@ -1,97 +1,88 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os import tempfile import unittest from nose.tools import istest from nose.plugins.attrib import attr from swh.core import hashutil from swh.storage.exc import ObjNotFoundError, Error from swh.storage.tests.server_testing import ServerTestFixture from swh.storage.objstorage.api.client import RemoteObjStorage from swh.storage.objstorage.api.server import app @attr('db') class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase): """ Test the remote archive API. """ def setUp(self): self.config = {'storage_base': tempfile.mkdtemp(), - 'storage_depth': 3} + 'storage_slicing': '0:1/0:5'} self.app = app super().setUp() self.objstorage = RemoteObjStorage(self.url()) def tearDown(self): super().tearDown() @istest def content_add(self): content = bytes('Test content', 'utf8') id = self.objstorage.content_add(content) self.assertEquals(self.objstorage.content_get(id), content) @istest def content_get_present(self): content = bytes('content_get_present', 'utf8') content_hash = hashutil.hashdata(content) id = self.objstorage.content_add(content) self.assertEquals(content_hash['sha1'], id) @istest def content_get_missing(self): content = bytes('content_get_missing', 'utf8') content_hash = hashutil.hashdata(content) with self.assertRaises(ObjNotFoundError): self.objstorage.content_get(content_hash['sha1']) @istest def content_get_random(self): ids = [] for i in range(100): content = bytes('content_get_present', 'utf8') id = self.objstorage.content_add(content) ids.append(id) for id in self.objstorage.content_get_random(50): self.assertIn(id, ids) @istest def content_check_invalid(self): content = bytes('content_check_invalid', 'utf8') - id = self.objstorage.content_add(content) - hex_obj_id = hashutil.hash_to_hex(id) - dir_path = os.path.join( - self.config['storage_base'], - *[hex_obj_id[i*2:i*2+2] - for i in range(int(self.config['storage_depth']))] - ) - path = os.path.join(dir_path, hex_obj_id) - content = list(content) - with open(path, 'bw') as f: - content[0] = (content[0] + 1) % 128 - f.write(bytes(content)) + invalid_id = hashutil.hashdata(b'invalid content')['sha1'] + # Add the content with an invalid id. + self.objstorage.content_add(content, invalid_id) + # Then check it and expect an error. with self.assertRaises(Error): - self.objstorage.content_check(id) + self.objstorage.content_check(invalid_id) @istest def content_check_valid(self): content = bytes('content_check_valid', 'utf8') id = self.objstorage.content_add(content) try: self.objstorage.content_check(id) except: self.fail('Integrity check failed') @istest def content_check_missing(self): content = bytes('content_check_valid', 'utf8') content_hash = hashutil.hashdata(content) with self.assertRaises(ObjNotFoundError): self.objstorage.content_check(content_hash['sha1']) diff --git a/swh/storage/tests/test_objstorage_pathslicing.py b/swh/storage/tests/test_objstorage_pathslicing.py index cc17a52ee..aedd63ec7 100644 --- a/swh/storage/tests/test_objstorage_pathslicing.py +++ b/swh/storage/tests/test_objstorage_pathslicing.py @@ -1,78 +1,76 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest from nose.tools import istest from swh.core import hashutil from swh.storage import exc from swh.storage.objstorage import PathSlicingObjStorage from objstorage_testing import ObjStorageTestFixture class TestpathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() - self.depth = 3 - self.slicing = 2 + self.slicing = '0:2/2:4/4:6' self.tmpdir = tempfile.mkdtemp() - self.storage = PathSlicingObjStorage(self.tmpdir, self.depth, - self.slicing) + self.storage = PathSlicingObjStorage(self.tmpdir, self.slicing) def content_path(self, obj_id): hex_obj_id = hashutil.hash_to_hex(obj_id) return self.storage._obj_path(hex_obj_id) @istest def contains(self): content_p, obj_id_p = self.hash_content(b'contains_present') content_m, obj_id_m = self.hash_content(b'contains_missing') self.storage.add(content_p, obj_id=obj_id_p) self.assertIn(obj_id_p, self.storage) self.assertNotIn(obj_id_m, self.storage) @istest def iter(self): content, obj_id = self.hash_content(b'iter') self.assertEqual(list(iter(self.storage)), []) self.storage.add(content, obj_id=obj_id) self.assertEqual(list(iter(self.storage)), [obj_id]) @istest def len(self): content, obj_id = self.hash_content(b'check_not_gzip') self.assertEqual(len(self.storage), 0) self.storage.add(content, obj_id=obj_id) self.assertEqual(len(self.storage), 1) @istest def check_not_gzip(self): content, obj_id = self.hash_content(b'check_not_gzip') self.storage.add(content, obj_id=obj_id) with open(self.content_path(obj_id), 'ab') as f: # Add garbage. f.write(b'garbage') with self.assertRaises(exc.Error): self.storage.check(obj_id) @istest def check_id_mismatch(self): content, obj_id = self.hash_content(b'check_id_mismatch') self.storage.add(content, obj_id=obj_id) with open(self.content_path(obj_id), 'wb') as f: f.write(b'unexpected content') with self.assertRaises(exc.Error): self.storage.check(obj_id) @istest def get_random_contents(self): content, obj_id = self.hash_content(b'get_random_content') self.storage.add(content, obj_id=obj_id) random_contents = list(self.storage.get_random(1)) self.assertEqual(1, len(random_contents)) self.assertIn(obj_id, random_contents) diff --git a/version.txt b/version.txt index c4346227d..5bf531f1f 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.40-0-g8c6e8d7 \ No newline at end of file +v0.0.41-0-g68abde3 \ No newline at end of file