diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -5,8 +5,7 @@ import logging import click - -from datetime import datetime +import time from swh.core import hashutil, config from swh.objstorage import PathSlicingObjStorage @@ -198,38 +197,31 @@ At least all the content that don't have enough copies on the backups servers are distributed into these batches. """ - # Get the data about each content referenced into the archiver. - missing_copy = {} - for content_id in self.archiver_storage.content_archive_ls(): - db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) - - # Fetch the datas about archival status of the content - backups = self.archiver_storage.content_archive_get( - content=db_content_id - ) - for _content_id, server_id, status, mtime in backups: - virtual_status = self.get_virtual_status(status, mtime) - server_data = (server_id, self.slave_objstorages[server_id]) - - missing_copy.setdefault( - db_content_id, - {'present': [], 'missing': []} - ).setdefault(virtual_status, []).append(server_data) - - # Check the content before archival. - try: - self.master_objstorage.check(content_id[0]) - except Exception as e: - # Exception can be Error or ObjNotFoundError. - logger.error(e) - # TODO Do something to restore the content? - - if len(missing_copy) >= self.config['batch_max_size']: - yield missing_copy - missing_copy = {} - - if len(missing_copy) > 0: - yield missing_copy + contents = {} + # Get the archives + archives = [(k, v) for k, v in self.archiver_storage.archive_ls()] + # Get all the contents referenced into the archiver tables + for content_id, copies in self.archiver_storage.content_archive_get(): + content_id = r'\x' + hashutil.hash_to_hex(content_id) + data = {'present': [], 'missing': []} + # For each archive server, check the current content status + for archive_id, archive_url in archives: + if archive_id not in copies: + data['missing'].append((archive_id, archive_url)) + else: + copy = copies[archive_id] + vstatus = self.get_virtual_status(copy['status'], + copy['mtime']) + data[vstatus].append((archive_id, archive_url)) + + contents[content_id] = data + + if len(contents) >= self.config['batch_max_size']: + yield contents + contents = {} + + if len(contents) > 0: + yield contents def get_virtual_status(self, status, mtime): """ Compute the virtual presence of a content. @@ -260,8 +252,7 @@ # If the status is 'ongoing' but there is still time, another worker # may still be on the task. if status == 'ongoing': - mtime = mtime.replace(tzinfo=None) - elapsed = (datetime.now() - mtime).total_seconds() + elapsed = int(time.time()) - mtime if elapsed <= self.config['archival_max_age']: return 'present' else: @@ -288,7 +279,7 @@ conf.update(cl_config) # Create connection data and run the archiver. archiver = ArchiverDirector(conf['dbconn'], conf) - logger.info("Starting an archival at", datetime.now()) + logger.info("Starting an archival at", time.time()) archiver.run() diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py --- a/swh/storage/archiver/storage.py +++ b/swh/storage/archiver/storage.py @@ -5,7 +5,7 @@ import psycopg2 -from ..common import db_transaction_generator +from ..common import db_transaction_generator, db_transaction from ..db import Db from ..exc import StorageDBError @@ -38,21 +38,7 @@ yield from self.db.archive_ls(cur) @db_transaction_generator - def content_archive_ls(self, cur=None): - """ Get the archival status of the content - - Get an iterable over all the content that is referenced - in a backup server. - - Yields: - the sha1 of each content referenced at least one time - in the database of archiveal status. - - """ - yield from self.db.content_archive_ls(cur) - - @db_transaction_generator - def content_archive_get(self, content=None, archive=None, cur=None): + def content_archive_get(self, content=None, cur=None): """ Get the archival status of a content in a specific server. Retreive from the database the archival status of the given content @@ -66,24 +52,22 @@ Yields: A tuple (content_id, server_id, archival status, mtime, tzinfo). """ - yield from self.db.content_archive_get(content, archive, cur) + yield from self.db.content_archive_get(content, cur) - @db_transaction_generator + @db_transaction def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): - """ Update the status of a archive content and set it's mtime to now() + """ Update the status of an archive content and set its mtime to - Change the last modification time of an archived content and change - its status to the given one. + Change the mtime of an archived content for the given archive and set + it's mtime to the current time. Args: - content_id (string): The content id. - archive_id (string): The id of the concerned archive. - new_status (string): One of missing, ongoing or present, this - status will replace the previous one. If not given, the - function only changes the mtime of the content. + content_id (str): content sha1 + archive_id (str): name of the archive + new_status (str): one of 'missing', 'present' or 'ongoing'. + this status will replace the previous one. If not given, + the function only change the mtime of the content for the + given archive. """ - yield from self.db.content_archive_update(content_id, - archive_id, - new_status, - cur) + self.db.content_archive_update(content_id, archive_id, new_status, cur) diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py --- a/swh/storage/archiver/worker.py +++ b/swh/storage/archiver/worker.py @@ -5,8 +5,7 @@ import random import logging - -from datetime import datetime +import time from swh.objstorage import PathSlicingObjStorage from swh.objstorage.api.client import RemoteObjStorage @@ -115,19 +114,20 @@ A dictionary that contains all the required data : 'content_id', 'archive_id', 'status', and 'mtime' """ + archive = server[0] t, = list( - self.archiver_storage.content_archive_get(content_id, server[0]) + self.archiver_storage.content_archive_get(content_id) ) return { 'content_id': t[0], - 'archive_id': t[1], - 'status': t[2], - 'mtime': t[3] + 'archive_id': archive, + 'status': t[1][archive]['status'], + 'mtime': t[1][archive]['mtime'] } def _content_archive_update(self, content_id, archive_id, new_status=None): - """ Update the status of a archive content and set it's mtime to now() + """ Update the status of a archive content and set its mtime to now. Change the last modification time of an archived content and change its status to the given one. @@ -167,8 +167,7 @@ # If the content is ongoing but still have time, there is # another worker working on this content. elif status == 'ongoing': - mtime = mtime.replace(tzinfo=None) - elapsed = (datetime.now() - mtime).total_seconds() + elapsed = int(time.time()) - mtime if elapsed <= self.config['archival_max_age']: return False return True diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -10,6 +10,7 @@ import psycopg2 import psycopg2.extras import tempfile +import time from contextlib import contextmanager @@ -659,48 +660,23 @@ """) yield from cursor_to_bytes(cur) - def content_archive_ls(self, cur=None): - """ Get the archival status of the content - - Get an iterable over all the content that is referenced - in a backup server. - - Yields: - the sha1 of each content referenced at least one time - in the database of archiveal status. - """ - cur = self._cursor(cur) - cur.execute("""SELECT DISTINCT content_id - FROM content_archive""") - yield from cursor_to_bytes(cur) - - def content_archive_get(self, content=None, archive=None, cur=None): + def content_archive_get(self, content=None, cur=None): """ Get the archival status of a content in a specific server. - Retreive from the database the archival status of the given content + Retrieve from the database the archival status of the given content in the given archive server. Args: - content: the sha1 of the content. May be None for any id. - archive: the database id of the server we're looking into - may be None for any server. + content: the sha1 of the content. May be None for all contents. Yields: - A tuple (content_id, server_id, archival status, mtime, tzinfo). + A tuple (content_id, copies_json). """ - query = """SELECT content_id, archive_id, status, mtime + query = """SELECT content_id, copies FROM content_archive """ - conditions = [] - if content: - conditions.append("content_id='%s'" % content) - if archive: - conditions.append("archive_id='%s'" % archive) - - if conditions: - query = """%s - WHERE %s - """ % (query, ' and '.join(conditions)) + if content is not None: + query += "WHERE content_id='%s'" % content cur = self._cursor(cur) cur.execute(query) @@ -708,31 +684,34 @@ def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): - """ Update the status of a archive content and set it's mtime to now() + """ Update the status of an archive content and set its mtime to - Change the last modification time of an archived content and change - its status to the given one. + Change the mtime of an archived content for the given archive and set + it's mtime to the current time. Args: - content_id (string): The content id. - archive_id (string): The id of the concerned archive. - new_status (string): One of missing, ongoing or present, this - status will replace the previous one. If not given, the - function only changes the mtime of the content. + content_id (str): content sha1 + archive_id (str): name of the archive + new_status (str): one of 'missing', 'present' or 'ongoing'. + this status will replace the previous one. If not given, + the function only change the mtime of the content for the + given archive. """ - query = """UPDATE content_archive - SET %(fields)s - WHERE content_id='%(content_id)s' - and archive_id='%(archive_id)s' - """ - fields = [] - if new_status: - fields.append("status='%s'" % new_status) - fields.append("mtime=now()") - - d = {'fields': ', '.join(fields), - 'content_id': content_id, - 'archive_id': archive_id} - - cur = self._cursor(cur) - cur.execute(query % d) + if new_status is not None: + query = """UPDATE content_archive + SET copies=jsonb_set( + copies, '{%s}', + '{"status":"%s", "mtime":%d}' + ) + WHERE content_id='%s' + """ % (archive_id, + new_status, int(time.time()), + content_id) + else: + query = """ UPDATE content_archive + SET copies=jsonb_set(copies, '{%s,mtime}', '%d') + WHERE content_id='%s' + """ % (archive_id, int(time.time())) + + cur = self._cursor(cur) + cur.execute(query) diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -6,10 +6,11 @@ import tempfile import unittest import os +import time +import json from nose.tools import istest from nose.plugins.attrib import attr -from datetime import datetime, timedelta from swh.core import hashutil from swh.core.tests.db_testing import DbsTestFixture @@ -91,16 +92,20 @@ self.cursor.execute('DELETE FROM content_archive') self.conn.commit() - def __add_content(self, content_data, status='missing', date='now()'): - # Add the content + def __add_content(self, content_data, status='missing', date=None): + # Add the content to the storage content = hashutil.hashdata(content_data) content.update({'data': content_data}) self.storage.content_add([content]) # Then update database content_id = r'\x' + hashutil.hash_to_hex(content['sha1']) + copies = {'banco': { + 'status': status, + 'mtime': date or int(time.time()) # if date is None, use now() + }} self.cursor.execute("""INSERT INTO content_archive - VALUES('%s'::sha1, 'banco', '%s', %s) - """ % (content_id, status, date)) + VALUES('%s'::sha1, '%s') + """ % (content_id, json.dumps(copies))) return content['sha1'] def __get_missing(self): @@ -136,7 +141,7 @@ config=config) # Integration test - + @istest def archive_missing_content(self): """ Run archiver on a missing content should archive it. """ @@ -145,12 +150,13 @@ # before, the content should not be there try: self.remote_objstorage.content_get(content_id) - except: + except ObjNotFoundError: pass + else: + self.fail('Content should not be present before archival') self.archiver.run() # now the content should be present on remote objstorage remote_data = self.remote_objstorage.content_get(content_id) - # After the run, the content should be archived after the archiver run. self.assertEquals(content_data, remote_data) @istest @@ -194,16 +200,15 @@ @istest def vstatus_ongoing_remaining(self): - current_time = datetime.now() self.assertEquals( - self.vstatus('ongoing', current_time), + self.vstatus('ongoing', int(time.time())), 'present' ) @istest def vstatus_ongoing_elapsed(self): - past_time = datetime.now() - timedelta( - seconds=self.archiver.config['archival_max_age'] + 1 + past_time = ( + int(time.time()) - self.archiver.config['archival_max_age'] - 1 ) self.assertEquals( self.vstatus('ongoing', past_time), @@ -235,7 +240,7 @@ """ An ongoing archival with remaining time shouldnt need archival. """ id = self.__add_content(b'need_archival_ongoing_remaining', - status='ongoing', date="'%s'" % datetime.now()) + status='ongoing') id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), False) @@ -247,9 +252,9 @@ id = self.__add_content( b'archive_ongoing_elapsed', status='ongoing', - date="'%s'" % (datetime.now() - timedelta( - seconds=self.archiver.config['archival_max_age'] + 1 - )) + date=( + int(time.time()) - self.archiver.config['archival_max_age'] - 1 + ) ) id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker()