diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -28,6 +28,7 @@ min_batch_size: content: 10000 content_bytes: 100000000 + skipped_content: 10000 directory: 5000 revision: 1000 release: 10000 @@ -43,16 +44,18 @@ 'content': min_batch_size.get('content', 10000), 'content_bytes': min_batch_size.get('content_bytes', 100*1024*1024), + 'skipped_content': min_batch_size.get('skipped_content', 10000), 'directory': min_batch_size.get('directory', 25000), 'revision': min_batch_size.get('revision', 100000), 'release': min_batch_size.get('release', 100000), } - self.object_types = ['content', 'directory', 'revision', 'release'] + self.object_types = [ + 'content', 'skipped_content', 'directory', 'revision', 'release'] self._objects = {k: deque() for k in self.object_types} def __getattr__(self, key): if key.endswith('_add'): - object_type = key.split('_')[0] + object_type = key.rsplit('_', 1)[0] if object_type in self.object_types: return partial( self.object_add, object_type=object_type diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -167,6 +167,10 @@ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', 'status'] + @_prepared_insert_statement('content', _content_keys) + def content_add_one(self, content, *, statement) -> None: + self._add_one(statement, 'content', content, self._content_keys) + @_prepared_statement('SELECT * FROM content WHERE ' + ' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))) def content_get_from_pk( @@ -203,10 +207,6 @@ self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) - @_prepared_insert_statement('content', _content_keys) - def content_add_one(self, content, *, statement) -> None: - self._add_one(statement, 'content', content, self._content_keys) - def content_index_add_one(self, main_algo: str, content: Content) -> None: query = 'INSERT INTO content_by_{algo} ({cols}) VALUES ({values})' \ .format(algo=main_algo, cols=', '.join(self._content_pk), @@ -221,6 +221,33 @@ algo=algo) return list(self._execute_with_retries(query, [hash_])) + ########################## + # 'skipped_content' table + ########################## + + _skipped_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] + _skipped_content_keys = [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', + 'ctime', 'status', 'reason', 'origin'] + + @_prepared_insert_statement('skipped_content', _skipped_content_keys) + def skipped_content_add_one(self, content, *, statement) -> None: + self._add_one(statement, 'skipped_content', content, + self._skipped_content_keys) + + ########################## + # 'skipped_content_by_*' tables + ########################## + + def skipped_content_index_add_one( + self, main_algo: str, content: Content) -> None: + query = ('INSERT INTO skipped_content_by_{algo} ({cols}) ' + 'VALUES ({values})').format( + algo=main_algo, cols=', '.join(self._content_pk), + values=', '.join('%s' for _ in self._content_pk)) + self._execute_with_retries( + query, [content.get_hash(algo) for algo in self._content_pk]) + ########################## # 'revision' table ########################## diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -65,6 +65,20 @@ PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) ); +CREATE TABLE IF NOT EXISTS skipped_content ( + sha1 blob, + sha1_git blob, + sha256 blob, + blake2s256 blob, + length bigint, + ctime timestamp, + -- creation time, i.e. time of (first) injection into the storage + status ascii, + reason text, + origin text, + PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) +); + CREATE TABLE IF NOT EXISTS revision ( id blob PRIMARY KEY, date microtimestamp_with_timezone, @@ -184,19 +198,29 @@ sha256 blob, blake2s256 blob, PRIMARY KEY (({main_algo}), {other_algos}) -);''' +); + +CREATE TABLE IF NOT EXISTS skipped_content_by_{main_algo} ( + sha1 blob, + sha1_git blob, + sha256 blob, + blake2s256 blob, + PRIMARY KEY (({main_algo}), {other_algos}) +); +''' -TABLES = ('content revision revision_parent release directory ' - 'directory_entry snapshot snapshot_branch origin_visit ' - 'origin tool_by_uuid tool object_count').split() +TABLES = ('skipped_content content revision revision_parent release ' + 'directory directory_entry snapshot snapshot_branch ' + 'origin_visit origin tool_by_uuid tool object_count').split() HASH_ALGORITHMS = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] for main_algo in HASH_ALGORITHMS: - CREATE_TABLES_QUERIES.append(CONTENT_INDEX_TEMPLATE.format( + CREATE_TABLES_QUERIES.extend(CONTENT_INDEX_TEMPLATE.format( main_algo=main_algo, other_algos=', '.join( [algo for algo in HASH_ALGORITHMS if algo != main_algo]) - )) + ).split('\n\n')) TABLES.append('content_by_%s' % main_algo) + TABLES.append('skipped_content_by_%s' % main_algo) diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -14,7 +14,8 @@ import dateutil from swh.model.model import ( - Revision, Release, Directory, DirectoryEntry, Content, OriginVisit, + Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, + OriginVisit, ) from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError @@ -116,7 +117,6 @@ summary = { 'content:add': count_content_added, - 'skipped_content:add': count_contents - count_content_added, } if with_data: @@ -208,10 +208,6 @@ result[content_metadata['sha1']].append(content_metadata) return result - def skipped_content_missing(self, contents): - # TODO - raise NotImplementedError('not yet supported for Cassandra') - def content_find(self, content): # Find an algorithm that is common to all the requested contents. # It will be used to do an initial filtering efficiently. @@ -263,6 +259,42 @@ def content_get_random(self): return self._cql_runner.content_get_random().sha1_git + def _skipped_content_add(self, contents): + contents = [SkippedContent.from_dict(c) for c in contents] + + # Filter-out content already in the database. + contents = [c for c in contents + if not self._cql_runner.content_get_from_pk(c.to_dict())] + + if self.journal_writer: + for content in contents: + content = content.to_dict() + if 'data' in content: + del content['data'] + self.journal_writer.write_addition('content', content) + + for content in contents: + # Add to index tables + for algo in HASH_ALGORITHMS: + self._cql_runner.skipped_content_index_add_one(algo, content) + + # Then to the main table + self._cql_runner.skipped_content_add_one(content) + + return { + 'skipped_content:add': len(contents) + } + + def skipped_content_add(self, content): + content = [c.copy() for c in content] # semi-shallow copy + for item in content: + item['ctime'] = now() + return self._skipped_content_add(content) + + def skipped_content_missing(self, contents): + # TODO + raise NotImplementedError('not yet supported for Cassandra') + def directory_add(self, directories): directories = list(directories) diff --git a/swh/storage/filter.py b/swh/storage/filter.py --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information -from typing import Dict, Generator, Sequence, Set +from typing import Dict, Generator, Iterable, Set from swh.storage import get_storage @@ -27,22 +27,30 @@ def __init__(self, storage): self.storage = get_storage(**storage) self.objects_seen = { - 'content': set(), # set of content hashes (sha256) seen - 'directory': set(), - 'revision': set(), + 'content': set(), # sha256 + 'skipped_content': set(), # sha1_git + 'directory': set(), # sha1_git + 'revision': set(), # sha1_git } def __getattr__(self, key): return getattr(self.storage, key) - def content_add(self, content: Sequence[Dict]) -> Dict: + def content_add(self, content: Iterable[Dict]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_contents(contents) return self.storage.content_add( x for x in contents if x['sha256'] in contents_to_add ) - def directory_add(self, directories: Sequence[Dict]) -> Dict: + def skipped_content_add(self, content: Iterable[Dict]) -> Dict: + contents = list(content) + contents_to_add = self._filter_missing_skipped_contents(contents) + return self.storage.skipped_content_add( + x for x in contents if x['sha1_git'] in contents_to_add + ) + + def directory_add(self, directories: Iterable[Dict]) -> Dict: directories = list(directories) missing_ids = self._filter_missing_ids( 'directory', @@ -63,7 +71,7 @@ ) def _filter_missing_contents( - self, content_hashes: Sequence[Dict]) -> Set[bytes]: + self, content_hashes: Iterable[Dict]) -> Set[bytes]: """Return only the content keys missing from swh Args: @@ -84,6 +92,26 @@ key_hash='sha256', )) + def _filter_missing_skipped_contents( + self, content_hashes: Iterable[Dict]) -> Set[bytes]: + """Return only the content keys missing from swh + + Args: + content_hashes: List of sha1_git to check for existence in swh + storage + + """ + objects_seen = self.objects_seen['skipped_content'] + missing_hashes = [] + for hashes in content_hashes: + if hashes['sha1_git'] in objects_seen: + continue + objects_seen.add(hashes['sha1_git']) + missing_hashes.append(hashes) + + return {c['sha1_git'] + for c in self.storage.skipped_content_missing(missing_hashes)} + def _filter_missing_ids( self, object_type: str, @@ -92,7 +120,7 @@ Args: object_type: object type to use {revision, directory} - ids: Sequence of object_type ids + ids: Iterable of object_type ids Returns: Missing ids from the storage for object_type diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -19,8 +19,8 @@ import attr from swh.model.model import ( - Content, Directory, Revision, Release, Snapshot, OriginVisit, Origin, - SHA1_SIZE) + BaseContent, Content, SkippedContent, Directory, Revision, Release, + Snapshot, OriginVisit, Origin, SHA1_SIZE) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError @@ -75,47 +75,26 @@ return True def _content_add(self, contents, with_data): - content_with_data = [] - content_without_data = [] for content in contents: if content.status is None: content.status = 'visible' + if content.status == 'absent': + raise ValueError('content with status=absent') if content.length is None: - content.length = -1 - if content.status != 'absent': - if self._content_key(content) not in self._contents: - content_with_data.append(content) - else: - if self._content_key(content) not in self._skipped_contents: - content_without_data.append(content) + raise ValueError('content with length=None') if self.journal_writer: - for content in content_with_data: + for content in contents: content = attr.evolve(content, data=None) self.journal_writer.write_addition('content', content) - for content in content_without_data: - self.journal_writer.write_addition('content', content) - - count_content_added, count_content_bytes_added = \ - self._content_add_present(content_with_data, with_data) - - count_skipped_content_added = self._content_add_absent( - content_without_data - ) summary = { - 'content:add': count_content_added, - 'skipped_content:add': count_skipped_content_added, + 'content:add': 0, } if with_data: - summary['content:add:bytes'] = count_content_bytes_added + summary['content:add:bytes'] = 0 - return summary - - def _content_add_present(self, contents, with_data): - count_content_added = 0 - count_content_bytes_added = 0 for content in contents: key = self._content_key(content) if key in self._contents: @@ -133,40 +112,21 @@ ('content', content.sha1)) self._contents[key] = content bisect.insort(self._sorted_sha1s, content.sha1) - count_content_added += 1 + summary['content:add'] += 1 if with_data: content_data = self._contents[key].data self._contents[key] = attr.evolve( self._contents[key], data=None) - count_content_bytes_added += len(content_data) + summary['content:add:bytes'] += len(content_data) self.objstorage.add(content_data, content.sha1) - return (count_content_added, count_content_bytes_added) - - def _content_add_absent(self, contents): - count = 0 - skipped_content_missing = self.skipped_content_missing(contents) - for content in skipped_content_missing: - key = self._content_key(content) - for algo in DEFAULT_ALGORITHMS: - self._skipped_content_indexes[algo][content.get_hash(algo)] \ - .add(key) - self._skipped_contents[key] = content - count += 1 - - return count - - def _content_to_model(self, contents): - for content in contents: - content = content.copy() - content.pop('origin', None) - yield Content.from_dict(content) + return summary def content_add(self, content): now = datetime.datetime.now(tz=datetime.timezone.utc) - content = [attr.evolve(c, ctime=now) - for c in self._content_to_model(content)] + content = [attr.evolve(Content.from_dict(c), ctime=now) + for c in content] return self._content_add(content, with_data=True) def content_update(self, content, keys=[]): @@ -194,7 +154,7 @@ self._content_indexes[algorithm][hash_].add(new_key) def content_add_metadata(self, content): - content = list(self._content_to_model(content)) + content = [Content.from_dict(c) for c in content] return self._content_add(content, with_data=False) def content_get(self, content): @@ -308,6 +268,39 @@ if content not in self._content_indexes['sha1_git']: yield content + def content_get_random(self): + return random.choice(list(self._content_indexes['sha1_git'])) + + def _skipped_content_add(self, contents): + for content in contents: + if content.status is None: + content = attr.evolve(content, status='absent') + if content.length is None: + content = attr.evolve(content, length=-1) + if content.status != 'absent': + raise ValueError(f'Content with status={content.status}') + + if self.journal_writer: + for content in contents: + self.journal_writer.write_addition('content', content) + + summary = { + 'skipped_content:add': 0 + } + + skipped_content_missing = self.skipped_content_missing( + [c.to_dict() for c in contents]) + for content in skipped_content_missing: + key = self._content_key(content, allow_missing=True) + for algo in DEFAULT_ALGORITHMS: + if algo in content: + self._skipped_content_indexes[algo][content[algo]] \ + .add(key) + self._skipped_contents[key] = content + summary['skipped_content:add'] += 1 + + return summary + def skipped_content_missing(self, contents): for content in contents: for (key, algorithm) in self._content_key_algorithm(content): @@ -316,11 +309,17 @@ if key not in self._skipped_content_indexes[algorithm]: # index must contain hashes of algos except blake2s256 # else the content is considered skipped - yield content + yield {algo: content[algo] + for algo in DEFAULT_ALGORITHMS + if content[algo] is not None} break - def content_get_random(self): - return random.choice(list(self._content_indexes['sha1_git'])) + def skipped_content_add(self, content): + content = list(content) + now = datetime.datetime.now(tz=datetime.timezone.utc) + content = [attr.evolve(SkippedContent.from_dict(c), ctime=now) + for c in content] + return self._skipped_content_add(content) def directory_add(self, directories): directories = list(directories) @@ -1021,15 +1020,15 @@ return person @staticmethod - def _content_key(content): + def _content_key(content, allow_missing=False): """A stable key for a content""" - return tuple(getattr(content, key) + return tuple(getattr(content, key, None) for key in sorted(DEFAULT_ALGORITHMS)) @staticmethod def _content_key_algorithm(content): """ A stable key and the algorithm for a content""" - if isinstance(content, Content): + if isinstance(content, BaseContent): content = content.to_dict() return tuple((content.get(key), key) for key in sorted(DEFAULT_ALGORITHMS)) diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -29,14 +29,11 @@ following keys: - data (bytes): the actual content - - length (int): content length (default: -1) + - length (int): content length - one key for each checksum algorithm in :data:`swh.model.hashutil.ALGORITHMS`, mapped to the corresponding checksum - - status (str): one of visible, hidden, absent - - reason (str): if status = absent, the reason why - - origin (int): if status = absent, the origin we saw the - content in + - status (str): one of visible, hidden Raises: @@ -50,11 +47,10 @@ Since additions to both idempotent, that should not be a problem. Returns: - Summary dict with the following key and associated values: + Summary dict with the following keys and associated values: content:add: New contents added content:add:bytes: Sum of the contents' length data - skipped_content:add: New skipped contents (no data) added """ ... @@ -253,20 +249,6 @@ """ ... - @remote_api_endpoint('content/skipped/missing') - def skipped_content_missing(self, contents): - """List skipped_content missing from storage - - Args: - content: iterable of dictionaries containing the data for each - checksum algorithm. - - Returns: - iterable: missing signatures - - """ - ... - @remote_api_endpoint('content/present') def content_find(self, content): """Find a content hash in db. @@ -296,6 +278,57 @@ """ ... + @remote_api_endpoint('content/skipped/add') + def skipped_content_add(self, content): + """Add contents to the skipped_content list, which contains + (partial) information about content missing from the archive. + + Args: + contents (iterable): iterable of dictionaries representing + individual pieces of content to add. Each dictionary has the + following keys: + + - length (Optional[int]): content length (default: -1) + - one key for each checksum algorithm in + :data:`swh.model.hashutil.ALGORITHMS`, mapped to the + corresponding checksum; each is optional + - status (str): must be "absent" + - reason (str): the reason why the content is absent + - origin (int): if status = absent, the origin we saw the + content in + + Raises: + + The following exceptions can occur: + + - HashCollision in case of collision + - Any other exceptions raise by the backend + + In case of errors, some content may have been stored in + the DB and in the objstorage. + Since additions to both idempotent, that should not be a problem. + + Returns: + Summary dict with the following key and associated values: + + skipped_content:add: New skipped contents (no data) added + """ + ... + + @remote_api_endpoint('content/skipped/missing') + def skipped_content_missing(self, contents): + """List skipped_content missing from storage + + Args: + content: iterable of dictionaries containing the data for each + checksum algorithm. + + Returns: + iterable: missing signatures + + """ + ... + @remote_api_endpoint('directory/add') def directory_add(self, directories): """Add directories to the storage diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -63,15 +63,20 @@ return getattr(self.storage, key) @swh_retry - def content_add(self, content: List[Dict]) -> Dict: + def content_add(self, content: Iterable[Dict]) -> Dict: contents = list(content) return self.storage.content_add(contents) @swh_retry - def content_add_metadata(self, content: List[Dict]) -> Dict: + def content_add_metadata(self, content: Iterable[Dict]) -> Dict: contents = list(content) return self.storage.content_add_metadata(contents) + @swh_retry + def skipped_content_add(self, content: Iterable[Dict]) -> Dict: + contents = list(content) + return self.storage.skipped_content_add(contents) + @swh_retry def origin_add_one(self, origin: Dict) -> str: return self.storage.origin_add_one(origin) @@ -91,7 +96,7 @@ metadata=metadata, snapshot=snapshot) @swh_retry - def tool_add(self, tools: List[Dict]) -> List[Dict]: + def tool_add(self, tools: Iterable[Dict]) -> List[Dict]: tools = list(tools) return self.storage.tool_add(tools) @@ -110,22 +115,22 @@ origin_url, ts, provider_id, tool_id, metadata) @swh_retry - def directory_add(self, directories: List[Dict]) -> Dict: + def directory_add(self, directories: Iterable[Dict]) -> Dict: directories = list(directories) return self.storage.directory_add(directories) @swh_retry - def revision_add(self, revisions: List[Dict]) -> Dict: + def revision_add(self, revisions: Iterable[Dict]) -> Dict: revisions = list(revisions) return self.storage.revision_add(revisions) @swh_retry - def release_add(self, releases: List[Dict]) -> Dict: + def release_add(self, releases: Iterable[Dict]) -> Dict: releases = list(releases) return self.storage.release_add(releases) @swh_retry - def snapshot_add(self, snapshot: List[Dict]) -> Dict: + def snapshot_add(self, snapshot: Iterable[Dict]) -> Dict: snapshots = list(snapshot) return self.storage.snapshot_add(snapshots) diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -128,106 +128,54 @@ return tuple([hash[k] for k in keys]) @staticmethod - def _normalize_content(d): + def _content_normalize(d): d = d.copy() if 'status' not in d: d['status'] = 'visible' - if 'length' not in d: - d['length'] = -1 - return d @staticmethod - def _validate_content(d): + def _content_validate(d): """Sanity checks on status / reason / length, that postgresql doesn't enforce.""" - if d['status'] not in ('visible', 'absent', 'hidden'): + if d['status'] not in ('visible', 'hidden'): raise ValueError('Invalid content status: {}'.format(d['status'])) - if d['status'] != 'absent' and d.get('reason') is not None: + if d.get('reason') is not None: raise ValueError( - 'Must not provide a reason if content is not absent.') + 'Must not provide a reason if content is present.') - if d['length'] < -1: - raise ValueError('Content length must be positive or -1.') + if d['length'] is None or d['length'] < 0: + raise ValueError('Content length must be positive.') - def _filter_new_content(self, content, db=None, cur=None): - """Sort contents into buckets 'with data' and 'without data', - and filter out those already in the database.""" - content_by_status = defaultdict(list) - for d in content: - content_by_status[d['status']].append(d) - - content_with_data = content_by_status['visible'] \ - + content_by_status['hidden'] - content_without_data = content_by_status['absent'] - - missing_content = set(self.content_missing(content_with_data, - db=db, cur=cur)) - missing_skipped = set(self._content_unique_key(hashes, db) - for hashes in self.skipped_content_missing( - content_without_data, db=db, cur=cur)) - - content_with_data = [ - cont for cont in content_with_data - if cont['sha1'] in missing_content] - content_without_data = [ - cont for cont in content_without_data - if self._content_unique_key(cont, db) in missing_skipped] - - summary = { - 'content:add': len(missing_content), - 'skipped_content:add': len(missing_skipped), - } - - return (content_with_data, content_without_data, summary) - - def _content_add_metadata(self, db, cur, - content_with_data, content_without_data): + def _content_add_metadata(self, db, cur, content): """Add content to the postgresql database but not the object storage. """ - if content_with_data: - # create temporary table for metadata injection - db.mktemp('content', cur) + # create temporary table for metadata injection + db.mktemp('content', cur) - db.copy_to(content_with_data, 'tmp_content', - db.content_add_keys, cur) + db.copy_to(content, 'tmp_content', + db.content_add_keys, cur) - # move metadata in place - try: - db.content_add_from_temp(cur) - except psycopg2.IntegrityError as e: - from . import HashCollision - if e.diag.sqlstate == '23505' and \ - e.diag.table_name == 'content': - constraint_to_hash_name = { - 'content_pkey': 'sha1', - 'content_sha1_git_idx': 'sha1_git', - 'content_sha256_idx': 'sha256', - } - colliding_hash_name = constraint_to_hash_name \ - .get(e.diag.constraint_name) - raise HashCollision(colliding_hash_name) from None - else: - raise - - if content_without_data: - content_without_data = \ - [cont.copy() for cont in content_without_data] - origin_ids = db.origin_id_get_by_url( - [cont.get('origin') for cont in content_without_data], - cur=cur) - for (cont, origin_id) in zip(content_without_data, origin_ids): - if 'origin' in cont: - cont['origin'] = origin_id - db.mktemp('skipped_content', cur) - db.copy_to(content_without_data, 'tmp_skipped_content', - db.skipped_content_keys, cur) - - # move metadata in place - db.skipped_content_add_from_temp(cur) + # move metadata in place + try: + db.content_add_from_temp(cur) + except psycopg2.IntegrityError as e: + from . import HashCollision + if e.diag.sqlstate == '23505' and \ + e.diag.table_name == 'content': + constraint_to_hash_name = { + 'content_pkey': 'sha1', + 'content_sha1_git_idx': 'sha1_git', + 'content_sha256_idx': 'sha256', + } + colliding_hash_name = constraint_to_hash_name \ + .get(e.diag.constraint_name) + raise HashCollision(colliding_hash_name) from None + else: + raise @timed @process_metrics @@ -238,21 +186,19 @@ for item in content: item['ctime'] = now - content = [self._normalize_content(c) for c in content] + content = [self._content_normalize(c) for c in content] for c in content: - self._validate_content(c) + self._content_validate(c) - (content_with_data, content_without_data, summary) = \ - self._filter_new_content(content, db, cur) + missing = list(self.content_missing(content, key_hash='sha1_git')) + content = [c for c in content if c['sha1_git'] in missing] if self.journal_writer: - for item in content_with_data: + for item in content: if 'data' in item: item = item.copy() del item['data'] self.journal_writer.write_addition('content', item) - for item in content_without_data: - self.journal_writer.write_addition('content', item) def add_to_objstorage(): """Add to objstorage the new missing_content @@ -264,7 +210,7 @@ """ content_bytes_added = 0 data = {} - for cont in content_with_data: + for cont in content: if cont['sha1'] not in data: data[cont['sha1']] = cont['data'] content_bytes_added += max(0, cont['length']) @@ -278,15 +224,16 @@ with ThreadPoolExecutor(max_workers=1) as executor: added_to_objstorage = executor.submit(add_to_objstorage) - self._content_add_metadata( - db, cur, content_with_data, content_without_data) + self._content_add_metadata(db, cur, content) # Wait for objstorage addition before returning from the # transaction, bubbling up any exception content_bytes_added = added_to_objstorage.result() - summary['content:add:bytes'] = content_bytes_added - return summary + return { + 'content:add': len(content), + 'content:add:bytes': content_bytes_added, + } @timed @db_transaction() @@ -308,23 +255,23 @@ @process_metrics @db_transaction() def content_add_metadata(self, content, db=None, cur=None): - content = [self._normalize_content(c) for c in content] + content = [self._content_normalize(c) for c in content] for c in content: - self._validate_content(c) + self._content_validate(c) - (content_with_data, content_without_data, summary) = \ - self._filter_new_content(content, db, cur) + missing = self.content_missing(content, key_hash='sha1_git') + content = [c for c in content if c['sha1_git'] in missing] if self.journal_writer: - for item in itertools.chain(content_with_data, - content_without_data): + for item in itertools.chain(content): assert 'data' not in content self.journal_writer.write_addition('content', item) - self._content_add_metadata( - db, cur, content_with_data, content_without_data) + self._content_add_metadata(db, cur, content) - return summary + return { + 'content:add': len(content), + } @timed def content_get(self, content): @@ -423,12 +370,6 @@ for obj in db.content_missing_per_sha1_git(contents, cur): yield obj[0] - @timed - @db_transaction_generator() - def skipped_content_missing(self, contents, db=None, cur=None): - for content in db.skipped_content_missing(contents, cur): - yield dict(zip(db.content_hash_keys, content)) - @timed @db_transaction() def content_find(self, content, db=None, cur=None): @@ -449,6 +390,83 @@ def content_get_random(self, db=None, cur=None): return db.content_get_random(cur) + @staticmethod + def _skipped_content_normalize(d): + d = d.copy() + + if 'status' not in d: + d['status'] = 'absent' + + if 'length' not in d: + d['length'] = -1 + + return d + + @staticmethod + def _skipped_content_validate(d): + """Sanity checks on status / reason / length, that postgresql + doesn't enforce.""" + if d['status'] != 'absent': + raise ValueError('Invalid content status: {}'.format(d['status'])) + + if d.get('reason') is None: + raise ValueError( + 'Must provide a reason if content is absent.') + + if d['length'] < -1: + raise ValueError('Content length must be positive or -1.') + + def _skipped_content_add_metadata(self, db, cur, content): + content = \ + [cont.copy() for cont in content] + origin_ids = db.origin_id_get_by_url( + [cont.get('origin') for cont in content], + cur=cur) + for (cont, origin_id) in zip(content, origin_ids): + if 'origin' in cont: + cont['origin'] = origin_id + db.mktemp('skipped_content', cur) + db.copy_to(content, 'tmp_skipped_content', + db.skipped_content_keys, cur) + + # move metadata in place + db.skipped_content_add_from_temp(cur) + + @timed + @process_metrics + @db_transaction() + def skipped_content_add(self, content, db=None, cur=None): + content = [dict(c.items()) for c in content] # semi-shallow copy + now = datetime.datetime.now(tz=datetime.timezone.utc) + for item in content: + item['ctime'] = now + + content = [self._skipped_content_normalize(c) for c in content] + for c in content: + self._skipped_content_validate(c) + + missing_contents = self.skipped_content_missing(content) + content = [c for c in content + if any(all(c.get(algo) == missing_content.get(algo) + for algo in ALGORITHMS) + for missing_content in missing_contents)] + + if self.journal_writer: + for item in content: + self.journal_writer.write_addition('content', item) + + self._skipped_content_add_metadata(db, cur, content) + + return { + 'skipped_content:add': len(content), + } + + @timed + @db_transaction_generator() + def skipped_content_missing(self, contents, db=None, cur=None): + for content in db.skipped_content_missing(contents, cur): + yield dict(zip(db.content_hash_keys, content)) + @timed @process_metrics @db_transaction() diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -58,7 +58,10 @@ @pytest.fixture def swh_contents(swh_storage): contents = gen_contents(n=20) - swh_storage.content_add(contents) + swh_storage.content_add( + [c for c in contents if c['status'] != 'absent']) + swh_storage.skipped_content_add( + [c for c in contents if c['status'] == 'absent']) return contents @@ -218,6 +221,7 @@ return { 'content': [data.cont, data.cont2], 'content_metadata': [data.cont3], + 'skipped_content': [data.skipped_cont, data.skipped_cont2], 'person': [data.person], 'directory': [data.dir2, data.dir], 'revision': [data.revision], diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -27,7 +27,6 @@ assert s == { 'content:add': 1 + 1, 'content:add:bytes': contents[0]['length'] + contents[1]['length'], - 'skipped_content:add': 0 } missing_contents = storage.content_missing( @@ -48,7 +47,6 @@ assert s == { 'content:add': 1, 'content:add:bytes': contents[0]['length'], - 'skipped_content:add': 0 } missing_contents = storage.content_missing([contents[0]]) @@ -75,7 +73,6 @@ assert s == { 'content:add': 1, 'content:add:bytes': contents[0]['length'], - 'skipped_content:add': 0 } missing_contents = storage.content_missing([contents[0]]) @@ -85,6 +82,55 @@ assert s == {} +def test_buffering_proxy_storage_skipped_content_threshold_not_hit( + sample_data): + contents = sample_data['skipped_content'] + storage = BufferingProxyStorage( + storage={'cls': 'memory'}, + min_batch_size={ + 'skipped_content': 10, + } + ) + s = storage.skipped_content_add([contents[0], contents[1]]) + assert s == {} + + # contents have not been written to storage + missing_contents = storage.skipped_content_missing( + [contents[0], contents[1]]) + assert {c['sha1'] for c in missing_contents} \ + == {c['sha1'] for c in contents} + + s = storage.flush() + assert s == { + 'skipped_content:add': 1 + 1 + } + + missing_contents = storage.skipped_content_missing( + [contents[0], contents[1]]) + assert list(missing_contents) == [] + + +def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data): + contents = sample_data['skipped_content'] + storage = BufferingProxyStorage( + storage={'cls': 'memory'}, + min_batch_size={ + 'skipped_content': 1, + } + ) + + s = storage.skipped_content_add([contents[0]]) + assert s == { + 'skipped_content:add': 1 + } + + missing_contents = storage.skipped_content_missing([contents[0]]) + assert list(missing_contents) == [] + + s = storage.flush() + assert s == {} + + def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): directories = sample_data['directory'] storage = BufferingProxyStorage( diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -18,7 +18,6 @@ assert s == { 'content:add': 1, 'content:add:bytes': sample_content['length'], - 'skipped_content:add': 0 } content = next(storage.content_get([sample_content['sha1']])) @@ -28,7 +27,27 @@ assert s == { 'content:add': 0, 'content:add:bytes': 0, - 'skipped_content:add': 0 + } + + +def test_filtering_proxy_storage_skipped_content(sample_data): + sample_content = sample_data['skipped_content'][0] + storage = FilteringProxyStorage(storage={'cls': 'memory'}) + + content = next(storage.skipped_content_missing([sample_content])) + assert content['sha1'] == sample_content['sha1'] + + s = storage.skipped_content_add([sample_content]) + assert s == { + 'skipped_content:add': 1, + } + + content = list(storage.skipped_content_missing([sample_content])) + assert content == [] + + s = storage.skipped_content_add([sample_content]) + assert s == { + 'skipped_content:add': 0, } diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -32,7 +32,6 @@ assert s == { 'content:add': 1, 'content:add:bytes': sample_content['length'], - 'skipped_content:add': 0 } content = next(swh_storage.content_get([sample_content['sha1']])) @@ -103,7 +102,6 @@ s = swh_storage.content_add_metadata([sample_content]) assert s == { 'content:add': 1, - 'skipped_content:add': 0 } content_metadata = swh_storage.content_get_metadata([pk]) diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -141,7 +141,6 @@ assert actual_result == { 'content:add': 1, 'content:add:bytes': cont['length'], - 'skipped_content:add': 0 } assert list(swh_storage.content_get([cont['sha1']])) == \ @@ -168,7 +167,6 @@ assert actual_result == { 'content:add': 1, 'content:add:bytes': data.cont['length'], - 'skipped_content:add': 0 } swh_storage.refresh_stat_counters() @@ -177,25 +175,35 @@ def test_content_add_validation(self, swh_storage): cont = data.cont + with pytest.raises(ValueError, match='status'): + swh_storage.content_add([{**cont, 'status': 'absent'}]) + with pytest.raises(ValueError, match='status'): swh_storage.content_add([{**cont, 'status': 'foobar'}]) with pytest.raises(ValueError, match="(?i)length"): swh_storage.content_add([{**cont, 'length': -2}]) + with pytest.raises( + (ValueError, TypeError), + match="reason"): + swh_storage.content_add([{**cont, 'reason': 'foobar'}]) + + def test_skipped_content_add_validation(self, swh_storage): + cont = data.cont.copy() + del cont['data'] + + with pytest.raises(ValueError, match='status'): + swh_storage.skipped_content_add([{**cont, 'status': 'visible'}]) + with pytest.raises((ValueError, psycopg2.IntegrityError), match='reason') as cm: - swh_storage.content_add([{**cont, 'status': 'absent'}]) + swh_storage.skipped_content_add([{**cont, 'status': 'absent'}]) if type(cm.value) == psycopg2.IntegrityError: assert cm.exception.pgcode == \ psycopg2.errorcodes.NOT_NULL_VIOLATION - with pytest.raises( - ValueError, - match="^Must not provide a reason if content is not absent.$"): - swh_storage.content_add([{**cont, 'reason': 'foobar'}]) - def test_content_get_missing(self, swh_storage): cont = data.cont @@ -225,7 +233,6 @@ assert actual_result == { 'content:add': 2, 'content:add:bytes': cont['length'] + cont2['length'], - 'skipped_content:add': 0 } def test_content_add_twice(self, swh_storage): @@ -233,7 +240,6 @@ assert actual_result == { 'content:add': 1, 'content:add:bytes': data.cont['length'], - 'skipped_content:add': 0 } assert len(swh_storage.journal_writer.objects) == 1 @@ -241,9 +247,8 @@ assert actual_result == { 'content:add': 1, 'content:add:bytes': data.cont2['length'], - 'skipped_content:add': 0 } - assert len(swh_storage.journal_writer.objects) == 2 + assert 2 <= len(swh_storage.journal_writer.objects) <= 3 assert len(swh_storage.content_find(data.cont)) == 1 assert len(swh_storage.content_find(data.cont2)) == 1 @@ -286,7 +291,6 @@ actual_result = swh_storage.content_add_metadata([cont]) assert actual_result == { 'content:add': 1, - 'skipped_content:add': 0 } expected_cont = cont.copy() @@ -308,7 +312,6 @@ actual_result = swh_storage.content_add_metadata([cont, cont2]) assert actual_result == { 'content:add': 2, - 'skipped_content:add': 0 } def test_content_add_metadata_collision(self, swh_storage): @@ -336,13 +339,10 @@ assert len(missing) == 2 - actual_result = swh_storage.content_add([cont, cont, cont2]) + actual_result = swh_storage.skipped_content_add([cont, cont, cont2]) - assert actual_result == { - 'content:add': 0, - 'content:add:bytes': 0, - 'skipped_content:add': 2, - } + assert 2 <= actual_result.pop('skipped_content:add') <= 3 + assert actual_result == {} missing = list(swh_storage.skipped_content_missing([cont, cont2])) @@ -3618,6 +3618,8 @@ swh_storage.origin_visit_add( origin, obj['date'], obj['type']) else: + if obj_type == 'content' and obj['status'] == 'absent': + obj_type = 'skipped_content' method = getattr(swh_storage, obj_type + '_add') try: method([obj]) @@ -3727,7 +3729,6 @@ assert actual_result == { 'content:add': 1, 'content:add:bytes': cont['length'], - 'skipped_content:add': 0 } if hasattr(swh_storage, 'objstorage'): @@ -3758,7 +3759,6 @@ assert actual_result == { 'content:add': 1, - 'skipped_content:add': 0 } if hasattr(swh_storage, 'objstorage'): @@ -3778,13 +3778,10 @@ cont2 = data.skipped_cont2 cont2['blake2s256'] = None - actual_result = swh_storage.content_add([cont, cont, cont2]) + actual_result = swh_storage.skipped_content_add([cont, cont, cont2]) - assert actual_result == { - 'content:add': 0, - 'content:add:bytes': 0, - 'skipped_content:add': 2, - } + assert 2 <= actual_result.pop('skipped_content:add') <= 3 + assert actual_result == {} with db_transaction(swh_storage) as (_, cur): cur.execute('SELECT sha1, sha1_git, sha256, blake2s256, '