Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/archiver/director.py
# Copyright (C) 2015 The Software Heritage developers | # Copyright (C) 2015 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import swh | import swh | ||||
import logging | |||||
from datetime import datetime | from datetime import datetime | ||||
from swh.core import hashutil, config | from swh.core import hashutil | ||||
from swh.scheduler.celery_backend.config import app | from swh.scheduler.celery_backend.config import app | ||||
from . import tasks # NOQA | from . import tasks # NOQA | ||||
from ..db import cursor_to_bytes | |||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'objstorage_path': '/tmp/swh-storage/objects', | 'objstorage_path': '/tmp/swh-storage/objects', | ||||
'batch_max_size': 50, | 'batch_max_size': 50, | ||||
'archival_max_age': 3600, | 'archival_max_age': 3600, | ||||
'retention_policy': 2, | 'retention_policy': 2, | ||||
'asynchronous': True, | 'asynchronous': True, | ||||
'dbname': 'softwareheritage', | 'dbname': 'softwareheritage', | ||||
'user': 'root' | 'user': 'root' | ||||
} | } | ||||
task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' | task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' | ||||
logger = logging.getLogger() | |||||
class ArchiverDirector(): | class ArchiverDirector(): | ||||
"""Process the files in order to know which one is needed as backup. | """Process the files in order to know which one is needed as backup. | ||||
The archiver director processes the files in the local storage in order | The archiver director processes the files in the local storage in order | ||||
to know which one needs archival and it delegates this task to | to know which one needs archival and it delegates this task to | ||||
archiver workers. | archiver workers. | ||||
Attributes: | Attributes: | ||||
master_storage: the local storage of the master server. | master_storage: the local storage of the master server. | ||||
slave_storages: Iterable of remote obj storages to the slaves servers | slave_storages: Iterable of remote obj storages to the slaves servers | ||||
used for backup. | used for backup. | ||||
batch_max_size: The number of content items that can be given | config: Archiver_configuration. A dictionary that must contain | ||||
to the same archiver worker. | the following keys. | ||||
archival_max_age: Delay given to the worker to copy all the files | objstorage_path (string): the path of the objstorage of the | ||||
in a given batch. | master. | ||||
retention_policy: Required number of copies for the content to | batch_max_size (int): The number of content items that can be | ||||
be considered safe. | given to the same archiver worker. | ||||
ardumont: typo: to copy all | |||||
archival_max_age (int): Delay given to the worker to copy all | |||||
the files in a given batch. | |||||
retention_policy (int): Required number of copies for the | |||||
content to be considered safe. | |||||
asynchronous (boolean): Indicate whenever the archival should | |||||
run in asynchronous mode or not. | |||||
Done Inline Actionstypo: must contain ardumont: typo: must contain | |||||
""" | """ | ||||
def __init__(self, db_conn, config): | def __init__(self, db_conn, config): | ||||
""" Constructor of the archiver director. | """ Constructor of the archiver director. | ||||
Args: | Args: | ||||
db_conn: db_conn: Either a libpq connection string, | db_conn: db_conn: Either a libpq connection string, | ||||
or a psycopg2 connection. | or a psycopg2 connection. | ||||
config: Archiver_configuration. A dictionnary that must contains | config: Archiver_configuration. A dictionary that must contains | ||||
the following keys. | the following keys. | ||||
objstorage_path (string): the path of the objstorage of the | objstorage_path (string): the path of the objstorage of the | ||||
master. | master. | ||||
batch_max_size (int): The number of content items that can be | batch_max_size (int): The number of content items that can be | ||||
given to the same archiver worker. | given to the same archiver worker. | ||||
archival_max_age (int): Delay given to the worker to cpy all | archival_max_age (int): Delay given to the worker to copy all | ||||
the files in a given batch. | the files in a given batch. | ||||
retention_policy (int): Required number of copies for the | retention_policy (int): Required number of copies for the | ||||
content to be considered safe. | content to be considered safe. | ||||
asynchronous (boolean): Indicate whenever the archival should | asynchronous (boolean): Indicate whenever the archival should | ||||
run in asynchronous mode or not. | run in asynchronous mode or not. | ||||
""" | """ | ||||
# Get the local storage of the master and remote ones for the slaves. | # Get the local storage of the master and remote ones for the slaves. | ||||
self.master_storage_args = [db_conn, config['objstorage_path']] | self.master_storage_args = [db_conn, config['objstorage_path']] | ||||
master_storage = swh.storage.get_storage('local_storage', | master_storage = swh.storage.get_storage('local_storage', | ||||
self.master_storage_args) | self.master_storage_args) | ||||
slaves = { | slaves = { | ||||
id: url | id: url | ||||
for id, url | for id, url | ||||
in master_storage.db.archive_ls() | in master_storage.db.archive_ls() | ||||
} | } | ||||
# TODO Database should be initialized somehow before going in | # TODO Database should be initialized somehow before going in | ||||
# production. For now, assumes that the database contains | # production. For now, assumes that the database contains | ||||
# datas for all the current content. | # datas for all the current content. | ||||
self.master_storage = master_storage | self.master_storage = master_storage | ||||
self.slave_storages = slaves | self.slave_storages = slaves | ||||
self.batch_max_size = config['batch_max_size'] | self.config = config | ||||
self.archival_max_age = config['archival_max_age'] | |||||
self.retention_policy = config['retention_policy'] | |||||
self.is_asynchronous = config['asynchronous'] | |||||
def run(self): | def run(self): | ||||
""" Run the archiver director. | """ Run the archiver director. | ||||
The archiver director will check all the contents of the archiver | The archiver director will check all the contents of the archiver | ||||
database and do the required backup jobs. | database and do the required backup jobs. | ||||
""" | """ | ||||
run_fn = (self.run_async_worker | if self.config['asynchronous']: | ||||
if self.is_asynchronous | run_fn = self.run_async_worker | ||||
else self.run_sync_worker) | else: | ||||
run_fn = self.run_sync_worker | |||||
for batch in self.get_unarchived_content(): | for batch in self.get_unarchived_content(): | ||||
run_fn(batch) | run_fn(batch) | ||||
def run_async_worker(self, batch): | def run_async_worker(self, batch): | ||||
""" Produce a worker that will be added to the task queue. | """ Produce a worker that will be added to the task queue. | ||||
""" | """ | ||||
task = app.tasks[task_name] | task = app.tasks[task_name] | ||||
task.delay(batch, self.master_storage_args, | task.delay(batch, self.master_storage_args, | ||||
self.slave_storages, self.retention_policy) | self.slave_storages, self.config['retention_policy']) | ||||
def run_sync_worker(self, batch): | def run_sync_worker(self, batch): | ||||
""" Run synchronously a worker on the given batch. | """ Run synchronously a worker on the given batch. | ||||
""" | """ | ||||
task = app.tasks[task_name] | task = app.tasks[task_name] | ||||
task(batch, self.master_storage_args, | task(batch, self.master_storage_args, | ||||
self.slave_storages, self.retention_policy) | self.slave_storages, self.config) | ||||
def get_unarchived_content(self): | def get_unarchived_content(self): | ||||
""" get all the contents that needs to be archived. | """ get all the contents that needs to be archived. | ||||
Yields: | Yields: | ||||
A batch of contents. Batches are dictionnaries which associates | A batch of contents. Batches are dictionaries which associates | ||||
a content id to the data about servers that contains it or not. | a content id to the data about servers that contains it or not. | ||||
{'id1': | {'id1': | ||||
{'present': [('slave1', 'slave1_url')], | {'present': [('slave1', 'slave1_url')], | ||||
'missing': [('slave2', 'slave2_url'), | 'missing': [('slave2', 'slave2_url'), | ||||
('slave3', 'slave3_url')] | ('slave3', 'slave3_url')] | ||||
}, | }, | ||||
'id2': | 'id2': | ||||
{'present': [], | {'present': [], | ||||
'missing': [ | 'missing': [ | ||||
('slave1', 'slave1_url'), | ('slave1', 'slave1_url'), | ||||
('slave2', 'slave2_url'), | ('slave2', 'slave2_url'), | ||||
('slave3', 'slave3_url') | ('slave3', 'slave3_url') | ||||
] | ]} | ||||
} | } | ||||
Where keys (idX) are sha1 of the content and (slaveX, slaveX_url) | Where keys (idX) are sha1 of the content and (slaveX, slaveX_url) | ||||
are ids and urls of the storage slaves. | are ids and urls of the storage slaves. | ||||
At least all the content that don't have enough copies on the | At least all the content that don't have enough copies on the | ||||
backups servers are distributed into these batches. | backups servers are distributed into these batches. | ||||
""" | """ | ||||
# Get the data about each content referenced into the archiver. | # Get the data about each content referenced into the archiver. | ||||
missing_copy = {} | missing_copy = {} | ||||
for content_id in self.master_storage.db.content_archive_ls(): | for content_id in self.master_storage.db.content_archive_ls(): | ||||
# Do some initializations | |||||
db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) | db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) | ||||
# Query in order to know in which servers the content is saved. | # Fetch the datas about archival status of the content | ||||
backups = self.master_storage.db.content_archive_get( | backups = self.master_storage.db.content_archive_get( | ||||
content=db_content_id | content=db_content_id | ||||
) | ) | ||||
for _content_id, server_id, status, mtime in backups: | for _content_id, server_id, status, mtime in backups: | ||||
virtual_status = self.get_virtual_status(status, mtime) | |||||
# If the content is ongoing but still have time, there is | |||||
# another worker working on this content. | |||||
if status == 'ongoing': | |||||
mtime = mtime.replace(tzinfo=None) | |||||
elapsed = (datetime.now() - mtime).total_seconds() | |||||
if elapsed < self.archival_max_age: | |||||
continue | |||||
server_data = (server_id, self.slave_storages[server_id]) | server_data = (server_id, self.slave_storages[server_id]) | ||||
missing_copy.setdefault( | missing_copy.setdefault( | ||||
db_content_id, | db_content_id, | ||||
{'present': [], 'missing': []} | {'present': [], 'missing': []} | ||||
).setdefault(status, []).append(server_data) | ).setdefault(virtual_status, []).append(server_data) | ||||
# Check the content before archival. | # Check the content before archival. | ||||
# TODO catch exception and try to restore the file from an | try: | ||||
# archive? | |||||
self.master_storage.objstorage.check(content_id[0]) | self.master_storage.objstorage.check(content_id[0]) | ||||
except Exception as e: | |||||
# Exception can be Error or ObjNotFoundError. | |||||
logger.error(e) | |||||
# TODO Do something to restore the content? | |||||
if len(missing_copy) >= self.batch_max_size: | if len(missing_copy) >= self.config['batch_max_size']: | ||||
yield missing_copy | yield missing_copy | ||||
missing_copy = {} | missing_copy = {} | ||||
if len(missing_copy) > 0: | if len(missing_copy) > 0: | ||||
yield missing_copy | yield missing_copy | ||||
def get_virtual_status(self, status, mtime): | |||||
""" Compute the virtual presence of a content. | |||||
def initialize_content_archive(db, sample_size, names=['Local']): | If the status is ongoing but the time is not elasped, the archiver | ||||
""" Initialize the content_archive table with a sample. | consider it will be present in the futur, and so consider it as | ||||
present. | |||||
From the content table, get a sample of id, and fill the | However, if the time is elasped, the copy may have failed, so consider | ||||
content_archive table with those id in order to create a test sample | the content as missing. | ||||
for the archiver. | |||||
Arguments: | |||||
Args: | status (string): One of ('present', 'missing', 'ongoing'). The | ||||
db: The database of the storage. | status of the content. | ||||
sample_size (int): The size of the sample to create. | mtime (datetime): Time at which the content have been updated for | ||||
names: A list of archive names. Those archives must already exists. | the last time. | ||||
Archival status of the archives content will be erased on db. | |||||
Returns: | Returns: | ||||
Tha amount of entry created. | The virtual status of the studied content, which is 'present' or | ||||
""" | 'missing'. | ||||
with db.transaction() as cur: | |||||
cur.execute('DELETE FROM content_archive') | |||||
with db.transaction() as cur: | |||||
cur.execute('SELECT sha1 from content limit %d' % sample_size) | |||||
ids = list(cursor_to_bytes(cur)) | |||||
for id, in ids: | |||||
tid = r'\x' + hashutil.hash_to_hex(id) | |||||
with db.transaction() as cur: | |||||
for name in names: | |||||
s = """INSERT INTO content_archive | |||||
VALUES('%s'::sha1, '%s', 'missing', now()) | |||||
""" % (tid, name) | |||||
cur.execute(s) | |||||
print('Initialized database with', sample_size * len(names), 'items') | |||||
return sample_size * len(names) | |||||
def add_content_to_objstore(director, source, content_ids): | |||||
""" Fill the objstore according to the database | |||||
Get the current status of the database and fill the objstorage of the | |||||
master storage according to these data. | |||||
Content are fetched from the source, which is a storage. | |||||
Args: | Raises: | ||||
director (ArchiverDirector): The archiver director containing | ValueError: if the status is not one 'present', 'missing' | ||||
the master storage to fill. | or 'ongoing' | ||||
source (Storage): A storage that contains the content for all the | """ | ||||
ids in content_ids. | if status in ('present', 'missing'): | ||||
content_ids: A list of ids that should be added to the master object | return status | ||||
storage. | |||||
""" | |||||
for res in source.content_get(content_ids): | |||||
content_data = res['data'] | |||||
director.master_storage.objstorage.add_bytes(content_data) | |||||
if __name__ == '__main__': | |||||
import sys | |||||
conf = config.read(sys.argv[1], DEFAULT_CONFIG) | |||||
cstring = 'dbname={} user={}'.format(conf['dbname'], conf['user']) | |||||
director = ArchiverDirector(cstring, conf) | # If the status is 'ongoing' but there is still time, another worker | ||||
director.run() | # may still be on the task. | ||||
if status == 'ongoing': | |||||
mtime = mtime.replace(tzinfo=None) | |||||
elapsed = (datetime.now() - mtime).total_seconds() | |||||
if elapsed <= self.config['archival_max_age']: | |||||
return 'present' | |||||
else: | |||||
return 'missing' | |||||
else: | |||||
raise ValueError("status must be either 'present', 'missing' " | |||||
"or 'ongoing'") |
typo: to copy all