diff --git a/manual_test_archiver.py b/manual_test_archiver.py new file mode 100644 index 0000000..26d4f4c --- /dev/null +++ b/manual_test_archiver.py @@ -0,0 +1,95 @@ +import string +import random + +from swh.core import hashutil +from swh.storage import Storage +from swh.storage.db import cursor_to_bytes + +from swh.storage.archiver import ArchiverDirector + + +def rs(size=6, chars=string.ascii_uppercase + string.ascii_lowercase): + return ''.join(random.choice(chars) for _ in range(size)) + + +def mc(data): + data = bytes(data, 'utf8') + content = hashutil.hashdata(data) + content.update({'data': data}) + return content + + +def initialize_content_archive(db, sample_size, names=['Local']): + """ Initialize the content_archive table with a sample. + + From the content table, get a sample of id, and fill the + content_archive table with those id in order to create a test sample + for the archiver. + + Args: + db: The database of the storage. + sample_size (int): The size of the sample to create. + names: A list of archive names. Those archives must already exists. + Archival status of the archives content will be erased on db. + + Returns: + Tha amount of entry created. + """ + with db.transaction() as cur: + cur.execute('DELETE FROM content_archive') + + with db.transaction() as cur: + cur.execute('SELECT sha1 from content limit %d' % sample_size) + ids = list(cursor_to_bytes(cur)) + + for id, in ids: + tid = r'\x' + hashutil.hash_to_hex(id) + + with db.transaction() as cur: + for name in names: + s = """INSERT INTO content_archive + VALUES('%s'::sha1, '%s', 'missing', now()) + """ % (tid, name) + cur.execute(s) + + print('Initialized database with', sample_size * len(names), 'items') + return sample_size * len(names) + + +def clean(): + # Clean all + with loc.db.transaction() as cur: + cur.execute('delete from content_archive') + cur.execute('delete from content') + import os + os.system("rm -r /tmp/swh/storage-dev/2/*") + + +CONTENT_SIZE = 10 + +if __name__ == '__main__': + random.seed(0) + # Local database + dbname = 'softwareheritage-dev' + user = 'qcampos' + cstring = 'dbname=%s user=%s' % (dbname, user) + # Archiver config + config = { + 'objstorage_path': '/tmp/swh/storage-dev/2', + 'archival_max_age': 3600, + 'batch_max_size': 10, + 'retention_policy': 1, + 'asynchronous': False + } + + # Grand-palais's storage + loc = Storage(cstring, config['objstorage_path']) + + # Add the content + l = [mc(rs(100)) for _ in range(CONTENT_SIZE)] + loc.content_add(l) + initialize_content_archive(loc.db, CONTENT_SIZE, ['petit-palais']) + + # Launch the archiver + archiver = ArchiverDirector(cstring, config) + archiver.run() diff --git a/test_archiver.py b/test_archiver.py new file mode 100644 index 0000000..4efa399 --- /dev/null +++ b/test_archiver.py @@ -0,0 +1,162 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest +import os + +from nose.tools import istest +from nose.plugins.attrib import attr +from datetime import datetime, timedelta + +from swh.core import hashutil +from swh.core.tests.db_testing import DbTestFixture +from server_testing import ServerTestFixture + +from swh.storage import Storage +from swh.storage.exc import ObjNotFoundError +from swh.storage.archiver import ArchiverDirector +from swh.storage.objstorage.api.client import RemoteObjStorage +from swh.storage.objstorage.api.server import app + +TEST_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') + + +@attr('db') +class TestArchiver(DbTestFixture, ServerTestFixture, + unittest.TestCase): + """ Test the objstorage archiver. + """ + + TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') + + def setUp(self): + # Launch the backup server + self.backup_objroot = tempfile.mkdtemp() + self.config = {'storage_base': self.backup_objroot, + 'storage_depth': 3} + self.app = app + super().setUp() + + # Launch a client to check objects presence + print("url", self.url()) + self.remote_objstorage = RemoteObjStorage(self.url()) + # Create the local storage. + self.objroot = tempfile.mkdtemp() + self.storage = Storage(self.conn, self.objroot) + # Initializes and fill the tables. + self.initialize_tables() + # Create the archiver + self.archiver = self.__create_director() + + def tearDown(self): + self.empty_tables() + super().tearDown() + + def initialize_tables(self): + """ Initializes the database with a sample of items. + """ + # Add an archive + self.cursor.execute("""INSERT INTO archives(id, url) + VALUES('Local', 'http://localhost:{}/') + """.format(self.port)) + self.conn.commit() + + def empty_tables(self): + # Remove all content + self.cursor.execute('DELETE FROM content_archive') + self.cursor.execute('DELETE FROM archives') + self.conn.commit() + + def __add_content(self, content_data, status='missing', date='now()'): + # Add the content + content = hashutil.hashdata(content_data) + content.update({'data': content_data}) + self.storage.content_add([content]) + # Then update database + content_id = r'\x' + hashutil.hash_to_hex(content['sha1']) + self.cursor.execute("""INSERT INTO content_archive + VALUES('%s'::sha1, 'Local', '%s', %s) + """ % (content_id, status, date)) + return content['sha1'] + + def __get_missing(self): + self.cursor.execute("""SELECT content_id + FROM content_archive + WHERE status='missing'""") + return self.cursor.fetchall() + + def __create_director(self, batch_size=5000, archival_max_age=3600, + retention_policy=1): + config = { + 'objstorage_path': self.objroot, + 'batch_max_size': batch_size, + 'archival_max_age': archival_max_age, + 'retention_policy': retention_policy + } + director = ArchiverDirector(self.conn, config) + return director + + @istest + def archive_missing_content(self): + """ Run archiver on a missing content should archive it. + """ + content_data = b'archive_missing_content' + id = self.__add_content(content_data) + # After the run, the content should be in the archive. + self.archiver.run() + remote_data = self.remote_objstorage.content_get(id) + self.assertEquals(content_data, remote_data) + + @istest + def archive_present_content(self): + """ A content that is not 'missing' shouldn't be archived. + """ + id = self.__add_content(b'archive_present_content', status='present') + # After the run, the content should NOT be in the archive.* + self.archiver.run() + with self.assertRaises(ObjNotFoundError): + self.remote_objstorage.content_get(id) + + @istest + def archive_ongoing_remaining(self): + """ A content that is ongoing and still have some time + to be archived should not be rescheduled. + """ + id = self.__add_content(b'archive_ongoing_remaining', status='ongoing') + items = [x for batch in self.archiver.get_unarchived_content() + for x in batch] + id = r'\x' + hashutil.hash_to_hex(id) + self.assertNotIn(id, items) + + @istest + def archive_ongoing_elapsed(self): + """ A content that is ongoing but with elapsed time should + be rescheduled. + """ + # Create an ongoing archive content with time elapsed by 1s. + id = self.__add_content( + b'archive_ongoing_elapsed', + status='ongoing', + date="'%s'" % (datetime.now() - timedelta( + seconds=self.archiver.archival_max_age + 1 + )) + ) + items = [x for batch in self.archiver.get_unarchived_content() + for x in batch] + id = r'\x' + hashutil.hash_to_hex(id) + self.assertIn(id, items) + + @istest + def archive_already_enough(self): + """ A content missing should not be archived if there + is already enough copies. + """ + id = self.__add_content(b'archive_alread_enough') + director = self.__create_director(retention_policy=0) + director.run() + with self.assertRaises(ObjNotFoundError): + self.remote_objstorage.content_get(id)