Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 19 Lines | from typing import ( | ||||
Callable, | Callable, | ||||
Dict, | Dict, | ||||
Generic, | Generic, | ||||
Hashable, | Hashable, | ||||
Iterable, | Iterable, | ||||
Iterator, | Iterator, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Set, | |||||
Tuple, | Tuple, | ||||
Type, | Type, | ||||
TypeVar, | TypeVar, | ||||
Union, | Union, | ||||
) | ) | ||||
import attr | import attr | ||||
from swh.core.api.serializers import msgpack_loads, msgpack_dumps | from swh.core.api.serializers import msgpack_loads, msgpack_dumps | ||||
from swh.model.identifiers import SWHID | from swh.model.identifiers import SWHID | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
SkippedContent, | SkippedContent, | ||||
Revision, | |||||
Release, | Release, | ||||
Snapshot, | Snapshot, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
Origin, | Origin, | ||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataAuthorityType, | MetadataAuthorityType, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
MetadataTargetType, | MetadataTargetType, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Sha1Git, | Sha1Git, | ||||
) | ) | ||||
from swh.storage.cassandra import CassandraStorage | from swh.storage.cassandra import CassandraStorage | ||||
from swh.storage.cassandra.model import ( | from swh.storage.cassandra.model import ( | ||||
BaseRow, | BaseRow, | ||||
ContentRow, | ContentRow, | ||||
DirectoryRow, | DirectoryRow, | ||||
DirectoryEntryRow, | DirectoryEntryRow, | ||||
ObjectCountRow, | ObjectCountRow, | ||||
RevisionRow, | |||||
RevisionParentRow, | |||||
SkippedContentRow, | SkippedContentRow, | ||||
) | ) | ||||
from swh.storage.interface import ( | from swh.storage.interface import ( | ||||
ListOrder, | ListOrder, | ||||
PagedResult, | PagedResult, | ||||
PartialBranches, | PartialBranches, | ||||
VISIT_STATUSES, | VISIT_STATUSES, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 164 Lines • ▼ Show 20 Lines | |||||
class InMemoryCqlRunner: | class InMemoryCqlRunner: | ||||
def __init__(self): | def __init__(self): | ||||
self._contents = Table(ContentRow) | self._contents = Table(ContentRow) | ||||
self._content_indexes = defaultdict(lambda: defaultdict(set)) | self._content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self._skipped_contents = Table(ContentRow) | self._skipped_contents = Table(ContentRow) | ||||
self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self._directories = Table(DirectoryRow) | self._directories = Table(DirectoryRow) | ||||
self._directory_entries = Table(DirectoryEntryRow) | self._directory_entries = Table(DirectoryEntryRow) | ||||
self._revisions = Table(RevisionRow) | |||||
self._revision_parents = Table(RevisionParentRow) | |||||
self._stat_counters = defaultdict(int) | self._stat_counters = defaultdict(int) | ||||
def increment_counter(self, object_type: str, nb: int): | def increment_counter(self, object_type: str, nb: int): | ||||
self._stat_counters[object_type] += nb | self._stat_counters[object_type] += nb | ||||
def stat_counters(self) -> Iterable[ObjectCountRow]: | def stat_counters(self) -> Iterable[ObjectCountRow]: | ||||
for (object_type, count) in self._stat_counters.items(): | for (object_type, count) in self._stat_counters.items(): | ||||
yield ObjectCountRow(partition_key=0, object_type=object_type, count=count) | yield ObjectCountRow(partition_key=0, object_type=object_type, count=count) | ||||
Show All 38 Lines | class InMemoryCqlRunner: | ||||
# 'content_by_*' tables | # 'content_by_*' tables | ||||
########################## | ########################## | ||||
def content_missing_by_sha1_git(self, ids: List[bytes]) -> List[bytes]: | def content_missing_by_sha1_git(self, ids: List[bytes]) -> List[bytes]: | ||||
missing = [] | missing = [] | ||||
for id_ in ids: | for id_ in ids: | ||||
if id_ not in self._content_indexes["sha1_git"]: | if id_ not in self._content_indexes["sha1_git"]: | ||||
missing.append(id_) | missing.append(id_) | ||||
return missing | return missing | ||||
def content_index_add_one(self, algo: str, content: Content, token: int) -> None: | def content_index_add_one(self, algo: str, content: Content, token: int) -> None: | ||||
self._content_indexes[algo][content.get_hash(algo)].add(token) | self._content_indexes[algo][content.get_hash(algo)].add(token) | ||||
def content_get_tokens_from_single_hash( | def content_get_tokens_from_single_hash( | ||||
self, algo: str, hash_: bytes | self, algo: str, hash_: bytes | ||||
) -> Iterable[int]: | ) -> Iterable[int]: | ||||
Show All 33 Lines | class InMemoryCqlRunner: | ||||
# 'directory' table | # 'directory' table | ||||
########################## | ########################## | ||||
def directory_missing(self, ids: List[bytes]) -> List[bytes]: | def directory_missing(self, ids: List[bytes]) -> List[bytes]: | ||||
missing = [] | missing = [] | ||||
for id_ in ids: | for id_ in ids: | ||||
if self._directories.get_from_primary_key((id_,)) is None: | if self._directories.get_from_primary_key((id_,)) is None: | ||||
missing.append(id_) | missing.append(id_) | ||||
return missing | return missing | ||||
def directory_add_one(self, directory: DirectoryRow) -> None: | def directory_add_one(self, directory: DirectoryRow) -> None: | ||||
self._directories.insert(directory) | self._directories.insert(directory) | ||||
self.increment_counter("directory", 1) | self.increment_counter("directory", 1) | ||||
def directory_get_random(self) -> Optional[DirectoryRow]: | def directory_get_random(self) -> Optional[DirectoryRow]: | ||||
return self._directories.get_random() | return self._directories.get_random() | ||||
Show All 10 Lines | class InMemoryCqlRunner: | ||||
) -> Iterable[DirectoryEntryRow]: | ) -> Iterable[DirectoryEntryRow]: | ||||
for id_ in directory_ids: | for id_ in directory_ids: | ||||
yield from self._directory_entries.get_from_partition_key((id_,)) | yield from self._directory_entries.get_from_partition_key((id_,)) | ||||
########################## | ########################## | ||||
# 'revision' table | # 'revision' table | ||||
########################## | ########################## | ||||
def revision_missing(self, ids: List[bytes]) -> List[bytes]: | def revision_missing(self, ids: List[bytes]) -> Iterable[bytes]: | ||||
return ids | missing = [] | ||||
for id_ in ids: | |||||
if self._revisions.get_from_primary_key((id_,)) is None: | |||||
missing.append(id_) | |||||
return missing | |||||
def revision_add_one(self, revision: RevisionRow) -> None: | |||||
self._revisions.insert(revision) | |||||
self.increment_counter("revision", 1) | |||||
def revision_get_ids(self, revision_ids) -> Iterable[int]: | |||||
for id_ in revision_ids: | |||||
if self._revisions.get_from_primary_key((id_,)) is not None: | |||||
yield id_ | |||||
def revision_get(self, revision_ids: List[Sha1Git]) -> Iterable[RevisionRow]: | |||||
for id_ in revision_ids: | |||||
row = self._revisions.get_from_primary_key((id_,)) | |||||
if row: | |||||
yield row | |||||
def revision_get_random(self) -> Optional[RevisionRow]: | |||||
return self._revisions.get_random() | |||||
########################## | |||||
# 'revision_parent' table | |||||
########################## | |||||
def revision_parent_add_one(self, revision_parent: RevisionParentRow) -> None: | |||||
self._revision_parents.insert(revision_parent) | |||||
def revision_parent_get(self, revision_id: Sha1Git) -> Iterable[bytes]: | |||||
for parent in self._revision_parents.get_from_partition_key((revision_id,)): | |||||
yield parent.parent_id | |||||
########################## | ########################## | ||||
# 'release' table | # 'release' table | ||||
########################## | ########################## | ||||
def release_missing(self, ids: List[bytes]) -> List[bytes]: | def release_missing(self, ids: List[bytes]) -> List[bytes]: | ||||
return ids | return ids | ||||
class InMemoryStorage(CassandraStorage): | class InMemoryStorage(CassandraStorage): | ||||
_cql_runner: InMemoryCqlRunner # type: ignore | _cql_runner: InMemoryCqlRunner # type: ignore | ||||
def __init__(self, journal_writer=None): | def __init__(self, journal_writer=None): | ||||
self.reset() | self.reset() | ||||
self.journal_writer = JournalWriter(journal_writer) | self.journal_writer = JournalWriter(journal_writer) | ||||
def reset(self): | def reset(self): | ||||
self._cql_runner = InMemoryCqlRunner() | self._cql_runner = InMemoryCqlRunner() | ||||
self._revisions = {} | |||||
self._releases = {} | self._releases = {} | ||||
self._snapshots = {} | self._snapshots = {} | ||||
self._origins = {} | self._origins = {} | ||||
self._origins_by_sha1 = {} | self._origins_by_sha1 = {} | ||||
self._origin_visits = {} | self._origin_visits = {} | ||||
self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | ||||
self._persons = {} | self._persons = {} | ||||
Show All 27 Lines | def reset(self): | ||||
self._objects = defaultdict(list) | self._objects = defaultdict(list) | ||||
self._sorted_sha1s = SortedList[bytes, bytes]() | self._sorted_sha1s = SortedList[bytes, bytes]() | ||||
self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | ||||
def check_config(self, *, check_write: bool) -> bool: | def check_config(self, *, check_write: bool) -> bool: | ||||
return True | return True | ||||
def revision_add(self, revisions: List[Revision]) -> Dict: | |||||
revisions = [rev for rev in revisions if rev.id not in self._revisions] | |||||
self.journal_writer.revision_add(revisions) | |||||
count = 0 | |||||
for revision in revisions: | |||||
revision = attr.evolve( | |||||
revision, | |||||
committer=self._person_add(revision.committer), | |||||
author=self._person_add(revision.author), | |||||
) | |||||
self._revisions[revision.id] = revision | |||||
self._objects[revision.id].append(("revision", revision.id)) | |||||
count += 1 | |||||
self._cql_runner.increment_counter("revision", len(revisions)) | |||||
return {"revision:add": count} | |||||
def revision_missing(self, revisions: List[Sha1Git]) -> Iterable[Sha1Git]: | |||||
for id in revisions: | |||||
if id not in self._revisions: | |||||
yield id | |||||
def revision_get( | |||||
self, revisions: List[Sha1Git] | |||||
) -> Iterable[Optional[Dict[str, Any]]]: | |||||
for id in revisions: | |||||
if id in self._revisions: | |||||
yield self._revisions.get(id).to_dict() | |||||
else: | |||||
yield None | |||||
def __get_parent_revs( | |||||
self, rev_id: Sha1Git, seen: Set[Sha1Git], limit: Optional[int] | |||||
) -> Iterable[Dict[str, Any]]: | |||||
if limit and len(seen) >= limit: | |||||
return | |||||
if rev_id in seen or rev_id not in self._revisions: | |||||
return | |||||
seen.add(rev_id) | |||||
yield self._revisions[rev_id].to_dict() | |||||
for parent in self._revisions[rev_id].parents: | |||||
yield from self.__get_parent_revs(parent, seen, limit) | |||||
def revision_log( | |||||
self, revisions: List[Sha1Git], limit: Optional[int] = None | |||||
) -> Iterable[Optional[Dict[str, Any]]]: | |||||
seen: Set[Sha1Git] = set() | |||||
for rev_id in revisions: | |||||
yield from self.__get_parent_revs(rev_id, seen, limit) | |||||
def revision_shortlog( | |||||
self, revisions: List[Sha1Git], limit: Optional[int] = None | |||||
) -> Iterable[Optional[Tuple[Sha1Git, Tuple[Sha1Git, ...]]]]: | |||||
yield from ( | |||||
(rev["id"], rev["parents"]) if rev else None | |||||
for rev in self.revision_log(revisions, limit) | |||||
) | |||||
def revision_get_random(self) -> Sha1Git: | |||||
return random.choice(list(self._revisions)) | |||||
def release_add(self, releases: List[Release]) -> Dict: | def release_add(self, releases: List[Release]) -> Dict: | ||||
to_add = [] | to_add = [] | ||||
for rel in releases: | for rel in releases: | ||||
if rel.id not in self._releases and rel not in to_add: | if rel.id not in self._releases and rel not in to_add: | ||||
to_add.append(rel) | to_add.append(rel) | ||||
self.journal_writer.release_add(to_add) | self.journal_writer.release_add(to_add) | ||||
for rel in to_add: | for rel in to_add: | ||||
▲ Show 20 Lines • Show All 709 Lines • Show Last 20 Lines |