diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.65 swh.model >= 0.0.51 -swh.objstorage >= 0.0.17 +swh.objstorage >= 0.0.40 diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -17,14 +17,13 @@ Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, OriginVisit, Snapshot ) -from swh.objstorage import get_objstorage -from swh.objstorage.exc import ObjNotFoundError try: from swh.journal.writer import get_journal_writer except ImportError: get_journal_writer = None # type: ignore # mypy limitation, see https://github.com/python/mypy/issues/1153 +from swh.storage.objstorage import ObjStorage from .. import HashCollision from ..exc import StorageArgumentException @@ -49,12 +48,11 @@ port=9042, journal_writer=None): self._cql_runner = CqlRunner(hosts, keyspace, port) - self.objstorage = get_objstorage(**objstorage) - if journal_writer: self.journal_writer = get_journal_writer(**journal_writer) else: self.journal_writer = None + self.objstorage = ObjStorage(objstorage) def check_config(self, *, check_write): self._cql_runner.check_read() @@ -78,24 +76,20 @@ del content['data'] self.journal_writer.write_addition('content', content) - count_contents = 0 - count_content_added = 0 - count_content_bytes_added = 0 - - for content in contents: + if with_data: # First insert to the objstorage, if the endpoint is # `content_add` (as opposed to `content_add_metadata`). # TODO: this should probably be done in concurrently to inserting # in index tables (but still before the main table; so an entry is # only added to the main table after everything else was # successfully inserted. - count_contents += 1 - if content.status != 'absent': - count_content_added += 1 - if with_data: - content_data = content.data - count_content_bytes_added += len(content_data) - self.objstorage.add(content_data, content.sha1) + summary = self.objstorage.content_add( + c.to_dict() for c in contents if c.status != 'absent') + content_add_bytes = summary['content:add:bytes'] + + content_add = 0 + for content in contents: + content_add += 1 # Then add to index tables for algo in HASH_ALGORITHMS: @@ -120,11 +114,11 @@ raise HashCollision(algo, content.get_hash(algo), pks) summary = { - 'content:add': count_content_added, + 'content:add': content_add, } if with_data: - summary['content:add:bytes'] = count_content_bytes_added + summary['content:add:bytes'] = content_add_bytes return summary @@ -145,14 +139,7 @@ if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise StorageArgumentException( "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) - for obj_id in content: - try: - data = self.objstorage.get(obj_id) - except ObjNotFoundError: - yield None - continue - - yield {'sha1': obj_id, 'data': data} + yield from self.objstorage.content_get(content) def content_get_partition( self, partition_id: int, nb_partitions: int, limit: int = 1000, diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -22,8 +22,7 @@ BaseContent, Content, SkippedContent, Directory, Revision, Release, Snapshot, OriginVisit, Origin, SHA1_SIZE) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex -from swh.objstorage import get_objstorage -from swh.objstorage.exc import ObjNotFoundError +from swh.storage.objstorage import ObjStorage from . import HashCollision from .exc import StorageArgumentException @@ -71,7 +70,7 @@ # ideally we would want a skip list for both fast inserts and searches self._sorted_sha1s = [] - self.objstorage = get_objstorage('memory', {}) + self.objstorage = ObjStorage({'cls': 'memory', 'args': {}}) def check_config(self, *, check_write): return True @@ -93,12 +92,13 @@ raise StorageArgumentException(*e.args) self.journal_writer.write_addition('content', content) - summary = { - 'content:add': 0, - } - + content_add = 0 + content_add_bytes = 0 if with_data: - summary['content:add:bytes'] = 0 + summary = self.objstorage.content_add( + c.to_dict() for c in contents + if c.status != 'absent') + content_add_bytes = summary['content:add:bytes'] for content in contents: key = self._content_key(content) @@ -116,17 +116,19 @@ ('content', content.sha1)) self._contents[key] = content bisect.insort(self._sorted_sha1s, content.sha1) - summary['content:add'] += 1 - if with_data: - content_data = self._contents[key].data - try: - self._contents[key] = attr.evolve( - self._contents[key], - data=None) - except (KeyError, TypeError, ValueError) as e: - raise StorageArgumentException(*e.args) - summary['content:add:bytes'] += len(content_data) - self.objstorage.add(content_data, content.sha1) + try: + self._contents[key] = attr.evolve( + self._contents[key], + data=None) + except (KeyError, TypeError, ValueError) as e: + raise StorageArgumentException(*e.args) + content_add += 1 + + summary = { + 'content:add': content_add, + } + if content_add_bytes: + summary['content:add:bytes'] = content_add_bytes return summary @@ -175,14 +177,7 @@ if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise StorageArgumentException( "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) - for obj_id in content: - try: - data = self.objstorage.get(obj_id) - except ObjNotFoundError: - yield None - continue - - yield {'sha1': obj_id, 'data': data} + yield from self.objstorage.content_get(content) def content_get_range(self, start, end, limit=1000): if limit is None: diff --git a/swh/storage/objstorage.py b/swh/storage/objstorage.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage.py @@ -0,0 +1,57 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Dict, Generator, Iterable + +from swh.objstorage import get_objstorage +from swh.objstorage.exc import ObjNotFoundError + + +class ObjStorage: + """Objstorage collaborator in charge of adding objects to + the objstorage. + + """ + def __init__(self, objstorage_config: Dict): + self.objstorage = get_objstorage(**objstorage_config) + + def __getattr__(self, key): + return getattr(self.objstorage, key) + + def content_get(self, contents: Iterable[bytes]) -> Generator: + """Retrieve content data from the objstorage + + Args: + contents: List of contents to retrieve data from + + """ + for obj_id in contents: + try: + data = self.objstorage.get(obj_id) + except ObjNotFoundError: + yield None + continue + + yield {'sha1': obj_id, 'data': data} + + def content_add(self, contents: Iterable[Dict]) -> Dict: + """Add contents to the objstorage. + + Args: + contents: List of contents to add1 + + Returns: + The summary dict of content and content bytes added to the + objstorage. + + """ + summary = self.objstorage.add_batch({ + cont['sha1']: cont['data'] + for cont in contents + }) + return { + 'content:add': summary['object:add'], + 'content:add:bytes': summary['object:add:bytes'] + } diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -21,13 +21,12 @@ from swh.model.model import SHA1_SIZE from swh.model.hashutil import ALGORITHMS, hash_to_bytes, hash_to_hex -from swh.objstorage import get_objstorage -from swh.objstorage.exc import ObjNotFoundError try: from swh.journal.writer import get_journal_writer except ImportError: get_journal_writer = None # type: ignore # mypy limitation, see https://github.com/python/mypy/issues/1153 +from swh.storage.objstorage import ObjStorage from . import converters, HashCollision from .common import db_transaction_generator, db_transaction @@ -92,7 +91,6 @@ except psycopg2.OperationalError as e: raise StorageDBError(e) - self.objstorage = get_objstorage(**objstorage) if journal_writer: if get_journal_writer is None: raise EnvironmentError( @@ -101,6 +99,7 @@ self.journal_writer = get_journal_writer(**journal_writer) else: self.journal_writer = None + self.objstorage = ObjStorage(objstorage) def get_db(self): if self._db: @@ -233,18 +232,8 @@ objstorage. Content present twice is only sent once. """ - content_bytes_added = 0 - data = {} - for cont in content: - if cont['sha1'] not in data: - data[cont['sha1']] = cont['data'] - content_bytes_added += max(0, cont['length']) - - # FIXME: Since we do the filtering anyway now, we might as - # well make the objstorage's add_batch call return what we - # want here (real bytes added)... that'd simplify this... - self.objstorage.add_batch(data) - return content_bytes_added + summary = self.objstorage.content_add(content) + return summary['content:add:bytes'] with ThreadPoolExecutor(max_workers=1) as executor: added_to_objstorage = executor.submit(add_to_objstorage) @@ -305,15 +294,7 @@ if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise StorageArgumentException( "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) - - for obj_id in content: - try: - data = self.objstorage.get(obj_id) - except ObjNotFoundError: - yield None - continue - - yield {'sha1': obj_id, 'data': data} + yield from self.objstorage.content_get(content) @timed @db_transaction() diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -26,7 +26,6 @@ s = storage.content_add([sample_content]) assert s == { 'content:add': 0, - 'content:add:bytes': 0, } diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -3626,7 +3626,7 @@ # This test is only relevant on the local storage, with an actual # objstorage raising an exception def test_content_add_objstorage_exception(self, swh_storage): - swh_storage.objstorage.add = Mock( + swh_storage.objstorage.content_add = Mock( side_effect=Exception('mocked broken objstorage') ) @@ -3725,7 +3725,7 @@ } if hasattr(swh_storage, 'objstorage'): - assert cont['sha1'] in swh_storage.objstorage + assert cont['sha1'] in swh_storage.objstorage.objstorage with db_transaction(swh_storage) as (_, cur): cur.execute('SELECT sha1, sha1_git, sha256, length, status' @@ -3755,7 +3755,7 @@ } if hasattr(swh_storage, 'objstorage'): - assert cont['sha1'] not in swh_storage.objstorage + assert cont['sha1'] not in swh_storage.objstorage.objstorage with db_transaction(swh_storage) as (_, cur): cur.execute('SELECT sha1, sha1_git, sha256, length, status' ' FROM content WHERE sha1 = %s',