diff --git a/sql/swh-schema.sql b/sql/swh-schema.sql --- a/sql/swh-schema.sql +++ b/sql/swh-schema.sql @@ -14,7 +14,7 @@ ); insert into dbversion(version, release, description) - values(68, now(), 'Work In Progress'); + values(69, now(), 'Work In Progress'); -- a SHA1 checksum (not necessarily originating from Git) create domain sha1 as bytea check (length(value) = 20); @@ -426,3 +426,29 @@ ); create index on release(target, target_type); + + +-- In order to archive the content of the object storage, add +-- some tables to keep trace of what have already been archived. + +CREATE DOMAIN archive_id AS TEXT; + +CREATE TABLE archives ( + id archive_id PRIMARY KEY, + url TEXT +); + +CREATE TYPE archive_status AS ENUM ( + 'missing', + 'ongoing', + 'present' +); + +CREATE TABLE content_archive ( + content_id sha1 REFERENCES content(sha1), + archive_id archive_id REFERENCES archives(id), + status archive_status, + mtime timestamptz, + PRIMARY KEY (content_id, archive_id) +); + diff --git a/sql/upgrades/069.sql b/sql/upgrades/069.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/069.sql @@ -0,0 +1,28 @@ +-- SWH DB schema upgrade +-- from_version: 68 +-- to_version: 69 +-- description: add tables for the archiver. + +insert into dbversion(version, release, description) + values(69, now(), 'Work In Progress'); + +CREATE DOMAIN archive_id AS TEXT; + +CREATE TABLE archives ( + id archive_id PRIMARY KEY, + url TEXT +); + +CREATE TYPE archive_status AS ENUM ( + 'missing', + 'ongoing', + 'present' +); + +CREATE TABLE content_archive ( + content_id sha1 REFERENCES content(sha1), + archive_id archive_id REFERENCES archives(id), + status archive_status, + mtime timestamptz, + PRIMARY KEY (content_id, archive_id) +); diff --git a/swh/storage/archiver/__init__.py b/swh/storage/archiver/__init__.py new file mode 100644 --- /dev/null +++ b/swh/storage/archiver/__init__.py @@ -0,0 +1,3 @@ +from .director import ArchiverDirector # NOQA +from .worker import ArchiverWorker # NOQA +from .copier import ArchiverCopier # NOQA diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py new file mode 100644 --- /dev/null +++ b/swh/storage/archiver/copier.py @@ -0,0 +1,60 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.core import hashutil +from ..objstorage.api.client import RemoteObjStorage + + +class ArchiverCopier(): + """ This archiver copy some files into a remote objstorage + in order to get a backup. + + Attributes: + content_ids: A list of sha1's that represents the content this copier + has to archive. + server (RemoteArchive): The remote object storage that is used to + backup content. + master_storage (Storage): The master storage that contains the data + the copier needs to archive. + """ + def __init__(self, destination, content, master_storage): + """ Create a Copier for the archiver + + Args: + destination: A tuple (archive_name, archive_url) that represents a + remote object storage as in the 'archives' table. + content: A list of sha1 that represents the content this copier + have to archive. + master_storage (Storage): The master storage of the system that + contains the data to archive. + """ + _name, self.url = destination + self.content_ids = content + self.server = RemoteObjStorage(self.url) + self.master_storage = master_storage + + def run(self): + """ Do the copy on the backup storage. + + Run the archiver copier in order to copy the required content + into the current destination. + The content which corresponds to the sha1 in self.content_ids + will be fetched from the master_storage and then copied into + the backup object storage. + + Returns: + A boolean that indicates if the whole content have been copied. + """ + self.content_ids = list(map(lambda x: hashutil.hex_to_hash(x[2:]), + self.content_ids)) + contents = self.master_storage.content_get(self.content_ids) + try: + for content in contents: + content_data = content['data'] + self.server.content_add(content_data) + except: + return False + + return True diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py new file mode 100644 --- /dev/null +++ b/swh/storage/archiver/director.py @@ -0,0 +1,244 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import swh + +from datetime import datetime + +from swh.core import hashutil, config +from swh.scheduler.celery_backend.config import app +from . import tasks # NOQA +from ..db import cursor_to_bytes + + +DEFAULT_CONFIG = { + 'objstorage_path': '/tmp/swh-storage/objects', + 'batch_max_size': 50, + 'archival_max_age': 3600, + 'retention_policy': 2, + 'asynchronous': True, + + 'dbname': 'softwareheritage', + 'user': 'root' +} + +task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' + + +class ArchiverDirector(): + """Process the files in order to know which one is needed as backup. + + The archiver director processes the files in the local storage in order + to know which one needs archival and it delegates this task to + archiver workers. + + Attributes: + master_storage: the local storage of the master server. + slave_storages: Iterable of remote obj storages to the slaves servers + used for backup. + batch_max_size: The number of content items that can be given + to the same archiver worker. + archival_max_age: Delay given to the worker to copy all the files + in a given batch. + retention_policy: Required number of copies for the content to + be considered safe. + """ + + def __init__(self, db_conn, config): + """ Constructor of the archiver director. + + Args: + db_conn: db_conn: Either a libpq connection string, + or a psycopg2 connection. + config: Archiver_configuration. A dictionnary that must contains + the following keys. + objstorage_path (string): the path of the objstorage of the + master. + batch_max_size (int): The number of content items that can be + given to the same archiver worker. + archival_max_age (int): Delay given to the worker to cpy all + the files in a given batch. + retention_policy (int): Required number of copies for the + content to be considered safe. + asynchronous (boolean): Indicate whenever the archival should + run in asynchronous mode or not. + """ + # Get the local storage of the master and remote ones for the slaves. + self.master_storage_args = [db_conn, config['objstorage_path']] + master_storage = swh.storage.get_storage('local_storage', + self.master_storage_args) + slaves = { + id: url + for id, url + in master_storage.db.archive_ls() + } + + # TODO Database should be initialized somehow before going in + # production. For now, assumes that the database contains + # datas for all the current content. + + self.master_storage = master_storage + self.slave_storages = slaves + self.batch_max_size = config['batch_max_size'] + self.archival_max_age = config['archival_max_age'] + self.retention_policy = config['retention_policy'] + self.is_asynchronous = config['asynchronous'] + + def run(self): + """ Run the archiver director. + + The archiver director will check all the contents of the archiver + database and do the required backup jobs. + """ + run_fn = (self.run_async_worker + if self.is_asynchronous + else self.run_sync_worker) + for batch in self.get_unarchived_content(): + run_fn(batch) + + def run_async_worker(self, batch): + """ Produce a worker that will be added to the task queue. + """ + task = app.tasks[task_name] + task.delay(batch, self.master_storage_args, + self.slave_storages, self.retention_policy) + + def run_sync_worker(self, batch): + """ Run synchronously a worker on the given batch. + """ + task = app.tasks[task_name] + task(batch, self.master_storage_args, + self.slave_storages, self.retention_policy) + + def get_unarchived_content(self): + """ get all the contents that needs to be archived. + + Yields: + A batch of contents. Batches are dictionnaries which associates + a content id to the data about servers that contains it or not. + + {'id1': + {'present': [('slave1', 'slave1_url')], + 'missing': [('slave2', 'slave2_url'), + ('slave3', 'slave3_url')] + }, + 'id2': + {'present': [], + 'missing': [ + ('slave1', 'slave1_url'), + ('slave2', 'slave2_url'), + ('slave3', 'slave3_url') + ] + } + + Where keys (idX) are sha1 of the content and (slaveX, slaveX_url) + are ids and urls of the storage slaves. + + At least all the content that don't have enough copies on the + backups servers are distributed into these batches. + """ + # Get the data about each content referenced into the archiver. + missing_copy = {} + for content_id in self.master_storage.db.content_archive_ls(): + # Do some initializations + db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) + + # Query in order to know in which servers the content is saved. + backups = self.master_storage.db.content_archive_get( + content=db_content_id + ) + for _content_id, server_id, status, mtime in backups: + + # If the content is ongoing but still have time, there is + # another worker working on this content. + if status == 'ongoing': + mtime = mtime.replace(tzinfo=None) + elapsed = (datetime.now() - mtime).total_seconds() + if elapsed < self.archival_max_age: + continue + server_data = (server_id, self.slave_storages[server_id]) + missing_copy.setdefault( + db_content_id, + {'present': [], 'missing': []} + ).setdefault(status, []).append(server_data) + + # Check the content before archival. + # TODO catch exception and try to restore the file from an + # archive? + self.master_storage.objstorage.check(content_id[0]) + + if len(missing_copy) >= self.batch_max_size: + yield missing_copy + missing_copy = {} + + if len(missing_copy) > 0: + yield missing_copy + + +def initialize_content_archive(db, sample_size, names=['Local']): + """ Initialize the content_archive table with a sample. + + From the content table, get a sample of id, and fill the + content_archive table with those id in order to create a test sample + for the archiver. + + Args: + db: The database of the storage. + sample_size (int): The size of the sample to create. + names: A list of archive names. Those archives must already exists. + Archival status of the archives content will be erased on db. + + Returns: + Tha amount of entry created. + """ + with db.transaction() as cur: + cur.execute('DELETE FROM content_archive') + + with db.transaction() as cur: + cur.execute('SELECT sha1 from content limit %d' % sample_size) + ids = list(cursor_to_bytes(cur)) + + for id, in ids: + tid = r'\x' + hashutil.hash_to_hex(id) + + with db.transaction() as cur: + for name in names: + s = """INSERT INTO content_archive + VALUES('%s'::sha1, '%s', 'missing', now()) + """ % (tid, name) + cur.execute(s) + + print('Initialized database with', sample_size * len(names), 'items') + return sample_size * len(names) + + +def add_content_to_objstore(director, source, content_ids): + """ Fill the objstore according to the database + + Get the current status of the database and fill the objstorage of the + master storage according to these data. + Content are fetched from the source, which is a storage. + + Args: + director (ArchiverDirector): The archiver director containing + the master storage to fill. + source (Storage): A storage that contains the content for all the + ids in content_ids. + content_ids: A list of ids that should be added to the master object + storage. + """ + for res in source.content_get(content_ids): + content_data = res['data'] + director.master_storage.objstorage.add_bytes(content_data) + + +if __name__ == '__main__': + import sys + + conf = config.read(sys.argv[1], DEFAULT_CONFIG) + cstring = 'dbname={} user={}'.format(conf['dbname'], conf['user']) + + director = ArchiverDirector(cstring, conf) + director.run() diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py new file mode 100644 --- /dev/null +++ b/swh/storage/archiver/tasks.py @@ -0,0 +1,20 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.scheduler.task import Task +from .worker import ArchiverWorker + + +class SWHArchiverTask(Task): + """ Main task that archive a batch of content. + """ + task_queue = 'swh_storage_archive_worker' + + def run(self, batch, master_storage_args, + slave_storages, retention_policy): + aw = ArchiverWorker(batch, master_storage_args, + slave_storages, retention_policy) + if aw.run(): + self.log("Successful backup for a batch of size %s" % len(batch)) diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py new file mode 100644 --- /dev/null +++ b/swh/storage/archiver/worker.py @@ -0,0 +1,184 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random + +from .copier import ArchiverCopier +from .. import get_storage + +from datetime import datetime + + +class ArchiverWorker(): # This class should probably extend a Celery Task. + """ Do the required backups on a given batch of contents. + + Process the content of a content batch in order to do the needed backups on + the slaves servers. + + Attributes: + batch: The content this worker has to archive, which is a dictionary + that associates a content's sha1 id to the list of servers where + the content is present or missing + (see ArchiverDirector::get_unarchived_content). + master_storage_args: The connection argument to initialize the + master storage where is the content location. + slave_storages: A map that associates server_id to the remote server. + retention_policy: The required number of copies for a content to be + considered safe. + """ + def __init__(self, batch, master_storage_args, + slave_storages, retention_policy): + """ Constructor of the ArchiverWorker class. + + Args: + batch: A batch of content, which is a dictionnary that associates + a content's sha1 id to the list of servers where the content + is present. + master_storage: The master storage where is the whole content. + slave_storages: A map that associates server_id to the remote + server. + retention_policy: The required number of copies for a content to + be considered safe. + """ + self.batch = batch + self.master_storage = get_storage('local_storage', master_storage_args) + self.slave_storages = slave_storages + self.retention_policy = retention_policy + + def __choose_backup_servers(self, allowed_storage, backup_number): + """ Choose the slave servers for archival. + + Choose the given amount of servers among those which don't already + contain a copy of the content. + + Args: + allowed_storage: servers when the content is not already present. + backup_number (int): The number of servers we have to choose in + order to fullfill the objective. + """ + # In case there is not enough backup servers to get all the backups + # we need, just do our best. + # TODO such situation can only be caused by an incorrect configuration + # setting. Do a verification previously. + backup_number = min(backup_number, len(allowed_storage)) + + # TODO Find a better (or a good) policy to choose the backup servers. + # The random choice should be equivalently distributed between + # servers for a great amount of data, but don't take care of servers + # capacities. + return random.sample(allowed_storage, backup_number) + + def __get_archival_status(self, content_id, server): + t, = list( + self.master_storage.db.content_archive_get(content_id, server[0]) + ) + return { + 'content_id': t[0], + 'archive_id': t[1], + 'status': t[2], + 'mtime': t[3] + } + + def __content_archive_update(self, content_id, archive_id, + new_status=None): + """ Update the status of a archive content and set it's mtime to now() + + Change the last modification time of an archived content and change + its status to the given one. + + Args: + content_id (string): The content id. + archive_id (string): The id of the concerned archive. + new_status (string): One of missing, ongoing or present, this + status will replace the previous one. If not given, the + function only changes the mtime of the content. + """ + query = """UPDATE content_archive + SET %(fields)s + WHERE content_id='%(content_id)s' + and archive_id='%(archive_id)s' + """ + fields = [] + if new_status: + fields.append("status='%s'" % new_status) + fields.append("mtime=now()") + + d = {'fields': ', '.join(fields), + 'content_id': content_id, + 'archive_id': archive_id} + + with self.master_storage.db.transaction() as cur: + cur.execute(query % d) + + def run(self): + """ Do the task expected from the archiver worker. + + Process the content in the batch, ensure that the elements still need + an archival, and spawn copiers to copy files in each destinations. + """ + + def content_filter(content, destination): + """ Indicates whenever a content need archivage. + + Filter function that returns True if a given content + still require to be archived. + + Args: + content (str): + """ + archival_status = self.__get_archival_status( + content, + destination + ) + if archival_status: + status = archival_status['status'] + # If the archive is already present, no need to backup. + if status == 'present': + return False + # If the content is ongoing but still have time, there is + # another worker working on this content. + elif status == 'ongoing': + elapsed = datetime.now() - archival_status['mtime']\ + .total_seconds() + if elapsed < self.master_storage.archival_max_age: + return False + return True + else: + # TODO this is an error case, the content should always exists. + return None + + slaves_copy = {} + for content_id in self.batch: + # Choose some servers to upload the content + server_data = self.batch[content_id] + + backup_servers = self.__choose_backup_servers( + server_data['missing'], + self.retention_policy - len(server_data['present']) + ) + # Fill the map destination -> content to upload + for server in backup_servers: + slaves_copy.setdefault(server, []).append(content_id) + + # At this point, check the archival status of the content in order to + # know if it is still needed. + for destination in slaves_copy: + contents = [] + for content in slaves_copy[destination]: + if content_filter(content, destination): + contents.append(content) + slaves_copy[destination] = contents + + # Spawn a copier for each destination that will copy all the + # needed content. + for destination in slaves_copy: + ac = ArchiverCopier( + destination, slaves_copy[destination], + self.master_storage) + if ac.run(): + # Once the archival complete, update the database. + for content_id in slaves_copy[destination]: + self.__content_archive_update(content_id, destination[0], + new_status='present') diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -624,3 +624,62 @@ if not data: return None return line_to_bytes(data) + + def archive_ls(self, cur=None): + """ Get all the archives registered on the server. + + Yields: + a tuple (server_id, server_url) for each archive server. + """ + cur = self._cursor(cur) + cur.execute("""SELECT id, url + FROM archives + """) + yield from cursor_to_bytes(cur) + + def content_archive_ls(self, cur=None): + """ Get the archival status of the content + + Get an iterable over all the content that is referenced + in a backup server. + + Yields: + the sha1 of each content referenced at least one time + in the database of archiveal status. + """ + cur = self._cursor(cur) + cur.execute("""SELECT DISTINCT content_id + FROM content_archive""") + yield from cursor_to_bytes(cur) + + def content_archive_get(self, content=None, archive=None, cur=None): + """ Get the archival status of a content in a specific server. + + Retreive from the database the archival status of the given content + in the given archive server. + + Args: + content: the sha1 of the content. May be None for any id. + archive: the database id of the server we're looking into + may be None for any server. + + Yields: + A tuple (content_id, server_id, archival status, mtime, tzinfo). + """ + query = """SELECT content_id, archive_id, status, mtime + FROM content_archive + """ + conditions = [] + if content: + conditions.append("content_id='%s'" % content) + if archive: + conditions.append("archive_id='%s'" % archive) + + if conditions: + query = """%s + WHERE %s + """ % (query, ' and '.join(conditions)) + + cur = self._cursor(cur) + cur.execute(query) + yield from cursor_to_bytes(cur) diff --git a/swh/storage/tests/manual_test_archiver.py b/swh/storage/tests/manual_test_archiver.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/manual_test_archiver.py @@ -0,0 +1,96 @@ +import string +import random + +from swh.core import hashutil +from swh.storage import Storage +from swh.storage.db import cursor_to_bytes + +from swh.storage.archiver import ArchiverDirector + + +def rs(size=6, chars=string.ascii_uppercase + string.ascii_lowercase): + return ''.join(random.choice(chars) for _ in range(size)) + + +def mc(data): + data = bytes(data, 'utf8') + content = hashutil.hashdata(data) + content.update({'data': data}) + return content + + +random.seed(0) + + +def initialize_content_archive(db, sample_size, names=['Local']): + """ Initialize the content_archive table with a sample. + + From the content table, get a sample of id, and fill the + content_archive table with those id in order to create a test sample + for the archiver. + + Args: + db: The database of the storage. + sample_size (int): The size of the sample to create. + names: A list of archive names. Those archives must already exists. + Archival status of the archives content will be erased on db. + + Returns: + Tha amount of entry created. + """ + with db.transaction() as cur: + cur.execute('DELETE FROM content_archive') + + with db.transaction() as cur: + cur.execute('SELECT sha1 from content limit %d' % sample_size) + ids = list(cursor_to_bytes(cur)) + + for id, in ids: + tid = r'\x' + hashutil.hash_to_hex(id) + + with db.transaction() as cur: + for name in names: + s = """INSERT INTO content_archive + VALUES('%s'::sha1, '%s', 'missing', now()) + """ % (tid, name) + cur.execute(s) + + print('Initialized database with', sample_size * len(names), 'items') + return sample_size * len(names) + +CONTENT_SIZE = 10 + +# Local database +dbname = 'softwareheritage-dev' +user = 'qcampos' +cstring = 'dbname=%s user=%s' % (dbname, user) +# Archiver config +config = { + 'objstorage_path': '/tmp/swh/storage-dev/2', + 'archival_max_age': 3600, + 'batch_max_size': 10, + 'retention_policy': 1, + 'asynchronous': False +} + +# Grand-palais's storage +loc = Storage(cstring, config['objstorage_path']) + + +# Add the content +l = [mc(rs(100)) for _ in range(CONTENT_SIZE)] +loc.content_add(l) +initialize_content_archive(loc.db, CONTENT_SIZE, ['petit-palais']) + +# Launch the archiver +archiver = ArchiverDirector(cstring, config) +archiver.run() + + +def clean(): + # Clean all + with loc.db.transaction() as cur: + cur.execute('delete from content_archive') + cur.execute('delete from content') + import os + os.system("rm -r /tmp/swh/storage-dev/2/*") diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_archiver.py @@ -0,0 +1,163 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest +import os + +from nose.tools import istest +from nose.plugins.attrib import attr +from datetime import datetime, timedelta + +from swh.core import hashutil +from swh.core.tests.db_testing import DbTestFixture +from server_testing import ServerTestFixture + +from swh.storage import Storage +from swh.storage.exc import ObjNotFoundError +from swh.storage.archiver import ArchiverDirector +from swh.storage.objstorage.api.client import RemoteObjStorage +from swh.storage.objstorage.api.server import app + +TEST_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') + + +@attr('db') +class TestArchiver(DbTestFixture, ServerTestFixture, + unittest.TestCase): + """ Test the objstorage archiver. + """ + + TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') + + def setUp(self): + # Launch the backup server + self.backup_objroot = tempfile.mkdtemp() + self.config = {'storage_base': self.backup_objroot, + 'storage_depth': 3} + self.app = app + super().setUp() + + # Launch a client to check objects presence + print("url", self.url()) + self.remote_objstorage = RemoteObjStorage(self.url()) + # Create the local storage. + self.objroot = tempfile.mkdtemp() + self.storage = Storage(self.conn, self.objroot) + # Initializes and fill the tables. + self.initialize_tables() + # Create the archiver + self.archiver = self.__create_director() + + def tearDown(self): + self.empty_tables() + super().tearDown() + + def initialize_tables(self): + """ Initializes the database with a sample of items. + """ + # Add an archive + self.cursor.execute("""INSERT INTO archives(id, url) + VALUES('Local', 'http://localhost:{}/') + """.format(self.port)) + self.conn.commit() + + def empty_tables(self): + # Remove all content + self.cursor.execute('DELETE FROM content_archive') + self.cursor.execute('DELETE FROM archives') + self.conn.commit() + + def __add_content(self, content_data, status='missing', date='now()'): + # Add the content + content = hashutil.hashdata(content_data) + content.update({'data': content_data}) + self.storage.content_add([content]) + # Then update database + content_id = r'\x' + hashutil.hash_to_hex(content['sha1']) + self.cursor.execute("""INSERT INTO content_archive + VALUES('%s'::sha1, 'Local', '%s', %s) + """ % (content_id, status, date)) + return content['sha1'] + + def __get_missing(self): + self.cursor.execute("""SELECT content_id + FROM content_archive + WHERE status='missing'""") + return self.cursor.fetchall() + + def __create_director(self, batch_size=5000, archival_max_age=3600, + retention_policy=1, asynchronous=False): + config = { + 'objstorage_path': self.objroot, + 'batch_max_size': batch_size, + 'archival_max_age': archival_max_age, + 'retention_policy': retention_policy, + 'asynchronous': asynchronous # Avoid depending on queue for tests. + } + director = ArchiverDirector(self.conn, config) + return director + + @istest + def archive_missing_content(self): + """ Run archiver on a missing content should archive it. + """ + content_data = b'archive_missing_content' + id = self.__add_content(content_data) + # After the run, the content should be in the archive. + self.archiver.run() + remote_data = self.remote_objstorage.content_get(id) + self.assertEquals(content_data, remote_data) + + @istest + def archive_present_content(self): + """ A content that is not 'missing' shouldn't be archived. + """ + id = self.__add_content(b'archive_present_content', status='present') + # After the run, the content should NOT be in the archive.* + self.archiver.run() + with self.assertRaises(ObjNotFoundError): + self.remote_objstorage.content_get(id) + + @istest + def archive_ongoing_remaining(self): + """ A content that is ongoing and still have some time + to be archived should not be rescheduled. + """ + id = self.__add_content(b'archive_ongoing_remaining', status='ongoing') + items = [x for batch in self.archiver.get_unarchived_content() + for x in batch] + id = r'\x' + hashutil.hash_to_hex(id) + self.assertNotIn(id, items) + + @istest + def archive_ongoing_elapsed(self): + """ A content that is ongoing but with elapsed time should + be rescheduled. + """ + # Create an ongoing archive content with time elapsed by 1s. + id = self.__add_content( + b'archive_ongoing_elapsed', + status='ongoing', + date="'%s'" % (datetime.now() - timedelta( + seconds=self.archiver.archival_max_age + 1 + )) + ) + items = [x for batch in self.archiver.get_unarchived_content() + for x in batch] + id = r'\x' + hashutil.hash_to_hex(id) + self.assertIn(id, items) + + @istest + def archive_already_enough(self): + """ A content missing should not be archived if there + is already enough copies. + """ + id = self.__add_content(b'archive_alread_enough') + director = self.__create_director(retention_policy=0) + director.run() + with self.assertRaises(ObjNotFoundError): + self.remote_objstorage.content_get(id)