diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -76,7 +76,7 @@ copies. """ contents = [] - for content in self._get_unarchived_content(): + for content in self._get_unarchived_content_id(): contents.append(content) if len(contents) > self.config['batch_max_size']: yield contents @@ -84,21 +84,11 @@ if len(contents) > 0: yield contents - def _get_unarchived_content(self): - """ Get all the content ids in the db that needs more copies - - Yields: - sha1 of contents that needs to be archived. - """ - for content_id, present, _ongoing in self._get_all_contents(): - if len(present) < self.config['retention_policy']: - yield content_id - else: - continue - - def _get_all_contents(self): + def _get_unarchived_content_id(self): """ Get batchs from the archiver db and yield it as continous stream + Content returned are those that need to have more copies. + Yields: Datas about a content as a tuple (content_id, present_copies, ongoing_copies) where ongoing_copies @@ -107,13 +97,16 @@ last_object = b'' while True: archiver_contents = list( - self.archiver_storage.content_archive_get_copies(last_object) + self.archiver_storage.content_archive_get_unarchived_copies( + previous_content=last_object, + retention_policy=self.config['retention_policy'] + ) ) if not archiver_contents: return - for content in archiver_contents: - last_object = content[0] - yield content + for content_id, presents, oingoings in archiver_contents: + last_object = content_id + yield content_id def launch(): diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py --- a/swh/storage/archiver/storage.py +++ b/swh/storage/archiver/storage.py @@ -53,13 +53,13 @@ return self.db.content_archive_get(content_id, cur) @db_transaction_generator - def content_archive_get_copies(self, previous_content=None, limit=1000, + def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): - """Get the list of copies for `limit` contents starting after - `previous_content`. + """ Get the list of copies for `limit` contents starting after + `last_content`. Args: - previous_content: sha1 of the last content retrieved. May be None + last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). @@ -69,9 +69,32 @@ ongoing_copies is a dict mapping copy to mtime. """ - yield from self.db.content_archive_get_copies(previous_content, limit, + yield from self.db.content_archive_get_copies(last_content, limit, cur) + @db_transaction_generator + def content_archive_get_unarchived_copies( + self, retention_policy, last_content=None, + limit=1000, cur=None): + """ Get the list of copies for `limit` contents starting after + `last_content`. Yields only copies with number of present + smaller than `retention policy`. + + Args: + last_content: sha1 of the last content retrieved. May be None + to start at the beginning. + retention_policy: number of presentcopies required. + limit: number of contents to retrieve. Can be None to retrieve all + objects (will be slow). + + Yields: + A tuple (content_id, present_copies, ongoing_copies), where + ongoing_copies is a dict mapping copy to mtime. + + """ + yield from self.db.content_archive_get_unarchived_copies( + retention_policy, last_content, limit, cur) + @db_transaction def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -702,13 +702,13 @@ content_id, present, ongoing, mtimes = cur.fetchone() return (content_id, present, dict(zip(ongoing, mtimes))) - def content_archive_get_copies(self, previous_content=None, limit=1000, + def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """Get the list of copies for `limit` contents starting after - `previous_content`. + `last_content`. Args: - previous_content: sha1 of the last content retrieved. May be None + last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). @@ -744,11 +744,64 @@ LIMIT %s """ - if previous_content is None: - previous_content = b'' + if last_content is None: + last_content = b'' cur = self._cursor(cur) - cur.execute(query, (previous_content, limit)) + cur.execute(query, (last_content, limit)) + for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): + yield (content_id, present, dict(zip(ongoing, mtimes))) + + def content_archive_get_unarchived_copies( + self, retention_policy, last_content=None, + limit=1000, cur=None): + """ Get the list of copies for `limit` contents starting after + `last_content`. Yields only copies with number of present + smaller than `retention policy`. + + Args: + last_content: sha1 of the last content retrieved. May be None + to start at the beginning. + retention_policy: number of presentcopies required. + limit: number of contents to retrieve. Can be None to retrieve all + objects (will be slow). + + Yields: + A tuple (content_id, present_copies, ongoing_copies), where + ongoing_copies is a dict mapping copy to mtime. + + """ + + query = """SELECT content_id, + array( + SELECT key + FROM jsonb_each(copies) + WHERE value->>'status' = 'present' + ORDER BY key + ) AS present, + array( + SELECT key + FROM jsonb_each(copies) + WHERE value->>'status' = 'ongoing' + ORDER BY key + ) AS ongoing, + array( + SELECT value->'mtime' + FROM jsonb_each(copies) + WHERE value->>'status' = 'ongoing' + ORDER BY key + ) AS ongoing_mtime + FROM content_archive + WHERE content_id > %s AND num_present < %s + ORDER BY content_id + LIMIT %s + """ + + if last_content is None: + last_content = b'' + + cur = self._cursor(cur) + cur.execute(query, (last_content, retention_policy, limit)) for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): yield (content_id, present, dict(zip(ongoing, mtimes)))