Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/bin/swh-objstorage-azure b/bin/swh-objstorage-azure
index 9cfed40..b059cc3 100755
--- a/bin/swh-objstorage-azure
+++ b/bin/swh-objstorage-azure
@@ -1,113 +1,112 @@
#!/usr/bin/env python3
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# NOT FOR PRODUCTION
import click
from swh.objstorage import get_objstorage
-from swh.objstorage.cloud.objstorage_azure import AzureCloudObjStorage
from swh.core import config, hashutil
class AzureAccess(config.SWHConfig):
"""This is an orchestration class to try and check objstorage_azure
implementation."""
DEFAULT_CONFIG = {
# Output storage
'storage_azure': ('dict',
{'cls': 'pathslicing',
'args': {'root': '/srv/softwareheritage/objects',
'slicing': '0:2/2:4/4:6'}}),
# Input storage
'storage_local': ('dict',
{'cls': 'pathslicing',
'args': {'root': '/srv/softwareheritage/objects',
'slicing': '0:2/2:4/4:6'}}),
}
CONFIG_BASE_FILENAME = 'objstorage/azure'
def __init__(self):
super().__init__()
self.config = self.parse_config_file()
- container_name = 'contents'
self.azure_cloud_storage = get_objstorage(
**self.config['storage_azure'])
self.read_objstorage = get_objstorage(
**self.config['storage_local'])
def list_contents(self, limit=10):
count = 0
for c in self.azure_cloud_storage:
count += 1
yield c
if count >= limit:
return
def send_one_content(self, obj_id):
obj_content = self.read_objstorage.get(obj_id)
self.azure_cloud_storage.add(content=obj_content,
obj_id=obj_id)
def check_integrity(self, obj_id):
self.azure_cloud_storage.check(obj_id) # will raise if problem
def check_presence(self, obj_id):
return obj_id in self.azure_cloud_storage
def download(self, obj_id):
return self.azure_cloud_storage.get(obj_id)
@click.command()
def tryout():
obj_azure = AzureAccess()
hex_sample_id = '00000085c856b32f0709a4f5d669bb4faa3a0ce9'
sample_id = hashutil.hex_to_hash(hex_sample_id)
check_presence = obj_azure.check_presence(sample_id)
print('presence first time should be False:', check_presence)
obj_azure.send_one_content(sample_id)
check_presence = obj_azure.check_presence(sample_id)
print('presence True:', check_presence)
hex_sample_2 = 'dfeffffeffff17b439f3e582813bd875e7141a0e'
sample_2 = hashutil.hex_to_hash(hex_sample_2)
check_presence = obj_azure.check_presence(sample_2)
print('presence False:', check_presence)
print()
print('Download a blob')
blob_content = obj_azure.download(sample_id)
print(blob_content)
print()
try:
not_found_hex_id = hex_sample_id.replace('0', 'f')
not_found_id = hashutil.hash_to_hex(not_found_hex_id)
obj_azure.download(not_found_id)
except:
print('Expected `blob does not exist`!')
# print()
# print('blobs:')
# print(list(obj_azure.list_contents()))
# print()
# print('content of %s' % hex_sample_id)
# print(obj_azure.download(hex_sample_id))
obj_azure.check_integrity(sample_id)
+
if __name__ == '__main__':
- tryout()
+ tryout()
diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py
index 086e66f..594bd07 100644
--- a/swh/objstorage/__init__.py
+++ b/swh/objstorage/__init__.py
@@ -1,61 +1,65 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from .objstorage import ObjStorage
from .objstorage_pathslicing import PathSlicingObjStorage
from .api.client import RemoteObjStorage
from .multiplexer import MultiplexerObjStorage
from .multiplexer.filter import add_filters
__all__ = ['get_objstorage', 'ObjStorage']
_STORAGE_CLASSES = {
'pathslicing': PathSlicingObjStorage,
'remote': RemoteObjStorage,
}
try:
from swh.objstorage.cloud.objstorage_azure import AzureCloudObjStorage
_STORAGE_CLASSES['azure-storage'] = AzureCloudObjStorage
except ImportError:
pass
def get_objstorage(cls, args):
""" Create an ObjStorage using the given implementation class.
Args:
cls (str): objstorage class unique key contained in the
_STORAGE_CLASSES dict.
args (dict): arguments for the required class of objstorage
that must match exactly the one in the `__init__` method of the
class.
Returns:
subclass of ObjStorage that match the given `storage_class` argument.
Raises:
ValueError: if the given storage class is not a valid objstorage
key.
"""
try:
return _STORAGE_CLASSES[cls](**args)
except KeyError:
raise ValueError('Storage class %s does not exist' % cls)
def _construct_filtered_objstorage(storage_conf, filters_conf):
return add_filters(
get_objstorage(**storage_conf),
filters_conf
)
+
+
_STORAGE_CLASSES['filtered'] = _construct_filtered_objstorage
def _construct_multiplexer_objstorage(objstorages):
storages = [get_objstorage(**conf)
for conf in objstorages]
return MultiplexerObjStorage(storages)
+
+
_STORAGE_CLASSES['multiplexer'] = _construct_multiplexer_objstorage
diff --git a/swh/objstorage/checker.py b/swh/objstorage/checker.py
index 3777644..e37259d 100644
--- a/swh/objstorage/checker.py
+++ b/swh/objstorage/checker.py
@@ -1,256 +1,257 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import click
import logging
from swh.core import config
from swh.storage.archiver.storage import ArchiverStorage
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError, Error
class BaseContentChecker(config.SWHConfig, metaclass=abc.ABCMeta):
"""Abstract class of the content integrity checker.
This checker's purpose is to iterate over the contents of a storage and
check the integrity of each file.
Behavior of the checker to deal with corrupted status will be specified
by subclasses.
You should override the DEFAULT_CONFIG and CONFIG_BASE_FILENAME
variables if you need it.
"""
DEFAULT_CONFIG = {
'storage': ('dict',
{'cls': 'pathslicing',
'args': {'root': '/srv/softwareheritage/objects',
'slicing': '0:2/2:4/4:6'}}),
'batch_size': ('int', 1000),
}
CONFIG_BASE_FILENAME = 'objstorage/objstorage_checker'
def __init__(self):
""" Create a checker that ensure the objstorage have no corrupted file
"""
self.config = self.parse_config_file()
self.objstorage = get_objstorage(**self.config['storage'])
self.batch_size = self.config['batch_size']
def run_as_daemon(self):
""" Start the check routine and perform it forever.
Use this method to run the checker as a daemon that will iterate over
the content forever in background.
"""
while True:
try:
self.run()
except:
pass
def run(self):
""" Check a batch of content.
"""
for obj_id in self._get_content_to_check(self.batch_size):
cstatus = self._check_content(obj_id)
if cstatus == 'corrupted':
self.corrupted_content(obj_id)
elif cstatus == 'missing':
self.missing_content(obj_id)
def _get_content_to_check(self, batch_size):
""" Get the content that should be verified.
Returns:
An iterable of the content's id that need to be checked.
"""
yield from self.objstorage.get_random(batch_size)
def _check_content(self, obj_id):
""" Check the validity of the given content.
Returns:
True if the content was valid, false if it was corrupted.
"""
try:
self.objstorage.check(obj_id)
except ObjNotFoundError:
return 'missing'
except Error:
return 'corrupted'
@abc.abstractmethod
def corrupted_content(self, obj_id):
""" Perform an action to treat with a corrupted content.
"""
raise NotImplementedError("%s must implement "
"'corrupted_content' method" % type(self))
@abc.abstractmethod
def missing_content(self, obj_id):
""" Perform an action to treat with a missing content.
"""
raise NotImplementedError("%s must implement "
"'missing_content' method" % type(self))
class LogContentChecker(BaseContentChecker):
""" Content integrity checker that just log detected errors.
"""
DEFAULT_CONFIG = {
'storage': ('dict',
{'cls': 'pathslicing',
'args': {'root': '/srv/softwareheritage/objects',
'slicing': '0:2/2:4/4:6'}}),
'batch_size': ('int', 1000),
'log_tag': ('str', 'objstorage.checker')
}
CONFIG_BASE_FILENAME = 'objstorage/log_checker'
def __init__(self):
super().__init__()
self.logger = logging.getLogger(self.config['log_tag'])
def corrupted_content(self, obj_id):
""" Perform an action to treat with a corrupted content.
"""
self.logger.error('Content %s is corrupted' % obj_id)
def missing_content(self, obj_id):
""" Perform an action to treat with a missing content.
"""
self.logger.error('Content %s is detected missing' % obj_id)
class RepairContentChecker(LogContentChecker):
""" Content integrity checker that will try to restore contents.
"""
DEFAULT_CONFIG = {
'storage': ('dict',
{'cls': 'pathslicing',
'args': {'root': '/srv/softwareheritage/objects',
'slicing': '0:2/2:4/4:6'}}),
'batch_size': ('int', 1000),
'log_tag': ('str', 'objstorage.checker'),
'backup_storages': ('dict',
{'banco': {
'cls': 'remote',
'args': {'base_url': 'http://banco:5003/'}
}})
}
CONFIG_BASE_FILENAME = 'objstorage/repair_checker'
def __init__(self):
super().__init__()
self.backups = [
get_objstorage(**storage)
for name, storage in self.config['backup_storages'].items()
]
def corrupted_content(self, obj_id):
""" Perform an action to treat with a corrupted content.
"""
super().corrupted_content(obj_id)
self._restore(obj_id)
def missing_content(self, obj_id):
""" Perform an action to treat with a missing content.
"""
super().missing_content(obj_id)
self._restore(obj_id)
def _restore(self, obj_id):
if not self._perform_restore(obj_id):
# Object could not be restored
self.logger.critical(
'Object %s is corrupted and could not be repaired' % obj_id
)
def _perform_restore(self, obj_id):
""" Try to restore the object in the current storage using the backups
"""
for backup in self.backups:
try:
content = backup.get(obj_id)
self.objstorage.restore(content, obj_id)
except ObjNotFoundError as e:
continue
else:
# Return True direclty when a backup contains the object
return True
# No backup contains the object
return False
class ArchiveNotifierContentChecker(LogContentChecker):
""" Implementation of the checker that will update the archiver database
Once the database is updated the archiver may restore the content on it's
next scheduling as it won't be present anymore, and this status change
will probably make the retention policy invalid.
"""
DEFAULT_CONFIG = {
'storage': ('dict',
{'cls': 'pathslicing',
'args': {'root': '/srv/softwareheritage/objects',
'slicing': '0:2/2:4/4:6'}}),
'batch_size': ('int', 1000),
'log_tag': ('str', 'objstorage.checker'),
'storage_name': ('str', 'banco'),
'dbconn': ('str', 'dbname=softwareheritage-archiver-dev')
}
CONFIG_BASE_FILENAME = 'objstorage/archive_notifier_checker'
def __init__(self):
super().__init__()
self.archiver_db = ArchiverStorage(self.config['dbconn'])
self.storage_name = self.config['storage_name']
def corrupted_content(self, obj_id):
""" Perform an action to treat with a corrupted content.
"""
super().corrupted_content(obj_id)
self._update_status(obj_id, 'corrupted')
def missing_content(self, obj_id):
""" Perform an action to treat with a missing content.
"""
super().missing_content(obj_id)
self._update_status(obj_id, 'missing')
def _update_status(self, obj_id, status):
self.archiver_db.content_archive_update(obj_id, self.storage_name,
new_status=status)
@click.command()
@click.argument('checker-type', required=1, default='log')
@click.option('--daemon/--nodaemon', default=True,
help='Indicates if the checker should run forever '
'or on a single batch of content')
def launch(checker_type, daemon):
types = {
'log': LogContentChecker,
'repair': RepairContentChecker,
'archiver_notifier': ArchiveNotifierContentChecker
}
checker = types[checker_type]()
if daemon:
checker.run_as_daemon()
else:
checker.run()
+
if __name__ == '__main__':
launch()

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 11:18 AM (3 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3281450

Event Timeline