Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 20 Lines | from typing import ( | ||||
Generic, | Generic, | ||||
Hashable, | Hashable, | ||||
Iterable, | Iterable, | ||||
Iterator, | Iterator, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Set, | Set, | ||||
Tuple, | Tuple, | ||||
Type, | |||||
TypeVar, | TypeVar, | ||||
Union, | Union, | ||||
) | ) | ||||
import attr | import attr | ||||
from swh.core.api.serializers import msgpack_loads, msgpack_dumps | from swh.core.api.serializers import msgpack_loads, msgpack_dumps | ||||
from swh.model.identifiers import SWHID | from swh.model.identifiers import SWHID | ||||
Show All 13 Lines | from swh.model.model import ( | ||||
MetadataAuthorityType, | MetadataAuthorityType, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
MetadataTargetType, | MetadataTargetType, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Sha1, | Sha1, | ||||
Sha1Git, | Sha1Git, | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.storage.cassandra.model import BaseRow | |||||
from swh.storage.interface import ( | from swh.storage.interface import ( | ||||
ListOrder, | ListOrder, | ||||
PagedResult, | PagedResult, | ||||
PartialBranches, | PartialBranches, | ||||
VISIT_STATUSES, | VISIT_STATUSES, | ||||
) | ) | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | def iter_after(self, start_key: Any) -> Iterator[SortedListItem]: | ||||
for item in it: | for item in it: | ||||
if self.key(item) > start_key: # type: ignore | if self.key(item) > start_key: # type: ignore | ||||
yield item | yield item | ||||
break | break | ||||
yield from it | yield from it | ||||
TRow = TypeVar("TRow", bound=BaseRow) | |||||
class Table(Generic[TRow]): | |||||
def __init__(self, row_class: Type[TRow]): | |||||
self.row_class = row_class | |||||
self.primary_key_cols = row_class.PARTITION_KEY + row_class.CLUSTERING_KEY | |||||
# Map from tokens to clustering keys to rows | |||||
# These are not actually partitions (or rather, there is one partition | |||||
# for each token) and they aren't sorted. | |||||
# But it is good enough if we don't care about performance; | |||||
# and makes the code a lot simpler. | |||||
self.data: Dict[int, Dict[Tuple, TRow]] = defaultdict(dict) | |||||
def __repr__(self): | |||||
return f"<__module__.Table[{self.row_class.__name__}] object>" | |||||
def partition_key(self, row: Union[TRow, Dict[str, Any]]) -> Tuple: | |||||
"""Returns the partition key of a row (ie. the cells which get hashed | |||||
into the token.""" | |||||
if isinstance(row, dict): | |||||
row_d = row | |||||
else: | |||||
row_d = row.to_dict() | |||||
return tuple(row_d[col] for col in self.row_class.PARTITION_KEY) | |||||
def clustering_key(self, row: Union[TRow, Dict[str, Any]]) -> Tuple: | |||||
"""Returns the clustering key of a row (ie. the cells which are used | |||||
for sorting rows within a partition.""" | |||||
if isinstance(row, dict): | |||||
row_d = row | |||||
else: | |||||
row_d = row.to_dict() | |||||
return tuple(row_d[col] for col in self.row_class.CLUSTERING_KEY) | |||||
def primary_key(self, row): | |||||
return self.partition_key(row) + self.clustering_key(row) | |||||
def primary_key_from_dict(self, d: Dict[str, Any]) -> Tuple: | |||||
"""Returns the primary key (ie. concatenation of partition key and | |||||
clustering key) of the given dictionary interpreted as a row.""" | |||||
return tuple(d[col] for col in self.primary_key_cols) | |||||
def token(self, key: Tuple): | |||||
anlambert: of a row, in a row ? | |||||
"""Returns the token of a row (ie. the hash of its partition key).""" | |||||
return hash(key) | |||||
def get_partition(self, token: int) -> Dict[Tuple, TRow]: | |||||
"""Returns the partition that contains this token.""" | |||||
return self.data[token] | |||||
def insert(self, row: TRow): | |||||
partition = self.data[self.token(self.partition_key(row))] | |||||
partition[self.clustering_key(row)] = row | |||||
def split_primary_key(self, key: Tuple) -> Tuple[Tuple, Tuple]: | |||||
"""Returns (partition_key, clustering_key) from a partition key""" | |||||
assert len(key) == len(self.primary_key_cols) | |||||
partition_key = key[0 : len(self.row_class.PARTITION_KEY)] | |||||
clustering_key = key[len(self.row_class.PARTITION_KEY) :] | |||||
return (partition_key, clustering_key) | |||||
def get_from_primary_key(self, primary_key: Tuple) -> Optional[TRow]: | |||||
"""Returns at most one row, from its primary key.""" | |||||
(partition_key, clustering_key) = self.split_primary_key(primary_key) | |||||
token = self.token(partition_key) | |||||
partition = self.get_partition(token) | |||||
return partition.get(clustering_key) | |||||
def get_from_token(self, token: int) -> Iterable[TRow]: | |||||
"""Returns all rows whose token (ie. non-cryptographic hash of the | |||||
partition key) is the one passed as argument.""" | |||||
return (v for (k, v) in sorted(self.get_partition(token).items())) | |||||
def iter_all(self) -> Iterator[Tuple[Tuple, TRow]]: | |||||
return ( | |||||
(self.primary_key(row), row) | |||||
for (token, partition) in self.data.items() | |||||
for (clustering_key, row) in partition.items() | |||||
) | |||||
class InMemoryStorage: | class InMemoryStorage: | ||||
def __init__(self, journal_writer=None): | def __init__(self, journal_writer=None): | ||||
self.reset() | self.reset() | ||||
self.journal_writer = JournalWriter(journal_writer) | self.journal_writer = JournalWriter(journal_writer) | ||||
def reset(self): | def reset(self): | ||||
self._contents = {} | self._contents = {} | ||||
self._content_indexes = defaultdict(lambda: defaultdict(set)) | self._content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self._skipped_contents = {} | self._skipped_contents = {} | ||||
self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
▲ Show 20 Lines • Show All 1,151 Lines • Show Last 20 Lines |
of a row, in a row ?