Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/archiver/director.py
| # Copyright (C) 2015 The Software Heritage developers | # Copyright (C) 2015 The Software Heritage developers | ||||
| # See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
| # License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
| # See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
| import swh | import swh | ||||
| import logging | |||||
| from datetime import datetime | from datetime import datetime | ||||
| from swh.core import hashutil, config | from swh.core import hashutil | ||||
| from swh.scheduler.celery_backend.config import app | from swh.scheduler.celery_backend.config import app | ||||
| from . import tasks # NOQA | from . import tasks # NOQA | ||||
| from ..db import cursor_to_bytes | |||||
| DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
| 'objstorage_path': '/tmp/swh-storage/objects', | 'objstorage_path': '/tmp/swh-storage/objects', | ||||
| 'batch_max_size': 50, | 'batch_max_size': 50, | ||||
| 'archival_max_age': 3600, | 'archival_max_age': 3600, | ||||
| 'retention_policy': 2, | 'retention_policy': 2, | ||||
| 'asynchronous': True, | 'asynchronous': True, | ||||
| 'dbname': 'softwareheritage', | 'dbname': 'softwareheritage', | ||||
| 'user': 'root' | 'user': 'root' | ||||
| } | } | ||||
| task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' | task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' | ||||
| logger = logging.getLogger() | |||||
| class ArchiverDirector(): | class ArchiverDirector(): | ||||
| """Process the files in order to know which one is needed as backup. | """Process the files in order to know which one is needed as backup. | ||||
| The archiver director processes the files in the local storage in order | The archiver director processes the files in the local storage in order | ||||
| to know which one needs archival and it delegates this task to | to know which one needs archival and it delegates this task to | ||||
| archiver workers. | archiver workers. | ||||
| Attributes: | Attributes: | ||||
| master_storage: the local storage of the master server. | master_storage: the local storage of the master server. | ||||
| slave_storages: Iterable of remote obj storages to the slaves servers | slave_storages: Iterable of remote obj storages to the slaves servers | ||||
| used for backup. | used for backup. | ||||
| batch_max_size: The number of content items that can be given | config: Archiver_configuration. A dictionary that must contain | ||||
| to the same archiver worker. | the following keys. | ||||
| archival_max_age: Delay given to the worker to copy all the files | objstorage_path (string): the path of the objstorage of the | ||||
| in a given batch. | master. | ||||
| retention_policy: Required number of copies for the content to | batch_max_size (int): The number of content items that can be | ||||
| be considered safe. | given to the same archiver worker. | ||||
ardumont: typo: to copy all | |||||
| archival_max_age (int): Delay given to the worker to copy all | |||||
| the files in a given batch. | |||||
| retention_policy (int): Required number of copies for the | |||||
| content to be considered safe. | |||||
| asynchronous (boolean): Indicate whenever the archival should | |||||
| run in asynchronous mode or not. | |||||
Done Inline Actionstypo: must contain ardumont: typo: must contain | |||||
| """ | """ | ||||
| def __init__(self, db_conn, config): | def __init__(self, db_conn, config): | ||||
| """ Constructor of the archiver director. | """ Constructor of the archiver director. | ||||
| Args: | Args: | ||||
| db_conn: db_conn: Either a libpq connection string, | db_conn: db_conn: Either a libpq connection string, | ||||
| or a psycopg2 connection. | or a psycopg2 connection. | ||||
| config: Archiver_configuration. A dictionnary that must contains | config: Archiver_configuration. A dictionary that must contains | ||||
| the following keys. | the following keys. | ||||
| objstorage_path (string): the path of the objstorage of the | objstorage_path (string): the path of the objstorage of the | ||||
| master. | master. | ||||
| batch_max_size (int): The number of content items that can be | batch_max_size (int): The number of content items that can be | ||||
| given to the same archiver worker. | given to the same archiver worker. | ||||
| archival_max_age (int): Delay given to the worker to cpy all | archival_max_age (int): Delay given to the worker to copy all | ||||
| the files in a given batch. | the files in a given batch. | ||||
| retention_policy (int): Required number of copies for the | retention_policy (int): Required number of copies for the | ||||
| content to be considered safe. | content to be considered safe. | ||||
| asynchronous (boolean): Indicate whenever the archival should | asynchronous (boolean): Indicate whenever the archival should | ||||
| run in asynchronous mode or not. | run in asynchronous mode or not. | ||||
| """ | """ | ||||
| # Get the local storage of the master and remote ones for the slaves. | # Get the local storage of the master and remote ones for the slaves. | ||||
| self.master_storage_args = [db_conn, config['objstorage_path']] | self.master_storage_args = [db_conn, config['objstorage_path']] | ||||
| master_storage = swh.storage.get_storage('local_storage', | master_storage = swh.storage.get_storage('local_storage', | ||||
| self.master_storage_args) | self.master_storage_args) | ||||
| slaves = { | slaves = { | ||||
| id: url | id: url | ||||
| for id, url | for id, url | ||||
| in master_storage.db.archive_ls() | in master_storage.db.archive_ls() | ||||
| } | } | ||||
| # TODO Database should be initialized somehow before going in | # TODO Database should be initialized somehow before going in | ||||
| # production. For now, assumes that the database contains | # production. For now, assumes that the database contains | ||||
| # datas for all the current content. | # datas for all the current content. | ||||
| self.master_storage = master_storage | self.master_storage = master_storage | ||||
| self.slave_storages = slaves | self.slave_storages = slaves | ||||
| self.batch_max_size = config['batch_max_size'] | self.config = config | ||||
| self.archival_max_age = config['archival_max_age'] | |||||
| self.retention_policy = config['retention_policy'] | |||||
| self.is_asynchronous = config['asynchronous'] | |||||
| def run(self): | def run(self): | ||||
| """ Run the archiver director. | """ Run the archiver director. | ||||
| The archiver director will check all the contents of the archiver | The archiver director will check all the contents of the archiver | ||||
| database and do the required backup jobs. | database and do the required backup jobs. | ||||
| """ | """ | ||||
| run_fn = (self.run_async_worker | if self.config['asynchronous']: | ||||
| if self.is_asynchronous | run_fn = self.run_async_worker | ||||
| else self.run_sync_worker) | else: | ||||
| run_fn = self.run_sync_worker | |||||
| for batch in self.get_unarchived_content(): | for batch in self.get_unarchived_content(): | ||||
| run_fn(batch) | run_fn(batch) | ||||
| def run_async_worker(self, batch): | def run_async_worker(self, batch): | ||||
| """ Produce a worker that will be added to the task queue. | """ Produce a worker that will be added to the task queue. | ||||
| """ | """ | ||||
| task = app.tasks[task_name] | task = app.tasks[task_name] | ||||
| task.delay(batch, self.master_storage_args, | task.delay(batch, self.master_storage_args, | ||||
| self.slave_storages, self.retention_policy) | self.slave_storages, self.config['retention_policy']) | ||||
| def run_sync_worker(self, batch): | def run_sync_worker(self, batch): | ||||
| """ Run synchronously a worker on the given batch. | """ Run synchronously a worker on the given batch. | ||||
| """ | """ | ||||
| task = app.tasks[task_name] | task = app.tasks[task_name] | ||||
| task(batch, self.master_storage_args, | task(batch, self.master_storage_args, | ||||
| self.slave_storages, self.retention_policy) | self.slave_storages, self.config) | ||||
| def get_unarchived_content(self): | def get_unarchived_content(self): | ||||
| """ get all the contents that needs to be archived. | """ get all the contents that needs to be archived. | ||||
| Yields: | Yields: | ||||
| A batch of contents. Batches are dictionnaries which associates | A batch of contents. Batches are dictionaries which associates | ||||
| a content id to the data about servers that contains it or not. | a content id to the data about servers that contains it or not. | ||||
| {'id1': | {'id1': | ||||
| {'present': [('slave1', 'slave1_url')], | {'present': [('slave1', 'slave1_url')], | ||||
| 'missing': [('slave2', 'slave2_url'), | 'missing': [('slave2', 'slave2_url'), | ||||
| ('slave3', 'slave3_url')] | ('slave3', 'slave3_url')] | ||||
| }, | }, | ||||
| 'id2': | 'id2': | ||||
| {'present': [], | {'present': [], | ||||
| 'missing': [ | 'missing': [ | ||||
| ('slave1', 'slave1_url'), | ('slave1', 'slave1_url'), | ||||
| ('slave2', 'slave2_url'), | ('slave2', 'slave2_url'), | ||||
| ('slave3', 'slave3_url') | ('slave3', 'slave3_url') | ||||
| ] | ]} | ||||
| } | } | ||||
| Where keys (idX) are sha1 of the content and (slaveX, slaveX_url) | Where keys (idX) are sha1 of the content and (slaveX, slaveX_url) | ||||
| are ids and urls of the storage slaves. | are ids and urls of the storage slaves. | ||||
| At least all the content that don't have enough copies on the | At least all the content that don't have enough copies on the | ||||
| backups servers are distributed into these batches. | backups servers are distributed into these batches. | ||||
| """ | """ | ||||
| # Get the data about each content referenced into the archiver. | # Get the data about each content referenced into the archiver. | ||||
| missing_copy = {} | missing_copy = {} | ||||
| for content_id in self.master_storage.db.content_archive_ls(): | for content_id in self.master_storage.db.content_archive_ls(): | ||||
| # Do some initializations | |||||
| db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) | db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) | ||||
| # Query in order to know in which servers the content is saved. | # Fetch the datas about archival status of the content | ||||
| backups = self.master_storage.db.content_archive_get( | backups = self.master_storage.db.content_archive_get( | ||||
| content=db_content_id | content=db_content_id | ||||
| ) | ) | ||||
| for _content_id, server_id, status, mtime in backups: | for _content_id, server_id, status, mtime in backups: | ||||
| virtual_status = self.get_virtual_status(status, mtime) | |||||
| # If the content is ongoing but still have time, there is | |||||
| # another worker working on this content. | |||||
| if status == 'ongoing': | |||||
| mtime = mtime.replace(tzinfo=None) | |||||
| elapsed = (datetime.now() - mtime).total_seconds() | |||||
| if elapsed < self.archival_max_age: | |||||
| continue | |||||
| server_data = (server_id, self.slave_storages[server_id]) | server_data = (server_id, self.slave_storages[server_id]) | ||||
| missing_copy.setdefault( | missing_copy.setdefault( | ||||
| db_content_id, | db_content_id, | ||||
| {'present': [], 'missing': []} | {'present': [], 'missing': []} | ||||
| ).setdefault(status, []).append(server_data) | ).setdefault(virtual_status, []).append(server_data) | ||||
| # Check the content before archival. | # Check the content before archival. | ||||
| # TODO catch exception and try to restore the file from an | try: | ||||
| # archive? | |||||
| self.master_storage.objstorage.check(content_id[0]) | self.master_storage.objstorage.check(content_id[0]) | ||||
| except Exception as e: | |||||
| # Exception can be Error or ObjNotFoundError. | |||||
| logger.error(e) | |||||
| # TODO Do something to restore the content? | |||||
| if len(missing_copy) >= self.batch_max_size: | if len(missing_copy) >= self.config['batch_max_size']: | ||||
| yield missing_copy | yield missing_copy | ||||
| missing_copy = {} | missing_copy = {} | ||||
| if len(missing_copy) > 0: | if len(missing_copy) > 0: | ||||
| yield missing_copy | yield missing_copy | ||||
| def get_virtual_status(self, status, mtime): | |||||
| """ Compute the virtual presence of a content. | |||||
| def initialize_content_archive(db, sample_size, names=['Local']): | If the status is ongoing but the time is not elasped, the archiver | ||||
| """ Initialize the content_archive table with a sample. | consider it will be present in the futur, and so consider it as | ||||
| present. | |||||
| From the content table, get a sample of id, and fill the | However, if the time is elasped, the copy may have failed, so consider | ||||
| content_archive table with those id in order to create a test sample | the content as missing. | ||||
| for the archiver. | |||||
| Arguments: | |||||
| Args: | status (string): One of ('present', 'missing', 'ongoing'). The | ||||
| db: The database of the storage. | status of the content. | ||||
| sample_size (int): The size of the sample to create. | mtime (datetime): Time at which the content have been updated for | ||||
| names: A list of archive names. Those archives must already exists. | the last time. | ||||
| Archival status of the archives content will be erased on db. | |||||
| Returns: | Returns: | ||||
| Tha amount of entry created. | The virtual status of the studied content, which is 'present' or | ||||
| """ | 'missing'. | ||||
| with db.transaction() as cur: | |||||
| cur.execute('DELETE FROM content_archive') | |||||
| with db.transaction() as cur: | |||||
| cur.execute('SELECT sha1 from content limit %d' % sample_size) | |||||
| ids = list(cursor_to_bytes(cur)) | |||||
| for id, in ids: | |||||
| tid = r'\x' + hashutil.hash_to_hex(id) | |||||
| with db.transaction() as cur: | |||||
| for name in names: | |||||
| s = """INSERT INTO content_archive | |||||
| VALUES('%s'::sha1, '%s', 'missing', now()) | |||||
| """ % (tid, name) | |||||
| cur.execute(s) | |||||
| print('Initialized database with', sample_size * len(names), 'items') | |||||
| return sample_size * len(names) | |||||
| def add_content_to_objstore(director, source, content_ids): | |||||
| """ Fill the objstore according to the database | |||||
| Get the current status of the database and fill the objstorage of the | |||||
| master storage according to these data. | |||||
| Content are fetched from the source, which is a storage. | |||||
| Args: | Raises: | ||||
| director (ArchiverDirector): The archiver director containing | ValueError: if the status is not one 'present', 'missing' | ||||
| the master storage to fill. | or 'ongoing' | ||||
| source (Storage): A storage that contains the content for all the | """ | ||||
| ids in content_ids. | if status in ('present', 'missing'): | ||||
| content_ids: A list of ids that should be added to the master object | return status | ||||
| storage. | |||||
| """ | |||||
| for res in source.content_get(content_ids): | |||||
| content_data = res['data'] | |||||
| director.master_storage.objstorage.add_bytes(content_data) | |||||
| if __name__ == '__main__': | |||||
| import sys | |||||
| conf = config.read(sys.argv[1], DEFAULT_CONFIG) | |||||
| cstring = 'dbname={} user={}'.format(conf['dbname'], conf['user']) | |||||
| director = ArchiverDirector(cstring, conf) | # If the status is 'ongoing' but there is still time, another worker | ||||
| director.run() | # may still be on the task. | ||||
| if status == 'ongoing': | |||||
| mtime = mtime.replace(tzinfo=None) | |||||
| elapsed = (datetime.now() - mtime).total_seconds() | |||||
| if elapsed <= self.config['archival_max_age']: | |||||
| return 'present' | |||||
| else: | |||||
| return 'missing' | |||||
| else: | |||||
| raise ValueError("status must be either 'present', 'missing' " | |||||
| "or 'ongoing'") | |||||
typo: to copy all