diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py index 67d1406..2b1fe9f 100644 --- a/swh/storage/archiver/worker.py +++ b/swh/storage/archiver/worker.py @@ -1,270 +1,238 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random import logging import time from collections import defaultdict from swh.core import hashutil from swh.core import config from swh.objstorage import get_objstorage from swh.objstorage.exc import Error, ObjNotFoundError from .storage import ArchiverStorage from .copier import ArchiverCopier logger = logging.getLogger('archiver.worker') class ArchiverWorker(config.SWHConfig): """ Do the required backups on a given batch of contents. Process the content of a content batch in order to do the needed backups on the slaves servers. """ DEFAULT_CONFIG = { 'retention_policy': ('int', 2), 'archival_max_age': ('int', 3600), 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev'), 'storages': ('dict', [ {'host': 'uffizi', 'cls': 'pathslicing', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}, {'host': 'banco', 'cls': 'remote', 'args': {'base_url': 'http://banco:5003/'}} ]) } CONFIG_BASE_FILENAME = 'archiver-worker' def __init__(self, batch, add_config={}): """ Constructor of the ArchiverWorker class. """ self.batch = batch config = self.parse_config_file(additional_configs=[add_config]) self.retention_policy = config['retention_policy'] self.archival_max_age = config['archival_max_age'] self.archiver_db = ArchiverStorage(config['dbconn']) self.objstorages = { storage['host']: get_objstorage(storage['cls'], storage['args']) for storage in config.get('storages', []) } if len(self.objstorages) < self.retention_policy: raise ValueError('Retention policy is too high for the number of ' 'provided servers') def run(self): """ Do the task expected from the archiver worker. Process the content in the batch, ensure that the elements still need an archival, and spawn copiers to copy files in each destinations. """ - # Defaultdict so the d[key] with non-existant key automatically - # create the given type (here list). - transferts = defaultdict(list) + transfers = defaultdict(list) for obj_id in self.batch: # Get dict {'missing': [servers], 'present': [servers]} # for contents ignoring those who don't need archival. copies = self._compute_copies(obj_id) if not self._need_archival(copies): continue present = copies.get('present', []) missing = copies.get('missing', []) if len(present) == 0: logger.critical('Content have been lost %s' % obj_id) continue # Choose randomly some servers to be used as srcs and dests. for src_dest in self._choose_backup_servers(present, missing): - transferts[src_dest].append(obj_id) + transfers[src_dest].append(obj_id) # Then run copiers for each of the required transferts. - for (src, dest), content_ids in transferts.items(): + for (src, dest), content_ids in transfers.items(): self.run_copier(self.objstorages[src], self.objstorages[dest], content_ids) def _compute_copies(self, content_id): """ From a content_id, return present and missing copies. Returns: A dictionary with keys 'present' and 'missing' that are mapped to lists of copies ids depending on whenever the content is present or missing on the copy. + The key 'ongoing' is associated with a dict that map to a copy + name the mtime of the ongoing status update. """ copies = self.archiver_db.content_archive_get(content_id) _, present, ongoing = copies - # Initialize the archival status with all known present - content_data = {'present': set(present), 'missing': set()} - # Add data about the ongoing items - for copy, mtime in ongoing.items(): - content_data[ - self._get_virtual_status('ongoing', mtime) - ].add(copy) - # Add to the archival status datas about servers that were not - # in the db; they are missing. - content_data['missing'].update( - set(self.objstorages.keys()) - set(content_data['present']) - ) - return content_data - - def _get_virtual_status(self, status, mtime): - """ Compute the virtual presence of a content. - - If the status is ongoing but the time is not elasped, the archiver - consider it will be present in the futur, and so consider it as - present. - However, if the time is elasped, the copy may have failed, so consider - the content as missing. - - Arguments: - status (string): One of ('present', 'missing', 'ongoing'). The - status of the content. - mtime (datetime): Time at which the content have been updated for - the last time. - - Returns: - The virtual status of the studied content, which is 'present' or - 'missing'. - - Raises: - ValueError: if the status is not one 'present', 'missing' - or 'ongoing' - """ - if status in ('present', 'missing'): - return status - - # If the status is 'ongoing' but there is still time, another worker - # may still be on the task. - if status == 'ongoing': - elapsed = time.time() - mtime - if elapsed <= self.archival_max_age: - return 'present' - else: - return 'missing' - else: - raise ValueError("status must be either 'present', 'missing' " - "or 'ongoing'") + set_present, set_ongoing = set(present), set(ongoing) + set_missing = set(self.objstorages) - set_present - set_ongoing + return {'present': set_present, 'missing': set_missing, + 'ongoing': ongoing} def _need_archival(self, content_data): """ Indicate if the content need to be archived. Args: content_data (dict): dict that contains two lists 'present' and 'missing' with copies id corresponding to this status. Returns: True if there is not enough copies, False otherwise. """ - nb_present = len(content_data.get('present', [])) - return nb_present < self.retention_policy + nb_presents = len(content_data.get('present', [])) + for copy, mtime in content_data.get('ongoing', {}).items(): + if not self._is_archival_delay_elasped(mtime): + nb_presents += 1 + return nb_presents < self.retention_policy + + def _is_archival_delay_elapsed(self, start_time): + """ Indicates if the archival delay is elapsed given the start_time + + Args: + start_time (float): time at which the archival started. + + Returns: + True if the archival delay is elasped, False otherwise + """ + elapsed = time.time() - start_time + return elapsed > self.archival_max_age def _choose_backup_servers(self, present, missing): """ Choose and yield the required amount of couple source/destination For each required copy, choose a unique destination server among the missing copies and a source server among the presents. Each destination server is unique so after archival, the retention policy requiremen will be fulfilled. However, the source server may be used multiple times. Yields: tuple (source, destination) for each required copy. """ # Transform from set to list to allow random selections missing = list(missing) present = list(present) nb_required = self.retention_policy - len(present) destinations = random.sample(missing, nb_required) sources = [random.choice(present) for dest in destinations] yield from zip(sources, destinations) def run_copier(self, source, destination, content_ids): """ Run a copier in order to archive the given contents Upload the given contents from the source to the destination. If the process fail, the whole content is considered uncopied and remains 'ongoing', waiting to be rescheduled as there is a delay. Args: source (ObjStorage): source storage to get the contents. destination (ObjStorage): Storage where the contents will be copied content_ids: list of content's id to archive. """ # Check if there is any error among the contents. content_status = self._get_contents_error(content_ids, source) # Iterates over the error detected. for content_id, real_status in content_status.items(): # Remove them from the to-archive list, # as they cannot be retrieved correclty. content_ids.remove(content_id) # Update their status to reflect their real state. self._content_archive_update(content_id, source, new_status=real_status) # Now perform the copy on the remaining contents ac = ArchiverCopier(source, destination, content_ids) if ac.run(): # Once the archival complete, update the database. for content_id in content_ids: self._content_archive_update(content_id, destination, new_status='present') def _get_contents_error(self, content_ids, storage): """ Indicates what is the error associated to a content when needed Check the given content on the given storage. If an error is detected, it will be reported through the returned dict. Args: content_ids: a list of content id to check storage: the storage where are the content to check. Returns: a dict that map {content_id -> error_status} for each content_id with an error. The `error_status` result may be 'missing' or 'corrupted'. """ content_status = {} for content_id in content_ids: try: storage.check(content_id) except Error: content_status[content_id] = 'corrupted' logger.error('Content is corrupted: %s' % content_id) except ObjNotFoundError: content_status[content_id] = 'missing' logger.error('A content referenced present is missing: %s' % content_id) return content_status def _content_archive_update(self, content_id, archive_id, new_status=None): """ Update the status of a archive content and set its mtime to now. Change the last modification time of an archived content and change its status to the given one. Args: content_id (string): The content id. archive_id (string): The id of the concerned archive. new_status (string): One of missing, ongoing or present, this status will replace the previous one. If not given, the function only changes the mtime of the content. """ db_obj_id = r'\x' + hashutil.hash_to_hex(content_id) self.archiver_db.content_archive_update( db_obj_id, archive_id, new_status ) diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py index a9062d0..6d4e756 100644 --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -1,308 +1,288 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest import os import time import json from nose.tools import istest from nose.plugins.attrib import attr from swh.core import hashutil from swh.core.tests.db_testing import DbsTestFixture from server_testing import ServerTestFixture from swh.storage.archiver import ArchiverDirector, ArchiverWorker from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.api.server import app TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') @attr('db') class TestArchiver(DbsTestFixture, ServerTestFixture, unittest.TestCase): """ Test the objstorage archiver. """ TEST_DB_NAMES = [ 'softwareheritage-archiver-test', ] TEST_DB_DUMPS = [ os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'), ] TEST_DB_DUMP_TYPES = [ 'pg_dump', ] def setUp(self): # Launch the backup server dest_root = tempfile.mkdtemp(prefix='remote') self.config = { 'storage_base': dest_root, 'storage_slicing': '0:2/2:4/4:6' } self.app = app super().setUp() # Retrieve connection (depends on the order in TEST_DB_NAMES) self.conn = self.conns[0] # archiver db's connection self.cursor = self.cursors[0] # Create source storage src_root = tempfile.mkdtemp() src_config = {'cls': 'pathslicing', 'args': {'root': src_root, 'slicing': '0:2/2:4/4:6'}} self.src_storage = get_objstorage(**src_config) # Create destination storage dest_config = {'cls': 'remote', 'args': {'base_url': self.url()}} self.dest_storage = get_objstorage(**dest_config) # Keep mapped the id to the storages self.storages = {'uffizi': self.src_storage, 'banco': self.dest_storage} # Create the archiver itself src_archiver_conf = {'host': 'uffizi'} dest_archiver_conf = {'host': 'banco'} src_archiver_conf.update(src_config) dest_archiver_conf.update(dest_config) self.archiver_storages = [src_archiver_conf, dest_archiver_conf] self.archiver = self._create_director( retention_policy=2, storages=self.archiver_storages ) # Create a base worker self.archiver_worker = self._create_worker() # Initializes and fill the tables. self.initialize_tables() def tearDown(self): self.empty_tables() super().tearDown() def initialize_tables(self): """ Initializes the database with a sample of items. """ # Add an archive (update existing one for technical reason, # altering enum cannot run in a transaction...) self.cursor.execute("""UPDATE archive SET url='{}' WHERE id='banco' """.format(self.url())) self.conn.commit() def empty_tables(self): # Remove all content self.cursor.execute('DELETE FROM content_archive') self.conn.commit() def _create_director(self, storages, batch_size=5000, archival_max_age=3600, retention_policy=2, asynchronous=False): config = { 'dbconn': ('str', self.conn), 'batch_max_size': ('int', batch_size), 'archival_max_age': ('int', archival_max_age), 'retention_policy': ('int', retention_policy), 'asynchronous': ('bool', asynchronous), 'storages': ('dict', self.archiver_storages) } return ArchiverDirector(config) def _create_worker(self, batch={}, retention_policy=2, archival_max_age=3600): config = { 'retention_policy': ('int', retention_policy), 'archival_max_age': ('int', archival_max_age), 'dbconn': ('str', self.conn), 'storages': ('dict', self.archiver_storages) } return ArchiverWorker(batch, config) def _add_content(self, storage_name, content_data): """ Add really a content to the given objstorage This put an empty status for the added content. """ # Add the content to the storage obj_id = self.storages[storage_name].add(content_data) db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) self.cursor.execute(""" INSERT INTO content_archive VALUES('%s', '{}') """ % (db_obj_id)) return obj_id def _update_status(self, obj_id, storage_name, status, date=None): """ Update the db status for the given id/storage_name. This does not create the content in the storage. """ db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) self.archiver.archiver_storage.content_archive_update( db_obj_id, storage_name, status ) def _add_dated_content(self, obj_id, copies={}): """ Fully erase the previous copies field for the given content id This does not alter the contents into the objstorages. """ db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) self.cursor.execute(""" UPDATE TABLE content_archive SET copies='%s' WHERE content_id='%s' """ % (json.dumps(copies), db_obj_id)) # Integration test @istest def archive_missing_content(self): """ Run archiver on a missing content should archive it. """ obj_data = b'archive_missing_content' obj_id = self._add_content('uffizi', obj_data) self._update_status(obj_id, 'uffizi', 'present') # Content is missing on banco (entry not present in the db) try: self.dest_storage.get(obj_id) except ObjNotFoundError: pass else: self.fail('Content should not be present before archival') self.archiver.run() # now the content should be present on remote objstorage remote_data = self.dest_storage.get(obj_id) self.assertEquals(obj_data, remote_data) @istest def archive_present_content(self): """ A content that is not 'missing' shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_present_content') self._update_status(obj_id, 'uffizi', 'present') self._update_status(obj_id, 'banco', 'present') # After the run, the content should NOT be in the archive. # As the archiver believe it was already in. self.archiver.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) @istest def archive_already_enough(self): """ A content missing with enough copies shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_alread_enough') self._update_status(obj_id, 'uffizi', 'present') director = self._create_director(self.archiver_storages, retention_policy=1) # Obj is present in only one archive but only one copy is required. director.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) # Unit tests for archive worker - def vstatus(self, status, mtime): - return self.archiver_worker._get_virtual_status(status, mtime) - - @istest - def vstatus_present(self): - self.assertEquals( - self.vstatus('present', None), - 'present' - ) - - @istest - def vstatus_missing(self): - self.assertEquals( - self.vstatus('missing', None), - 'missing' - ) + def archival_elapsed(self, mtime): + return self.archiver_worker._is_archival_delay_elapsed(mtime) @istest def vstatus_ongoing_remaining(self): - self.assertEquals( - self.vstatus('ongoing', time.time()), - 'present' - ) + self.assertFalse(self.archival_elapsed(time.time())) @istest def vstatus_ongoing_elapsed(self): past_time = ( time.time() - self.archiver_worker.archival_max_age ) - self.assertEquals( - self.vstatus('ongoing', past_time), - 'missing' - ) + self.assertTrue(self.archival_elapsed(past_time)) def _status(self, status, mtime=None): """ Get a dict that match the copies structure """ return {'status': status, 'mtime': mtime or time.time()} @istest def need_archival_missing(self): """ A content should need archival when it is missing. """ status_copies = {'present': ['uffizi'], 'missing': ['banco']} worker = self._create_worker({}, retention_policy=2) self.assertEqual(worker._need_archival(status_copies), True) @istest def need_archival_present(self): """ A content present everywhere shouldn't need archival """ status_copies = {'present': ['uffizi', 'banco']} worker = self._create_worker({}, retention_policy=2) self.assertEqual(worker._need_archival(status_copies), False) def _compute_copies_status(self, status): """ A content with a given status should be detected correctly """ obj_id = self._add_content( 'banco', b'compute_copies_' + bytes(status, 'utf8')) self._update_status(obj_id, 'banco', status) worker = self._create_worker() self.assertIn('banco', worker._compute_copies(obj_id)[status]) @istest def compute_copies_present(self): """ A present content should be detected with correct status """ self._compute_copies_status('present') @istest def compute_copies_missing(self): """ A missing content should be detected with correct status """ self._compute_copies_status('missing') def _get_backups(self, present, missing): """ Return a list of the pair src/dest from the present and missing """ worker = self._create_worker() return list(worker._choose_backup_servers(present, missing)) @istest def choose_backup_servers(self): self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) # Even with more possible destinations, do not take more than the # retention_policy require self.assertEqual( len(self._get_backups(['uffizi'], ['banco', 's3'])), 1 )