Page MenuHomeSoftware Heritage

D81.id266.diff
No OneTemporary

D81.id266.diff

diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py
--- a/swh/storage/archiver/copier.py
+++ b/swh/storage/archiver/copier.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015 The Software Heritage developers
+# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -30,7 +30,7 @@
master_storage (Storage): The master storage of the system that
contains the data to archive.
"""
- _name, self.url = destination
+ self.url = destination
self.content_ids = content
self.server = RemoteObjStorage(self.url)
self.master_objstorage = master_objstorage
diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py
--- a/swh/storage/archiver/director.py
+++ b/swh/storage/archiver/director.py
@@ -102,11 +102,7 @@
# Get the slave storages
self.db_conn_archiver = db_conn_archiver
self.archiver_storage = ArchiverStorage(db_conn_archiver)
- self.slave_objstorages = {
- id: url
- for id, url
- in self.archiver_storage.archive_ls()
- }
+ self.slave_objstorages = dict(self.archiver_storage.archive_ls())
# Check that there is enough backup servers for the retention policy
if config['retention_policy'] > len(self.slave_objstorages) + 1:
raise ValueError(
@@ -147,7 +143,7 @@
else:
run_fn = self.run_sync_worker
- for batch in self.get_unarchived_content():
+ for batch in self.get_unarchived_content_batch():
run_fn(batch)
def run_async_worker(self, batch):
@@ -155,9 +151,9 @@
"""
task = app.tasks[task_name]
task.delay(batch,
+ slave_objstorages=self.slave_objstorages,
archiver_args=self.db_conn_archiver,
master_objstorage_args=self.master_objstorage_args,
- slave_objstorages=self.slave_objstorages,
config=self.config)
def run_sync_worker(self, batch):
@@ -165,75 +161,86 @@
"""
task = app.tasks[task_name]
task(batch,
+ slave_objstorages=self.slave_objstorages,
archiver_args=self.db_conn_archiver,
master_objstorage_args=self.master_objstorage_args,
- slave_objstorages=self.slave_objstorages,
config=self.config)
- def get_unarchived_content(self):
- """ Get contents that need to be archived.
+ def get_unarchived_content_batch(self):
+ """ Get batches of contents that need to be archived
Yields:
A batch of contents. Batches are dictionaries which associates
- a content id to the data about servers that contains it or not.
-
- {'id1':
- {'present': [('slave1', 'slave1_url')],
- 'missing': [('slave2', 'slave2_url'),
- ('slave3', 'slave3_url')]
- },
- 'id2':
- {'present': [],
- 'missing': [
- ('slave1', 'slave1_url'),
- ('slave2', 'slave2_url'),
- ('slave3', 'slave3_url')
- ]}
+ a content sha1 to a list of server id where it is already present
+ or archival is ongoing.
+
+ {
+ 'id1': ['slave1', 'slave3'],
+ 'id2': ['slave2']
}
- Where keys (idX) are sha1 of the content and (slaveX, slaveX_url)
- are ids and urls of the storage slaves.
+ Where keys (idX) are sha1 of the content and slaveX are ids of the
+ slaves storages.
At least all the content that don't have enough copies on the
backups servers are distributed into these batches.
"""
contents = {}
- # Get the archives
- archives = dict(self.archiver_storage.archive_ls())
- # Get all the contents referenced into the archiver tables
- last_object = b''
- while True:
- archived_contents = list(
- self.archiver_storage.content_archive_get_copies(last_object))
+ for content_id, presents in self._get_unarchived_content():
+ contents[content_id] = presents
+
+ if len(contents) > self.config['batch_max_size']:
+ yield contents
+ contents = {}
- if not archived_contents:
- break
+ if len(contents) > 0:
+ yield contents
- for content_id, present, ongoing in archived_contents:
- last_object = content_id
- data = {
- 'present': set(present),
- 'missing': set(archives) - set(present) - set(ongoing),
- }
+ def _get_unarchived_content(self):
+ """ Get contents that need to be archived.
- for archive_id, mtime in ongoing.items():
- status = self.get_virtual_status('ongoing', mtime)
- data[status].add(archive_id)
+ Yields:
+ Tuple that represents the archival status of contents that needs
+ to be archived, as (content_id, archive_present), where
+ `archive_present` is a list of archive servers where the content is
+ already present.
+ """
+ # Iterates over the contents referenced into the archiver.
+ for content in self._get_all_contents():
+ content_id, present_copies, ongoing_copies = content
+ # Construct the present list
+ virtualy_present_copies = {
+ ids
+ for ids in ongoing_copies
+ if self.get_virtual_status('ongoing', ongoing_copies[ids])
+ }
+ presents = set(present_copies) | virtualy_present_copies
- if not data['missing']:
- continue
+ # If there is already enough copies, don't schedule it.
+ if len(presents) >= self.config['retention_policy']:
+ continue
- contents[r'\x%s' % hashutil.hash_to_hex(content_id)] = {
- k: [(archive_id, archives[archive_id]) for archive_id in v]
- for k, v in data.items()
- }
+ # Otherwise, yield it
+ yield r'\x%s' % hashutil.hash_to_hex(content_id), presents
- if len(contents) >= self.config['batch_max_size']:
- yield contents
- contents = {}
+ def _get_all_contents(self):
+ """ Get batchs from the archiver db and yield it as continous stream
- if len(contents) > 0:
- yield contents
+ Yields:
+ Datas about a content as a tuple
+ (content_id, present_copies, ongoing_copies) where ongoing_copies
+ is a dict mapping copy to mtime.
+ """
+ last_object = b''
+ while True:
+ archiver_contents = list(
+ self.archiver_storage.content_archive_get_copies(last_object)
+ )
+ if not archiver_contents:
+ return
+ for content in archiver_contents:
+ last_object = content[0]
+ yield content
def get_virtual_status(self, status, mtime):
""" Compute the virtual presence of a content.
@@ -258,7 +265,7 @@
ValueError: if the status is not one 'present', 'missing'
or 'ongoing'
"""
- if status in ('present', 'missing'):
+ if status in {'present', 'missing'}:
return status
# If the status is 'ongoing' but there is still time, another worker
diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py
--- a/swh/storage/archiver/tasks.py
+++ b/swh/storage/archiver/tasks.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015 The Software Heritage developers
+# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -12,9 +12,10 @@
"""
task_queue = 'swh_storage_archive_worker'
- def run(self, batch, archiver_args, master_objstorage_args,
- slave_objstorages, config):
- aw = ArchiverWorker(batch, archiver_args, master_objstorage_args,
- slave_objstorages, config)
+ def run(self, batch, slave_objstorages, archiver_args,
+ master_objstorage_args, config):
+ aw = ArchiverWorker(batch, slave_objstorages,
+ archiver_args, master_objstorage_args,
+ config)
if aw.run():
self.log("Successful backup for a batch of size %s" % len(batch))
diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py
--- a/swh/storage/archiver/worker.py
+++ b/swh/storage/archiver/worker.py
@@ -1,11 +1,9 @@
-# Copyright (C) 2015 The Software Heritage developers
+# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import random
import logging
-import time
from swh.objstorage import PathSlicingObjStorage
from swh.objstorage.api.client import RemoteObjStorage
@@ -14,9 +12,6 @@
from .copier import ArchiverCopier
-logger = logging.getLogger()
-
-
class ArchiverWorker():
""" Do the required backups on a given batch of contents.
@@ -45,19 +40,20 @@
asynchronous (boolean): Indicate whenever the archival should
run in asynchronous mode or not.
"""
- def __init__(self, batch, archiver_args, master_objstorage_args,
- slave_objstorages, config):
+ def __init__(self, batch, slave_objstorages,
+ archiver_args, master_objstorage_args,
+ config):
""" Constructor of the ArchiverWorker class.
Args:
batch: A batch of content, which is a dictionary that associates
a content's sha1 id to the list of servers where the content
is present.
+ slave_objstorages: A map that associates server_id to the remote
+ server.
archiver_args: The archiver's arguments to establish connection to
db.
master_objstorage_args: The master storage arguments.
- slave_objstorages: A map that associates server_id to the remote
- server.
config: Archiver_configuration. A dictionary that must contains
the following keys.
objstorage_path (string): the path of the objstorage of the
@@ -72,8 +68,8 @@
run in asynchronous mode or not.
"""
self.batch = batch
- self.archiver_storage = ArchiverStorage(archiver_args)
self.slave_objstorages = slave_objstorages
+ self.archiver_storage = ArchiverStorage(archiver_args)
self.config = config
if config['objstorage_type'] == 'local_objstorage':
@@ -82,49 +78,55 @@
master_objstorage = RemoteObjStorage(**master_objstorage_args)
self.master_objstorage = master_objstorage
- def _choose_backup_servers(self, allowed_storage, backup_number):
- """ Choose the slave servers for archival.
-
- Choose the given amount of servers among those which don't already
- contain a copy of the content.
+ def run(self):
+ """ Do the task expected from the archiver worker.
- Args:
- allowed_storage: servers when the content is not already present.
- backup_number (int): The number of servers we have to choose in
- order to fullfill the objective.
+ Process the content in the batch, ensure that the elements still need
+ an archival, and spawn copiers to copy files in each destinations.
"""
- # In case there is not enough backup servers to get all the backups
- # we need, just do our best.
- # Such situation should not happen.
- backup_number = min(backup_number, len(allowed_storage))
+ # Batch the contents to the required servers
+ contents = {} # Map {archive_id -> [contents]}
+ for content_id, present_copies in self.batch.items():
+ backup_servers = self._select_archive_servers(present_copies)
- # TODO Find a better (or a good) policy to choose the backup servers.
- # The random choice should be equivalently distributed between
- # servers for a great amount of data, but don't take care of servers
- # capacities.
- return random.sample(allowed_storage, backup_number)
+ for server in backup_servers:
+ # Add the content to the map {archive_id -> [content_ids]}
+ contents.setdefault(server, []).append(content_id)
+
+ # Set the archival status of the items to ongoing.
+ self._content_archive_update(
+ content_id,
+ server,
+ new_status='ongoing'
+ )
+
+ # For each destination, spawn a copier
+ for destination, content_ids in contents.items():
+ server_url = self.slave_objstorages[destination]
+ try:
+ self.run_copier(server_url, content_ids)
+ except:
+ logging.error('Unable to copy a batch to %s' % destination)
- def _get_archival_status(self, content_id, server):
- """ Get the archival status of the required content.
+ def _select_archive_servers(self, already_present_copies):
+ """ Select some backup servers
+
+ Select backup servers that does not contains the content among the
+ existing ones.
+
+ Args:
+ already_present_copies list(str): list of the servers that already
+ contains the object.
- Attributes:
- content_id (string): Sha1 of the content.
- server: Tuple (archive_id, archive_url) of the archive server.
Returns:
- A dictionary that contains all the required data : 'content_id',
- 'archive_id', 'status', and 'mtime'
+ a list of enough backup servers id to respect the retention policy.
"""
- archive = server[0]
- t, = list(
- self.archiver_storage.content_archive_get(content_id)
- )
- status_for_archive = t[1].get(archive, {})
- return {
- 'content_id': content_id,
- 'archive_id': archive,
- 'status': status_for_archive.get('status', 'missing'),
- 'mtime': status_for_archive.get('mtime', 0),
- }
+ nb_missing = len(self.slave_objstorages) - len(already_present_copies)
+ allowed_servers = [server for server in self.slave_objstorages
+ if server not in already_present_copies]
+ # Select any server for the backup
+ backup_servers = allowed_servers[:nb_missing]
+ return backup_servers
def _content_archive_update(self, content_id, archive_id,
new_status=None):
@@ -146,91 +148,6 @@
new_status
)
- def need_archival(self, content, destination):
- """ Indicates whenever a content need archivage.
-
- Filter function that returns True if a given content
- still require to be archived.
-
- Args:
- content (str): Sha1 of a content.
- destination: Tuple (archive id, archive url).
- """
- archival_status = self._get_archival_status(
- content,
- destination
- )
- status = archival_status['status']
- mtime = archival_status['mtime']
- # If the archive is already present, no need to backup.
- if status == 'present':
- return False
- # If the content is ongoing but still have time, there is
- # another worker working on this content.
- elif status == 'ongoing':
- elapsed = int(time.time()) - mtime
- if elapsed <= self.config['archival_max_age']:
- return False
- return True
-
- def sort_content_by_archive(self):
- """ Create a map {archive_server -> list of content)
-
- Create a mapping that associate to a archive server all the
- contents that needs to be archived in it by the current worker.
-
- The map is in the form of :
- {
- (archive_1, archive_1_url): [content1, content2, content_3]
- (archive_2, archive_2_url): [content1, content3]
- }
-
- Returns:
- The created mapping.
- """
- slaves_copy = {}
- for content_id in self.batch:
- # Choose some servers to upload the content among the missing ones.
- server_data = self.batch[content_id]
- nb_present = len(server_data['present'])
- nb_backup = self.config['retention_policy'] - nb_present
- backup_servers = self._choose_backup_servers(
- server_data['missing'],
- nb_backup
- )
- # Fill the map destination -> content to upload
- for server in backup_servers:
- slaves_copy.setdefault(server, []).append(content_id)
- return slaves_copy
-
- def run(self):
- """ Do the task expected from the archiver worker.
-
- Process the content in the batch, ensure that the elements still need
- an archival, and spawn copiers to copy files in each destinations.
- """
- # Get a map (archive -> [contents])
- slaves_copy = self.sort_content_by_archive()
-
- # At this point, re-check the archival status in order to know if the
- # job have been done by another worker.
- for destination in slaves_copy:
- # list() is needed because filter's result will be consumed twice.
- slaves_copy[destination] = list(filter(
- lambda content_id: self.need_archival(content_id, destination),
- slaves_copy[destination]
- ))
- for content_id in slaves_copy[destination]:
- self._content_archive_update(content_id, destination[0],
- new_status='ongoing')
-
- # Spawn a copier for each destination
- for destination in slaves_copy:
- try:
- self.run_copier(destination, slaves_copy[destination])
- except:
- logger.error('Unable to copy a batch to %s' % destination)
-
def run_copier(self, destination, contents):
""" Run a copier in order to archive the given contents
diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py
--- a/swh/storage/tests/test_archiver.py
+++ b/swh/storage/tests/test_archiver.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015 The Software Heritage developers
+# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -101,7 +101,7 @@
content_id = r'\x' + hashutil.hash_to_hex(content['sha1'])
copies = {'banco': {
'status': status,
- 'mtime': date or int(time.time()) # if date is None, use now()
+ 'mtime': date or time.time() # if date is None, use now()
}}
self.cursor.execute("""INSERT INTO content_archive
VALUES('%s'::sha1, '%s')
@@ -130,14 +130,16 @@
config=config)
return director
- def __create_worker(self, batch={}, config={}):
+ def __create_worker(self, batch={}, slave_objstorages=None, config={}):
mobjstorage_args = self.archiver.master_objstorage_args
if not config:
config = self.archiver.config
+ if not slave_objstorages:
+ slave_objstorages = [self.storage_data]
return ArchiverWorker(batch,
archiver_args=self.conn,
master_objstorage_args=mobjstorage_args,
- slave_objstorages=[self.storage_data],
+ slave_objstorages=slave_objstorages,
config=config)
# Integration test
@@ -201,14 +203,14 @@
@istest
def vstatus_ongoing_remaining(self):
self.assertEquals(
- self.vstatus('ongoing', int(time.time())),
+ self.vstatus('ongoing', time.time()),
'present'
)
@istest
def vstatus_ongoing_elapsed(self):
past_time = (
- int(time.time()) - self.archiver.config['archival_max_age'] - 1
+ time.time() - self.archiver.config['archival_max_age'] - 1
)
self.assertEquals(
self.vstatus('ongoing', past_time),
@@ -216,65 +218,40 @@
)
# Unit tests for archive worker
+ def _initialize_testdata_worker(self):
+ storages = ['server1', 'server2']
+ batch = {
+ 'id1': ['server1', 'server2'],
+ 'id2': ['server2'],
+ 'id3': []
+ }
+ worker = self.__create_worker(
+ batch=batch,
+ slave_objstorages=storages,
+ config=self.archiver.config.copy().update(retention_policy=2)
+ )
+ return storages, batch, worker
@istest
- def need_archival_missing(self):
- """ A content should still need archival when it is missing.
- """
- id = self.__add_content(b'need_archival_missing', status='missing')
- id = r'\x' + hashutil.hash_to_hex(id)
- worker = self.__create_worker()
- self.assertEqual(worker.need_archival(id, self.storage_data), True)
-
- @istest
- def need_archival_present(self):
- """ A content should still need archival when it is missing
- """
- id = self.__add_content(b'need_archival_missing', status='present')
- id = r'\x' + hashutil.hash_to_hex(id)
- worker = self.__create_worker()
- self.assertEqual(worker.need_archival(id, self.storage_data), False)
-
- @istest
- def need_archival_ongoing_remaining(self):
- """ An ongoing archival with remaining time shouldnt need archival.
+ def select_server_none(self):
+ """ Content with enough copies should not need any more
"""
- id = self.__add_content(b'need_archival_ongoing_remaining',
- status='ongoing')
- id = r'\x' + hashutil.hash_to_hex(id)
- worker = self.__create_worker()
- self.assertEqual(worker.need_archival(id, self.storage_data), False)
+ storages, batch, worker = self._initialize_testdata_worker()
+ mapping = worker._select_archive_servers(batch['id1'])
+ self.assertEquals(mapping, [])
@istest
- def need_archival_ongoing_elasped(self):
- """ An ongoing archival with elapsed time should be scheduled again.
+ def select_server_single(self):
+ """ Content with not enough copies should need at least one
"""
- id = self.__add_content(
- b'archive_ongoing_elapsed',
- status='ongoing',
- date=(
- int(time.time()) - self.archiver.config['archival_max_age'] - 1
- )
- )
- id = r'\x' + hashutil.hash_to_hex(id)
- worker = self.__create_worker()
- self.assertEqual(worker.need_archival(id, self.storage_data), True)
+ storages, batch, worker = self._initialize_testdata_worker()
+ mapping = worker._select_archive_servers(batch['id2'])
+ self.assertEquals(mapping, ['server1'])
@istest
- def content_sorting_by_archiver(self):
- """ Check that the content is correctly sorted.
+ def select_server_all(self):
+ """ Content with no copies will need at least `retention_policy` ones
"""
- batch = {
- 'id1': {
- 'present': [('slave1', 'slave1_url')],
- 'missing': []
- },
- 'id2': {
- 'present': [],
- 'missing': [('slave1', 'slave1_url')]
- }
- }
- worker = self.__create_worker(batch=batch)
- mapping = worker.sort_content_by_archive()
- self.assertNotIn('id1', mapping[('slave1', 'slave1_url')])
- self.assertIn('id2', mapping[('slave1', 'slave1_url')])
+ storages, batch, worker = self._initialize_testdata_worker()
+ mapping = worker._select_archive_servers(batch['id3'])
+ self.assertEquals(mapping, ['server1', 'server2'])

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 6:54 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228741

Event Timeline