Page MenuHomeSoftware Heritage

D2643.id9461.diff
No OneTemporary

D2643.id9461.diff

diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,3 @@
swh.core[db,http] >= 0.0.65
swh.model >= 0.0.51
-swh.objstorage >= 0.0.17
+swh.objstorage >= 0.0.40
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -17,14 +17,13 @@
Revision, Release, Directory, DirectoryEntry, Content, SkippedContent,
OriginVisit, Snapshot, Origin
)
-from swh.objstorage import get_objstorage
-from swh.objstorage.exc import ObjNotFoundError
try:
from swh.journal.writer import get_journal_writer
except ImportError:
get_journal_writer = None # type: ignore
# mypy limitation, see https://github.com/python/mypy/issues/1153
+from swh.storage.objstorage import ObjStorage
from .. import HashCollision
from ..exc import StorageArgumentException
@@ -49,12 +48,11 @@
port=9042, journal_writer=None):
self._cql_runner = CqlRunner(hosts, keyspace, port)
- self.objstorage = get_objstorage(**objstorage)
-
if journal_writer:
self.journal_writer = get_journal_writer(**journal_writer)
else:
self.journal_writer = None
+ self.objstorage = ObjStorage(objstorage)
def check_config(self, *, check_write):
self._cql_runner.check_read()
@@ -73,26 +71,20 @@
del cont['data']
self.journal_writer.write_addition('content', cont)
- count_contents = 0
- count_content_added = 0
- count_content_bytes_added = 0
-
- for content in contents:
+ if with_data:
# First insert to the objstorage, if the endpoint is
# `content_add` (as opposed to `content_add_metadata`).
# TODO: this should probably be done in concurrently to inserting
# in index tables (but still before the main table; so an entry is
# only added to the main table after everything else was
# successfully inserted.
- count_contents += 1
- if content.status != 'absent':
- count_content_added += 1
- if with_data:
- content_data = content.data
- if content_data is None:
- raise StorageArgumentException('Missing data')
- count_content_bytes_added += len(content_data)
- self.objstorage.add(content_data, content.sha1)
+ summary = self.objstorage.content_add(
+ c for c in contents if c.status != 'absent')
+ content_add_bytes = summary['content:add:bytes']
+
+ content_add = 0
+ for content in contents:
+ content_add += 1
# Then add to index tables
for algo in HASH_ALGORITHMS:
@@ -117,11 +109,11 @@
raise HashCollision(algo, content.get_hash(algo), pks)
summary = {
- 'content:add': count_content_added,
+ 'content:add': content_add,
}
if with_data:
- summary['content:add:bytes'] = count_content_bytes_added
+ summary['content:add:bytes'] = content_add_bytes
return summary
@@ -139,14 +131,7 @@
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
raise StorageArgumentException(
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX)
- for obj_id in content:
- try:
- data = self.objstorage.get(obj_id)
- except ObjNotFoundError:
- yield None
- continue
-
- yield {'sha1': obj_id, 'data': data}
+ yield from self.objstorage.content_get(content)
def content_get_partition(
self, partition_id: int, nb_partitions: int, limit: int = 1000,
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -22,8 +22,7 @@
BaseContent, Content, SkippedContent, Directory, Revision, Release,
Snapshot, OriginVisit, Origin, SHA1_SIZE)
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
-from swh.objstorage import get_objstorage
-from swh.objstorage.exc import ObjNotFoundError
+from swh.storage.objstorage import ObjStorage
from . import HashCollision
from .exc import StorageArgumentException
@@ -71,7 +70,7 @@
# ideally we would want a skip list for both fast inserts and searches
self._sorted_sha1s = []
- self.objstorage = get_objstorage('memory', {})
+ self.objstorage = ObjStorage({'cls': 'memory', 'args': {}})
def check_config(self, *, check_write):
return True
@@ -83,12 +82,13 @@
content = attr.evolve(content, data=None)
self.journal_writer.write_addition('content', content)
- summary = {
- 'content:add': 0,
- }
-
+ content_add = 0
+ content_add_bytes = 0
if with_data:
- summary['content:add:bytes'] = 0
+ summary = self.objstorage.content_add(
+ c for c in contents
+ if c.status != 'absent')
+ content_add_bytes = summary['content:add:bytes']
for content in contents:
key = self._content_key(content)
@@ -106,14 +106,16 @@
('content', content.sha1))
self._contents[key] = content
bisect.insort(self._sorted_sha1s, content.sha1)
- summary['content:add'] += 1
- if with_data:
- content_data = self._contents[key].data
- self._contents[key] = attr.evolve(
- self._contents[key],
- data=None)
- summary['content:add:bytes'] += len(content_data)
- self.objstorage.add(content_data, content.sha1)
+ self._contents[key] = attr.evolve(
+ self._contents[key],
+ data=None)
+ content_add += 1
+
+ summary = {
+ 'content:add': content_add,
+ }
+ if with_data:
+ summary['content:add:bytes'] = content_add_bytes
return summary
@@ -154,14 +156,7 @@
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
raise StorageArgumentException(
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX)
- for obj_id in content:
- try:
- data = self.objstorage.get(obj_id)
- except ObjNotFoundError:
- yield None
- continue
-
- yield {'sha1': obj_id, 'data': data}
+ yield from self.objstorage.content_get(content)
def content_get_range(self, start, end, limit=1000):
if limit is None:
diff --git a/swh/storage/objstorage.py b/swh/storage/objstorage.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage.py
@@ -0,0 +1,63 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Dict, Generator, Iterable
+
+from swh.model.model import Content
+from swh.objstorage import get_objstorage
+from swh.objstorage.exc import ObjNotFoundError
+
+from .exc import StorageArgumentException
+
+
+class ObjStorage:
+ """Objstorage collaborator in charge of adding objects to
+ the objstorage.
+
+ """
+ def __init__(self, objstorage_config: Dict):
+ self.objstorage = get_objstorage(**objstorage_config)
+
+ def __getattr__(self, key):
+ return getattr(self.objstorage, key)
+
+ def content_get(self, contents: Iterable[bytes]) -> Generator:
+ """Retrieve content data from the objstorage
+
+ Args:
+ contents: List of contents to retrieve data from
+
+ """
+ for obj_id in contents:
+ try:
+ data = self.objstorage.get(obj_id)
+ except ObjNotFoundError:
+ yield None
+ continue
+
+ yield {'sha1': obj_id, 'data': data}
+
+ def content_add(self, contents: Iterable[Content]) -> Dict:
+ """Add contents to the objstorage.
+
+ Args:
+ contents: List of contents to add1
+
+ Returns:
+ The summary dict of content and content bytes added to the
+ objstorage.
+
+ """
+ contents = list(contents)
+ if any(cont.data is None for cont in contents):
+ raise StorageArgumentException('Missing data')
+ summary = self.objstorage.add_batch({
+ cont.sha1: cont.data
+ for cont in contents
+ })
+ return {
+ 'content:add': summary['object:add'],
+ 'content:add:bytes': summary['object:add:bytes']
+ }
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -25,13 +25,12 @@
Snapshot, Origin, SHA1_SIZE
)
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
-from swh.objstorage import get_objstorage
-from swh.objstorage.exc import ObjNotFoundError
try:
from swh.journal.writer import get_journal_writer
except ImportError:
get_journal_writer = None # type: ignore
# mypy limitation, see https://github.com/python/mypy/issues/1153
+from swh.storage.objstorage import ObjStorage
from . import converters, HashCollision
from .common import db_transaction_generator, db_transaction
@@ -96,7 +95,6 @@
except psycopg2.OperationalError as e:
raise StorageDBError(e)
- self.objstorage = get_objstorage(**objstorage)
if journal_writer:
if get_journal_writer is None:
raise EnvironmentError(
@@ -105,6 +103,7 @@
self.journal_writer = get_journal_writer(**journal_writer)
else:
self.journal_writer = None
+ self.objstorage = ObjStorage(objstorage)
def get_db(self):
if self._db:
@@ -207,18 +206,8 @@
objstorage. Content present twice is only sent once.
"""
- content_bytes_added = 0
- data = {}
- for cont in content:
- if cont.sha1 not in data:
- data[cont.sha1] = cont.data
- content_bytes_added += max(0, cont.length)
-
- # FIXME: Since we do the filtering anyway now, we might as
- # well make the objstorage's add_batch call return what we
- # want here (real bytes added)... that'd simplify this...
- self.objstorage.add_batch(data)
- return content_bytes_added
+ summary = self.objstorage.content_add(content)
+ return summary['content:add:bytes']
with ThreadPoolExecutor(max_workers=1) as executor:
added_to_objstorage = executor.submit(add_to_objstorage)
@@ -278,15 +267,7 @@
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
raise StorageArgumentException(
"Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX)
-
- for obj_id in content:
- try:
- data = self.objstorage.get(obj_id)
- except ObjNotFoundError:
- yield None
- continue
-
- yield {'sha1': obj_id, 'data': data}
+ yield from self.objstorage.content_get(content)
@timed
@db_transaction()
diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py
--- a/swh/storage/tests/test_filter.py
+++ b/swh/storage/tests/test_filter.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -3647,7 +3647,7 @@
# This test is only relevant on the local storage, with an actual
# objstorage raising an exception
def test_content_add_objstorage_exception(self, swh_storage):
- swh_storage.objstorage.add = Mock(
+ swh_storage.objstorage.content_add = Mock(
side_effect=Exception('mocked broken objstorage')
)
@@ -3746,7 +3746,7 @@
}
if hasattr(swh_storage, 'objstorage'):
- assert cont['sha1'] in swh_storage.objstorage
+ assert cont['sha1'] in swh_storage.objstorage.objstorage
with db_transaction(swh_storage) as (_, cur):
cur.execute('SELECT sha1, sha1_git, sha256, length, status'
@@ -3776,7 +3776,7 @@
}
if hasattr(swh_storage, 'objstorage'):
- assert cont['sha1'] not in swh_storage.objstorage
+ assert cont['sha1'] not in swh_storage.objstorage.objstorage
with db_transaction(swh_storage) as (_, cur):
cur.execute('SELECT sha1, sha1_git, sha256, length, status'
' FROM content WHERE sha1 = %s',

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:16 PM (6 d, 7 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3229342

Event Timeline