diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.65 swh.model >= 0.0.51 -swh.objstorage >= 0.0.17 +swh.objstorage >= 0.0.40 diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -17,14 +17,13 @@ Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, OriginVisit, Snapshot, Origin ) -from swh.objstorage import get_objstorage -from swh.objstorage.exc import ObjNotFoundError try: from swh.journal.writer import get_journal_writer except ImportError: get_journal_writer = None # type: ignore # mypy limitation, see https://github.com/python/mypy/issues/1153 +from swh.storage.objstorage import ObjStorage from .. import HashCollision from ..exc import StorageArgumentException @@ -49,12 +48,11 @@ port=9042, journal_writer=None): self._cql_runner = CqlRunner(hosts, keyspace, port) - self.objstorage = get_objstorage(**objstorage) - if journal_writer: self.journal_writer = get_journal_writer(**journal_writer) else: self.journal_writer = None + self.objstorage = ObjStorage(objstorage) def check_config(self, *, check_write): self._cql_runner.check_read() @@ -73,26 +71,20 @@ del cont['data'] self.journal_writer.write_addition('content', cont) - count_contents = 0 - count_content_added = 0 - count_content_bytes_added = 0 - - for content in contents: + if with_data: # First insert to the objstorage, if the endpoint is # `content_add` (as opposed to `content_add_metadata`). # TODO: this should probably be done in concurrently to inserting # in index tables (but still before the main table; so an entry is # only added to the main table after everything else was # successfully inserted. - count_contents += 1 - if content.status != 'absent': - count_content_added += 1 - if with_data: - content_data = content.data - if content_data is None: - raise StorageArgumentException('Missing data') - count_content_bytes_added += len(content_data) - self.objstorage.add(content_data, content.sha1) + summary = self.objstorage.content_add( + c for c in contents if c.status != 'absent') + content_add_bytes = summary['content:add:bytes'] + + content_add = 0 + for content in contents: + content_add += 1 # Then add to index tables for algo in HASH_ALGORITHMS: @@ -117,11 +109,11 @@ raise HashCollision(algo, content.get_hash(algo), pks) summary = { - 'content:add': count_content_added, + 'content:add': content_add, } if with_data: - summary['content:add:bytes'] = count_content_bytes_added + summary['content:add:bytes'] = content_add_bytes return summary @@ -139,14 +131,7 @@ if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise StorageArgumentException( "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) - for obj_id in content: - try: - data = self.objstorage.get(obj_id) - except ObjNotFoundError: - yield None - continue - - yield {'sha1': obj_id, 'data': data} + yield from self.objstorage.content_get(content) def content_get_partition( self, partition_id: int, nb_partitions: int, limit: int = 1000, diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -22,8 +22,7 @@ BaseContent, Content, SkippedContent, Directory, Revision, Release, Snapshot, OriginVisit, Origin, SHA1_SIZE) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex -from swh.objstorage import get_objstorage -from swh.objstorage.exc import ObjNotFoundError +from swh.storage.objstorage import ObjStorage from . import HashCollision from .exc import StorageArgumentException @@ -71,7 +70,7 @@ # ideally we would want a skip list for both fast inserts and searches self._sorted_sha1s = [] - self.objstorage = get_objstorage('memory', {}) + self.objstorage = ObjStorage({'cls': 'memory', 'args': {}}) def check_config(self, *, check_write): return True @@ -83,12 +82,13 @@ content = attr.evolve(content, data=None) self.journal_writer.write_addition('content', content) - summary = { - 'content:add': 0, - } - + content_add = 0 + content_add_bytes = 0 if with_data: - summary['content:add:bytes'] = 0 + summary = self.objstorage.content_add( + c for c in contents + if c.status != 'absent') + content_add_bytes = summary['content:add:bytes'] for content in contents: key = self._content_key(content) @@ -106,14 +106,16 @@ ('content', content.sha1)) self._contents[key] = content bisect.insort(self._sorted_sha1s, content.sha1) - summary['content:add'] += 1 - if with_data: - content_data = self._contents[key].data - self._contents[key] = attr.evolve( - self._contents[key], - data=None) - summary['content:add:bytes'] += len(content_data) - self.objstorage.add(content_data, content.sha1) + self._contents[key] = attr.evolve( + self._contents[key], + data=None) + content_add += 1 + + summary = { + 'content:add': content_add, + } + if with_data: + summary['content:add:bytes'] = content_add_bytes return summary @@ -154,14 +156,7 @@ if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise StorageArgumentException( "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) - for obj_id in content: - try: - data = self.objstorage.get(obj_id) - except ObjNotFoundError: - yield None - continue - - yield {'sha1': obj_id, 'data': data} + yield from self.objstorage.content_get(content) def content_get_range(self, start, end, limit=1000): if limit is None: diff --git a/swh/storage/objstorage.py b/swh/storage/objstorage.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage.py @@ -0,0 +1,63 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Dict, Generator, Iterable + +from swh.model.model import Content +from swh.objstorage import get_objstorage +from swh.objstorage.exc import ObjNotFoundError + +from .exc import StorageArgumentException + + +class ObjStorage: + """Objstorage collaborator in charge of adding objects to + the objstorage. + + """ + def __init__(self, objstorage_config: Dict): + self.objstorage = get_objstorage(**objstorage_config) + + def __getattr__(self, key): + return getattr(self.objstorage, key) + + def content_get(self, contents: Iterable[bytes]) -> Generator: + """Retrieve content data from the objstorage + + Args: + contents: List of contents to retrieve data from + + """ + for obj_id in contents: + try: + data = self.objstorage.get(obj_id) + except ObjNotFoundError: + yield None + continue + + yield {'sha1': obj_id, 'data': data} + + def content_add(self, contents: Iterable[Content]) -> Dict: + """Add contents to the objstorage. + + Args: + contents: List of contents to add1 + + Returns: + The summary dict of content and content bytes added to the + objstorage. + + """ + contents = list(contents) + if any(cont.data is None for cont in contents): + raise StorageArgumentException('Missing data') + summary = self.objstorage.add_batch({ + cont.sha1: cont.data + for cont in contents + }) + return { + 'content:add': summary['object:add'], + 'content:add:bytes': summary['object:add:bytes'] + } diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -25,13 +25,12 @@ Snapshot, Origin, SHA1_SIZE ) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex -from swh.objstorage import get_objstorage -from swh.objstorage.exc import ObjNotFoundError try: from swh.journal.writer import get_journal_writer except ImportError: get_journal_writer = None # type: ignore # mypy limitation, see https://github.com/python/mypy/issues/1153 +from swh.storage.objstorage import ObjStorage from . import converters, HashCollision from .common import db_transaction_generator, db_transaction @@ -96,7 +95,6 @@ except psycopg2.OperationalError as e: raise StorageDBError(e) - self.objstorage = get_objstorage(**objstorage) if journal_writer: if get_journal_writer is None: raise EnvironmentError( @@ -105,6 +103,7 @@ self.journal_writer = get_journal_writer(**journal_writer) else: self.journal_writer = None + self.objstorage = ObjStorage(objstorage) def get_db(self): if self._db: @@ -207,18 +206,8 @@ objstorage. Content present twice is only sent once. """ - content_bytes_added = 0 - data = {} - for cont in content: - if cont.sha1 not in data: - data[cont.sha1] = cont.data - content_bytes_added += max(0, cont.length) - - # FIXME: Since we do the filtering anyway now, we might as - # well make the objstorage's add_batch call return what we - # want here (real bytes added)... that'd simplify this... - self.objstorage.add_batch(data) - return content_bytes_added + summary = self.objstorage.content_add(content) + return summary['content:add:bytes'] with ThreadPoolExecutor(max_workers=1) as executor: added_to_objstorage = executor.submit(add_to_objstorage) @@ -278,15 +267,7 @@ if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise StorageArgumentException( "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) - - for obj_id in content: - try: - data = self.objstorage.get(obj_id) - except ObjNotFoundError: - yield None - continue - - yield {'sha1': obj_id, 'data': data} + yield from self.objstorage.content_get(content) @timed @db_transaction() diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -3647,7 +3647,7 @@ # This test is only relevant on the local storage, with an actual # objstorage raising an exception def test_content_add_objstorage_exception(self, swh_storage): - swh_storage.objstorage.add = Mock( + swh_storage.objstorage.content_add = Mock( side_effect=Exception('mocked broken objstorage') ) @@ -3746,7 +3746,7 @@ } if hasattr(swh_storage, 'objstorage'): - assert cont['sha1'] in swh_storage.objstorage + assert cont['sha1'] in swh_storage.objstorage.objstorage with db_transaction(swh_storage) as (_, cur): cur.execute('SELECT sha1, sha1_git, sha256, length, status' @@ -3776,7 +3776,7 @@ } if hasattr(swh_storage, 'objstorage'): - assert cont['sha1'] not in swh_storage.objstorage + assert cont['sha1'] not in swh_storage.objstorage.objstorage with db_transaction(swh_storage) as (_, cur): cur.execute('SELECT sha1, sha1_git, sha256, length, status' ' FROM content WHERE sha1 = %s',