diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -9,7 +9,6 @@ import json from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from typing import Any, Dict, Iterable, List, Optional, Union @@ -185,43 +184,27 @@ @timed @process_metrics - @db_transaction() def content_add( - self, content: Iterable[Content], db=None, cur=None) -> Dict: + self, content: Iterable[Content]) -> Dict: now = datetime.datetime.now(tz=datetime.timezone.utc) contents = [attr.evolve(c, ctime=now) for c in content] - missing = list(self.content_missing( - map(Content.to_dict, contents), key_hash='sha1_git', - db=db, cur=cur, - )) - contents = [c for c in contents if c.sha1_git in missing] - - self.journal_writer.content_add(contents) - - def add_to_objstorage(): - """Add to objstorage the new missing_content - - Returns: - Sum of all the content's data length pushed to the - objstorage. Content present twice is only sent once. - - """ - summary = self.objstorage.content_add(contents) - return summary['content:add:bytes'] - - with ThreadPoolExecutor(max_workers=1) as executor: - added_to_objstorage = executor.submit(add_to_objstorage) + objstorage_summary = self.objstorage.content_add(contents) - self._content_add_metadata(db, cur, contents) + with self.db() as db: + with db.transaction() as cur: + missing = list(self.content_missing( + map(Content.to_dict, contents), key_hash='sha1_git', + db=db, cur=cur, + )) + contents = [c for c in contents if c.sha1_git in missing] - # Wait for objstorage addition before returning from the - # transaction, bubbling up any exception - content_bytes_added = added_to_objstorage.result() + self.journal_writer.content_add(contents) + self._content_add_metadata(db, cur, contents) return { 'content:add': len(contents), - 'content:add:bytes': content_bytes_added, + 'content:add:bytes': objstorage_summary['content:add:bytes'], } @timed