diff --git a/PKG-INFO b/PKG-INFO index d9f121f..7156357 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.objstorage -Version: 0.0.14 +Version: 0.0.15 Summary: Software Heritage Object Storage Home-page: https://forge.softwareheritage.org/diffusion/DOBJS Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/bin/swh-objstorage-azure b/bin/swh-objstorage-azure index 9cfed40..b059cc3 100755 --- a/bin/swh-objstorage-azure +++ b/bin/swh-objstorage-azure @@ -1,113 +1,112 @@ #!/usr/bin/env python3 # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # NOT FOR PRODUCTION import click from swh.objstorage import get_objstorage -from swh.objstorage.cloud.objstorage_azure import AzureCloudObjStorage from swh.core import config, hashutil class AzureAccess(config.SWHConfig): """This is an orchestration class to try and check objstorage_azure implementation.""" DEFAULT_CONFIG = { # Output storage 'storage_azure': ('dict', {'cls': 'pathslicing', 'args': {'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}), # Input storage 'storage_local': ('dict', {'cls': 'pathslicing', 'args': {'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}), } CONFIG_BASE_FILENAME = 'objstorage/azure' def __init__(self): super().__init__() self.config = self.parse_config_file() - container_name = 'contents' self.azure_cloud_storage = get_objstorage( **self.config['storage_azure']) self.read_objstorage = get_objstorage( **self.config['storage_local']) def list_contents(self, limit=10): count = 0 for c in self.azure_cloud_storage: count += 1 yield c if count >= limit: return def send_one_content(self, obj_id): obj_content = self.read_objstorage.get(obj_id) self.azure_cloud_storage.add(content=obj_content, obj_id=obj_id) def check_integrity(self, obj_id): self.azure_cloud_storage.check(obj_id) # will raise if problem def check_presence(self, obj_id): return obj_id in self.azure_cloud_storage def download(self, obj_id): return self.azure_cloud_storage.get(obj_id) @click.command() def tryout(): obj_azure = AzureAccess() hex_sample_id = '00000085c856b32f0709a4f5d669bb4faa3a0ce9' sample_id = hashutil.hex_to_hash(hex_sample_id) check_presence = obj_azure.check_presence(sample_id) print('presence first time should be False:', check_presence) obj_azure.send_one_content(sample_id) check_presence = obj_azure.check_presence(sample_id) print('presence True:', check_presence) hex_sample_2 = 'dfeffffeffff17b439f3e582813bd875e7141a0e' sample_2 = hashutil.hex_to_hash(hex_sample_2) check_presence = obj_azure.check_presence(sample_2) print('presence False:', check_presence) print() print('Download a blob') blob_content = obj_azure.download(sample_id) print(blob_content) print() try: not_found_hex_id = hex_sample_id.replace('0', 'f') not_found_id = hashutil.hash_to_hex(not_found_hex_id) obj_azure.download(not_found_id) except: print('Expected `blob does not exist`!') # print() # print('blobs:') # print(list(obj_azure.list_contents())) # print() # print('content of %s' % hex_sample_id) # print(obj_azure.download(hex_sample_id)) obj_azure.check_integrity(sample_id) + if __name__ == '__main__': - tryout() + tryout() diff --git a/swh.objstorage.egg-info/PKG-INFO b/swh.objstorage.egg-info/PKG-INFO index d9f121f..7156357 100644 --- a/swh.objstorage.egg-info/PKG-INFO +++ b/swh.objstorage.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.objstorage -Version: 0.0.14 +Version: 0.0.15 Summary: Software Heritage Object Storage Home-page: https://forge.softwareheritage.org/diffusion/DOBJS Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py index dc3b765..594bd07 100644 --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -1,56 +1,65 @@ +# Copyright (C) 2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + from .objstorage import ObjStorage from .objstorage_pathslicing import PathSlicingObjStorage from .api.client import RemoteObjStorage from .multiplexer import MultiplexerObjStorage from .multiplexer.filter import add_filters __all__ = ['get_objstorage', 'ObjStorage'] _STORAGE_CLASSES = { 'pathslicing': PathSlicingObjStorage, 'remote': RemoteObjStorage, } try: from swh.objstorage.cloud.objstorage_azure import AzureCloudObjStorage _STORAGE_CLASSES['azure-storage'] = AzureCloudObjStorage except ImportError: pass def get_objstorage(cls, args): """ Create an ObjStorage using the given implementation class. Args: cls (str): objstorage class unique key contained in the _STORAGE_CLASSES dict. args (dict): arguments for the required class of objstorage that must match exactly the one in the `__init__` method of the class. Returns: subclass of ObjStorage that match the given `storage_class` argument. Raises: ValueError: if the given storage class is not a valid objstorage key. """ try: return _STORAGE_CLASSES[cls](**args) except KeyError: raise ValueError('Storage class %s does not exist' % cls) def _construct_filtered_objstorage(storage_conf, filters_conf): return add_filters( get_objstorage(**storage_conf), filters_conf ) + + _STORAGE_CLASSES['filtered'] = _construct_filtered_objstorage def _construct_multiplexer_objstorage(objstorages): storages = [get_objstorage(**conf) for conf in objstorages] return MultiplexerObjStorage(storages) + + _STORAGE_CLASSES['multiplexer'] = _construct_multiplexer_objstorage diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py index bea5275..437e4a5 100644 --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -1,72 +1,72 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pickle import requests from requests.exceptions import ConnectionError from ..objstorage import ObjStorage from ..exc import ObjStorageAPIError from .common import (decode_response, encode_data_client as encode_data) class RemoteObjStorage(ObjStorage): """ Proxy to a remote object storage. This class allows to connect to an object storage server via http protocol. Attributes: - base_url (string): The url of the server to connect. Must end + url (string): The url of the server to connect. Must end with a '/' session: The session to send requests. """ - def __init__(self, base_url): - self.base_url = base_url + def __init__(self, url): + self.url = url self.session = requests.Session() - def url(self, endpoint): - return '%s%s' % (self.base_url, endpoint) + def _url(self, endpoint): + return '%s%s' % (self.url, endpoint) def post(self, endpoint, data): try: response = self.session.post( - self.url(endpoint), + self._url(endpoint), data=encode_data(data), headers={'content-type': 'application/x-msgpack'}, ) except ConnectionError as e: raise ObjStorageAPIError(e) # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) return decode_response(response) def check_config(self, *, check_write): return self.post('check_config', {'check_write': check_write}) def __contains__(self, obj_id): return self.post('content/contains', {'obj_id': obj_id}) def add(self, content, obj_id=None, check_presence=True): return self.post('content/add', {'content': content, 'obj_id': obj_id, 'check_presence': check_presence}) def get(self, obj_id): return self.post('content/get', {'obj_id': obj_id}) def get_batch(self, obj_ids): return self.post('content/get/batch', {'obj_ids': obj_ids}) def check(self, obj_id): self.post('content/check', {'obj_id': obj_id}) def get_random(self, batch_size): return self.post('content/get/random', {'batch_size': batch_size}) diff --git a/swh/objstorage/checker.py b/swh/objstorage/checker.py index 3777644..08dd81c 100644 --- a/swh/objstorage/checker.py +++ b/swh/objstorage/checker.py @@ -1,256 +1,270 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import click import logging from swh.core import config from swh.storage.archiver.storage import ArchiverStorage from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError, Error class BaseContentChecker(config.SWHConfig, metaclass=abc.ABCMeta): """Abstract class of the content integrity checker. This checker's purpose is to iterate over the contents of a storage and check the integrity of each file. Behavior of the checker to deal with corrupted status will be specified by subclasses. You should override the DEFAULT_CONFIG and CONFIG_BASE_FILENAME variables if you need it. """ DEFAULT_CONFIG = { - 'storage': ('dict', - {'cls': 'pathslicing', - 'args': {'root': '/srv/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6'}}), + 'storage': ('dict', { + 'cls': 'pathslicing', + 'args': { + 'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6' + } + }), 'batch_size': ('int', 1000), } CONFIG_BASE_FILENAME = 'objstorage/objstorage_checker' def __init__(self): """ Create a checker that ensure the objstorage have no corrupted file """ self.config = self.parse_config_file() self.objstorage = get_objstorage(**self.config['storage']) self.batch_size = self.config['batch_size'] def run_as_daemon(self): """ Start the check routine and perform it forever. Use this method to run the checker as a daemon that will iterate over the content forever in background. """ while True: try: self.run() except: pass def run(self): """ Check a batch of content. """ for obj_id in self._get_content_to_check(self.batch_size): cstatus = self._check_content(obj_id) if cstatus == 'corrupted': self.corrupted_content(obj_id) elif cstatus == 'missing': self.missing_content(obj_id) def _get_content_to_check(self, batch_size): """ Get the content that should be verified. Returns: An iterable of the content's id that need to be checked. """ yield from self.objstorage.get_random(batch_size) def _check_content(self, obj_id): """ Check the validity of the given content. Returns: True if the content was valid, false if it was corrupted. """ try: self.objstorage.check(obj_id) except ObjNotFoundError: return 'missing' except Error: return 'corrupted' @abc.abstractmethod def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ raise NotImplementedError("%s must implement " "'corrupted_content' method" % type(self)) @abc.abstractmethod def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ raise NotImplementedError("%s must implement " "'missing_content' method" % type(self)) class LogContentChecker(BaseContentChecker): """ Content integrity checker that just log detected errors. """ DEFAULT_CONFIG = { - 'storage': ('dict', - {'cls': 'pathslicing', - 'args': {'root': '/srv/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6'}}), + 'storage': ('dict', { + 'cls': 'pathslicing', + 'args': { + 'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6' + } + }), 'batch_size': ('int', 1000), 'log_tag': ('str', 'objstorage.checker') } CONFIG_BASE_FILENAME = 'objstorage/log_checker' def __init__(self): super().__init__() self.logger = logging.getLogger(self.config['log_tag']) def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ self.logger.error('Content %s is corrupted' % obj_id) def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ self.logger.error('Content %s is detected missing' % obj_id) class RepairContentChecker(LogContentChecker): """ Content integrity checker that will try to restore contents. """ DEFAULT_CONFIG = { - 'storage': ('dict', - {'cls': 'pathslicing', - 'args': {'root': '/srv/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6'}}), + 'storage': ('dict', { + 'cls': 'pathslicing', + 'args': { + 'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6' + } + }), 'batch_size': ('int', 1000), 'log_tag': ('str', 'objstorage.checker'), - 'backup_storages': ('dict', - {'banco': { - 'cls': 'remote', - 'args': {'base_url': 'http://banco:5003/'} - }}) + 'backup_storages': ('dict', { + 'banco': { + 'cls': 'remote', + 'args': {'url': 'http://banco:5003/'} + } + }) } CONFIG_BASE_FILENAME = 'objstorage/repair_checker' def __init__(self): super().__init__() self.backups = [ get_objstorage(**storage) for name, storage in self.config['backup_storages'].items() ] def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ super().corrupted_content(obj_id) self._restore(obj_id) def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ super().missing_content(obj_id) self._restore(obj_id) def _restore(self, obj_id): if not self._perform_restore(obj_id): # Object could not be restored self.logger.critical( 'Object %s is corrupted and could not be repaired' % obj_id ) def _perform_restore(self, obj_id): """ Try to restore the object in the current storage using the backups """ for backup in self.backups: try: content = backup.get(obj_id) self.objstorage.restore(content, obj_id) except ObjNotFoundError as e: continue else: # Return True direclty when a backup contains the object return True # No backup contains the object return False class ArchiveNotifierContentChecker(LogContentChecker): """ Implementation of the checker that will update the archiver database Once the database is updated the archiver may restore the content on it's next scheduling as it won't be present anymore, and this status change will probably make the retention policy invalid. """ DEFAULT_CONFIG = { - 'storage': ('dict', - {'cls': 'pathslicing', - 'args': {'root': '/srv/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6'}}), + 'storage': ('dict', { + 'cls': 'pathslicing', + 'args': { + 'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6' + } + }), 'batch_size': ('int', 1000), 'log_tag': ('str', 'objstorage.checker'), 'storage_name': ('str', 'banco'), 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev') } CONFIG_BASE_FILENAME = 'objstorage/archive_notifier_checker' def __init__(self): super().__init__() self.archiver_db = ArchiverStorage(self.config['dbconn']) self.storage_name = self.config['storage_name'] def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ super().corrupted_content(obj_id) self._update_status(obj_id, 'corrupted') def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ super().missing_content(obj_id) self._update_status(obj_id, 'missing') def _update_status(self, obj_id, status): self.archiver_db.content_archive_update(obj_id, self.storage_name, new_status=status) @click.command() @click.argument('checker-type', required=1, default='log') @click.option('--daemon/--nodaemon', default=True, help='Indicates if the checker should run forever ' 'or on a single batch of content') def launch(checker_type, daemon): types = { 'log': LogContentChecker, 'repair': RepairContentChecker, 'archiver_notifier': ArchiveNotifierContentChecker } checker = types[checker_type]() if daemon: checker.run_as_daemon() else: checker.run() + if __name__ == '__main__': launch() diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py index 1d53b91..8aa8838 100644 --- a/swh/objstorage/tests/test_objstorage_api.py +++ b/swh/objstorage/tests/test_objstorage_api.py @@ -1,36 +1,36 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest from nose.plugins.attrib import attr from swh.objstorage import get_objstorage from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture from swh.objstorage.tests.server_testing import ServerTestFixture from swh.objstorage.api.server import app @attr('db') class TestRemoteObjStorage(ServerTestFixture, ObjStorageTestFixture, unittest.TestCase): """ Test the remote archive API. """ def setUp(self): self.config = { 'cls': 'pathslicing', 'args': { 'root': tempfile.mkdtemp(), 'slicing': '0:1/0:5', } } self.app = app super().setUp() self.storage = get_objstorage('remote', { - 'base_url': self.url() + 'url': self.url() }) diff --git a/swh/objstorage/tests/test_objstorage_instantiation.py b/swh/objstorage/tests/test_objstorage_instantiation.py index f512393..beda01f 100644 --- a/swh/objstorage/tests/test_objstorage_instantiation.py +++ b/swh/objstorage/tests/test_objstorage_instantiation.py @@ -1,47 +1,49 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest from nose.tools import istest from swh.objstorage.tests.server_testing import ServerTestFixture from swh.objstorage import get_objstorage from swh.objstorage.objstorage_pathslicing import PathSlicingObjStorage from swh.objstorage.api.client import RemoteObjStorage from swh.objstorage.api.server import app class TestObjStorageInitialization(ServerTestFixture, unittest.TestCase): """ Test that the methods for ObjStorage initializations with `get_objstorage` works properly. """ def setUp(self): self.path = tempfile.mkdtemp() # Server is launched at self.url() self.app = app self.config = {'storage_base': tempfile.mkdtemp(), 'storage_slicing': '0:1/0:5'} super().setUp() @istest def pathslicing_objstorage(self): conf = { 'cls': 'pathslicing', 'args': {'root': self.path, 'slicing': '0:2/0:5'} } st = get_objstorage(**conf) self.assertTrue(isinstance(st, PathSlicingObjStorage)) @istest def remote_objstorage(self): conf = { 'cls': 'remote', - 'args': {'base_url': self.url()} + 'args': { + 'url': self.url() + } } st = get_objstorage(**conf) self.assertTrue(isinstance(st, RemoteObjStorage)) diff --git a/version.txt b/version.txt index 93494f3..45f4157 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.14-0-g53df791 \ No newline at end of file +v0.0.15-0-g4cf0ef5 \ No newline at end of file