Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345280
D2643.id9461.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
D2643.id9461.diff
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,3 @@
swh.core[db,http] >= 0.0.65
swh.model >= 0.0.51
-swh.objstorage >= 0.0.17
+swh.objstorage >= 0.0.40
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -17,14 +17,13 @@
Revision, Release, Directory, DirectoryEntry, Content, SkippedContent,
OriginVisit, Snapshot, Origin
)
-from swh.objstorage import get_objstorage
-from swh.objstorage.exc import ObjNotFoundError
try:
from swh.journal.writer import get_journal_writer
except ImportError:
get_journal_writer = None # type: ignore
# mypy limitation, see https://github.com/python/mypy/issues/1153
+from swh.storage.objstorage import ObjStorage
from .. import HashCollision
from ..exc import StorageArgumentException
@@ -49,12 +48,11 @@
port=9042, journal_writer=None):
self._cql_runner = CqlRunner(hosts, keyspace, port)
- self.objstorage = get_objstorage(**objstorage)
-
if journal_writer:
self.journal_writer = get_journal_writer(**journal_writer)
else:
self.journal_writer = None
+ self.objstorage = ObjStorage(objstorage)
def check_config(self, *, check_write):
self._cql_runner.check_read()
@@ -73,26 +71,20 @@
del cont['data']
self.journal_writer.write_addition('content', cont)
- count_contents = 0
- count_content_added = 0
- count_content_bytes_added = 0
-
- for content in contents:
+ if with_data:
# First insert to the objstorage, if the endpoint is
# `content_add` (as opposed to `content_add_metadata`).
# TODO: this should probably be done in concurrently to inserting
# in index tables (but still before the main table; so an entry is
# only added to the main table after everything else was
# successfully inserted.
- count_contents += 1
- if content.status != 'absent':
- count_content_added += 1
- if with_data:
- content_data = content.data
- if content_data is None:
- raise StorageArgumentException('Missing data')
- count_content_bytes_added += len(content_data)
- self.objstorage.add(content_data, content.sha1)
+ summary = self.objstorage.content_add(
+ c for c in contents if c.status != 'absent')
+ content_add_bytes = summary['content:add:bytes']
+
+ content_add = 0
+ for content in contents:
+ content_add += 1
# Then add to index tables
for algo in HASH_ALGORITHMS:
@@ -117,11 +109,11 @@
raise HashCollision(algo, content.get_hash(algo), pks)
summary = {
- 'content:add': count_content_added,
+ 'content:add': content_add,
}
if with_data:
- summary['content:add:bytes'] = count_content_bytes_added
+ summary['content:add:bytes'] = content_add_bytes
return summary
@@ -139,14 +131,7 @@
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
raise StorageArgumentException(
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX)
- for obj_id in content:
- try:
- data = self.objstorage.get(obj_id)
- except ObjNotFoundError:
- yield None
- continue
-
- yield {'sha1': obj_id, 'data': data}
+ yield from self.objstorage.content_get(content)
def content_get_partition(
self, partition_id: int, nb_partitions: int, limit: int = 1000,
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -22,8 +22,7 @@
BaseContent, Content, SkippedContent, Directory, Revision, Release,
Snapshot, OriginVisit, Origin, SHA1_SIZE)
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
-from swh.objstorage import get_objstorage
-from swh.objstorage.exc import ObjNotFoundError
+from swh.storage.objstorage import ObjStorage
from . import HashCollision
from .exc import StorageArgumentException
@@ -71,7 +70,7 @@
# ideally we would want a skip list for both fast inserts and searches
self._sorted_sha1s = []
- self.objstorage = get_objstorage('memory', {})
+ self.objstorage = ObjStorage({'cls': 'memory', 'args': {}})
def check_config(self, *, check_write):
return True
@@ -83,12 +82,13 @@
content = attr.evolve(content, data=None)
self.journal_writer.write_addition('content', content)
- summary = {
- 'content:add': 0,
- }
-
+ content_add = 0
+ content_add_bytes = 0
if with_data:
- summary['content:add:bytes'] = 0
+ summary = self.objstorage.content_add(
+ c for c in contents
+ if c.status != 'absent')
+ content_add_bytes = summary['content:add:bytes']
for content in contents:
key = self._content_key(content)
@@ -106,14 +106,16 @@
('content', content.sha1))
self._contents[key] = content
bisect.insort(self._sorted_sha1s, content.sha1)
- summary['content:add'] += 1
- if with_data:
- content_data = self._contents[key].data
- self._contents[key] = attr.evolve(
- self._contents[key],
- data=None)
- summary['content:add:bytes'] += len(content_data)
- self.objstorage.add(content_data, content.sha1)
+ self._contents[key] = attr.evolve(
+ self._contents[key],
+ data=None)
+ content_add += 1
+
+ summary = {
+ 'content:add': content_add,
+ }
+ if with_data:
+ summary['content:add:bytes'] = content_add_bytes
return summary
@@ -154,14 +156,7 @@
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
raise StorageArgumentException(
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX)
- for obj_id in content:
- try:
- data = self.objstorage.get(obj_id)
- except ObjNotFoundError:
- yield None
- continue
-
- yield {'sha1': obj_id, 'data': data}
+ yield from self.objstorage.content_get(content)
def content_get_range(self, start, end, limit=1000):
if limit is None:
diff --git a/swh/storage/objstorage.py b/swh/storage/objstorage.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage.py
@@ -0,0 +1,63 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Dict, Generator, Iterable
+
+from swh.model.model import Content
+from swh.objstorage import get_objstorage
+from swh.objstorage.exc import ObjNotFoundError
+
+from .exc import StorageArgumentException
+
+
+class ObjStorage:
+ """Objstorage collaborator in charge of adding objects to
+ the objstorage.
+
+ """
+ def __init__(self, objstorage_config: Dict):
+ self.objstorage = get_objstorage(**objstorage_config)
+
+ def __getattr__(self, key):
+ return getattr(self.objstorage, key)
+
+ def content_get(self, contents: Iterable[bytes]) -> Generator:
+ """Retrieve content data from the objstorage
+
+ Args:
+ contents: List of contents to retrieve data from
+
+ """
+ for obj_id in contents:
+ try:
+ data = self.objstorage.get(obj_id)
+ except ObjNotFoundError:
+ yield None
+ continue
+
+ yield {'sha1': obj_id, 'data': data}
+
+ def content_add(self, contents: Iterable[Content]) -> Dict:
+ """Add contents to the objstorage.
+
+ Args:
+ contents: List of contents to add1
+
+ Returns:
+ The summary dict of content and content bytes added to the
+ objstorage.
+
+ """
+ contents = list(contents)
+ if any(cont.data is None for cont in contents):
+ raise StorageArgumentException('Missing data')
+ summary = self.objstorage.add_batch({
+ cont.sha1: cont.data
+ for cont in contents
+ })
+ return {
+ 'content:add': summary['object:add'],
+ 'content:add:bytes': summary['object:add:bytes']
+ }
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -25,13 +25,12 @@
Snapshot, Origin, SHA1_SIZE
)
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
-from swh.objstorage import get_objstorage
-from swh.objstorage.exc import ObjNotFoundError
try:
from swh.journal.writer import get_journal_writer
except ImportError:
get_journal_writer = None # type: ignore
# mypy limitation, see https://github.com/python/mypy/issues/1153
+from swh.storage.objstorage import ObjStorage
from . import converters, HashCollision
from .common import db_transaction_generator, db_transaction
@@ -96,7 +95,6 @@
except psycopg2.OperationalError as e:
raise StorageDBError(e)
- self.objstorage = get_objstorage(**objstorage)
if journal_writer:
if get_journal_writer is None:
raise EnvironmentError(
@@ -105,6 +103,7 @@
self.journal_writer = get_journal_writer(**journal_writer)
else:
self.journal_writer = None
+ self.objstorage = ObjStorage(objstorage)
def get_db(self):
if self._db:
@@ -207,18 +206,8 @@
objstorage. Content present twice is only sent once.
"""
- content_bytes_added = 0
- data = {}
- for cont in content:
- if cont.sha1 not in data:
- data[cont.sha1] = cont.data
- content_bytes_added += max(0, cont.length)
-
- # FIXME: Since we do the filtering anyway now, we might as
- # well make the objstorage's add_batch call return what we
- # want here (real bytes added)... that'd simplify this...
- self.objstorage.add_batch(data)
- return content_bytes_added
+ summary = self.objstorage.content_add(content)
+ return summary['content:add:bytes']
with ThreadPoolExecutor(max_workers=1) as executor:
added_to_objstorage = executor.submit(add_to_objstorage)
@@ -278,15 +267,7 @@
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
raise StorageArgumentException(
"Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX)
-
- for obj_id in content:
- try:
- data = self.objstorage.get(obj_id)
- except ObjNotFoundError:
- yield None
- continue
-
- yield {'sha1': obj_id, 'data': data}
+ yield from self.objstorage.content_get(content)
@timed
@db_transaction()
diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py
--- a/swh/storage/tests/test_filter.py
+++ b/swh/storage/tests/test_filter.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -3647,7 +3647,7 @@
# This test is only relevant on the local storage, with an actual
# objstorage raising an exception
def test_content_add_objstorage_exception(self, swh_storage):
- swh_storage.objstorage.add = Mock(
+ swh_storage.objstorage.content_add = Mock(
side_effect=Exception('mocked broken objstorage')
)
@@ -3746,7 +3746,7 @@
}
if hasattr(swh_storage, 'objstorage'):
- assert cont['sha1'] in swh_storage.objstorage
+ assert cont['sha1'] in swh_storage.objstorage.objstorage
with db_transaction(swh_storage) as (_, cur):
cur.execute('SELECT sha1, sha1_git, sha256, length, status'
@@ -3776,7 +3776,7 @@
}
if hasattr(swh_storage, 'objstorage'):
- assert cont['sha1'] not in swh_storage.objstorage
+ assert cont['sha1'] not in swh_storage.objstorage.objstorage
with db_transaction(swh_storage) as (_, cur):
cur.execute('SELECT sha1, sha1_git, sha256, length, status'
' FROM content WHERE sha1 = %s',
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:16 PM (5 d, 15 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3229342
Attached To
D2643: storages: Refactor objstorage operations with a dedicated collaborator
Event Timeline
Log In to Comment