diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -37,7 +37,6 @@ from swh.core.api.serializers import msgpack_loads, msgpack_dumps from swh.model.identifiers import SWHID from swh.model.model import ( - BaseContent, Content, SkippedContent, Directory, @@ -54,13 +53,13 @@ RawExtrinsicMetadata, Sha1Git, ) -from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.storage.cassandra import CassandraStorage from swh.storage.cassandra.model import ( BaseRow, ContentRow, ObjectCountRow, + SkippedContentRow, ) from swh.storage.interface import ( ListOrder, @@ -226,6 +225,8 @@ def __init__(self): self._contents = Table(ContentRow) self._content_indexes = defaultdict(lambda: defaultdict(set)) + self._skipped_contents = Table(ContentRow) + self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) self._stat_counters = defaultdict(int) def increment_counter(self, object_type: str, nb: int): @@ -297,6 +298,36 @@ ) -> Iterable[int]: return self._content_indexes[algo][hash_] + ########################## + # 'skipped_content' table + ########################## + + def _skipped_content_add_finalize(self, content: SkippedContentRow) -> None: + self._skipped_contents.insert(content) + self.increment_counter("skipped_content", 1) + + def skipped_content_add_prepare(self, content: SkippedContentRow): + finalizer = functools.partial(self._skipped_content_add_finalize, content) + return ( + self._skipped_contents.token(self._contents.partition_key(content)), + finalizer, + ) + + def skipped_content_get_from_pk( + self, content_hashes: Dict[str, bytes] + ) -> Optional[SkippedContentRow]: + primary_key = self._skipped_contents.primary_key_from_dict(content_hashes) + return self._skipped_contents.get_from_primary_key(primary_key) + + ########################## + # 'skipped_content_by_*' tables + ########################## + + def skipped_content_index_add_one( + self, algo: str, content: SkippedContent, token: int + ) -> None: + self._skipped_content_indexes[algo][content.get_hash(algo)].add(token) + ########################## # 'directory' table ########################## @@ -328,8 +359,6 @@ def reset(self): self._cql_runner = InMemoryCqlRunner() - self._skipped_contents = {} - self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) self._directories = {} self._revisions = {} self._releases = {} @@ -375,46 +404,6 @@ def check_config(self, *, check_write: bool) -> bool: return True - def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict: - self.journal_writer.skipped_content_add(contents) - - summary = {"skipped_content:add": 0} - - missing_contents = self.skipped_content_missing([c.hashes() for c in contents]) - missing = {self._content_key(c) for c in missing_contents} - contents = [c for c in contents if self._content_key(c) in missing] - for content in contents: - key = self._content_key(content) - for algo in DEFAULT_ALGORITHMS: - if content.get_hash(algo): - self._skipped_content_indexes[algo][content.get_hash(algo)].add(key) - self._skipped_contents[key] = content - summary["skipped_content:add"] += 1 - - self._cql_runner.increment_counter("skipped_content", len(contents)) - - return summary - - def skipped_content_add(self, content: List[SkippedContent]) -> Dict: - content = [attr.evolve(c, ctime=now()) for c in content] - return self._skipped_content_add(content) - - def skipped_content_missing( - self, contents: List[Dict[str, Any]] - ) -> Iterable[Dict[str, Any]]: - for content in contents: - matches = list(self._skipped_contents.values()) - for (algorithm, key) in self._content_key(content): - if algorithm == "blake2s256": - continue - # Filter out skipped contents with the same hash - matches = [ - match for match in matches if match.get_hash(algorithm) == key - ] - # if none of the contents match - if not matches: - yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} - def directory_add(self, directories: List[Directory]) -> Dict: directories = [dir_ for dir_ in directories if dir_.id not in self._directories] self.journal_writer.directory_add(directories) @@ -1244,13 +1233,6 @@ return self._persons[person.fullname] - @staticmethod - def _content_key(content): - """ A stable key and the algorithm for a content""" - if isinstance(content, BaseContent): - content = content.to_dict() - return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS)) - @staticmethod def _metadata_fetcher_key(fetcher: MetadataFetcher) -> FetcherKey: return (fetcher.name, fetcher.version) diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -206,7 +206,6 @@ assert got_persons == expected_persons for attr_ in ( - "skipped_contents", "directories", "revisions", "releases", @@ -221,7 +220,10 @@ got_objects = sorted(getattr(dst, f"_{attr_}").items()) assert got_objects == expected_objects, f"Mismatch object list for {attr_}" - for attr_ in ("contents",): + for attr_ in ( + "contents", + "skipped_contents", + ): if exclude and attr_ in exclude: continue expected_objects = [ @@ -378,7 +380,6 @@ assert got_persons == expected_persons for attr_ in ( - "skipped_contents", "directories", "revisions", "releases", @@ -395,7 +396,10 @@ ] assert got_objects == expected_objects, f"Mismatch object list for {attr_}" - for attr_ in ("contents",): + for attr_ in ( + "contents", + "skipped_contents", + ): expected_objects = [ (id, nullify_ctime(maybe_anonymize(attr_, obj))) for id, obj in sorted(getattr(src._cql_runner, f"_{attr_}").iter_all())