Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124322
D81.id266.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
24 KB
Subscribers
None
D81.id266.diff
View Options
diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py
--- a/swh/storage/archiver/copier.py
+++ b/swh/storage/archiver/copier.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015 The Software Heritage developers
+# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -30,7 +30,7 @@
master_storage (Storage): The master storage of the system that
contains the data to archive.
"""
- _name, self.url = destination
+ self.url = destination
self.content_ids = content
self.server = RemoteObjStorage(self.url)
self.master_objstorage = master_objstorage
diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py
--- a/swh/storage/archiver/director.py
+++ b/swh/storage/archiver/director.py
@@ -102,11 +102,7 @@
# Get the slave storages
self.db_conn_archiver = db_conn_archiver
self.archiver_storage = ArchiverStorage(db_conn_archiver)
- self.slave_objstorages = {
- id: url
- for id, url
- in self.archiver_storage.archive_ls()
- }
+ self.slave_objstorages = dict(self.archiver_storage.archive_ls())
# Check that there is enough backup servers for the retention policy
if config['retention_policy'] > len(self.slave_objstorages) + 1:
raise ValueError(
@@ -147,7 +143,7 @@
else:
run_fn = self.run_sync_worker
- for batch in self.get_unarchived_content():
+ for batch in self.get_unarchived_content_batch():
run_fn(batch)
def run_async_worker(self, batch):
@@ -155,9 +151,9 @@
"""
task = app.tasks[task_name]
task.delay(batch,
+ slave_objstorages=self.slave_objstorages,
archiver_args=self.db_conn_archiver,
master_objstorage_args=self.master_objstorage_args,
- slave_objstorages=self.slave_objstorages,
config=self.config)
def run_sync_worker(self, batch):
@@ -165,75 +161,86 @@
"""
task = app.tasks[task_name]
task(batch,
+ slave_objstorages=self.slave_objstorages,
archiver_args=self.db_conn_archiver,
master_objstorage_args=self.master_objstorage_args,
- slave_objstorages=self.slave_objstorages,
config=self.config)
- def get_unarchived_content(self):
- """ Get contents that need to be archived.
+ def get_unarchived_content_batch(self):
+ """ Get batches of contents that need to be archived
Yields:
A batch of contents. Batches are dictionaries which associates
- a content id to the data about servers that contains it or not.
-
- {'id1':
- {'present': [('slave1', 'slave1_url')],
- 'missing': [('slave2', 'slave2_url'),
- ('slave3', 'slave3_url')]
- },
- 'id2':
- {'present': [],
- 'missing': [
- ('slave1', 'slave1_url'),
- ('slave2', 'slave2_url'),
- ('slave3', 'slave3_url')
- ]}
+ a content sha1 to a list of server id where it is already present
+ or archival is ongoing.
+
+ {
+ 'id1': ['slave1', 'slave3'],
+ 'id2': ['slave2']
}
- Where keys (idX) are sha1 of the content and (slaveX, slaveX_url)
- are ids and urls of the storage slaves.
+ Where keys (idX) are sha1 of the content and slaveX are ids of the
+ slaves storages.
At least all the content that don't have enough copies on the
backups servers are distributed into these batches.
"""
contents = {}
- # Get the archives
- archives = dict(self.archiver_storage.archive_ls())
- # Get all the contents referenced into the archiver tables
- last_object = b''
- while True:
- archived_contents = list(
- self.archiver_storage.content_archive_get_copies(last_object))
+ for content_id, presents in self._get_unarchived_content():
+ contents[content_id] = presents
+
+ if len(contents) > self.config['batch_max_size']:
+ yield contents
+ contents = {}
- if not archived_contents:
- break
+ if len(contents) > 0:
+ yield contents
- for content_id, present, ongoing in archived_contents:
- last_object = content_id
- data = {
- 'present': set(present),
- 'missing': set(archives) - set(present) - set(ongoing),
- }
+ def _get_unarchived_content(self):
+ """ Get contents that need to be archived.
- for archive_id, mtime in ongoing.items():
- status = self.get_virtual_status('ongoing', mtime)
- data[status].add(archive_id)
+ Yields:
+ Tuple that represents the archival status of contents that needs
+ to be archived, as (content_id, archive_present), where
+ `archive_present` is a list of archive servers where the content is
+ already present.
+ """
+ # Iterates over the contents referenced into the archiver.
+ for content in self._get_all_contents():
+ content_id, present_copies, ongoing_copies = content
+ # Construct the present list
+ virtualy_present_copies = {
+ ids
+ for ids in ongoing_copies
+ if self.get_virtual_status('ongoing', ongoing_copies[ids])
+ }
+ presents = set(present_copies) | virtualy_present_copies
- if not data['missing']:
- continue
+ # If there is already enough copies, don't schedule it.
+ if len(presents) >= self.config['retention_policy']:
+ continue
- contents[r'\x%s' % hashutil.hash_to_hex(content_id)] = {
- k: [(archive_id, archives[archive_id]) for archive_id in v]
- for k, v in data.items()
- }
+ # Otherwise, yield it
+ yield r'\x%s' % hashutil.hash_to_hex(content_id), presents
- if len(contents) >= self.config['batch_max_size']:
- yield contents
- contents = {}
+ def _get_all_contents(self):
+ """ Get batchs from the archiver db and yield it as continous stream
- if len(contents) > 0:
- yield contents
+ Yields:
+ Datas about a content as a tuple
+ (content_id, present_copies, ongoing_copies) where ongoing_copies
+ is a dict mapping copy to mtime.
+ """
+ last_object = b''
+ while True:
+ archiver_contents = list(
+ self.archiver_storage.content_archive_get_copies(last_object)
+ )
+ if not archiver_contents:
+ return
+ for content in archiver_contents:
+ last_object = content[0]
+ yield content
def get_virtual_status(self, status, mtime):
""" Compute the virtual presence of a content.
@@ -258,7 +265,7 @@
ValueError: if the status is not one 'present', 'missing'
or 'ongoing'
"""
- if status in ('present', 'missing'):
+ if status in {'present', 'missing'}:
return status
# If the status is 'ongoing' but there is still time, another worker
diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py
--- a/swh/storage/archiver/tasks.py
+++ b/swh/storage/archiver/tasks.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015 The Software Heritage developers
+# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -12,9 +12,10 @@
"""
task_queue = 'swh_storage_archive_worker'
- def run(self, batch, archiver_args, master_objstorage_args,
- slave_objstorages, config):
- aw = ArchiverWorker(batch, archiver_args, master_objstorage_args,
- slave_objstorages, config)
+ def run(self, batch, slave_objstorages, archiver_args,
+ master_objstorage_args, config):
+ aw = ArchiverWorker(batch, slave_objstorages,
+ archiver_args, master_objstorage_args,
+ config)
if aw.run():
self.log("Successful backup for a batch of size %s" % len(batch))
diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py
--- a/swh/storage/archiver/worker.py
+++ b/swh/storage/archiver/worker.py
@@ -1,11 +1,9 @@
-# Copyright (C) 2015 The Software Heritage developers
+# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import random
import logging
-import time
from swh.objstorage import PathSlicingObjStorage
from swh.objstorage.api.client import RemoteObjStorage
@@ -14,9 +12,6 @@
from .copier import ArchiverCopier
-logger = logging.getLogger()
-
-
class ArchiverWorker():
""" Do the required backups on a given batch of contents.
@@ -45,19 +40,20 @@
asynchronous (boolean): Indicate whenever the archival should
run in asynchronous mode or not.
"""
- def __init__(self, batch, archiver_args, master_objstorage_args,
- slave_objstorages, config):
+ def __init__(self, batch, slave_objstorages,
+ archiver_args, master_objstorage_args,
+ config):
""" Constructor of the ArchiverWorker class.
Args:
batch: A batch of content, which is a dictionary that associates
a content's sha1 id to the list of servers where the content
is present.
+ slave_objstorages: A map that associates server_id to the remote
+ server.
archiver_args: The archiver's arguments to establish connection to
db.
master_objstorage_args: The master storage arguments.
- slave_objstorages: A map that associates server_id to the remote
- server.
config: Archiver_configuration. A dictionary that must contains
the following keys.
objstorage_path (string): the path of the objstorage of the
@@ -72,8 +68,8 @@
run in asynchronous mode or not.
"""
self.batch = batch
- self.archiver_storage = ArchiverStorage(archiver_args)
self.slave_objstorages = slave_objstorages
+ self.archiver_storage = ArchiverStorage(archiver_args)
self.config = config
if config['objstorage_type'] == 'local_objstorage':
@@ -82,49 +78,55 @@
master_objstorage = RemoteObjStorage(**master_objstorage_args)
self.master_objstorage = master_objstorage
- def _choose_backup_servers(self, allowed_storage, backup_number):
- """ Choose the slave servers for archival.
-
- Choose the given amount of servers among those which don't already
- contain a copy of the content.
+ def run(self):
+ """ Do the task expected from the archiver worker.
- Args:
- allowed_storage: servers when the content is not already present.
- backup_number (int): The number of servers we have to choose in
- order to fullfill the objective.
+ Process the content in the batch, ensure that the elements still need
+ an archival, and spawn copiers to copy files in each destinations.
"""
- # In case there is not enough backup servers to get all the backups
- # we need, just do our best.
- # Such situation should not happen.
- backup_number = min(backup_number, len(allowed_storage))
+ # Batch the contents to the required servers
+ contents = {} # Map {archive_id -> [contents]}
+ for content_id, present_copies in self.batch.items():
+ backup_servers = self._select_archive_servers(present_copies)
- # TODO Find a better (or a good) policy to choose the backup servers.
- # The random choice should be equivalently distributed between
- # servers for a great amount of data, but don't take care of servers
- # capacities.
- return random.sample(allowed_storage, backup_number)
+ for server in backup_servers:
+ # Add the content to the map {archive_id -> [content_ids]}
+ contents.setdefault(server, []).append(content_id)
+
+ # Set the archival status of the items to ongoing.
+ self._content_archive_update(
+ content_id,
+ server,
+ new_status='ongoing'
+ )
+
+ # For each destination, spawn a copier
+ for destination, content_ids in contents.items():
+ server_url = self.slave_objstorages[destination]
+ try:
+ self.run_copier(server_url, content_ids)
+ except:
+ logging.error('Unable to copy a batch to %s' % destination)
- def _get_archival_status(self, content_id, server):
- """ Get the archival status of the required content.
+ def _select_archive_servers(self, already_present_copies):
+ """ Select some backup servers
+
+ Select backup servers that does not contains the content among the
+ existing ones.
+
+ Args:
+ already_present_copies list(str): list of the servers that already
+ contains the object.
- Attributes:
- content_id (string): Sha1 of the content.
- server: Tuple (archive_id, archive_url) of the archive server.
Returns:
- A dictionary that contains all the required data : 'content_id',
- 'archive_id', 'status', and 'mtime'
+ a list of enough backup servers id to respect the retention policy.
"""
- archive = server[0]
- t, = list(
- self.archiver_storage.content_archive_get(content_id)
- )
- status_for_archive = t[1].get(archive, {})
- return {
- 'content_id': content_id,
- 'archive_id': archive,
- 'status': status_for_archive.get('status', 'missing'),
- 'mtime': status_for_archive.get('mtime', 0),
- }
+ nb_missing = len(self.slave_objstorages) - len(already_present_copies)
+ allowed_servers = [server for server in self.slave_objstorages
+ if server not in already_present_copies]
+ # Select any server for the backup
+ backup_servers = allowed_servers[:nb_missing]
+ return backup_servers
def _content_archive_update(self, content_id, archive_id,
new_status=None):
@@ -146,91 +148,6 @@
new_status
)
- def need_archival(self, content, destination):
- """ Indicates whenever a content need archivage.
-
- Filter function that returns True if a given content
- still require to be archived.
-
- Args:
- content (str): Sha1 of a content.
- destination: Tuple (archive id, archive url).
- """
- archival_status = self._get_archival_status(
- content,
- destination
- )
- status = archival_status['status']
- mtime = archival_status['mtime']
- # If the archive is already present, no need to backup.
- if status == 'present':
- return False
- # If the content is ongoing but still have time, there is
- # another worker working on this content.
- elif status == 'ongoing':
- elapsed = int(time.time()) - mtime
- if elapsed <= self.config['archival_max_age']:
- return False
- return True
-
- def sort_content_by_archive(self):
- """ Create a map {archive_server -> list of content)
-
- Create a mapping that associate to a archive server all the
- contents that needs to be archived in it by the current worker.
-
- The map is in the form of :
- {
- (archive_1, archive_1_url): [content1, content2, content_3]
- (archive_2, archive_2_url): [content1, content3]
- }
-
- Returns:
- The created mapping.
- """
- slaves_copy = {}
- for content_id in self.batch:
- # Choose some servers to upload the content among the missing ones.
- server_data = self.batch[content_id]
- nb_present = len(server_data['present'])
- nb_backup = self.config['retention_policy'] - nb_present
- backup_servers = self._choose_backup_servers(
- server_data['missing'],
- nb_backup
- )
- # Fill the map destination -> content to upload
- for server in backup_servers:
- slaves_copy.setdefault(server, []).append(content_id)
- return slaves_copy
-
- def run(self):
- """ Do the task expected from the archiver worker.
-
- Process the content in the batch, ensure that the elements still need
- an archival, and spawn copiers to copy files in each destinations.
- """
- # Get a map (archive -> [contents])
- slaves_copy = self.sort_content_by_archive()
-
- # At this point, re-check the archival status in order to know if the
- # job have been done by another worker.
- for destination in slaves_copy:
- # list() is needed because filter's result will be consumed twice.
- slaves_copy[destination] = list(filter(
- lambda content_id: self.need_archival(content_id, destination),
- slaves_copy[destination]
- ))
- for content_id in slaves_copy[destination]:
- self._content_archive_update(content_id, destination[0],
- new_status='ongoing')
-
- # Spawn a copier for each destination
- for destination in slaves_copy:
- try:
- self.run_copier(destination, slaves_copy[destination])
- except:
- logger.error('Unable to copy a batch to %s' % destination)
-
def run_copier(self, destination, contents):
""" Run a copier in order to archive the given contents
diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py
--- a/swh/storage/tests/test_archiver.py
+++ b/swh/storage/tests/test_archiver.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015 The Software Heritage developers
+# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -101,7 +101,7 @@
content_id = r'\x' + hashutil.hash_to_hex(content['sha1'])
copies = {'banco': {
'status': status,
- 'mtime': date or int(time.time()) # if date is None, use now()
+ 'mtime': date or time.time() # if date is None, use now()
}}
self.cursor.execute("""INSERT INTO content_archive
VALUES('%s'::sha1, '%s')
@@ -130,14 +130,16 @@
config=config)
return director
- def __create_worker(self, batch={}, config={}):
+ def __create_worker(self, batch={}, slave_objstorages=None, config={}):
mobjstorage_args = self.archiver.master_objstorage_args
if not config:
config = self.archiver.config
+ if not slave_objstorages:
+ slave_objstorages = [self.storage_data]
return ArchiverWorker(batch,
archiver_args=self.conn,
master_objstorage_args=mobjstorage_args,
- slave_objstorages=[self.storage_data],
+ slave_objstorages=slave_objstorages,
config=config)
# Integration test
@@ -201,14 +203,14 @@
@istest
def vstatus_ongoing_remaining(self):
self.assertEquals(
- self.vstatus('ongoing', int(time.time())),
+ self.vstatus('ongoing', time.time()),
'present'
)
@istest
def vstatus_ongoing_elapsed(self):
past_time = (
- int(time.time()) - self.archiver.config['archival_max_age'] - 1
+ time.time() - self.archiver.config['archival_max_age'] - 1
)
self.assertEquals(
self.vstatus('ongoing', past_time),
@@ -216,65 +218,40 @@
)
# Unit tests for archive worker
+ def _initialize_testdata_worker(self):
+ storages = ['server1', 'server2']
+ batch = {
+ 'id1': ['server1', 'server2'],
+ 'id2': ['server2'],
+ 'id3': []
+ }
+ worker = self.__create_worker(
+ batch=batch,
+ slave_objstorages=storages,
+ config=self.archiver.config.copy().update(retention_policy=2)
+ )
+ return storages, batch, worker
@istest
- def need_archival_missing(self):
- """ A content should still need archival when it is missing.
- """
- id = self.__add_content(b'need_archival_missing', status='missing')
- id = r'\x' + hashutil.hash_to_hex(id)
- worker = self.__create_worker()
- self.assertEqual(worker.need_archival(id, self.storage_data), True)
-
- @istest
- def need_archival_present(self):
- """ A content should still need archival when it is missing
- """
- id = self.__add_content(b'need_archival_missing', status='present')
- id = r'\x' + hashutil.hash_to_hex(id)
- worker = self.__create_worker()
- self.assertEqual(worker.need_archival(id, self.storage_data), False)
-
- @istest
- def need_archival_ongoing_remaining(self):
- """ An ongoing archival with remaining time shouldnt need archival.
+ def select_server_none(self):
+ """ Content with enough copies should not need any more
"""
- id = self.__add_content(b'need_archival_ongoing_remaining',
- status='ongoing')
- id = r'\x' + hashutil.hash_to_hex(id)
- worker = self.__create_worker()
- self.assertEqual(worker.need_archival(id, self.storage_data), False)
+ storages, batch, worker = self._initialize_testdata_worker()
+ mapping = worker._select_archive_servers(batch['id1'])
+ self.assertEquals(mapping, [])
@istest
- def need_archival_ongoing_elasped(self):
- """ An ongoing archival with elapsed time should be scheduled again.
+ def select_server_single(self):
+ """ Content with not enough copies should need at least one
"""
- id = self.__add_content(
- b'archive_ongoing_elapsed',
- status='ongoing',
- date=(
- int(time.time()) - self.archiver.config['archival_max_age'] - 1
- )
- )
- id = r'\x' + hashutil.hash_to_hex(id)
- worker = self.__create_worker()
- self.assertEqual(worker.need_archival(id, self.storage_data), True)
+ storages, batch, worker = self._initialize_testdata_worker()
+ mapping = worker._select_archive_servers(batch['id2'])
+ self.assertEquals(mapping, ['server1'])
@istest
- def content_sorting_by_archiver(self):
- """ Check that the content is correctly sorted.
+ def select_server_all(self):
+ """ Content with no copies will need at least `retention_policy` ones
"""
- batch = {
- 'id1': {
- 'present': [('slave1', 'slave1_url')],
- 'missing': []
- },
- 'id2': {
- 'present': [],
- 'missing': [('slave1', 'slave1_url')]
- }
- }
- worker = self.__create_worker(batch=batch)
- mapping = worker.sort_content_by_archive()
- self.assertNotIn('id1', mapping[('slave1', 'slave1_url')])
- self.assertIn('id2', mapping[('slave1', 'slave1_url')])
+ storages, batch, worker = self._initialize_testdata_worker()
+ mapping = worker._select_archive_servers(batch['id3'])
+ self.assertEquals(mapping, ['server1', 'server2'])
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 6:54 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228741
Attached To
D81: Refactor and optimize the archiver
Event Timeline
Log In to Comment