diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -5,8 +5,7 @@ import logging import click - -from datetime import datetime +import time from swh.core import hashutil, config from swh.objstorage import PathSlicingObjStorage @@ -198,38 +197,31 @@ At least all the content that don't have enough copies on the backups servers are distributed into these batches. """ - # Get the data about each content referenced into the archiver. - missing_copy = {} - for content_id in self.archiver_storage.content_archive_ls(): - db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) - - # Fetch the datas about archival status of the content - backups = self.archiver_storage.content_archive_get( - content=db_content_id - ) - for _content_id, server_id, status, mtime in backups: - virtual_status = self.get_virtual_status(status, mtime) - server_data = (server_id, self.slave_objstorages[server_id]) - - missing_copy.setdefault( - db_content_id, - {'present': [], 'missing': []} - ).setdefault(virtual_status, []).append(server_data) - - # Check the content before archival. - try: - self.master_objstorage.check(content_id[0]) - except Exception as e: - # Exception can be Error or ObjNotFoundError. - logger.error(e) - # TODO Do something to restore the content? - - if len(missing_copy) >= self.config['batch_max_size']: - yield missing_copy - missing_copy = {} - - if len(missing_copy) > 0: - yield missing_copy + contents = {} + # Get the archives + archives = [(k, v) for k, v in self.archiver_storage.archive_ls()] + # Get all the contents referenced into the archiver tables + for content_id, copies in self.archiver_storage.content_archive_get(): + content_id = r'\x' + hashutil.hash_to_hex(content_id) + data = {'present': [], 'missing': []} + # For each archive server, check the current content status + for archive_id, archive_url in archives: + if archive_id not in copies: + data['missing'].append((archive_id, archive_url)) + else: + copy = copies[archive_id] + vstatus = self.get_virtual_status(copy['status'], + copy['mtime']) + data[vstatus].append((archive_id, archive_url)) + + contents[content_id] = data + + if len(contents) >= self.config['batch_max_size']: + yield contents + contents = {} + + if len(contents) > 0: + yield contents def get_virtual_status(self, status, mtime): """ Compute the virtual presence of a content. @@ -260,8 +252,7 @@ # If the status is 'ongoing' but there is still time, another worker # may still be on the task. if status == 'ongoing': - mtime = mtime.replace(tzinfo=None) - elapsed = (datetime.now() - mtime).total_seconds() + elapsed = int(time.time()) - mtime if elapsed <= self.config['archival_max_age']: return 'present' else: @@ -288,7 +279,7 @@ conf.update(cl_config) # Create connection data and run the archiver. archiver = ArchiverDirector(conf['dbconn'], conf) - logger.info("Starting an archival at", datetime.now()) + logger.info("Starting an archival at", time.time()) archiver.run() diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py --- a/swh/storage/archiver/storage.py +++ b/swh/storage/archiver/storage.py @@ -4,8 +4,9 @@ # See top-level LICENSE file for more information import psycopg2 +import time -from ..common import db_transaction_generator +from ..common import db_transaction_generator, db_transaction from ..db import Db from ..exc import StorageDBError @@ -38,21 +39,7 @@ yield from self.db.archive_ls(cur) @db_transaction_generator - def content_archive_ls(self, cur=None): - """ Get the archival status of the content - - Get an iterable over all the content that is referenced - in a backup server. - - Yields: - the sha1 of each content referenced at least one time - in the database of archiveal status. - - """ - yield from self.db.content_archive_ls(cur) - - @db_transaction_generator - def content_archive_get(self, content=None, archive=None, cur=None): + def content_archive_get(self, content=None, cur=None): """ Get the archival status of a content in a specific server. Retreive from the database the archival status of the given content @@ -66,24 +53,30 @@ Yields: A tuple (content_id, server_id, archival status, mtime, tzinfo). """ - yield from self.db.content_archive_get(content, archive, cur) + yield from self.db.content_archive_get(content, cur) - @db_transaction_generator + @db_transaction def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): - """ Update the status of a archive content and set it's mtime to now() + """ Update the status of an archive content and set its mtime to - Change the last modification time of an archived content and change - its status to the given one. + Change the mtime of an archived content for the given archive and set + it's mtime to the current time. Args: - content_id (string): The content id. - archive_id (string): The id of the concerned archive. - new_status (string): One of missing, ongoing or present, this - status will replace the previous one. If not given, the - function only changes the mtime of the content. + content_id (str): content sha1 + archive_id (str): name of the archive + new_status (str): one of 'missing', 'present' or 'ongoing'. + this status will replace the previous one. If not given, + the function only change the mtime of the content for the + given archive. """ - yield from self.db.content_archive_update(content_id, - archive_id, - new_status, - cur) + # FIXME check how to alter direclty the json object with postgres + # Get the data and alter it + copies = self.db.content_archive_get(content_id)['copies'] + if new_status is not None: + copies[archive_id]['status'] = new_status + copies[archive_id]['mtime'] = int(time.time()) + + # Then save the new data + self.db.content_archive_update(content_id, copies) diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py --- a/swh/storage/archiver/worker.py +++ b/swh/storage/archiver/worker.py @@ -5,8 +5,7 @@ import random import logging - -from datetime import datetime +import time from swh.objstorage import PathSlicingObjStorage from swh.objstorage.api.client import RemoteObjStorage @@ -115,19 +114,20 @@ A dictionary that contains all the required data : 'content_id', 'archive_id', 'status', and 'mtime' """ + archive = server[0] t, = list( - self.archiver_storage.content_archive_get(content_id, server[0]) + self.archiver_storage.content_archive_get(content_id) ) return { 'content_id': t[0], - 'archive_id': t[1], - 'status': t[2], - 'mtime': t[3] + 'archive_id': archive, + 'status': t[1][archive]['status'], + 'mtime': t[1][archive]['mtime'] } def _content_archive_update(self, content_id, archive_id, new_status=None): - """ Update the status of a archive content and set it's mtime to now() + """ Update the status of a archive content and set its mtime to now. Change the last modification time of an archived content and change its status to the given one. @@ -167,8 +167,7 @@ # If the content is ongoing but still have time, there is # another worker working on this content. elif status == 'ongoing': - mtime = mtime.replace(tzinfo=None) - elapsed = (datetime.now() - mtime).total_seconds() + elapsed = int(time.time()) - mtime if elapsed <= self.config['archival_max_age']: return False return True diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -659,80 +659,42 @@ """) yield from cursor_to_bytes(cur) - def content_archive_ls(self, cur=None): - """ Get the archival status of the content - - Get an iterable over all the content that is referenced - in a backup server. - - Yields: - the sha1 of each content referenced at least one time - in the database of archiveal status. - """ - cur = self._cursor(cur) - cur.execute("""SELECT DISTINCT content_id - FROM content_archive""") - yield from cursor_to_bytes(cur) - - def content_archive_get(self, content=None, archive=None, cur=None): + def content_archive_get(self, content=None, cur=None): """ Get the archival status of a content in a specific server. Retreive from the database the archival status of the given content in the given archive server. Args: - content: the sha1 of the content. May be None for any id. - archive: the database id of the server we're looking into - may be None for any server. + content: the sha1 of the content. May be None for all contents. Yields: - A tuple (content_id, server_id, archival status, mtime, tzinfo). + A tuple (content_id, copies_json). """ - query = """SELECT content_id, archive_id, status, mtime + query = """SELECT content_id, copies FROM content_archive """ - conditions = [] - if content: - conditions.append("content_id='%s'" % content) - if archive: - conditions.append("archive_id='%s'" % archive) - - if conditions: - query = """%s - WHERE %s - """ % (query, ' and '.join(conditions)) + if content is not None: + query += "WHERE content_id='%s'" % content cur = self._cursor(cur) cur.execute(query) yield from cursor_to_bytes(cur) - def content_archive_update(self, content_id, archive_id, - new_status=None, cur=None): - """ Update the status of a archive content and set it's mtime to now() + def content_archive_update(self, content_id, copies, cur=None): + """ Update the status of an archive content and set it's mtime to now() Change the last modification time of an archived content and change its status to the given one. Args: - content_id (string): The content id. - archive_id (string): The id of the concerned archive. - new_status (string): One of missing, ongoing or present, this - status will replace the previous one. If not given, the - function only changes the mtime of the content. + content_id (str): The content id. + copies (dict): dictionary that match the json expected in the table """ query = """UPDATE content_archive - SET %(fields)s - WHERE content_id='%(content_id)s' - and archive_id='%(archive_id)s' - """ - fields = [] - if new_status: - fields.append("status='%s'" % new_status) - fields.append("mtime=now()") - - d = {'fields': ', '.join(fields), - 'content_id': content_id, - 'archive_id': archive_id} + SET copies=%s + WHERE content_id='%s' + """ % (jsonize(copies), content_id) cur = self._cursor(cur) - cur.execute(query % d) + cur.execute(query) diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -6,10 +6,11 @@ import tempfile import unittest import os +import time +import json from nose.tools import istest from nose.plugins.attrib import attr -from datetime import datetime, timedelta from swh.core import hashutil from swh.core.tests.db_testing import DbsTestFixture @@ -91,16 +92,20 @@ self.cursor.execute('DELETE FROM content_archive') self.conn.commit() - def __add_content(self, content_data, status='missing', date='now()'): - # Add the content + def __add_content(self, content_data, status='missing', date=None): + # Add the content to the storage content = hashutil.hashdata(content_data) content.update({'data': content_data}) self.storage.content_add([content]) # Then update database content_id = r'\x' + hashutil.hash_to_hex(content['sha1']) + copies = {'banco': { + 'status': status, + 'mtime': date or int(time.time()) # if date is None, use now() + }} self.cursor.execute("""INSERT INTO content_archive - VALUES('%s'::sha1, 'banco', '%s', %s) - """ % (content_id, status, date)) + VALUES('%s'::sha1, '%s') + """ % (content_id, json.dumps(copies))) return content['sha1'] def __get_missing(self): @@ -136,7 +141,7 @@ config=config) # Integration test - + @istest def archive_missing_content(self): """ Run archiver on a missing content should archive it. """ @@ -145,12 +150,13 @@ # before, the content should not be there try: self.remote_objstorage.content_get(content_id) - except: + except ObjNotFoundError: pass + else: + self.fail('Content should not be present before archival') self.archiver.run() # now the content should be present on remote objstorage remote_data = self.remote_objstorage.content_get(content_id) - # After the run, the content should be archived after the archiver run. self.assertEquals(content_data, remote_data) @istest @@ -194,16 +200,15 @@ @istest def vstatus_ongoing_remaining(self): - current_time = datetime.now() self.assertEquals( - self.vstatus('ongoing', current_time), + self.vstatus('ongoing', int(time.time())), 'present' ) @istest def vstatus_ongoing_elapsed(self): - past_time = datetime.now() - timedelta( - seconds=self.archiver.config['archival_max_age'] + 1 + past_time = ( + int(time.time()) - self.archiver.config['archival_max_age'] - 1 ) self.assertEquals( self.vstatus('ongoing', past_time), @@ -235,7 +240,7 @@ """ An ongoing archival with remaining time shouldnt need archival. """ id = self.__add_content(b'need_archival_ongoing_remaining', - status='ongoing', date="'%s'" % datetime.now()) + status='ongoing') id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), False) @@ -247,9 +252,9 @@ id = self.__add_content( b'archive_ongoing_elapsed', status='ongoing', - date="'%s'" % (datetime.now() - timedelta( - seconds=self.archiver.config['archival_max_age'] + 1 - )) + date=( + int(time.time()) - self.archiver.config['archival_max_age'] - 1 + ) ) id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker()