Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345615
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
18 KB
Subscribers
None
View Options
diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py
index f59b6df52..90a579950 100644
--- a/swh/storage/archiver/director.py
+++ b/swh/storage/archiver/director.py
@@ -1,158 +1,151 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
from swh.core import config
from swh.scheduler.celery_backend.config import app
from . import tasks # NOQA
from .storage import ArchiverStorage
task_name = 'swh.storage.archiver.tasks.SWHArchiverTask'
class ArchiverDirector(config.SWHConfig):
"""Process the files in order to know which one is needed as backup.
The archiver director processes the files in the local storage in order
to know which one needs archival and it delegates this task to
archiver workers.
"""
DEFAULT_CONFIG = {
'batch_max_size': ('int', 1500),
'retention_policy': ('int', 2),
'asynchronous': ('bool', True),
'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest')
}
CONFIG_BASE_FILENAME = 'archiver/director'
- def __init__(self, add_config):
+ def __init__(self):
""" Constructor of the archiver director.
Args:
db_conn_archiver: Either a libpq connection string,
or a psycopg2 connection for the archiver db.
config: optionnal additional configuration. Keys in the dict will
override the one parsed from the configuration file.
"""
- self.config = self.parse_config_file(additional_configs=[add_config])
+ self.config = self.parse_config_file()
self.archiver_storage = ArchiverStorage(self.config['dbconn'])
def run(self):
""" Run the archiver director.
The archiver director will check all the contents of the archiver
database and do the required backup jobs.
"""
if self.config['asynchronous']:
run_fn = self.run_async_worker
else:
run_fn = self.run_sync_worker
for batch in self.get_unarchived_content_batch():
run_fn(batch)
- def _worker_args(self, batch):
- """ Generates a dict that contains the arguments for a worker.
- """
- return {
- 'batch': batch
- }
-
def run_async_worker(self, batch):
""" Produce a worker that will be added to the task queue.
"""
task = app.tasks[task_name]
- task.delay(**self._worker_args(batch))
+ task.delay(batch=batch)
def run_sync_worker(self, batch):
""" Run synchronously a worker on the given batch.
"""
task = app.tasks[task_name]
- task(**self._worker_args(batch))
+ task(batch=batch)
def get_unarchived_content_batch(self):
""" Create batch of contents that needs to be archived
Yields:
batch of sha1 that corresponds to contents that needs more archive
copies.
"""
contents = []
for content in self._get_unarchived_content():
contents.append(content)
if len(contents) > self.config['batch_max_size']:
yield contents
contents = []
if len(contents) > 0:
yield contents
def _get_unarchived_content(self):
""" Get all the content ids in the db that needs more copies
Yields:
sha1 of contents that needs to be archived.
"""
for content_id, present, _ongoing in self._get_all_contents():
if len(present) < self.config['retention_policy']:
yield content_id
else:
continue
def _get_all_contents(self):
""" Get batchs from the archiver db and yield it as continous stream
Yields:
Datas about a content as a tuple
(content_id, present_copies, ongoing_copies) where ongoing_copies
is a dict mapping copy to mtime.
"""
last_object = b''
while True:
archiver_contents = list(
self.archiver_storage.content_archive_get_copies(last_object)
)
if not archiver_contents:
return
for content in archiver_contents:
last_object = content[0]
yield content
@click.command()
@click.option('--batch-size', help="Maximal number of objects in a batch")
@click.option('--retention-policy',
help="Minimal number of copies the archiver will create")
@click.option('--dbconn',
help="Connection string for the archiver database")
@click.option('--async/--sync',
help="Indicates if the archiver should run asynchronously")
def launch(batch_size, retention_policy, dbconn, async):
# The configuration have following priority :
# command line > file config > default config
# Values are None if not provided
cl_config = create_conf(batch_size, retention_policy, dbconn, async)
# Rrun the archiver with the overriding conf.
archiver = ArchiverDirector(cl_config)
archiver.run()
def create_conf(batch_size, retention_policy, dbconn, async):
""" Create a dict that contains only the given arguments
"""
return dict(filter(lambda k, v: v is not None,
(
('batch_max_size', batch_size),
('retention_policy', retention_policy),
('dbconn', dbconn),
('async', async)
)))
if __name__ == '__main__':
launch()
diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py
index 9f54108d2..082d95bd6 100644
--- a/swh/storage/tests/test_archiver.py
+++ b/swh/storage/tests/test_archiver.py
@@ -1,310 +1,294 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import tempfile
import unittest
import os
import time
import json
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.core import hashutil
from swh.core.tests.db_testing import DbsTestFixture
from server_testing import ServerTestFixture
from swh.storage.archiver import ArchiverDirector, ArchiverWorker
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
from swh.objstorage.api.server import app
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata')
@attr('db')
class TestArchiver(DbsTestFixture, ServerTestFixture,
unittest.TestCase):
""" Test the objstorage archiver.
"""
TEST_DB_NAMES = [
'softwareheritage-archiver-test',
]
TEST_DB_DUMPS = [
os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'),
]
TEST_DB_DUMP_TYPES = [
'pg_dump',
]
def setUp(self):
# Launch the backup server
dest_root = tempfile.mkdtemp(prefix='remote')
self.config = {
'storage_base': dest_root,
'storage_slicing': '0:2/2:4/4:6'
}
self.app = app
super().setUp()
# Retrieve connection (depends on the order in TEST_DB_NAMES)
self.conn = self.conns[0] # archiver db's connection
self.cursor = self.cursors[0]
# Create source storage
src_root = tempfile.mkdtemp()
src_config = {'cls': 'pathslicing',
'args': {'root': src_root,
'slicing': '0:2/2:4/4:6'}}
self.src_storage = get_objstorage(**src_config)
# Create destination storage
dest_config = {'cls': 'remote',
'args': {'base_url': self.url()}}
self.dest_storage = get_objstorage(**dest_config)
# Keep mapped the id to the storages
self.storages = {'uffizi': self.src_storage,
'banco': self.dest_storage}
- # Create the archiver itself
+ # Override configurations
src_archiver_conf = {'host': 'uffizi'}
dest_archiver_conf = {'host': 'banco'}
src_archiver_conf.update(src_config)
dest_archiver_conf.update(dest_config)
self.archiver_storages = [src_archiver_conf, dest_archiver_conf]
- self._override_config() # Override the default config for db conn
- self.archiver = self._create_director(
- retention_policy=2,
- storages=self.archiver_storages
- )
+ self._override_director_config()
+ self._override_worker_config()
+ # Create the base archiver
+ self.archiver = self._create_director()
# Initializes and fill the tables.
self.initialize_tables()
def tearDown(self):
self.empty_tables()
super().tearDown()
def initialize_tables(self):
""" Initializes the database with a sample of items.
"""
# Add an archive (update existing one for technical reason,
# altering enum cannot run in a transaction...)
self.cursor.execute("""UPDATE archive
SET url='{}'
WHERE id='banco'
""".format(self.url()))
self.conn.commit()
def empty_tables(self):
# Remove all content
self.cursor.execute('DELETE FROM content_archive')
self.conn.commit()
- def _override_config(self):
- """ Override the default config of the Archiver director and worker
+ def _override_director_config(self, retention_policy=2):
+ """ Override the default config of the Archiver director
to allow the tests to use the *-test db instead of the default one as
there is no configuration file for now.
+ """
+ ArchiverDirector.parse_config_file = lambda obj: {
+ 'dbconn': self.conn,
+ 'batch_max_size': 5000,
+ 'archival_max_age': 3600,
+ 'retention_policy': retention_policy,
+ 'asynchronous': False
+ }
- Note that the default config file name is also overriden. If there is
- a file with this name in the configuration directories, the tests
- behavior may be altered.
+ def _override_worker_config(self):
+ """ Override the default config of the Archiver worker
+ to allow the tests to use the *-test db instead of the default one as
+ there is no configuration file for now.
"""
ArchiverWorker.parse_config_file = lambda obj: {
'retention_policy': 2,
'archival_max_age': 3600,
'dbconn': self.conn,
'storages': self.archiver_storages
}
- ArchiverDirector.pars_config_file = lambda obj: {
- 'dbconn': self.conn,
- 'batch_max_size': 5000,
- 'archival_max_age': 3600,
- 'retention_policy': 2,
- 'asynchronous': False
- }
- def _create_director(self, storages, batch_size=5000,
- archival_max_age=3600, retention_policy=2,
- asynchronous=False):
- config = {
- 'dbconn': ('str', self.conn),
- 'batch_max_size': ('int', batch_size),
- 'archival_max_age': ('int', archival_max_age),
- 'retention_policy': ('int', retention_policy),
- 'asynchronous': ('bool', asynchronous)
- }
- return ArchiverDirector(config)
-
- def _create_worker(self, batch={}, retention_policy=2,
- archival_max_age=3600):
- ArchiverWorker.DEFAULT_CONFIG = {
- 'retention_policy': ('int', retention_policy),
- 'archival_max_age': ('int', archival_max_age),
- 'dbconn': ('str', self.conn),
- 'storages': ('dict', self.archiver_storages)
- }
- aw = ArchiverWorker(batch)
- return aw
+ def _create_director(self):
+ return ArchiverDirector()
+
+ def _create_worker(self, batch={}):
+ return ArchiverWorker(batch)
def _add_content(self, storage_name, content_data):
""" Add really a content to the given objstorage
This put an empty status for the added content.
"""
# Add the content to the storage
obj_id = self.storages[storage_name].add(content_data)
db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id)
self.cursor.execute(""" INSERT INTO content_archive
VALUES('%s', '{}')
""" % (db_obj_id))
return obj_id
def _update_status(self, obj_id, storage_name, status, date=None):
""" Update the db status for the given id/storage_name.
This does not create the content in the storage.
"""
db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id)
self.archiver.archiver_storage.content_archive_update(
db_obj_id, storage_name, status
)
def _add_dated_content(self, obj_id, copies={}):
""" Fully erase the previous copies field for the given content id
This does not alter the contents into the objstorages.
"""
db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id)
self.cursor.execute(""" UPDATE TABLE content_archive
SET copies='%s'
WHERE content_id='%s'
""" % (json.dumps(copies), db_obj_id))
# Integration test
@istest
def archive_missing_content(self):
""" Run archiver on a missing content should archive it.
"""
obj_data = b'archive_missing_content'
obj_id = self._add_content('uffizi', obj_data)
self._update_status(obj_id, 'uffizi', 'present')
# Content is missing on banco (entry not present in the db)
try:
self.dest_storage.get(obj_id)
except ObjNotFoundError:
pass
else:
self.fail('Content should not be present before archival')
self.archiver.run()
# now the content should be present on remote objstorage
remote_data = self.dest_storage.get(obj_id)
self.assertEquals(obj_data, remote_data)
@istest
def archive_present_content(self):
""" A content that is not 'missing' shouldn't be archived.
"""
obj_id = self._add_content('uffizi', b'archive_present_content')
self._update_status(obj_id, 'uffizi', 'present')
self._update_status(obj_id, 'banco', 'present')
# After the run, the content should NOT be in the archive.
# As the archiver believe it was already in.
self.archiver.run()
with self.assertRaises(ObjNotFoundError):
self.dest_storage.get(obj_id)
@istest
def archive_already_enough(self):
""" A content missing with enough copies shouldn't be archived.
"""
obj_id = self._add_content('uffizi', b'archive_alread_enough')
self._update_status(obj_id, 'uffizi', 'present')
- director = self._create_director(self.archiver_storages,
- retention_policy=1)
+ self._override_director_config(retention_policy=1)
+ director = self._create_director()
# Obj is present in only one archive but only one copy is required.
director.run()
with self.assertRaises(ObjNotFoundError):
self.dest_storage.get(obj_id)
# Unit tests for archive worker
def archival_elapsed(self, mtime):
return self._create_worker()._is_archival_delay_elapsed(mtime)
@istest
def vstatus_ongoing_remaining(self):
self.assertFalse(self.archival_elapsed(time.time()))
@istest
def vstatus_ongoing_elapsed(self):
past_time = (
time.time() - self._create_worker().archival_max_age
)
self.assertTrue(self.archival_elapsed(past_time))
def _status(self, status, mtime=None):
""" Get a dict that match the copies structure
"""
return {'status': status, 'mtime': mtime or time.time()}
@istest
def need_archival_missing(self):
""" A content should need archival when it is missing.
"""
status_copies = {'present': ['uffizi'], 'missing': ['banco']}
- worker = self._create_worker({}, retention_policy=2)
+ worker = self._create_worker()
self.assertEqual(worker._need_archival(status_copies),
True)
@istest
def need_archival_present(self):
""" A content present everywhere shouldn't need archival
"""
status_copies = {'present': ['uffizi', 'banco']}
- worker = self._create_worker({}, retention_policy=2)
+ worker = self._create_worker()
self.assertEqual(worker._need_archival(status_copies),
False)
def _compute_copies_status(self, status):
""" A content with a given status should be detected correctly
"""
obj_id = self._add_content(
'banco', b'compute_copies_' + bytes(status, 'utf8'))
self._update_status(obj_id, 'banco', status)
worker = self._create_worker()
self.assertIn('banco', worker._compute_copies(obj_id)[status])
@istest
def compute_copies_present(self):
""" A present content should be detected with correct status
"""
self._compute_copies_status('present')
@istest
def compute_copies_missing(self):
""" A missing content should be detected with correct status
"""
self._compute_copies_status('missing')
def _get_backups(self, present, missing):
""" Return a list of the pair src/dest from the present and missing
"""
worker = self._create_worker()
return list(worker._choose_backup_servers(present, missing))
@istest
def choose_backup_servers(self):
self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0)
self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1)
# Even with more possible destinations, do not take more than the
# retention_policy require
self.assertEqual(
len(self._get_backups(['uffizi'], ['banco', 's3'])),
1
)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:27 PM (6 d, 19 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3321530
Attached To
R65 Staging repository
Event Timeline
Log In to Comment