Page MenuHomeSoftware Heritage

D2626.diff
No OneTemporary

D2626.diff

diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py
--- a/swh/storage/buffer.py
+++ b/swh/storage/buffer.py
@@ -28,6 +28,7 @@
min_batch_size:
content: 10000
content_bytes: 100000000
+ skipped_content: 10000
directory: 5000
revision: 1000
release: 10000
@@ -43,16 +44,18 @@
'content': min_batch_size.get('content', 10000),
'content_bytes': min_batch_size.get('content_bytes',
100*1024*1024),
+ 'skipped_content': min_batch_size.get('skipped_content', 10000),
'directory': min_batch_size.get('directory', 25000),
'revision': min_batch_size.get('revision', 100000),
'release': min_batch_size.get('release', 100000),
}
- self.object_types = ['content', 'directory', 'revision', 'release']
+ self.object_types = [
+ 'content', 'skipped_content', 'directory', 'revision', 'release']
self._objects = {k: deque() for k in self.object_types}
def __getattr__(self, key):
if key.endswith('_add'):
- object_type = key.split('_')[0]
+ object_type = key.rsplit('_', 1)[0]
if object_type in self.object_types:
return partial(
self.object_add, object_type=object_type
diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -19,6 +19,7 @@
from swh.model.model import (
Sha1Git, TimestampWithTimezone, Timestamp, Person, Content,
+ SkippedContent,
)
from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url
@@ -167,6 +168,10 @@
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length',
'ctime', 'status']
+ @_prepared_insert_statement('content', _content_keys)
+ def content_add_one(self, content, *, statement) -> None:
+ self._add_one(statement, 'content', content, self._content_keys)
+
@_prepared_statement('SELECT * FROM content WHERE ' +
' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS)))
def content_get_from_pk(
@@ -203,10 +208,6 @@
self, ids: List[bytes], *, statement) -> List[bytes]:
return self._missing(statement, ids)
- @_prepared_insert_statement('content', _content_keys)
- def content_add_one(self, content, *, statement) -> None:
- self._add_one(statement, 'content', content, self._content_keys)
-
def content_index_add_one(self, main_algo: str, content: Content) -> None:
query = 'INSERT INTO content_by_{algo} ({cols}) VALUES ({values})' \
.format(algo=main_algo, cols=', '.join(self._content_pk),
@@ -221,6 +222,60 @@
algo=algo)
return list(self._execute_with_retries(query, [hash_]))
+ ##########################
+ # 'skipped_content' table
+ ##########################
+
+ _skipped_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256']
+ _skipped_content_keys = [
+ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length',
+ 'ctime', 'status', 'reason', 'origin']
+ _magic_null_pk = b''
+ """
+ NULLs are not allowed in primary keys; instead use an empty
+ value
+ """
+
+ @_prepared_insert_statement('skipped_content', _skipped_content_keys)
+ def skipped_content_add_one(self, content, *, statement) -> None:
+ content = content.to_dict()
+ for key in self._skipped_content_pk:
+ if content[key] is None:
+ content[key] = self._magic_null_pk
+ content = SkippedContent.from_dict(content)
+ self._add_one(statement, 'skipped_content', content,
+ self._skipped_content_keys)
+
+ @_prepared_statement('SELECT * FROM skipped_content WHERE ' +
+ ' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS)))
+ def skipped_content_get_from_pk(
+ self, content_hashes: Dict[str, bytes], *, statement
+ ) -> Optional[Row]:
+ rows = list(self._execute_with_retries(
+ statement, [content_hashes[algo] or self._magic_null_pk
+ for algo in HASH_ALGORITHMS]))
+ assert len(rows) <= 1
+ if rows:
+ # TODO: convert _magic_null_pk back to None?
+ return rows[0]
+ else:
+ return None
+
+ ##########################
+ # 'skipped_content_by_*' tables
+ ##########################
+
+ def skipped_content_index_add_one(
+ self, main_algo: str, content: Content) -> None:
+ assert content.get_hash(main_algo) is not None
+ query = ('INSERT INTO skipped_content_by_{algo} ({cols}) '
+ 'VALUES ({values})').format(
+ algo=main_algo, cols=', '.join(self._content_pk),
+ values=', '.join('%s' for _ in self._content_pk))
+ self._execute_with_retries(
+ query, [content.get_hash(algo) or self._magic_null_pk
+ for algo in self._content_pk])
+
##########################
# 'revision' table
##########################
diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py
--- a/swh/storage/cassandra/schema.py
+++ b/swh/storage/cassandra/schema.py
@@ -65,6 +65,20 @@
PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256))
);
+CREATE TABLE IF NOT EXISTS skipped_content (
+ sha1 blob,
+ sha1_git blob,
+ sha256 blob,
+ blake2s256 blob,
+ length bigint,
+ ctime timestamp,
+ -- creation time, i.e. time of (first) injection into the storage
+ status ascii,
+ reason text,
+ origin text,
+ PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256))
+);
+
CREATE TABLE IF NOT EXISTS revision (
id blob PRIMARY KEY,
date microtimestamp_with_timezone,
@@ -184,19 +198,29 @@
sha256 blob,
blake2s256 blob,
PRIMARY KEY (({main_algo}), {other_algos})
-);'''
+);
+
+CREATE TABLE IF NOT EXISTS skipped_content_by_{main_algo} (
+ sha1 blob,
+ sha1_git blob,
+ sha256 blob,
+ blake2s256 blob,
+ PRIMARY KEY (({main_algo}), {other_algos})
+);
+'''
-TABLES = ('content revision revision_parent release directory '
- 'directory_entry snapshot snapshot_branch origin_visit '
- 'origin tool_by_uuid tool object_count').split()
+TABLES = ('skipped_content content revision revision_parent release '
+ 'directory directory_entry snapshot snapshot_branch '
+ 'origin_visit origin tool_by_uuid tool object_count').split()
HASH_ALGORITHMS = ['sha1', 'sha1_git', 'sha256', 'blake2s256']
for main_algo in HASH_ALGORITHMS:
- CREATE_TABLES_QUERIES.append(CONTENT_INDEX_TEMPLATE.format(
+ CREATE_TABLES_QUERIES.extend(CONTENT_INDEX_TEMPLATE.format(
main_algo=main_algo,
other_algos=', '.join(
[algo for algo in HASH_ALGORITHMS if algo != main_algo])
- ))
+ ).split('\n\n'))
TABLES.append('content_by_%s' % main_algo)
+ TABLES.append('skipped_content_by_%s' % main_algo)
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -14,7 +14,8 @@
import dateutil
from swh.model.model import (
- Revision, Release, Directory, DirectoryEntry, Content, OriginVisit,
+ Revision, Release, Directory, DirectoryEntry, Content, SkippedContent,
+ OriginVisit,
)
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
@@ -116,7 +117,6 @@
summary = {
'content:add': count_content_added,
- 'skipped_content:add': count_contents - count_content_added,
}
if with_data:
@@ -208,10 +208,6 @@
result[content_metadata['sha1']].append(content_metadata)
return result
- def skipped_content_missing(self, contents):
- # TODO
- raise NotImplementedError('not yet supported for Cassandra')
-
def content_find(self, content):
# Find an algorithm that is common to all the requested contents.
# It will be used to do an initial filtering efficiently.
@@ -263,6 +259,46 @@
def content_get_random(self):
return self._cql_runner.content_get_random().sha1_git
+ def _skipped_content_add(self, contents):
+ contents = [SkippedContent.from_dict(c) for c in contents]
+
+ # Filter-out content already in the database.
+ contents = [
+ c for c in contents
+ if not self._cql_runner.skipped_content_get_from_pk(c.to_dict())]
+
+ if self.journal_writer:
+ for content in contents:
+ content = content.to_dict()
+ if 'data' in content:
+ del content['data']
+ self.journal_writer.write_addition('content', content)
+
+ for content in contents:
+ # Add to index tables
+ for algo in HASH_ALGORITHMS:
+ if content.get_hash(algo) is not None:
+ self._cql_runner.skipped_content_index_add_one(
+ algo, content)
+
+ # Then to the main table
+ self._cql_runner.skipped_content_add_one(content)
+
+ return {
+ 'skipped_content:add': len(contents)
+ }
+
+ def skipped_content_add(self, content):
+ content = [c.copy() for c in content] # semi-shallow copy
+ for item in content:
+ item['ctime'] = now()
+ return self._skipped_content_add(content)
+
+ def skipped_content_missing(self, contents):
+ for content in contents:
+ if not self._cql_runner.skipped_content_get_from_pk(content):
+ yield content
+
def directory_add(self, directories):
directories = list(directories)
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -143,9 +143,9 @@
query = """SELECT * FROM (VALUES %s) AS t (%s)
WHERE not exists
(SELECT 1 FROM skipped_content s WHERE
- s.sha1 is not distinct from t.sha1 and
- s.sha1_git is not distinct from t.sha1_git and
- s.sha256 is not distinct from t.sha256);""" % \
+ s.sha1 is not distinct from t.sha1::sha1 and
+ s.sha1_git is not distinct from t.sha1_git::sha1 and
+ s.sha256 is not distinct from t.sha256::bytea);""" % \
((', '.join('%s' for _ in contents)),
', '.join(self.content_hash_keys))
cur.execute(query,
diff --git a/swh/storage/filter.py b/swh/storage/filter.py
--- a/swh/storage/filter.py
+++ b/swh/storage/filter.py
@@ -4,7 +4,7 @@
# See top-level LICENSE file for more information
-from typing import Dict, Generator, Sequence, Set
+from typing import Dict, Generator, Iterable, Set
from swh.storage import get_storage
@@ -27,22 +27,30 @@
def __init__(self, storage):
self.storage = get_storage(**storage)
self.objects_seen = {
- 'content': set(), # set of content hashes (sha256) seen
- 'directory': set(),
- 'revision': set(),
+ 'content': set(), # sha256
+ 'skipped_content': set(), # sha1_git
+ 'directory': set(), # sha1_git
+ 'revision': set(), # sha1_git
}
def __getattr__(self, key):
return getattr(self.storage, key)
- def content_add(self, content: Sequence[Dict]) -> Dict:
+ def content_add(self, content: Iterable[Dict]) -> Dict:
contents = list(content)
contents_to_add = self._filter_missing_contents(contents)
return self.storage.content_add(
x for x in contents if x['sha256'] in contents_to_add
)
- def directory_add(self, directories: Sequence[Dict]) -> Dict:
+ def skipped_content_add(self, content: Iterable[Dict]) -> Dict:
+ contents = list(content)
+ contents_to_add = self._filter_missing_skipped_contents(contents)
+ return self.storage.skipped_content_add(
+ x for x in contents if x['sha1_git'] in contents_to_add
+ )
+
+ def directory_add(self, directories: Iterable[Dict]) -> Dict:
directories = list(directories)
missing_ids = self._filter_missing_ids(
'directory',
@@ -63,7 +71,7 @@
)
def _filter_missing_contents(
- self, content_hashes: Sequence[Dict]) -> Set[bytes]:
+ self, content_hashes: Iterable[Dict]) -> Set[bytes]:
"""Return only the content keys missing from swh
Args:
@@ -84,6 +92,26 @@
key_hash='sha256',
))
+ def _filter_missing_skipped_contents(
+ self, content_hashes: Iterable[Dict]) -> Set[bytes]:
+ """Return only the content keys missing from swh
+
+ Args:
+ content_hashes: List of sha1_git to check for existence in swh
+ storage
+
+ """
+ objects_seen = self.objects_seen['skipped_content']
+ missing_hashes = []
+ for hashes in content_hashes:
+ if hashes['sha1_git'] in objects_seen:
+ continue
+ objects_seen.add(hashes['sha1_git'])
+ missing_hashes.append(hashes)
+
+ return {c['sha1_git']
+ for c in self.storage.skipped_content_missing(missing_hashes)}
+
def _filter_missing_ids(
self,
object_type: str,
@@ -92,7 +120,7 @@
Args:
object_type: object type to use {revision, directory}
- ids: Sequence of object_type ids
+ ids: Iterable of object_type ids
Returns:
Missing ids from the storage for object_type
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -19,8 +19,8 @@
import attr
from swh.model.model import (
- Content, Directory, Revision, Release, Snapshot, OriginVisit, Origin,
- SHA1_SIZE)
+ BaseContent, Content, SkippedContent, Directory, Revision, Release,
+ Snapshot, OriginVisit, Origin, SHA1_SIZE)
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
@@ -75,47 +75,26 @@
return True
def _content_add(self, contents, with_data):
- content_with_data = []
- content_without_data = []
for content in contents:
if content.status is None:
content.status = 'visible'
+ if content.status == 'absent':
+ raise ValueError('content with status=absent')
if content.length is None:
- content.length = -1
- if content.status != 'absent':
- if self._content_key(content) not in self._contents:
- content_with_data.append(content)
- else:
- if self._content_key(content) not in self._skipped_contents:
- content_without_data.append(content)
+ raise ValueError('content with length=None')
if self.journal_writer:
- for content in content_with_data:
+ for content in contents:
content = attr.evolve(content, data=None)
self.journal_writer.write_addition('content', content)
- for content in content_without_data:
- self.journal_writer.write_addition('content', content)
-
- count_content_added, count_content_bytes_added = \
- self._content_add_present(content_with_data, with_data)
-
- count_skipped_content_added = self._content_add_absent(
- content_without_data
- )
summary = {
- 'content:add': count_content_added,
- 'skipped_content:add': count_skipped_content_added,
+ 'content:add': 0,
}
if with_data:
- summary['content:add:bytes'] = count_content_bytes_added
+ summary['content:add:bytes'] = 0
- return summary
-
- def _content_add_present(self, contents, with_data):
- count_content_added = 0
- count_content_bytes_added = 0
for content in contents:
key = self._content_key(content)
if key in self._contents:
@@ -133,40 +112,21 @@
('content', content.sha1))
self._contents[key] = content
bisect.insort(self._sorted_sha1s, content.sha1)
- count_content_added += 1
+ summary['content:add'] += 1
if with_data:
content_data = self._contents[key].data
self._contents[key] = attr.evolve(
self._contents[key],
data=None)
- count_content_bytes_added += len(content_data)
+ summary['content:add:bytes'] += len(content_data)
self.objstorage.add(content_data, content.sha1)
- return (count_content_added, count_content_bytes_added)
-
- def _content_add_absent(self, contents):
- count = 0
- skipped_content_missing = self.skipped_content_missing(contents)
- for content in skipped_content_missing:
- key = self._content_key(content)
- for algo in DEFAULT_ALGORITHMS:
- self._skipped_content_indexes[algo][content.get_hash(algo)] \
- .add(key)
- self._skipped_contents[key] = content
- count += 1
-
- return count
-
- def _content_to_model(self, contents):
- for content in contents:
- content = content.copy()
- content.pop('origin', None)
- yield Content.from_dict(content)
+ return summary
def content_add(self, content):
now = datetime.datetime.now(tz=datetime.timezone.utc)
- content = [attr.evolve(c, ctime=now)
- for c in self._content_to_model(content)]
+ content = [attr.evolve(Content.from_dict(c), ctime=now)
+ for c in content]
return self._content_add(content, with_data=True)
def content_update(self, content, keys=[]):
@@ -194,7 +154,7 @@
self._content_indexes[algorithm][hash_].add(new_key)
def content_add_metadata(self, content):
- content = list(self._content_to_model(content))
+ content = [Content.from_dict(c) for c in content]
return self._content_add(content, with_data=False)
def content_get(self, content):
@@ -308,6 +268,39 @@
if content not in self._content_indexes['sha1_git']:
yield content
+ def content_get_random(self):
+ return random.choice(list(self._content_indexes['sha1_git']))
+
+ def _skipped_content_add(self, contents):
+ for content in contents:
+ if content.status is None:
+ content = attr.evolve(content, status='absent')
+ if content.length is None:
+ content = attr.evolve(content, length=-1)
+ if content.status != 'absent':
+ raise ValueError(f'Content with status={content.status}')
+
+ if self.journal_writer:
+ for content in contents:
+ self.journal_writer.write_addition('content', content)
+
+ summary = {
+ 'skipped_content:add': 0
+ }
+
+ skipped_content_missing = self.skipped_content_missing(
+ [c.to_dict() for c in contents])
+ for content in skipped_content_missing:
+ key = self._content_key(content, allow_missing=True)
+ for algo in DEFAULT_ALGORITHMS:
+ if algo in content:
+ self._skipped_content_indexes[algo][content[algo]] \
+ .add(key)
+ self._skipped_contents[key] = content
+ summary['skipped_content:add'] += 1
+
+ return summary
+
def skipped_content_missing(self, contents):
for content in contents:
for (key, algorithm) in self._content_key_algorithm(content):
@@ -316,11 +309,17 @@
if key not in self._skipped_content_indexes[algorithm]:
# index must contain hashes of algos except blake2s256
# else the content is considered skipped
- yield content
+ yield {algo: content[algo]
+ for algo in DEFAULT_ALGORITHMS
+ if content[algo] is not None}
break
- def content_get_random(self):
- return random.choice(list(self._content_indexes['sha1_git']))
+ def skipped_content_add(self, content):
+ content = list(content)
+ now = datetime.datetime.now(tz=datetime.timezone.utc)
+ content = [attr.evolve(SkippedContent.from_dict(c), ctime=now)
+ for c in content]
+ return self._skipped_content_add(content)
def directory_add(self, directories):
directories = list(directories)
@@ -1021,15 +1020,15 @@
return person
@staticmethod
- def _content_key(content):
+ def _content_key(content, allow_missing=False):
"""A stable key for a content"""
- return tuple(getattr(content, key)
+ return tuple(getattr(content, key, None)
for key in sorted(DEFAULT_ALGORITHMS))
@staticmethod
def _content_key_algorithm(content):
""" A stable key and the algorithm for a content"""
- if isinstance(content, Content):
+ if isinstance(content, BaseContent):
content = content.to_dict()
return tuple((content.get(key), key)
for key in sorted(DEFAULT_ALGORITHMS))
diff --git a/swh/storage/interface.py b/swh/storage/interface.py
--- a/swh/storage/interface.py
+++ b/swh/storage/interface.py
@@ -29,14 +29,11 @@
following keys:
- data (bytes): the actual content
- - length (int): content length (default: -1)
+ - length (int): content length
- one key for each checksum algorithm in
:data:`swh.model.hashutil.ALGORITHMS`, mapped to the
corresponding checksum
- - status (str): one of visible, hidden, absent
- - reason (str): if status = absent, the reason why
- - origin (int): if status = absent, the origin we saw the
- content in
+ - status (str): one of visible, hidden
Raises:
@@ -50,11 +47,10 @@
Since additions to both idempotent, that should not be a problem.
Returns:
- Summary dict with the following key and associated values:
+ Summary dict with the following keys and associated values:
content:add: New contents added
content:add:bytes: Sum of the contents' length data
- skipped_content:add: New skipped contents (no data) added
"""
...
@@ -253,20 +249,6 @@
"""
...
- @remote_api_endpoint('content/skipped/missing')
- def skipped_content_missing(self, contents):
- """List skipped_content missing from storage
-
- Args:
- content: iterable of dictionaries containing the data for each
- checksum algorithm.
-
- Returns:
- iterable: missing signatures
-
- """
- ...
-
@remote_api_endpoint('content/present')
def content_find(self, content):
"""Find a content hash in db.
@@ -296,6 +278,57 @@
"""
...
+ @remote_api_endpoint('content/skipped/add')
+ def skipped_content_add(self, content):
+ """Add contents to the skipped_content list, which contains
+ (partial) information about content missing from the archive.
+
+ Args:
+ contents (iterable): iterable of dictionaries representing
+ individual pieces of content to add. Each dictionary has the
+ following keys:
+
+ - length (Optional[int]): content length (default: -1)
+ - one key for each checksum algorithm in
+ :data:`swh.model.hashutil.ALGORITHMS`, mapped to the
+ corresponding checksum; each is optional
+ - status (str): must be "absent"
+ - reason (str): the reason why the content is absent
+ - origin (int): if status = absent, the origin we saw the
+ content in
+
+ Raises:
+
+ The following exceptions can occur:
+
+ - HashCollision in case of collision
+ - Any other exceptions raise by the backend
+
+ In case of errors, some content may have been stored in
+ the DB and in the objstorage.
+ Since additions to both idempotent, that should not be a problem.
+
+ Returns:
+ Summary dict with the following key and associated values:
+
+ skipped_content:add: New skipped contents (no data) added
+ """
+ ...
+
+ @remote_api_endpoint('content/skipped/missing')
+ def skipped_content_missing(self, contents):
+ """List skipped_content missing from storage
+
+ Args:
+ content: iterable of dictionaries containing the data for each
+ checksum algorithm.
+
+ Returns:
+ iterable: missing signatures
+
+ """
+ ...
+
@remote_api_endpoint('directory/add')
def directory_add(self, directories):
"""Add directories to the storage
diff --git a/swh/storage/retry.py b/swh/storage/retry.py
--- a/swh/storage/retry.py
+++ b/swh/storage/retry.py
@@ -63,15 +63,20 @@
return getattr(self.storage, key)
@swh_retry
- def content_add(self, content: List[Dict]) -> Dict:
+ def content_add(self, content: Iterable[Dict]) -> Dict:
contents = list(content)
return self.storage.content_add(contents)
@swh_retry
- def content_add_metadata(self, content: List[Dict]) -> Dict:
+ def content_add_metadata(self, content: Iterable[Dict]) -> Dict:
contents = list(content)
return self.storage.content_add_metadata(contents)
+ @swh_retry
+ def skipped_content_add(self, content: Iterable[Dict]) -> Dict:
+ contents = list(content)
+ return self.storage.skipped_content_add(contents)
+
@swh_retry
def origin_add_one(self, origin: Dict) -> str:
return self.storage.origin_add_one(origin)
@@ -91,7 +96,7 @@
metadata=metadata, snapshot=snapshot)
@swh_retry
- def tool_add(self, tools: List[Dict]) -> List[Dict]:
+ def tool_add(self, tools: Iterable[Dict]) -> List[Dict]:
tools = list(tools)
return self.storage.tool_add(tools)
@@ -110,22 +115,22 @@
origin_url, ts, provider_id, tool_id, metadata)
@swh_retry
- def directory_add(self, directories: List[Dict]) -> Dict:
+ def directory_add(self, directories: Iterable[Dict]) -> Dict:
directories = list(directories)
return self.storage.directory_add(directories)
@swh_retry
- def revision_add(self, revisions: List[Dict]) -> Dict:
+ def revision_add(self, revisions: Iterable[Dict]) -> Dict:
revisions = list(revisions)
return self.storage.revision_add(revisions)
@swh_retry
- def release_add(self, releases: List[Dict]) -> Dict:
+ def release_add(self, releases: Iterable[Dict]) -> Dict:
releases = list(releases)
return self.storage.release_add(releases)
@swh_retry
- def snapshot_add(self, snapshot: List[Dict]) -> Dict:
+ def snapshot_add(self, snapshot: Iterable[Dict]) -> Dict:
snapshots = list(snapshot)
return self.storage.snapshot_add(snapshots)
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -128,106 +128,54 @@
return tuple([hash[k] for k in keys])
@staticmethod
- def _normalize_content(d):
+ def _content_normalize(d):
d = d.copy()
if 'status' not in d:
d['status'] = 'visible'
- if 'length' not in d:
- d['length'] = -1
-
return d
@staticmethod
- def _validate_content(d):
+ def _content_validate(d):
"""Sanity checks on status / reason / length, that postgresql
doesn't enforce."""
- if d['status'] not in ('visible', 'absent', 'hidden'):
+ if d['status'] not in ('visible', 'hidden'):
raise ValueError('Invalid content status: {}'.format(d['status']))
- if d['status'] != 'absent' and d.get('reason') is not None:
+ if d.get('reason') is not None:
raise ValueError(
- 'Must not provide a reason if content is not absent.')
+ 'Must not provide a reason if content is present.')
- if d['length'] < -1:
- raise ValueError('Content length must be positive or -1.')
+ if d['length'] is None or d['length'] < 0:
+ raise ValueError('Content length must be positive.')
- def _filter_new_content(self, content, db=None, cur=None):
- """Sort contents into buckets 'with data' and 'without data',
- and filter out those already in the database."""
- content_by_status = defaultdict(list)
- for d in content:
- content_by_status[d['status']].append(d)
-
- content_with_data = content_by_status['visible'] \
- + content_by_status['hidden']
- content_without_data = content_by_status['absent']
-
- missing_content = set(self.content_missing(content_with_data,
- db=db, cur=cur))
- missing_skipped = set(self._content_unique_key(hashes, db)
- for hashes in self.skipped_content_missing(
- content_without_data, db=db, cur=cur))
-
- content_with_data = [
- cont for cont in content_with_data
- if cont['sha1'] in missing_content]
- content_without_data = [
- cont for cont in content_without_data
- if self._content_unique_key(cont, db) in missing_skipped]
-
- summary = {
- 'content:add': len(missing_content),
- 'skipped_content:add': len(missing_skipped),
- }
-
- return (content_with_data, content_without_data, summary)
-
- def _content_add_metadata(self, db, cur,
- content_with_data, content_without_data):
+ def _content_add_metadata(self, db, cur, content):
"""Add content to the postgresql database but not the object storage.
"""
- if content_with_data:
- # create temporary table for metadata injection
- db.mktemp('content', cur)
+ # create temporary table for metadata injection
+ db.mktemp('content', cur)
- db.copy_to(content_with_data, 'tmp_content',
- db.content_add_keys, cur)
+ db.copy_to(content, 'tmp_content',
+ db.content_add_keys, cur)
- # move metadata in place
- try:
- db.content_add_from_temp(cur)
- except psycopg2.IntegrityError as e:
- from . import HashCollision
- if e.diag.sqlstate == '23505' and \
- e.diag.table_name == 'content':
- constraint_to_hash_name = {
- 'content_pkey': 'sha1',
- 'content_sha1_git_idx': 'sha1_git',
- 'content_sha256_idx': 'sha256',
- }
- colliding_hash_name = constraint_to_hash_name \
- .get(e.diag.constraint_name)
- raise HashCollision(colliding_hash_name) from None
- else:
- raise
-
- if content_without_data:
- content_without_data = \
- [cont.copy() for cont in content_without_data]
- origin_ids = db.origin_id_get_by_url(
- [cont.get('origin') for cont in content_without_data],
- cur=cur)
- for (cont, origin_id) in zip(content_without_data, origin_ids):
- if 'origin' in cont:
- cont['origin'] = origin_id
- db.mktemp('skipped_content', cur)
- db.copy_to(content_without_data, 'tmp_skipped_content',
- db.skipped_content_keys, cur)
-
- # move metadata in place
- db.skipped_content_add_from_temp(cur)
+ # move metadata in place
+ try:
+ db.content_add_from_temp(cur)
+ except psycopg2.IntegrityError as e:
+ from . import HashCollision
+ if e.diag.sqlstate == '23505' and \
+ e.diag.table_name == 'content':
+ constraint_to_hash_name = {
+ 'content_pkey': 'sha1',
+ 'content_sha1_git_idx': 'sha1_git',
+ 'content_sha256_idx': 'sha256',
+ }
+ colliding_hash_name = constraint_to_hash_name \
+ .get(e.diag.constraint_name)
+ raise HashCollision(colliding_hash_name) from None
+ else:
+ raise
@timed
@process_metrics
@@ -238,21 +186,19 @@
for item in content:
item['ctime'] = now
- content = [self._normalize_content(c) for c in content]
+ content = [self._content_normalize(c) for c in content]
for c in content:
- self._validate_content(c)
+ self._content_validate(c)
- (content_with_data, content_without_data, summary) = \
- self._filter_new_content(content, db, cur)
+ missing = list(self.content_missing(content, key_hash='sha1_git'))
+ content = [c for c in content if c['sha1_git'] in missing]
if self.journal_writer:
- for item in content_with_data:
+ for item in content:
if 'data' in item:
item = item.copy()
del item['data']
self.journal_writer.write_addition('content', item)
- for item in content_without_data:
- self.journal_writer.write_addition('content', item)
def add_to_objstorage():
"""Add to objstorage the new missing_content
@@ -264,7 +210,7 @@
"""
content_bytes_added = 0
data = {}
- for cont in content_with_data:
+ for cont in content:
if cont['sha1'] not in data:
data[cont['sha1']] = cont['data']
content_bytes_added += max(0, cont['length'])
@@ -278,15 +224,16 @@
with ThreadPoolExecutor(max_workers=1) as executor:
added_to_objstorage = executor.submit(add_to_objstorage)
- self._content_add_metadata(
- db, cur, content_with_data, content_without_data)
+ self._content_add_metadata(db, cur, content)
# Wait for objstorage addition before returning from the
# transaction, bubbling up any exception
content_bytes_added = added_to_objstorage.result()
- summary['content:add:bytes'] = content_bytes_added
- return summary
+ return {
+ 'content:add': len(content),
+ 'content:add:bytes': content_bytes_added,
+ }
@timed
@db_transaction()
@@ -308,23 +255,23 @@
@process_metrics
@db_transaction()
def content_add_metadata(self, content, db=None, cur=None):
- content = [self._normalize_content(c) for c in content]
+ content = [self._content_normalize(c) for c in content]
for c in content:
- self._validate_content(c)
+ self._content_validate(c)
- (content_with_data, content_without_data, summary) = \
- self._filter_new_content(content, db, cur)
+ missing = self.content_missing(content, key_hash='sha1_git')
+ content = [c for c in content if c['sha1_git'] in missing]
if self.journal_writer:
- for item in itertools.chain(content_with_data,
- content_without_data):
+ for item in itertools.chain(content):
assert 'data' not in content
self.journal_writer.write_addition('content', item)
- self._content_add_metadata(
- db, cur, content_with_data, content_without_data)
+ self._content_add_metadata(db, cur, content)
- return summary
+ return {
+ 'content:add': len(content),
+ }
@timed
def content_get(self, content):
@@ -423,12 +370,6 @@
for obj in db.content_missing_per_sha1_git(contents, cur):
yield obj[0]
- @timed
- @db_transaction_generator()
- def skipped_content_missing(self, contents, db=None, cur=None):
- for content in db.skipped_content_missing(contents, cur):
- yield dict(zip(db.content_hash_keys, content))
-
@timed
@db_transaction()
def content_find(self, content, db=None, cur=None):
@@ -449,6 +390,83 @@
def content_get_random(self, db=None, cur=None):
return db.content_get_random(cur)
+ @staticmethod
+ def _skipped_content_normalize(d):
+ d = d.copy()
+
+ if d.get('status') is None:
+ d['status'] = 'absent'
+
+ if d.get('length') is None:
+ d['length'] = -1
+
+ return d
+
+ @staticmethod
+ def _skipped_content_validate(d):
+ """Sanity checks on status / reason / length, that postgresql
+ doesn't enforce."""
+ if d['status'] != 'absent':
+ raise ValueError('Invalid content status: {}'.format(d['status']))
+
+ if d.get('reason') is None:
+ raise ValueError(
+ 'Must provide a reason if content is absent.')
+
+ if d['length'] < -1:
+ raise ValueError('Content length must be positive or -1.')
+
+ def _skipped_content_add_metadata(self, db, cur, content):
+ content = \
+ [cont.copy() for cont in content]
+ origin_ids = db.origin_id_get_by_url(
+ [cont.get('origin') for cont in content],
+ cur=cur)
+ for (cont, origin_id) in zip(content, origin_ids):
+ if 'origin' in cont:
+ cont['origin'] = origin_id
+ db.mktemp('skipped_content', cur)
+ db.copy_to(content, 'tmp_skipped_content',
+ db.skipped_content_keys, cur)
+
+ # move metadata in place
+ db.skipped_content_add_from_temp(cur)
+
+ @timed
+ @process_metrics
+ @db_transaction()
+ def skipped_content_add(self, content, db=None, cur=None):
+ content = [dict(c.items()) for c in content] # semi-shallow copy
+ now = datetime.datetime.now(tz=datetime.timezone.utc)
+ for item in content:
+ item['ctime'] = now
+
+ content = [self._skipped_content_normalize(c) for c in content]
+ for c in content:
+ self._skipped_content_validate(c)
+
+ missing_contents = self.skipped_content_missing(content)
+ content = [c for c in content
+ if any(all(c.get(algo) == missing_content.get(algo)
+ for algo in ALGORITHMS)
+ for missing_content in missing_contents)]
+
+ if self.journal_writer:
+ for item in content:
+ self.journal_writer.write_addition('content', item)
+
+ self._skipped_content_add_metadata(db, cur, content)
+
+ return {
+ 'skipped_content:add': len(content),
+ }
+
+ @timed
+ @db_transaction_generator()
+ def skipped_content_missing(self, contents, db=None, cur=None):
+ for content in db.skipped_content_missing(contents, cur):
+ yield dict(zip(db.content_hash_keys, content))
+
@timed
@process_metrics
@db_transaction()
diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py
--- a/swh/storage/tests/conftest.py
+++ b/swh/storage/tests/conftest.py
@@ -58,7 +58,10 @@
@pytest.fixture
def swh_contents(swh_storage):
contents = gen_contents(n=20)
- swh_storage.content_add(contents)
+ swh_storage.content_add(
+ [c for c in contents if c['status'] != 'absent'])
+ swh_storage.skipped_content_add(
+ [c for c in contents if c['status'] == 'absent'])
return contents
@@ -218,6 +221,7 @@
return {
'content': [data.cont, data.cont2],
'content_metadata': [data.cont3],
+ 'skipped_content': [data.skipped_cont, data.skipped_cont2],
'person': [data.person],
'directory': [data.dir2, data.dir],
'revision': [data.revision],
diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py
--- a/swh/storage/tests/test_buffer.py
+++ b/swh/storage/tests/test_buffer.py
@@ -27,7 +27,6 @@
assert s == {
'content:add': 1 + 1,
'content:add:bytes': contents[0]['length'] + contents[1]['length'],
- 'skipped_content:add': 0
}
missing_contents = storage.content_missing(
@@ -48,7 +47,6 @@
assert s == {
'content:add': 1,
'content:add:bytes': contents[0]['length'],
- 'skipped_content:add': 0
}
missing_contents = storage.content_missing([contents[0]])
@@ -75,7 +73,6 @@
assert s == {
'content:add': 1,
'content:add:bytes': contents[0]['length'],
- 'skipped_content:add': 0
}
missing_contents = storage.content_missing([contents[0]])
@@ -85,6 +82,55 @@
assert s == {}
+def test_buffering_proxy_storage_skipped_content_threshold_not_hit(
+ sample_data):
+ contents = sample_data['skipped_content']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory'},
+ min_batch_size={
+ 'skipped_content': 10,
+ }
+ )
+ s = storage.skipped_content_add([contents[0], contents[1]])
+ assert s == {}
+
+ # contents have not been written to storage
+ missing_contents = storage.skipped_content_missing(
+ [contents[0], contents[1]])
+ assert {c['sha1'] for c in missing_contents} \
+ == {c['sha1'] for c in contents}
+
+ s = storage.flush()
+ assert s == {
+ 'skipped_content:add': 1 + 1
+ }
+
+ missing_contents = storage.skipped_content_missing(
+ [contents[0], contents[1]])
+ assert list(missing_contents) == []
+
+
+def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data):
+ contents = sample_data['skipped_content']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory'},
+ min_batch_size={
+ 'skipped_content': 1,
+ }
+ )
+
+ s = storage.skipped_content_add([contents[0]])
+ assert s == {
+ 'skipped_content:add': 1
+ }
+
+ missing_contents = storage.skipped_content_missing([contents[0]])
+ assert list(missing_contents) == []
+
+ s = storage.flush()
+ assert s == {}
+
+
def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data):
directories = sample_data['directory']
storage = BufferingProxyStorage(
diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py
--- a/swh/storage/tests/test_cassandra.py
+++ b/swh/storage/tests/test_cassandra.py
@@ -177,11 +177,6 @@
def test_content_update(self):
pass
- @pytest.mark.skip(
- 'not implemented, see https://forge.softwareheritage.org/T1633')
- def test_skipped_content_add(self):
- pass
-
@pytest.mark.skip(
'The "person" table of the pgsql is a legacy thing, and not '
'supported by the cassandra backend.')
diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py
--- a/swh/storage/tests/test_filter.py
+++ b/swh/storage/tests/test_filter.py
@@ -18,7 +18,6 @@
assert s == {
'content:add': 1,
'content:add:bytes': sample_content['length'],
- 'skipped_content:add': 0
}
content = next(storage.content_get([sample_content['sha1']]))
@@ -28,7 +27,27 @@
assert s == {
'content:add': 0,
'content:add:bytes': 0,
- 'skipped_content:add': 0
+ }
+
+
+def test_filtering_proxy_storage_skipped_content(sample_data):
+ sample_content = sample_data['skipped_content'][0]
+ storage = FilteringProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.skipped_content_missing([sample_content]))
+ assert content['sha1'] == sample_content['sha1']
+
+ s = storage.skipped_content_add([sample_content])
+ assert s == {
+ 'skipped_content:add': 1,
+ }
+
+ content = list(storage.skipped_content_missing([sample_content]))
+ assert content == []
+
+ s = storage.skipped_content_add([sample_content])
+ assert s == {
+ 'skipped_content:add': 0,
}
diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py
--- a/swh/storage/tests/test_retry.py
+++ b/swh/storage/tests/test_retry.py
@@ -32,7 +32,6 @@
assert s == {
'content:add': 1,
'content:add:bytes': sample_content['length'],
- 'skipped_content:add': 0
}
content = next(swh_storage.content_get([sample_content['sha1']]))
@@ -103,7 +102,6 @@
s = swh_storage.content_add_metadata([sample_content])
assert s == {
'content:add': 1,
- 'skipped_content:add': 0
}
content_metadata = swh_storage.content_get_metadata([pk])
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -141,7 +141,6 @@
assert actual_result == {
'content:add': 1,
'content:add:bytes': cont['length'],
- 'skipped_content:add': 0
}
assert list(swh_storage.content_get([cont['sha1']])) == \
@@ -168,7 +167,6 @@
assert actual_result == {
'content:add': 1,
'content:add:bytes': data.cont['length'],
- 'skipped_content:add': 0
}
swh_storage.refresh_stat_counters()
@@ -177,25 +175,35 @@
def test_content_add_validation(self, swh_storage):
cont = data.cont
+ with pytest.raises(ValueError, match='status'):
+ swh_storage.content_add([{**cont, 'status': 'absent'}])
+
with pytest.raises(ValueError, match='status'):
swh_storage.content_add([{**cont, 'status': 'foobar'}])
with pytest.raises(ValueError, match="(?i)length"):
swh_storage.content_add([{**cont, 'length': -2}])
+ with pytest.raises(
+ (ValueError, TypeError),
+ match="reason"):
+ swh_storage.content_add([{**cont, 'reason': 'foobar'}])
+
+ def test_skipped_content_add_validation(self, swh_storage):
+ cont = data.cont.copy()
+ del cont['data']
+
+ with pytest.raises(ValueError, match='status'):
+ swh_storage.skipped_content_add([{**cont, 'status': 'visible'}])
+
with pytest.raises((ValueError, psycopg2.IntegrityError),
match='reason') as cm:
- swh_storage.content_add([{**cont, 'status': 'absent'}])
+ swh_storage.skipped_content_add([{**cont, 'status': 'absent'}])
if type(cm.value) == psycopg2.IntegrityError:
assert cm.exception.pgcode == \
psycopg2.errorcodes.NOT_NULL_VIOLATION
- with pytest.raises(
- ValueError,
- match="^Must not provide a reason if content is not absent.$"):
- swh_storage.content_add([{**cont, 'reason': 'foobar'}])
-
def test_content_get_missing(self, swh_storage):
cont = data.cont
@@ -225,7 +233,6 @@
assert actual_result == {
'content:add': 2,
'content:add:bytes': cont['length'] + cont2['length'],
- 'skipped_content:add': 0
}
def test_content_add_twice(self, swh_storage):
@@ -233,7 +240,6 @@
assert actual_result == {
'content:add': 1,
'content:add:bytes': data.cont['length'],
- 'skipped_content:add': 0
}
assert len(swh_storage.journal_writer.objects) == 1
@@ -241,9 +247,8 @@
assert actual_result == {
'content:add': 1,
'content:add:bytes': data.cont2['length'],
- 'skipped_content:add': 0
}
- assert len(swh_storage.journal_writer.objects) == 2
+ assert 2 <= len(swh_storage.journal_writer.objects) <= 3
assert len(swh_storage.content_find(data.cont)) == 1
assert len(swh_storage.content_find(data.cont2)) == 1
@@ -286,7 +291,6 @@
actual_result = swh_storage.content_add_metadata([cont])
assert actual_result == {
'content:add': 1,
- 'skipped_content:add': 0
}
expected_cont = cont.copy()
@@ -308,7 +312,6 @@
actual_result = swh_storage.content_add_metadata([cont, cont2])
assert actual_result == {
'content:add': 2,
- 'skipped_content:add': 0
}
def test_content_add_metadata_collision(self, swh_storage):
@@ -336,13 +339,10 @@
assert len(missing) == 2
- actual_result = swh_storage.content_add([cont, cont, cont2])
+ actual_result = swh_storage.skipped_content_add([cont, cont, cont2])
- assert actual_result == {
- 'content:add': 0,
- 'content:add:bytes': 0,
- 'skipped_content:add': 2,
- }
+ assert 2 <= actual_result.pop('skipped_content:add') <= 3
+ assert actual_result == {}
missing = list(swh_storage.skipped_content_missing([cont, cont2]))
@@ -3618,6 +3618,8 @@
swh_storage.origin_visit_add(
origin, obj['date'], obj['type'])
else:
+ if obj_type == 'content' and obj['status'] == 'absent':
+ obj_type = 'skipped_content'
method = getattr(swh_storage, obj_type + '_add')
try:
method([obj])
@@ -3727,7 +3729,6 @@
assert actual_result == {
'content:add': 1,
'content:add:bytes': cont['length'],
- 'skipped_content:add': 0
}
if hasattr(swh_storage, 'objstorage'):
@@ -3758,7 +3759,6 @@
assert actual_result == {
'content:add': 1,
- 'skipped_content:add': 0
}
if hasattr(swh_storage, 'objstorage'):
@@ -3778,13 +3778,10 @@
cont2 = data.skipped_cont2
cont2['blake2s256'] = None
- actual_result = swh_storage.content_add([cont, cont, cont2])
+ actual_result = swh_storage.skipped_content_add([cont, cont, cont2])
- assert actual_result == {
- 'content:add': 0,
- 'content:add:bytes': 0,
- 'skipped_content:add': 2,
- }
+ assert 2 <= actual_result.pop('skipped_content:add') <= 3
+ assert actual_result == {}
with db_transaction(swh_storage) as (_, cur):
cur.execute('SELECT sha1, sha1_git, sha256, blake2s256, '

File Metadata

Mime Type
text/plain
Expires
Wed, Dec 18, 3:07 AM (1 d, 21 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219023

Event Timeline