Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 22 Lines | |||||
from swh.model.model import Content, Sha1Git, SkippedContent | from swh.model.model import Content, Sha1Git, SkippedContent | ||||
from swh.storage.cassandra import CassandraStorage | from swh.storage.cassandra import CassandraStorage | ||||
from swh.storage.cassandra.model import ( | from swh.storage.cassandra.model import ( | ||||
BaseRow, | BaseRow, | ||||
ContentRow, | ContentRow, | ||||
DirectoryEntryRow, | DirectoryEntryRow, | ||||
DirectoryRow, | DirectoryRow, | ||||
ExtIDRow, | |||||
MetadataAuthorityRow, | MetadataAuthorityRow, | ||||
MetadataFetcherRow, | MetadataFetcherRow, | ||||
ObjectCountRow, | ObjectCountRow, | ||||
OriginRow, | OriginRow, | ||||
OriginVisitRow, | OriginVisitRow, | ||||
OriginVisitStatusRow, | OriginVisitStatusRow, | ||||
RawExtrinsicMetadataRow, | RawExtrinsicMetadataRow, | ||||
ReleaseRow, | ReleaseRow, | ||||
▲ Show 20 Lines • Show All 120 Lines • ▼ Show 20 Lines | def __init__(self): | ||||
self._snapshots = Table(SnapshotRow) | self._snapshots = Table(SnapshotRow) | ||||
self._snapshot_branches = Table(SnapshotBranchRow) | self._snapshot_branches = Table(SnapshotBranchRow) | ||||
self._origins = Table(OriginRow) | self._origins = Table(OriginRow) | ||||
self._origin_visits = Table(OriginVisitRow) | self._origin_visits = Table(OriginVisitRow) | ||||
self._origin_visit_statuses = Table(OriginVisitStatusRow) | self._origin_visit_statuses = Table(OriginVisitStatusRow) | ||||
self._metadata_authorities = Table(MetadataAuthorityRow) | self._metadata_authorities = Table(MetadataAuthorityRow) | ||||
self._metadata_fetchers = Table(MetadataFetcherRow) | self._metadata_fetchers = Table(MetadataFetcherRow) | ||||
self._raw_extrinsic_metadata = Table(RawExtrinsicMetadataRow) | self._raw_extrinsic_metadata = Table(RawExtrinsicMetadataRow) | ||||
self._extid = Table(ExtIDRow) | |||||
self._extid_index = {} | |||||
self._stat_counters = defaultdict(int) | self._stat_counters = defaultdict(int) | ||||
def increment_counter(self, object_type: str, nb: int): | def increment_counter(self, object_type: str, nb: int): | ||||
self._stat_counters[object_type] += nb | self._stat_counters[object_type] += nb | ||||
def stat_counters(self) -> Iterable[ObjectCountRow]: | def stat_counters(self) -> Iterable[ObjectCountRow]: | ||||
for (object_type, count) in self._stat_counters.items(): | for (object_type, count) in self._stat_counters.items(): | ||||
yield ObjectCountRow(partition_key=0, object_type=object_type, count=count) | yield ObjectCountRow(partition_key=0, object_type=object_type, count=count) | ||||
▲ Show 20 Lines • Show All 432 Lines • ▼ Show 20 Lines | class InMemoryCqlRunner: | ||||
) -> Iterable[RawExtrinsicMetadataRow]: | ) -> Iterable[RawExtrinsicMetadataRow]: | ||||
metadata = self._raw_extrinsic_metadata.get_from_partition_key((target,)) | metadata = self._raw_extrinsic_metadata.get_from_partition_key((target,)) | ||||
return ( | return ( | ||||
m | m | ||||
for m in metadata | for m in metadata | ||||
if m.authority_type == authority_type and m.authority_url == authority_url | if m.authority_type == authority_type and m.authority_url == authority_url | ||||
) | ) | ||||
######################### | |||||
# 'extid' table | |||||
######################### | |||||
def _extid_add_finalize(self, extid: ExtIDRow) -> None: | |||||
self._extid.insert(extid) | |||||
self.increment_counter("extid", 1) | |||||
def extid_add_prepare(self, extid: ExtIDRow): | |||||
finalizer = functools.partial(self._extid_add_finalize, extid) | |||||
return (self._extid.token(self._extid.partition_key(extid)), finalizer) | |||||
def extid_extid_index_add_one(self, extid: ExtIDRow, token: int) -> None: | |||||
self._extid_index[("extid", extid.extid_type, extid.extid)] = token | |||||
def extid_target_index_add_one(self, extid: ExtIDRow, token: int) -> None: | |||||
self._extid_index[("target", extid.target_type, extid.target)] = token | |||||
def extid_get_from_pk( | |||||
self, extid_type, extid, target_type, target, | |||||
) -> Optional[ExtIDRow]: | |||||
primary_key = self._extid.primary_key_from_dict( | |||||
dict( | |||||
extid_type=extid_type, | |||||
extid=extid, | |||||
target_type=target_type, | |||||
target=target, | |||||
) | |||||
) | |||||
return self._extid.get_from_primary_key(primary_key) | |||||
def extid_get_from_token(self, token: int) -> Iterable[ExtIDRow]: | |||||
return self._extid.get_from_token(token) | |||||
def extid_get_tokens_from_extid(self, extid_type: str, extid: bytes): | |||||
return [self._extid_index.get(("extid", extid_type, extid))] | |||||
def extid_get_tokens_from_target(self, target_type: str, target: bytes): | |||||
return [self._extid_index.get(("target", target_type, target))] | |||||
class InMemoryStorage(CassandraStorage): | class InMemoryStorage(CassandraStorage): | ||||
_cql_runner: InMemoryCqlRunner # type: ignore | _cql_runner: InMemoryCqlRunner # type: ignore | ||||
def __init__(self, journal_writer=None): | def __init__(self, journal_writer=None): | ||||
self.reset() | self.reset() | ||||
self.journal_writer = JournalWriter(journal_writer) | self.journal_writer = JournalWriter(journal_writer) | ||||
def reset(self): | def reset(self): | ||||
self._cql_runner = InMemoryCqlRunner() | self._cql_runner = InMemoryCqlRunner() | ||||
self.objstorage = ObjStorage({"cls": "memory"}) | self.objstorage = ObjStorage({"cls": "memory"}) | ||||
def check_config(self, *, check_write: bool) -> bool: | def check_config(self, *, check_write: bool) -> bool: | ||||
return True | return True |