diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py index bea5275..437e4a5 100644 --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -1,72 +1,72 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pickle import requests from requests.exceptions import ConnectionError from ..objstorage import ObjStorage from ..exc import ObjStorageAPIError from .common import (decode_response, encode_data_client as encode_data) class RemoteObjStorage(ObjStorage): """ Proxy to a remote object storage. This class allows to connect to an object storage server via http protocol. Attributes: - base_url (string): The url of the server to connect. Must end + url (string): The url of the server to connect. Must end with a '/' session: The session to send requests. """ - def __init__(self, base_url): - self.base_url = base_url + def __init__(self, url): + self.url = url self.session = requests.Session() - def url(self, endpoint): - return '%s%s' % (self.base_url, endpoint) + def _url(self, endpoint): + return '%s%s' % (self.url, endpoint) def post(self, endpoint, data): try: response = self.session.post( - self.url(endpoint), + self._url(endpoint), data=encode_data(data), headers={'content-type': 'application/x-msgpack'}, ) except ConnectionError as e: raise ObjStorageAPIError(e) # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) return decode_response(response) def check_config(self, *, check_write): return self.post('check_config', {'check_write': check_write}) def __contains__(self, obj_id): return self.post('content/contains', {'obj_id': obj_id}) def add(self, content, obj_id=None, check_presence=True): return self.post('content/add', {'content': content, 'obj_id': obj_id, 'check_presence': check_presence}) def get(self, obj_id): return self.post('content/get', {'obj_id': obj_id}) def get_batch(self, obj_ids): return self.post('content/get/batch', {'obj_ids': obj_ids}) def check(self, obj_id): self.post('content/check', {'obj_id': obj_id}) def get_random(self, batch_size): return self.post('content/get/random', {'batch_size': batch_size}) diff --git a/swh/objstorage/checker.py b/swh/objstorage/checker.py index e37259d..4027a56 100644 --- a/swh/objstorage/checker.py +++ b/swh/objstorage/checker.py @@ -1,257 +1,257 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import click import logging from swh.core import config from swh.storage.archiver.storage import ArchiverStorage from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError, Error class BaseContentChecker(config.SWHConfig, metaclass=abc.ABCMeta): """Abstract class of the content integrity checker. This checker's purpose is to iterate over the contents of a storage and check the integrity of each file. Behavior of the checker to deal with corrupted status will be specified by subclasses. You should override the DEFAULT_CONFIG and CONFIG_BASE_FILENAME variables if you need it. """ DEFAULT_CONFIG = { 'storage': ('dict', {'cls': 'pathslicing', 'args': {'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}), 'batch_size': ('int', 1000), } CONFIG_BASE_FILENAME = 'objstorage/objstorage_checker' def __init__(self): """ Create a checker that ensure the objstorage have no corrupted file """ self.config = self.parse_config_file() self.objstorage = get_objstorage(**self.config['storage']) self.batch_size = self.config['batch_size'] def run_as_daemon(self): """ Start the check routine and perform it forever. Use this method to run the checker as a daemon that will iterate over the content forever in background. """ while True: try: self.run() except: pass def run(self): """ Check a batch of content. """ for obj_id in self._get_content_to_check(self.batch_size): cstatus = self._check_content(obj_id) if cstatus == 'corrupted': self.corrupted_content(obj_id) elif cstatus == 'missing': self.missing_content(obj_id) def _get_content_to_check(self, batch_size): """ Get the content that should be verified. Returns: An iterable of the content's id that need to be checked. """ yield from self.objstorage.get_random(batch_size) def _check_content(self, obj_id): """ Check the validity of the given content. Returns: True if the content was valid, false if it was corrupted. """ try: self.objstorage.check(obj_id) except ObjNotFoundError: return 'missing' except Error: return 'corrupted' @abc.abstractmethod def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ raise NotImplementedError("%s must implement " "'corrupted_content' method" % type(self)) @abc.abstractmethod def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ raise NotImplementedError("%s must implement " "'missing_content' method" % type(self)) class LogContentChecker(BaseContentChecker): """ Content integrity checker that just log detected errors. """ DEFAULT_CONFIG = { 'storage': ('dict', {'cls': 'pathslicing', 'args': {'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}), 'batch_size': ('int', 1000), 'log_tag': ('str', 'objstorage.checker') } CONFIG_BASE_FILENAME = 'objstorage/log_checker' def __init__(self): super().__init__() self.logger = logging.getLogger(self.config['log_tag']) def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ self.logger.error('Content %s is corrupted' % obj_id) def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ self.logger.error('Content %s is detected missing' % obj_id) class RepairContentChecker(LogContentChecker): """ Content integrity checker that will try to restore contents. """ DEFAULT_CONFIG = { 'storage': ('dict', {'cls': 'pathslicing', 'args': {'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}), 'batch_size': ('int', 1000), 'log_tag': ('str', 'objstorage.checker'), 'backup_storages': ('dict', {'banco': { 'cls': 'remote', - 'args': {'base_url': 'http://banco:5003/'} + 'args': {'url': 'http://banco:5003/'} }}) } CONFIG_BASE_FILENAME = 'objstorage/repair_checker' def __init__(self): super().__init__() self.backups = [ get_objstorage(**storage) for name, storage in self.config['backup_storages'].items() ] def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ super().corrupted_content(obj_id) self._restore(obj_id) def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ super().missing_content(obj_id) self._restore(obj_id) def _restore(self, obj_id): if not self._perform_restore(obj_id): # Object could not be restored self.logger.critical( 'Object %s is corrupted and could not be repaired' % obj_id ) def _perform_restore(self, obj_id): """ Try to restore the object in the current storage using the backups """ for backup in self.backups: try: content = backup.get(obj_id) self.objstorage.restore(content, obj_id) except ObjNotFoundError as e: continue else: # Return True direclty when a backup contains the object return True # No backup contains the object return False class ArchiveNotifierContentChecker(LogContentChecker): """ Implementation of the checker that will update the archiver database Once the database is updated the archiver may restore the content on it's next scheduling as it won't be present anymore, and this status change will probably make the retention policy invalid. """ DEFAULT_CONFIG = { 'storage': ('dict', {'cls': 'pathslicing', 'args': {'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}), 'batch_size': ('int', 1000), 'log_tag': ('str', 'objstorage.checker'), 'storage_name': ('str', 'banco'), 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev') } CONFIG_BASE_FILENAME = 'objstorage/archive_notifier_checker' def __init__(self): super().__init__() self.archiver_db = ArchiverStorage(self.config['dbconn']) self.storage_name = self.config['storage_name'] def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ super().corrupted_content(obj_id) self._update_status(obj_id, 'corrupted') def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ super().missing_content(obj_id) self._update_status(obj_id, 'missing') def _update_status(self, obj_id, status): self.archiver_db.content_archive_update(obj_id, self.storage_name, new_status=status) @click.command() @click.argument('checker-type', required=1, default='log') @click.option('--daemon/--nodaemon', default=True, help='Indicates if the checker should run forever ' 'or on a single batch of content') def launch(checker_type, daemon): types = { 'log': LogContentChecker, 'repair': RepairContentChecker, 'archiver_notifier': ArchiveNotifierContentChecker } checker = types[checker_type]() if daemon: checker.run_as_daemon() else: checker.run() if __name__ == '__main__': launch() diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py index 1d53b91..8aa8838 100644 --- a/swh/objstorage/tests/test_objstorage_api.py +++ b/swh/objstorage/tests/test_objstorage_api.py @@ -1,36 +1,36 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest from nose.plugins.attrib import attr from swh.objstorage import get_objstorage from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture from swh.objstorage.tests.server_testing import ServerTestFixture from swh.objstorage.api.server import app @attr('db') class TestRemoteObjStorage(ServerTestFixture, ObjStorageTestFixture, unittest.TestCase): """ Test the remote archive API. """ def setUp(self): self.config = { 'cls': 'pathslicing', 'args': { 'root': tempfile.mkdtemp(), 'slicing': '0:1/0:5', } } self.app = app super().setUp() self.storage = get_objstorage('remote', { - 'base_url': self.url() + 'url': self.url() }) diff --git a/swh/objstorage/tests/test_objstorage_instantiation.py b/swh/objstorage/tests/test_objstorage_instantiation.py index f512393..beda01f 100644 --- a/swh/objstorage/tests/test_objstorage_instantiation.py +++ b/swh/objstorage/tests/test_objstorage_instantiation.py @@ -1,47 +1,49 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest from nose.tools import istest from swh.objstorage.tests.server_testing import ServerTestFixture from swh.objstorage import get_objstorage from swh.objstorage.objstorage_pathslicing import PathSlicingObjStorage from swh.objstorage.api.client import RemoteObjStorage from swh.objstorage.api.server import app class TestObjStorageInitialization(ServerTestFixture, unittest.TestCase): """ Test that the methods for ObjStorage initializations with `get_objstorage` works properly. """ def setUp(self): self.path = tempfile.mkdtemp() # Server is launched at self.url() self.app = app self.config = {'storage_base': tempfile.mkdtemp(), 'storage_slicing': '0:1/0:5'} super().setUp() @istest def pathslicing_objstorage(self): conf = { 'cls': 'pathslicing', 'args': {'root': self.path, 'slicing': '0:2/0:5'} } st = get_objstorage(**conf) self.assertTrue(isinstance(st, PathSlicingObjStorage)) @istest def remote_objstorage(self): conf = { 'cls': 'remote', - 'args': {'base_url': self.url()} + 'args': { + 'url': self.url() + } } st = get_objstorage(**conf) self.assertTrue(isinstance(st, RemoteObjStorage))