diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/#director.py# copy from swh/storage/archiver/director.py copy to swh/storage/archiver/#director.py# --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/#director.py# @@ -111,7 +111,7 @@ ) if not archiver_contents: return - for content in archiver_contents: + for content_id, in archiver_contents: last_object = content[0] yield content diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -76,7 +76,7 @@ copies. """ contents = [] - for content in self._get_unarchived_content(): + for content in self._get_unarchived_contents(): contents.append(content) if len(contents) > self.config['batch_max_size']: yield contents @@ -84,21 +84,11 @@ if len(contents) > 0: yield contents - def _get_unarchived_content(self): - """ Get all the content ids in the db that needs more copies - - Yields: - sha1 of contents that needs to be archived. - """ - for content_id, present, _ongoing in self._get_all_contents(): - if len(present) < self.config['retention_policy']: - yield content_id - else: - continue - - def _get_all_contents(self): + def _get_unarchived_contents(self): """ Get batchs from the archiver db and yield it as continous stream + Content returned are those that need to have more copies. + Yields: Datas about a content as a tuple (content_id, present_copies, ongoing_copies) where ongoing_copies @@ -107,13 +97,16 @@ last_object = b'' while True: archiver_contents = list( - self.archiver_storage.content_archive_get_copies(last_object) + self.archiver_storage.content_archive_get_unarchived_copies( + previous_content=last_object, + retention_policy=self.config['retention_policy'] + ) ) if not archiver_contents: return - for content in archiver_contents: - last_object = content[0] - yield content + for content_id, presents, oingoings in archiver_contents: + last_object = content_id + yield content_id def launch(): diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py --- a/swh/storage/archiver/storage.py +++ b/swh/storage/archiver/storage.py @@ -72,6 +72,29 @@ yield from self.db.content_archive_get_copies(previous_content, limit, cur) + @db_transaction_generator + def content_archive_get_unarchived_copies( + self, retention_policy, previous_content=None, + limit=1000, cur=None): + """ Get the list of copies for `limit` contents starting after + `previous_content`. Yields only copies with number of present + smaller than `retention policy`. + + Args: + previous_content: sha1 of the last content retrieved. May be None + to start at the beginning. + retention_policy: number of presentcopies required. + limit: number of contents to retrieve. Can be None to retrieve all + objects (will be slow). + + Yields: + A tuple (content_id, present_copies, ongoing_copies), where + ongoing_copies is a dict mapping copy to mtime. + + """ + yield from self.db.content_archive_get_unarchived_copies( + retention_policy, previous_content, limit, cur) + @db_transaction def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py --- a/swh/storage/archiver/worker.py +++ b/swh/storage/archiver/worker.py @@ -52,6 +52,7 @@ Args: batch: list of object's sha1 that potentially need archival. """ + print(batch) self.batch = batch config = self.parse_config_file() self.retention_policy = config['retention_policy'] diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -752,6 +752,59 @@ for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): yield (content_id, present, dict(zip(ongoing, mtimes))) + def content_archive_get_unarchived_copies( + self, retention_policy, previous_content=None, + limit=1000, cur=None): + """ Get the list of copies for `limit` contents starting after + `previous_content`. Yields only copies with number of present + smaller than `retention policy`. + + Args: + previous_content: sha1 of the last content retrieved. May be None + to start at the beginning. + retention_policy: number of presentcopies required. + limit: number of contents to retrieve. Can be None to retrieve all + objects (will be slow). + + Yields: + A tuple (content_id, present_copies, ongoing_copies), where + ongoing_copies is a dict mapping copy to mtime. + + """ + + query = """SELECT content_id, + array( + SELECT key + FROM jsonb_each(copies) + WHERE value->>'status' = 'present' + ORDER BY key + ) AS present, + array( + SELECT key + FROM jsonb_each(copies) + WHERE value->>'status' = 'ongoing' + ORDER BY key + ) AS ongoing, + array( + SELECT value->'mtime' + FROM jsonb_each(copies) + WHERE value->>'status' = 'ongoing' + ORDER BY key + ) AS ongoing_mtime + FROM content_archive + WHERE content_id > %s AND num_present < %s + ORDER BY content_id + LIMIT %s + """ + + if previous_content is None: + previous_content = b'' + + cur = self._cursor(cur) + cur.execute(query, (previous_content, retention_policy, limit)) + for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): + yield (content_id, present, dict(zip(ongoing, mtimes))) + def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to