Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 31 Lines | from typing import ( | ||||
Union, | Union, | ||||
) | ) | ||||
import attr | import attr | ||||
from swh.core.api.serializers import msgpack_loads, msgpack_dumps | from swh.core.api.serializers import msgpack_loads, msgpack_dumps | ||||
from swh.model.identifiers import SWHID | from swh.model.identifiers import SWHID | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | |||||
Content, | Content, | ||||
SkippedContent, | SkippedContent, | ||||
Directory, | Directory, | ||||
Revision, | Revision, | ||||
Release, | Release, | ||||
Snapshot, | Snapshot, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
Origin, | Origin, | ||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataAuthorityType, | MetadataAuthorityType, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
MetadataTargetType, | MetadataTargetType, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Sha1Git, | Sha1Git, | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | |||||
from swh.storage.cassandra import CassandraStorage | from swh.storage.cassandra import CassandraStorage | ||||
from swh.storage.cassandra.model import ( | from swh.storage.cassandra.model import ( | ||||
BaseRow, | BaseRow, | ||||
ContentRow, | ContentRow, | ||||
ObjectCountRow, | ObjectCountRow, | ||||
SkippedContentRow, | |||||
) | ) | ||||
from swh.storage.interface import ( | from swh.storage.interface import ( | ||||
ListOrder, | ListOrder, | ||||
PagedResult, | PagedResult, | ||||
PartialBranches, | PartialBranches, | ||||
VISIT_STATUSES, | VISIT_STATUSES, | ||||
) | ) | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
▲ Show 20 Lines • Show All 149 Lines • ▼ Show 20 Lines | def iter_all(self) -> Iterator[Tuple[Tuple, TRow]]: | ||||
for (clustering_key, row) in partition.items() | for (clustering_key, row) in partition.items() | ||||
) | ) | ||||
class InMemoryCqlRunner: | class InMemoryCqlRunner: | ||||
def __init__(self): | def __init__(self): | ||||
self._contents = Table(ContentRow) | self._contents = Table(ContentRow) | ||||
self._content_indexes = defaultdict(lambda: defaultdict(set)) | self._content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self._skipped_contents = Table(ContentRow) | |||||
self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | |||||
self._stat_counters = defaultdict(int) | self._stat_counters = defaultdict(int) | ||||
def increment_counter(self, object_type: str, nb: int): | def increment_counter(self, object_type: str, nb: int): | ||||
self._stat_counters[object_type] += nb | self._stat_counters[object_type] += nb | ||||
def stat_counters(self) -> Iterable[ObjectCountRow]: | def stat_counters(self) -> Iterable[ObjectCountRow]: | ||||
for (object_type, count) in self._stat_counters.items(): | for (object_type, count) in self._stat_counters.items(): | ||||
yield ObjectCountRow(partition_key=0, object_type=object_type, count=count) | yield ObjectCountRow(partition_key=0, object_type=object_type, count=count) | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | def content_index_add_one(self, algo: str, content: Content, token: int) -> None: | ||||
self._content_indexes[algo][content.get_hash(algo)].add(token) | self._content_indexes[algo][content.get_hash(algo)].add(token) | ||||
def content_get_tokens_from_single_hash( | def content_get_tokens_from_single_hash( | ||||
self, algo: str, hash_: bytes | self, algo: str, hash_: bytes | ||||
) -> Iterable[int]: | ) -> Iterable[int]: | ||||
return self._content_indexes[algo][hash_] | return self._content_indexes[algo][hash_] | ||||
########################## | ########################## | ||||
# 'skipped_content' table | |||||
########################## | |||||
def _skipped_content_add_finalize(self, content: SkippedContentRow) -> None: | |||||
self._skipped_contents.insert(content) | |||||
self.increment_counter("skipped_content", 1) | |||||
def skipped_content_add_prepare(self, content: SkippedContentRow): | |||||
finalizer = functools.partial(self._skipped_content_add_finalize, content) | |||||
return ( | |||||
self._skipped_contents.token(self._contents.partition_key(content)), | |||||
finalizer, | |||||
) | |||||
def skipped_content_get_from_pk( | |||||
self, content_hashes: Dict[str, bytes] | |||||
) -> Optional[SkippedContentRow]: | |||||
primary_key = self._skipped_contents.primary_key_from_dict(content_hashes) | |||||
return self._skipped_contents.get_from_primary_key(primary_key) | |||||
########################## | |||||
# 'skipped_content_by_*' tables | |||||
########################## | |||||
def skipped_content_index_add_one( | |||||
self, algo: str, content: SkippedContent, token: int | |||||
) -> None: | |||||
self._skipped_content_indexes[algo][content.get_hash(algo)].add(token) | |||||
########################## | |||||
# 'directory' table | # 'directory' table | ||||
########################## | ########################## | ||||
def directory_missing(self, ids: List[bytes]) -> List[bytes]: | def directory_missing(self, ids: List[bytes]) -> List[bytes]: | ||||
return ids | return ids | ||||
########################## | ########################## | ||||
# 'revision' table | # 'revision' table | ||||
Show All 14 Lines | class InMemoryStorage(CassandraStorage): | ||||
_cql_runner: InMemoryCqlRunner # type: ignore | _cql_runner: InMemoryCqlRunner # type: ignore | ||||
def __init__(self, journal_writer=None): | def __init__(self, journal_writer=None): | ||||
self.reset() | self.reset() | ||||
self.journal_writer = JournalWriter(journal_writer) | self.journal_writer = JournalWriter(journal_writer) | ||||
def reset(self): | def reset(self): | ||||
self._cql_runner = InMemoryCqlRunner() | self._cql_runner = InMemoryCqlRunner() | ||||
self._skipped_contents = {} | |||||
self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | |||||
self._directories = {} | self._directories = {} | ||||
self._revisions = {} | self._revisions = {} | ||||
self._releases = {} | self._releases = {} | ||||
self._snapshots = {} | self._snapshots = {} | ||||
self._origins = {} | self._origins = {} | ||||
self._origins_by_sha1 = {} | self._origins_by_sha1 = {} | ||||
self._origin_visits = {} | self._origin_visits = {} | ||||
self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | ||||
Show All 29 Lines | def reset(self): | ||||
self._objects = defaultdict(list) | self._objects = defaultdict(list) | ||||
self._sorted_sha1s = SortedList[bytes, bytes]() | self._sorted_sha1s = SortedList[bytes, bytes]() | ||||
self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | ||||
def check_config(self, *, check_write: bool) -> bool: | def check_config(self, *, check_write: bool) -> bool: | ||||
return True | return True | ||||
def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict: | |||||
self.journal_writer.skipped_content_add(contents) | |||||
summary = {"skipped_content:add": 0} | |||||
missing_contents = self.skipped_content_missing([c.hashes() for c in contents]) | |||||
missing = {self._content_key(c) for c in missing_contents} | |||||
contents = [c for c in contents if self._content_key(c) in missing] | |||||
for content in contents: | |||||
key = self._content_key(content) | |||||
for algo in DEFAULT_ALGORITHMS: | |||||
if content.get_hash(algo): | |||||
self._skipped_content_indexes[algo][content.get_hash(algo)].add(key) | |||||
self._skipped_contents[key] = content | |||||
summary["skipped_content:add"] += 1 | |||||
self._cql_runner.increment_counter("skipped_content", len(contents)) | |||||
return summary | |||||
def skipped_content_add(self, content: List[SkippedContent]) -> Dict: | |||||
content = [attr.evolve(c, ctime=now()) for c in content] | |||||
return self._skipped_content_add(content) | |||||
def skipped_content_missing( | |||||
self, contents: List[Dict[str, Any]] | |||||
) -> Iterable[Dict[str, Any]]: | |||||
for content in contents: | |||||
matches = list(self._skipped_contents.values()) | |||||
for (algorithm, key) in self._content_key(content): | |||||
if algorithm == "blake2s256": | |||||
continue | |||||
# Filter out skipped contents with the same hash | |||||
matches = [ | |||||
match for match in matches if match.get_hash(algorithm) == key | |||||
] | |||||
# if none of the contents match | |||||
if not matches: | |||||
yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} | |||||
def directory_add(self, directories: List[Directory]) -> Dict: | def directory_add(self, directories: List[Directory]) -> Dict: | ||||
directories = [dir_ for dir_ in directories if dir_.id not in self._directories] | directories = [dir_ for dir_ in directories if dir_.id not in self._directories] | ||||
self.journal_writer.directory_add(directories) | self.journal_writer.directory_add(directories) | ||||
count = 0 | count = 0 | ||||
for directory in directories: | for directory in directories: | ||||
count += 1 | count += 1 | ||||
self._directories[directory.id] = directory | self._directories[directory.id] = directory | ||||
▲ Show 20 Lines • Show All 814 Lines • ▼ Show 20 Lines | def _person_add(self, person): | ||||
key = ("person", person.fullname) | key = ("person", person.fullname) | ||||
if key not in self._objects: | if key not in self._objects: | ||||
self._persons[person.fullname] = person | self._persons[person.fullname] = person | ||||
self._objects[key].append(key) | self._objects[key].append(key) | ||||
return self._persons[person.fullname] | return self._persons[person.fullname] | ||||
@staticmethod | @staticmethod | ||||
def _content_key(content): | |||||
""" A stable key and the algorithm for a content""" | |||||
if isinstance(content, BaseContent): | |||||
content = content.to_dict() | |||||
return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS)) | |||||
@staticmethod | |||||
def _metadata_fetcher_key(fetcher: MetadataFetcher) -> FetcherKey: | def _metadata_fetcher_key(fetcher: MetadataFetcher) -> FetcherKey: | ||||
return (fetcher.name, fetcher.version) | return (fetcher.name, fetcher.version) | ||||
@staticmethod | @staticmethod | ||||
def _metadata_authority_key(authority: MetadataAuthority) -> Hashable: | def _metadata_authority_key(authority: MetadataAuthority) -> Hashable: | ||||
return (authority.type, authority.url) | return (authority.type, authority.url) | ||||
def diff_directories(self, from_dir, to_dir, track_renaming=False): | def diff_directories(self, from_dir, to_dir, track_renaming=False): | ||||
Show All 16 Lines |