Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123212
D2626.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
50 KB
Subscribers
None
D2626.diff
View Options
diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py
--- a/swh/storage/buffer.py
+++ b/swh/storage/buffer.py
@@ -28,6 +28,7 @@
min_batch_size:
content: 10000
content_bytes: 100000000
+ skipped_content: 10000
directory: 5000
revision: 1000
release: 10000
@@ -43,16 +44,18 @@
'content': min_batch_size.get('content', 10000),
'content_bytes': min_batch_size.get('content_bytes',
100*1024*1024),
+ 'skipped_content': min_batch_size.get('skipped_content', 10000),
'directory': min_batch_size.get('directory', 25000),
'revision': min_batch_size.get('revision', 100000),
'release': min_batch_size.get('release', 100000),
}
- self.object_types = ['content', 'directory', 'revision', 'release']
+ self.object_types = [
+ 'content', 'skipped_content', 'directory', 'revision', 'release']
self._objects = {k: deque() for k in self.object_types}
def __getattr__(self, key):
if key.endswith('_add'):
- object_type = key.split('_')[0]
+ object_type = key.rsplit('_', 1)[0]
if object_type in self.object_types:
return partial(
self.object_add, object_type=object_type
diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -19,6 +19,7 @@
from swh.model.model import (
Sha1Git, TimestampWithTimezone, Timestamp, Person, Content,
+ SkippedContent,
)
from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url
@@ -167,6 +168,10 @@
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length',
'ctime', 'status']
+ @_prepared_insert_statement('content', _content_keys)
+ def content_add_one(self, content, *, statement) -> None:
+ self._add_one(statement, 'content', content, self._content_keys)
+
@_prepared_statement('SELECT * FROM content WHERE ' +
' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS)))
def content_get_from_pk(
@@ -203,10 +208,6 @@
self, ids: List[bytes], *, statement) -> List[bytes]:
return self._missing(statement, ids)
- @_prepared_insert_statement('content', _content_keys)
- def content_add_one(self, content, *, statement) -> None:
- self._add_one(statement, 'content', content, self._content_keys)
-
def content_index_add_one(self, main_algo: str, content: Content) -> None:
query = 'INSERT INTO content_by_{algo} ({cols}) VALUES ({values})' \
.format(algo=main_algo, cols=', '.join(self._content_pk),
@@ -221,6 +222,60 @@
algo=algo)
return list(self._execute_with_retries(query, [hash_]))
+ ##########################
+ # 'skipped_content' table
+ ##########################
+
+ _skipped_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256']
+ _skipped_content_keys = [
+ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length',
+ 'ctime', 'status', 'reason', 'origin']
+ _magic_null_pk = b''
+ """
+ NULLs are not allowed in primary keys; instead use an empty
+ value
+ """
+
+ @_prepared_insert_statement('skipped_content', _skipped_content_keys)
+ def skipped_content_add_one(self, content, *, statement) -> None:
+ content = content.to_dict()
+ for key in self._skipped_content_pk:
+ if content[key] is None:
+ content[key] = self._magic_null_pk
+ content = SkippedContent.from_dict(content)
+ self._add_one(statement, 'skipped_content', content,
+ self._skipped_content_keys)
+
+ @_prepared_statement('SELECT * FROM skipped_content WHERE ' +
+ ' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS)))
+ def skipped_content_get_from_pk(
+ self, content_hashes: Dict[str, bytes], *, statement
+ ) -> Optional[Row]:
+ rows = list(self._execute_with_retries(
+ statement, [content_hashes[algo] or self._magic_null_pk
+ for algo in HASH_ALGORITHMS]))
+ assert len(rows) <= 1
+ if rows:
+ # TODO: convert _magic_null_pk back to None?
+ return rows[0]
+ else:
+ return None
+
+ ##########################
+ # 'skipped_content_by_*' tables
+ ##########################
+
+ def skipped_content_index_add_one(
+ self, main_algo: str, content: Content) -> None:
+ assert content.get_hash(main_algo) is not None
+ query = ('INSERT INTO skipped_content_by_{algo} ({cols}) '
+ 'VALUES ({values})').format(
+ algo=main_algo, cols=', '.join(self._content_pk),
+ values=', '.join('%s' for _ in self._content_pk))
+ self._execute_with_retries(
+ query, [content.get_hash(algo) or self._magic_null_pk
+ for algo in self._content_pk])
+
##########################
# 'revision' table
##########################
diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py
--- a/swh/storage/cassandra/schema.py
+++ b/swh/storage/cassandra/schema.py
@@ -65,6 +65,20 @@
PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256))
);
+CREATE TABLE IF NOT EXISTS skipped_content (
+ sha1 blob,
+ sha1_git blob,
+ sha256 blob,
+ blake2s256 blob,
+ length bigint,
+ ctime timestamp,
+ -- creation time, i.e. time of (first) injection into the storage
+ status ascii,
+ reason text,
+ origin text,
+ PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256))
+);
+
CREATE TABLE IF NOT EXISTS revision (
id blob PRIMARY KEY,
date microtimestamp_with_timezone,
@@ -184,19 +198,29 @@
sha256 blob,
blake2s256 blob,
PRIMARY KEY (({main_algo}), {other_algos})
-);'''
+);
+
+CREATE TABLE IF NOT EXISTS skipped_content_by_{main_algo} (
+ sha1 blob,
+ sha1_git blob,
+ sha256 blob,
+ blake2s256 blob,
+ PRIMARY KEY (({main_algo}), {other_algos})
+);
+'''
-TABLES = ('content revision revision_parent release directory '
- 'directory_entry snapshot snapshot_branch origin_visit '
- 'origin tool_by_uuid tool object_count').split()
+TABLES = ('skipped_content content revision revision_parent release '
+ 'directory directory_entry snapshot snapshot_branch '
+ 'origin_visit origin tool_by_uuid tool object_count').split()
HASH_ALGORITHMS = ['sha1', 'sha1_git', 'sha256', 'blake2s256']
for main_algo in HASH_ALGORITHMS:
- CREATE_TABLES_QUERIES.append(CONTENT_INDEX_TEMPLATE.format(
+ CREATE_TABLES_QUERIES.extend(CONTENT_INDEX_TEMPLATE.format(
main_algo=main_algo,
other_algos=', '.join(
[algo for algo in HASH_ALGORITHMS if algo != main_algo])
- ))
+ ).split('\n\n'))
TABLES.append('content_by_%s' % main_algo)
+ TABLES.append('skipped_content_by_%s' % main_algo)
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -14,7 +14,8 @@
import dateutil
from swh.model.model import (
- Revision, Release, Directory, DirectoryEntry, Content, OriginVisit,
+ Revision, Release, Directory, DirectoryEntry, Content, SkippedContent,
+ OriginVisit,
)
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
@@ -116,7 +117,6 @@
summary = {
'content:add': count_content_added,
- 'skipped_content:add': count_contents - count_content_added,
}
if with_data:
@@ -208,10 +208,6 @@
result[content_metadata['sha1']].append(content_metadata)
return result
- def skipped_content_missing(self, contents):
- # TODO
- raise NotImplementedError('not yet supported for Cassandra')
-
def content_find(self, content):
# Find an algorithm that is common to all the requested contents.
# It will be used to do an initial filtering efficiently.
@@ -263,6 +259,46 @@
def content_get_random(self):
return self._cql_runner.content_get_random().sha1_git
+ def _skipped_content_add(self, contents):
+ contents = [SkippedContent.from_dict(c) for c in contents]
+
+ # Filter-out content already in the database.
+ contents = [
+ c for c in contents
+ if not self._cql_runner.skipped_content_get_from_pk(c.to_dict())]
+
+ if self.journal_writer:
+ for content in contents:
+ content = content.to_dict()
+ if 'data' in content:
+ del content['data']
+ self.journal_writer.write_addition('content', content)
+
+ for content in contents:
+ # Add to index tables
+ for algo in HASH_ALGORITHMS:
+ if content.get_hash(algo) is not None:
+ self._cql_runner.skipped_content_index_add_one(
+ algo, content)
+
+ # Then to the main table
+ self._cql_runner.skipped_content_add_one(content)
+
+ return {
+ 'skipped_content:add': len(contents)
+ }
+
+ def skipped_content_add(self, content):
+ content = [c.copy() for c in content] # semi-shallow copy
+ for item in content:
+ item['ctime'] = now()
+ return self._skipped_content_add(content)
+
+ def skipped_content_missing(self, contents):
+ for content in contents:
+ if not self._cql_runner.skipped_content_get_from_pk(content):
+ yield content
+
def directory_add(self, directories):
directories = list(directories)
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -143,9 +143,9 @@
query = """SELECT * FROM (VALUES %s) AS t (%s)
WHERE not exists
(SELECT 1 FROM skipped_content s WHERE
- s.sha1 is not distinct from t.sha1 and
- s.sha1_git is not distinct from t.sha1_git and
- s.sha256 is not distinct from t.sha256);""" % \
+ s.sha1 is not distinct from t.sha1::sha1 and
+ s.sha1_git is not distinct from t.sha1_git::sha1 and
+ s.sha256 is not distinct from t.sha256::bytea);""" % \
((', '.join('%s' for _ in contents)),
', '.join(self.content_hash_keys))
cur.execute(query,
diff --git a/swh/storage/filter.py b/swh/storage/filter.py
--- a/swh/storage/filter.py
+++ b/swh/storage/filter.py
@@ -4,7 +4,7 @@
# See top-level LICENSE file for more information
-from typing import Dict, Generator, Sequence, Set
+from typing import Dict, Generator, Iterable, Set
from swh.storage import get_storage
@@ -27,22 +27,30 @@
def __init__(self, storage):
self.storage = get_storage(**storage)
self.objects_seen = {
- 'content': set(), # set of content hashes (sha256) seen
- 'directory': set(),
- 'revision': set(),
+ 'content': set(), # sha256
+ 'skipped_content': set(), # sha1_git
+ 'directory': set(), # sha1_git
+ 'revision': set(), # sha1_git
}
def __getattr__(self, key):
return getattr(self.storage, key)
- def content_add(self, content: Sequence[Dict]) -> Dict:
+ def content_add(self, content: Iterable[Dict]) -> Dict:
contents = list(content)
contents_to_add = self._filter_missing_contents(contents)
return self.storage.content_add(
x for x in contents if x['sha256'] in contents_to_add
)
- def directory_add(self, directories: Sequence[Dict]) -> Dict:
+ def skipped_content_add(self, content: Iterable[Dict]) -> Dict:
+ contents = list(content)
+ contents_to_add = self._filter_missing_skipped_contents(contents)
+ return self.storage.skipped_content_add(
+ x for x in contents if x['sha1_git'] in contents_to_add
+ )
+
+ def directory_add(self, directories: Iterable[Dict]) -> Dict:
directories = list(directories)
missing_ids = self._filter_missing_ids(
'directory',
@@ -63,7 +71,7 @@
)
def _filter_missing_contents(
- self, content_hashes: Sequence[Dict]) -> Set[bytes]:
+ self, content_hashes: Iterable[Dict]) -> Set[bytes]:
"""Return only the content keys missing from swh
Args:
@@ -84,6 +92,26 @@
key_hash='sha256',
))
+ def _filter_missing_skipped_contents(
+ self, content_hashes: Iterable[Dict]) -> Set[bytes]:
+ """Return only the content keys missing from swh
+
+ Args:
+ content_hashes: List of sha1_git to check for existence in swh
+ storage
+
+ """
+ objects_seen = self.objects_seen['skipped_content']
+ missing_hashes = []
+ for hashes in content_hashes:
+ if hashes['sha1_git'] in objects_seen:
+ continue
+ objects_seen.add(hashes['sha1_git'])
+ missing_hashes.append(hashes)
+
+ return {c['sha1_git']
+ for c in self.storage.skipped_content_missing(missing_hashes)}
+
def _filter_missing_ids(
self,
object_type: str,
@@ -92,7 +120,7 @@
Args:
object_type: object type to use {revision, directory}
- ids: Sequence of object_type ids
+ ids: Iterable of object_type ids
Returns:
Missing ids from the storage for object_type
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -19,8 +19,8 @@
import attr
from swh.model.model import (
- Content, Directory, Revision, Release, Snapshot, OriginVisit, Origin,
- SHA1_SIZE)
+ BaseContent, Content, SkippedContent, Directory, Revision, Release,
+ Snapshot, OriginVisit, Origin, SHA1_SIZE)
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
@@ -75,47 +75,26 @@
return True
def _content_add(self, contents, with_data):
- content_with_data = []
- content_without_data = []
for content in contents:
if content.status is None:
content.status = 'visible'
+ if content.status == 'absent':
+ raise ValueError('content with status=absent')
if content.length is None:
- content.length = -1
- if content.status != 'absent':
- if self._content_key(content) not in self._contents:
- content_with_data.append(content)
- else:
- if self._content_key(content) not in self._skipped_contents:
- content_without_data.append(content)
+ raise ValueError('content with length=None')
if self.journal_writer:
- for content in content_with_data:
+ for content in contents:
content = attr.evolve(content, data=None)
self.journal_writer.write_addition('content', content)
- for content in content_without_data:
- self.journal_writer.write_addition('content', content)
-
- count_content_added, count_content_bytes_added = \
- self._content_add_present(content_with_data, with_data)
-
- count_skipped_content_added = self._content_add_absent(
- content_without_data
- )
summary = {
- 'content:add': count_content_added,
- 'skipped_content:add': count_skipped_content_added,
+ 'content:add': 0,
}
if with_data:
- summary['content:add:bytes'] = count_content_bytes_added
+ summary['content:add:bytes'] = 0
- return summary
-
- def _content_add_present(self, contents, with_data):
- count_content_added = 0
- count_content_bytes_added = 0
for content in contents:
key = self._content_key(content)
if key in self._contents:
@@ -133,40 +112,21 @@
('content', content.sha1))
self._contents[key] = content
bisect.insort(self._sorted_sha1s, content.sha1)
- count_content_added += 1
+ summary['content:add'] += 1
if with_data:
content_data = self._contents[key].data
self._contents[key] = attr.evolve(
self._contents[key],
data=None)
- count_content_bytes_added += len(content_data)
+ summary['content:add:bytes'] += len(content_data)
self.objstorage.add(content_data, content.sha1)
- return (count_content_added, count_content_bytes_added)
-
- def _content_add_absent(self, contents):
- count = 0
- skipped_content_missing = self.skipped_content_missing(contents)
- for content in skipped_content_missing:
- key = self._content_key(content)
- for algo in DEFAULT_ALGORITHMS:
- self._skipped_content_indexes[algo][content.get_hash(algo)] \
- .add(key)
- self._skipped_contents[key] = content
- count += 1
-
- return count
-
- def _content_to_model(self, contents):
- for content in contents:
- content = content.copy()
- content.pop('origin', None)
- yield Content.from_dict(content)
+ return summary
def content_add(self, content):
now = datetime.datetime.now(tz=datetime.timezone.utc)
- content = [attr.evolve(c, ctime=now)
- for c in self._content_to_model(content)]
+ content = [attr.evolve(Content.from_dict(c), ctime=now)
+ for c in content]
return self._content_add(content, with_data=True)
def content_update(self, content, keys=[]):
@@ -194,7 +154,7 @@
self._content_indexes[algorithm][hash_].add(new_key)
def content_add_metadata(self, content):
- content = list(self._content_to_model(content))
+ content = [Content.from_dict(c) for c in content]
return self._content_add(content, with_data=False)
def content_get(self, content):
@@ -308,6 +268,39 @@
if content not in self._content_indexes['sha1_git']:
yield content
+ def content_get_random(self):
+ return random.choice(list(self._content_indexes['sha1_git']))
+
+ def _skipped_content_add(self, contents):
+ for content in contents:
+ if content.status is None:
+ content = attr.evolve(content, status='absent')
+ if content.length is None:
+ content = attr.evolve(content, length=-1)
+ if content.status != 'absent':
+ raise ValueError(f'Content with status={content.status}')
+
+ if self.journal_writer:
+ for content in contents:
+ self.journal_writer.write_addition('content', content)
+
+ summary = {
+ 'skipped_content:add': 0
+ }
+
+ skipped_content_missing = self.skipped_content_missing(
+ [c.to_dict() for c in contents])
+ for content in skipped_content_missing:
+ key = self._content_key(content, allow_missing=True)
+ for algo in DEFAULT_ALGORITHMS:
+ if algo in content:
+ self._skipped_content_indexes[algo][content[algo]] \
+ .add(key)
+ self._skipped_contents[key] = content
+ summary['skipped_content:add'] += 1
+
+ return summary
+
def skipped_content_missing(self, contents):
for content in contents:
for (key, algorithm) in self._content_key_algorithm(content):
@@ -316,11 +309,17 @@
if key not in self._skipped_content_indexes[algorithm]:
# index must contain hashes of algos except blake2s256
# else the content is considered skipped
- yield content
+ yield {algo: content[algo]
+ for algo in DEFAULT_ALGORITHMS
+ if content[algo] is not None}
break
- def content_get_random(self):
- return random.choice(list(self._content_indexes['sha1_git']))
+ def skipped_content_add(self, content):
+ content = list(content)
+ now = datetime.datetime.now(tz=datetime.timezone.utc)
+ content = [attr.evolve(SkippedContent.from_dict(c), ctime=now)
+ for c in content]
+ return self._skipped_content_add(content)
def directory_add(self, directories):
directories = list(directories)
@@ -1021,15 +1020,15 @@
return person
@staticmethod
- def _content_key(content):
+ def _content_key(content, allow_missing=False):
"""A stable key for a content"""
- return tuple(getattr(content, key)
+ return tuple(getattr(content, key, None)
for key in sorted(DEFAULT_ALGORITHMS))
@staticmethod
def _content_key_algorithm(content):
""" A stable key and the algorithm for a content"""
- if isinstance(content, Content):
+ if isinstance(content, BaseContent):
content = content.to_dict()
return tuple((content.get(key), key)
for key in sorted(DEFAULT_ALGORITHMS))
diff --git a/swh/storage/interface.py b/swh/storage/interface.py
--- a/swh/storage/interface.py
+++ b/swh/storage/interface.py
@@ -29,14 +29,11 @@
following keys:
- data (bytes): the actual content
- - length (int): content length (default: -1)
+ - length (int): content length
- one key for each checksum algorithm in
:data:`swh.model.hashutil.ALGORITHMS`, mapped to the
corresponding checksum
- - status (str): one of visible, hidden, absent
- - reason (str): if status = absent, the reason why
- - origin (int): if status = absent, the origin we saw the
- content in
+ - status (str): one of visible, hidden
Raises:
@@ -50,11 +47,10 @@
Since additions to both idempotent, that should not be a problem.
Returns:
- Summary dict with the following key and associated values:
+ Summary dict with the following keys and associated values:
content:add: New contents added
content:add:bytes: Sum of the contents' length data
- skipped_content:add: New skipped contents (no data) added
"""
...
@@ -253,20 +249,6 @@
"""
...
- @remote_api_endpoint('content/skipped/missing')
- def skipped_content_missing(self, contents):
- """List skipped_content missing from storage
-
- Args:
- content: iterable of dictionaries containing the data for each
- checksum algorithm.
-
- Returns:
- iterable: missing signatures
-
- """
- ...
-
@remote_api_endpoint('content/present')
def content_find(self, content):
"""Find a content hash in db.
@@ -296,6 +278,57 @@
"""
...
+ @remote_api_endpoint('content/skipped/add')
+ def skipped_content_add(self, content):
+ """Add contents to the skipped_content list, which contains
+ (partial) information about content missing from the archive.
+
+ Args:
+ contents (iterable): iterable of dictionaries representing
+ individual pieces of content to add. Each dictionary has the
+ following keys:
+
+ - length (Optional[int]): content length (default: -1)
+ - one key for each checksum algorithm in
+ :data:`swh.model.hashutil.ALGORITHMS`, mapped to the
+ corresponding checksum; each is optional
+ - status (str): must be "absent"
+ - reason (str): the reason why the content is absent
+ - origin (int): if status = absent, the origin we saw the
+ content in
+
+ Raises:
+
+ The following exceptions can occur:
+
+ - HashCollision in case of collision
+ - Any other exceptions raise by the backend
+
+ In case of errors, some content may have been stored in
+ the DB and in the objstorage.
+ Since additions to both idempotent, that should not be a problem.
+
+ Returns:
+ Summary dict with the following key and associated values:
+
+ skipped_content:add: New skipped contents (no data) added
+ """
+ ...
+
+ @remote_api_endpoint('content/skipped/missing')
+ def skipped_content_missing(self, contents):
+ """List skipped_content missing from storage
+
+ Args:
+ content: iterable of dictionaries containing the data for each
+ checksum algorithm.
+
+ Returns:
+ iterable: missing signatures
+
+ """
+ ...
+
@remote_api_endpoint('directory/add')
def directory_add(self, directories):
"""Add directories to the storage
diff --git a/swh/storage/retry.py b/swh/storage/retry.py
--- a/swh/storage/retry.py
+++ b/swh/storage/retry.py
@@ -63,15 +63,20 @@
return getattr(self.storage, key)
@swh_retry
- def content_add(self, content: List[Dict]) -> Dict:
+ def content_add(self, content: Iterable[Dict]) -> Dict:
contents = list(content)
return self.storage.content_add(contents)
@swh_retry
- def content_add_metadata(self, content: List[Dict]) -> Dict:
+ def content_add_metadata(self, content: Iterable[Dict]) -> Dict:
contents = list(content)
return self.storage.content_add_metadata(contents)
+ @swh_retry
+ def skipped_content_add(self, content: Iterable[Dict]) -> Dict:
+ contents = list(content)
+ return self.storage.skipped_content_add(contents)
+
@swh_retry
def origin_add_one(self, origin: Dict) -> str:
return self.storage.origin_add_one(origin)
@@ -91,7 +96,7 @@
metadata=metadata, snapshot=snapshot)
@swh_retry
- def tool_add(self, tools: List[Dict]) -> List[Dict]:
+ def tool_add(self, tools: Iterable[Dict]) -> List[Dict]:
tools = list(tools)
return self.storage.tool_add(tools)
@@ -110,22 +115,22 @@
origin_url, ts, provider_id, tool_id, metadata)
@swh_retry
- def directory_add(self, directories: List[Dict]) -> Dict:
+ def directory_add(self, directories: Iterable[Dict]) -> Dict:
directories = list(directories)
return self.storage.directory_add(directories)
@swh_retry
- def revision_add(self, revisions: List[Dict]) -> Dict:
+ def revision_add(self, revisions: Iterable[Dict]) -> Dict:
revisions = list(revisions)
return self.storage.revision_add(revisions)
@swh_retry
- def release_add(self, releases: List[Dict]) -> Dict:
+ def release_add(self, releases: Iterable[Dict]) -> Dict:
releases = list(releases)
return self.storage.release_add(releases)
@swh_retry
- def snapshot_add(self, snapshot: List[Dict]) -> Dict:
+ def snapshot_add(self, snapshot: Iterable[Dict]) -> Dict:
snapshots = list(snapshot)
return self.storage.snapshot_add(snapshots)
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -128,106 +128,54 @@
return tuple([hash[k] for k in keys])
@staticmethod
- def _normalize_content(d):
+ def _content_normalize(d):
d = d.copy()
if 'status' not in d:
d['status'] = 'visible'
- if 'length' not in d:
- d['length'] = -1
-
return d
@staticmethod
- def _validate_content(d):
+ def _content_validate(d):
"""Sanity checks on status / reason / length, that postgresql
doesn't enforce."""
- if d['status'] not in ('visible', 'absent', 'hidden'):
+ if d['status'] not in ('visible', 'hidden'):
raise ValueError('Invalid content status: {}'.format(d['status']))
- if d['status'] != 'absent' and d.get('reason') is not None:
+ if d.get('reason') is not None:
raise ValueError(
- 'Must not provide a reason if content is not absent.')
+ 'Must not provide a reason if content is present.')
- if d['length'] < -1:
- raise ValueError('Content length must be positive or -1.')
+ if d['length'] is None or d['length'] < 0:
+ raise ValueError('Content length must be positive.')
- def _filter_new_content(self, content, db=None, cur=None):
- """Sort contents into buckets 'with data' and 'without data',
- and filter out those already in the database."""
- content_by_status = defaultdict(list)
- for d in content:
- content_by_status[d['status']].append(d)
-
- content_with_data = content_by_status['visible'] \
- + content_by_status['hidden']
- content_without_data = content_by_status['absent']
-
- missing_content = set(self.content_missing(content_with_data,
- db=db, cur=cur))
- missing_skipped = set(self._content_unique_key(hashes, db)
- for hashes in self.skipped_content_missing(
- content_without_data, db=db, cur=cur))
-
- content_with_data = [
- cont for cont in content_with_data
- if cont['sha1'] in missing_content]
- content_without_data = [
- cont for cont in content_without_data
- if self._content_unique_key(cont, db) in missing_skipped]
-
- summary = {
- 'content:add': len(missing_content),
- 'skipped_content:add': len(missing_skipped),
- }
-
- return (content_with_data, content_without_data, summary)
-
- def _content_add_metadata(self, db, cur,
- content_with_data, content_without_data):
+ def _content_add_metadata(self, db, cur, content):
"""Add content to the postgresql database but not the object storage.
"""
- if content_with_data:
- # create temporary table for metadata injection
- db.mktemp('content', cur)
+ # create temporary table for metadata injection
+ db.mktemp('content', cur)
- db.copy_to(content_with_data, 'tmp_content',
- db.content_add_keys, cur)
+ db.copy_to(content, 'tmp_content',
+ db.content_add_keys, cur)
- # move metadata in place
- try:
- db.content_add_from_temp(cur)
- except psycopg2.IntegrityError as e:
- from . import HashCollision
- if e.diag.sqlstate == '23505' and \
- e.diag.table_name == 'content':
- constraint_to_hash_name = {
- 'content_pkey': 'sha1',
- 'content_sha1_git_idx': 'sha1_git',
- 'content_sha256_idx': 'sha256',
- }
- colliding_hash_name = constraint_to_hash_name \
- .get(e.diag.constraint_name)
- raise HashCollision(colliding_hash_name) from None
- else:
- raise
-
- if content_without_data:
- content_without_data = \
- [cont.copy() for cont in content_without_data]
- origin_ids = db.origin_id_get_by_url(
- [cont.get('origin') for cont in content_without_data],
- cur=cur)
- for (cont, origin_id) in zip(content_without_data, origin_ids):
- if 'origin' in cont:
- cont['origin'] = origin_id
- db.mktemp('skipped_content', cur)
- db.copy_to(content_without_data, 'tmp_skipped_content',
- db.skipped_content_keys, cur)
-
- # move metadata in place
- db.skipped_content_add_from_temp(cur)
+ # move metadata in place
+ try:
+ db.content_add_from_temp(cur)
+ except psycopg2.IntegrityError as e:
+ from . import HashCollision
+ if e.diag.sqlstate == '23505' and \
+ e.diag.table_name == 'content':
+ constraint_to_hash_name = {
+ 'content_pkey': 'sha1',
+ 'content_sha1_git_idx': 'sha1_git',
+ 'content_sha256_idx': 'sha256',
+ }
+ colliding_hash_name = constraint_to_hash_name \
+ .get(e.diag.constraint_name)
+ raise HashCollision(colliding_hash_name) from None
+ else:
+ raise
@timed
@process_metrics
@@ -238,21 +186,19 @@
for item in content:
item['ctime'] = now
- content = [self._normalize_content(c) for c in content]
+ content = [self._content_normalize(c) for c in content]
for c in content:
- self._validate_content(c)
+ self._content_validate(c)
- (content_with_data, content_without_data, summary) = \
- self._filter_new_content(content, db, cur)
+ missing = list(self.content_missing(content, key_hash='sha1_git'))
+ content = [c for c in content if c['sha1_git'] in missing]
if self.journal_writer:
- for item in content_with_data:
+ for item in content:
if 'data' in item:
item = item.copy()
del item['data']
self.journal_writer.write_addition('content', item)
- for item in content_without_data:
- self.journal_writer.write_addition('content', item)
def add_to_objstorage():
"""Add to objstorage the new missing_content
@@ -264,7 +210,7 @@
"""
content_bytes_added = 0
data = {}
- for cont in content_with_data:
+ for cont in content:
if cont['sha1'] not in data:
data[cont['sha1']] = cont['data']
content_bytes_added += max(0, cont['length'])
@@ -278,15 +224,16 @@
with ThreadPoolExecutor(max_workers=1) as executor:
added_to_objstorage = executor.submit(add_to_objstorage)
- self._content_add_metadata(
- db, cur, content_with_data, content_without_data)
+ self._content_add_metadata(db, cur, content)
# Wait for objstorage addition before returning from the
# transaction, bubbling up any exception
content_bytes_added = added_to_objstorage.result()
- summary['content:add:bytes'] = content_bytes_added
- return summary
+ return {
+ 'content:add': len(content),
+ 'content:add:bytes': content_bytes_added,
+ }
@timed
@db_transaction()
@@ -308,23 +255,23 @@
@process_metrics
@db_transaction()
def content_add_metadata(self, content, db=None, cur=None):
- content = [self._normalize_content(c) for c in content]
+ content = [self._content_normalize(c) for c in content]
for c in content:
- self._validate_content(c)
+ self._content_validate(c)
- (content_with_data, content_without_data, summary) = \
- self._filter_new_content(content, db, cur)
+ missing = self.content_missing(content, key_hash='sha1_git')
+ content = [c for c in content if c['sha1_git'] in missing]
if self.journal_writer:
- for item in itertools.chain(content_with_data,
- content_without_data):
+ for item in itertools.chain(content):
assert 'data' not in content
self.journal_writer.write_addition('content', item)
- self._content_add_metadata(
- db, cur, content_with_data, content_without_data)
+ self._content_add_metadata(db, cur, content)
- return summary
+ return {
+ 'content:add': len(content),
+ }
@timed
def content_get(self, content):
@@ -423,12 +370,6 @@
for obj in db.content_missing_per_sha1_git(contents, cur):
yield obj[0]
- @timed
- @db_transaction_generator()
- def skipped_content_missing(self, contents, db=None, cur=None):
- for content in db.skipped_content_missing(contents, cur):
- yield dict(zip(db.content_hash_keys, content))
-
@timed
@db_transaction()
def content_find(self, content, db=None, cur=None):
@@ -449,6 +390,83 @@
def content_get_random(self, db=None, cur=None):
return db.content_get_random(cur)
+ @staticmethod
+ def _skipped_content_normalize(d):
+ d = d.copy()
+
+ if d.get('status') is None:
+ d['status'] = 'absent'
+
+ if d.get('length') is None:
+ d['length'] = -1
+
+ return d
+
+ @staticmethod
+ def _skipped_content_validate(d):
+ """Sanity checks on status / reason / length, that postgresql
+ doesn't enforce."""
+ if d['status'] != 'absent':
+ raise ValueError('Invalid content status: {}'.format(d['status']))
+
+ if d.get('reason') is None:
+ raise ValueError(
+ 'Must provide a reason if content is absent.')
+
+ if d['length'] < -1:
+ raise ValueError('Content length must be positive or -1.')
+
+ def _skipped_content_add_metadata(self, db, cur, content):
+ content = \
+ [cont.copy() for cont in content]
+ origin_ids = db.origin_id_get_by_url(
+ [cont.get('origin') for cont in content],
+ cur=cur)
+ for (cont, origin_id) in zip(content, origin_ids):
+ if 'origin' in cont:
+ cont['origin'] = origin_id
+ db.mktemp('skipped_content', cur)
+ db.copy_to(content, 'tmp_skipped_content',
+ db.skipped_content_keys, cur)
+
+ # move metadata in place
+ db.skipped_content_add_from_temp(cur)
+
+ @timed
+ @process_metrics
+ @db_transaction()
+ def skipped_content_add(self, content, db=None, cur=None):
+ content = [dict(c.items()) for c in content] # semi-shallow copy
+ now = datetime.datetime.now(tz=datetime.timezone.utc)
+ for item in content:
+ item['ctime'] = now
+
+ content = [self._skipped_content_normalize(c) for c in content]
+ for c in content:
+ self._skipped_content_validate(c)
+
+ missing_contents = self.skipped_content_missing(content)
+ content = [c for c in content
+ if any(all(c.get(algo) == missing_content.get(algo)
+ for algo in ALGORITHMS)
+ for missing_content in missing_contents)]
+
+ if self.journal_writer:
+ for item in content:
+ self.journal_writer.write_addition('content', item)
+
+ self._skipped_content_add_metadata(db, cur, content)
+
+ return {
+ 'skipped_content:add': len(content),
+ }
+
+ @timed
+ @db_transaction_generator()
+ def skipped_content_missing(self, contents, db=None, cur=None):
+ for content in db.skipped_content_missing(contents, cur):
+ yield dict(zip(db.content_hash_keys, content))
+
@timed
@process_metrics
@db_transaction()
diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py
--- a/swh/storage/tests/conftest.py
+++ b/swh/storage/tests/conftest.py
@@ -58,7 +58,10 @@
@pytest.fixture
def swh_contents(swh_storage):
contents = gen_contents(n=20)
- swh_storage.content_add(contents)
+ swh_storage.content_add(
+ [c for c in contents if c['status'] != 'absent'])
+ swh_storage.skipped_content_add(
+ [c for c in contents if c['status'] == 'absent'])
return contents
@@ -218,6 +221,7 @@
return {
'content': [data.cont, data.cont2],
'content_metadata': [data.cont3],
+ 'skipped_content': [data.skipped_cont, data.skipped_cont2],
'person': [data.person],
'directory': [data.dir2, data.dir],
'revision': [data.revision],
diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py
--- a/swh/storage/tests/test_buffer.py
+++ b/swh/storage/tests/test_buffer.py
@@ -27,7 +27,6 @@
assert s == {
'content:add': 1 + 1,
'content:add:bytes': contents[0]['length'] + contents[1]['length'],
- 'skipped_content:add': 0
}
missing_contents = storage.content_missing(
@@ -48,7 +47,6 @@
assert s == {
'content:add': 1,
'content:add:bytes': contents[0]['length'],
- 'skipped_content:add': 0
}
missing_contents = storage.content_missing([contents[0]])
@@ -75,7 +73,6 @@
assert s == {
'content:add': 1,
'content:add:bytes': contents[0]['length'],
- 'skipped_content:add': 0
}
missing_contents = storage.content_missing([contents[0]])
@@ -85,6 +82,55 @@
assert s == {}
+def test_buffering_proxy_storage_skipped_content_threshold_not_hit(
+ sample_data):
+ contents = sample_data['skipped_content']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory'},
+ min_batch_size={
+ 'skipped_content': 10,
+ }
+ )
+ s = storage.skipped_content_add([contents[0], contents[1]])
+ assert s == {}
+
+ # contents have not been written to storage
+ missing_contents = storage.skipped_content_missing(
+ [contents[0], contents[1]])
+ assert {c['sha1'] for c in missing_contents} \
+ == {c['sha1'] for c in contents}
+
+ s = storage.flush()
+ assert s == {
+ 'skipped_content:add': 1 + 1
+ }
+
+ missing_contents = storage.skipped_content_missing(
+ [contents[0], contents[1]])
+ assert list(missing_contents) == []
+
+
+def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data):
+ contents = sample_data['skipped_content']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory'},
+ min_batch_size={
+ 'skipped_content': 1,
+ }
+ )
+
+ s = storage.skipped_content_add([contents[0]])
+ assert s == {
+ 'skipped_content:add': 1
+ }
+
+ missing_contents = storage.skipped_content_missing([contents[0]])
+ assert list(missing_contents) == []
+
+ s = storage.flush()
+ assert s == {}
+
+
def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data):
directories = sample_data['directory']
storage = BufferingProxyStorage(
diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py
--- a/swh/storage/tests/test_cassandra.py
+++ b/swh/storage/tests/test_cassandra.py
@@ -177,11 +177,6 @@
def test_content_update(self):
pass
- @pytest.mark.skip(
- 'not implemented, see https://forge.softwareheritage.org/T1633')
- def test_skipped_content_add(self):
- pass
-
@pytest.mark.skip(
'The "person" table of the pgsql is a legacy thing, and not '
'supported by the cassandra backend.')
diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py
--- a/swh/storage/tests/test_filter.py
+++ b/swh/storage/tests/test_filter.py
@@ -18,7 +18,6 @@
assert s == {
'content:add': 1,
'content:add:bytes': sample_content['length'],
- 'skipped_content:add': 0
}
content = next(storage.content_get([sample_content['sha1']]))
@@ -28,7 +27,27 @@
assert s == {
'content:add': 0,
'content:add:bytes': 0,
- 'skipped_content:add': 0
+ }
+
+
+def test_filtering_proxy_storage_skipped_content(sample_data):
+ sample_content = sample_data['skipped_content'][0]
+ storage = FilteringProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.skipped_content_missing([sample_content]))
+ assert content['sha1'] == sample_content['sha1']
+
+ s = storage.skipped_content_add([sample_content])
+ assert s == {
+ 'skipped_content:add': 1,
+ }
+
+ content = list(storage.skipped_content_missing([sample_content]))
+ assert content == []
+
+ s = storage.skipped_content_add([sample_content])
+ assert s == {
+ 'skipped_content:add': 0,
}
diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py
--- a/swh/storage/tests/test_retry.py
+++ b/swh/storage/tests/test_retry.py
@@ -32,7 +32,6 @@
assert s == {
'content:add': 1,
'content:add:bytes': sample_content['length'],
- 'skipped_content:add': 0
}
content = next(swh_storage.content_get([sample_content['sha1']]))
@@ -103,7 +102,6 @@
s = swh_storage.content_add_metadata([sample_content])
assert s == {
'content:add': 1,
- 'skipped_content:add': 0
}
content_metadata = swh_storage.content_get_metadata([pk])
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -141,7 +141,6 @@
assert actual_result == {
'content:add': 1,
'content:add:bytes': cont['length'],
- 'skipped_content:add': 0
}
assert list(swh_storage.content_get([cont['sha1']])) == \
@@ -168,7 +167,6 @@
assert actual_result == {
'content:add': 1,
'content:add:bytes': data.cont['length'],
- 'skipped_content:add': 0
}
swh_storage.refresh_stat_counters()
@@ -177,25 +175,35 @@
def test_content_add_validation(self, swh_storage):
cont = data.cont
+ with pytest.raises(ValueError, match='status'):
+ swh_storage.content_add([{**cont, 'status': 'absent'}])
+
with pytest.raises(ValueError, match='status'):
swh_storage.content_add([{**cont, 'status': 'foobar'}])
with pytest.raises(ValueError, match="(?i)length"):
swh_storage.content_add([{**cont, 'length': -2}])
+ with pytest.raises(
+ (ValueError, TypeError),
+ match="reason"):
+ swh_storage.content_add([{**cont, 'reason': 'foobar'}])
+
+ def test_skipped_content_add_validation(self, swh_storage):
+ cont = data.cont.copy()
+ del cont['data']
+
+ with pytest.raises(ValueError, match='status'):
+ swh_storage.skipped_content_add([{**cont, 'status': 'visible'}])
+
with pytest.raises((ValueError, psycopg2.IntegrityError),
match='reason') as cm:
- swh_storage.content_add([{**cont, 'status': 'absent'}])
+ swh_storage.skipped_content_add([{**cont, 'status': 'absent'}])
if type(cm.value) == psycopg2.IntegrityError:
assert cm.exception.pgcode == \
psycopg2.errorcodes.NOT_NULL_VIOLATION
- with pytest.raises(
- ValueError,
- match="^Must not provide a reason if content is not absent.$"):
- swh_storage.content_add([{**cont, 'reason': 'foobar'}])
-
def test_content_get_missing(self, swh_storage):
cont = data.cont
@@ -225,7 +233,6 @@
assert actual_result == {
'content:add': 2,
'content:add:bytes': cont['length'] + cont2['length'],
- 'skipped_content:add': 0
}
def test_content_add_twice(self, swh_storage):
@@ -233,7 +240,6 @@
assert actual_result == {
'content:add': 1,
'content:add:bytes': data.cont['length'],
- 'skipped_content:add': 0
}
assert len(swh_storage.journal_writer.objects) == 1
@@ -241,9 +247,8 @@
assert actual_result == {
'content:add': 1,
'content:add:bytes': data.cont2['length'],
- 'skipped_content:add': 0
}
- assert len(swh_storage.journal_writer.objects) == 2
+ assert 2 <= len(swh_storage.journal_writer.objects) <= 3
assert len(swh_storage.content_find(data.cont)) == 1
assert len(swh_storage.content_find(data.cont2)) == 1
@@ -286,7 +291,6 @@
actual_result = swh_storage.content_add_metadata([cont])
assert actual_result == {
'content:add': 1,
- 'skipped_content:add': 0
}
expected_cont = cont.copy()
@@ -308,7 +312,6 @@
actual_result = swh_storage.content_add_metadata([cont, cont2])
assert actual_result == {
'content:add': 2,
- 'skipped_content:add': 0
}
def test_content_add_metadata_collision(self, swh_storage):
@@ -336,13 +339,10 @@
assert len(missing) == 2
- actual_result = swh_storage.content_add([cont, cont, cont2])
+ actual_result = swh_storage.skipped_content_add([cont, cont, cont2])
- assert actual_result == {
- 'content:add': 0,
- 'content:add:bytes': 0,
- 'skipped_content:add': 2,
- }
+ assert 2 <= actual_result.pop('skipped_content:add') <= 3
+ assert actual_result == {}
missing = list(swh_storage.skipped_content_missing([cont, cont2]))
@@ -3618,6 +3618,8 @@
swh_storage.origin_visit_add(
origin, obj['date'], obj['type'])
else:
+ if obj_type == 'content' and obj['status'] == 'absent':
+ obj_type = 'skipped_content'
method = getattr(swh_storage, obj_type + '_add')
try:
method([obj])
@@ -3727,7 +3729,6 @@
assert actual_result == {
'content:add': 1,
'content:add:bytes': cont['length'],
- 'skipped_content:add': 0
}
if hasattr(swh_storage, 'objstorage'):
@@ -3758,7 +3759,6 @@
assert actual_result == {
'content:add': 1,
- 'skipped_content:add': 0
}
if hasattr(swh_storage, 'objstorage'):
@@ -3778,13 +3778,10 @@
cont2 = data.skipped_cont2
cont2['blake2s256'] = None
- actual_result = swh_storage.content_add([cont, cont, cont2])
+ actual_result = swh_storage.skipped_content_add([cont, cont, cont2])
- assert actual_result == {
- 'content:add': 0,
- 'content:add:bytes': 0,
- 'skipped_content:add': 2,
- }
+ assert 2 <= actual_result.pop('skipped_content:add') <= 3
+ assert actual_result == {}
with db_transaction(swh_storage) as (_, cur):
cur.execute('SELECT sha1, sha1_git, sha256, blake2s256, '
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Dec 18, 3:07 AM (1 d, 21 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219023
Attached To
D2626: Split 'content_add' method into 'content_add' and 'skipped_content_add'.
Event Timeline
Log In to Comment