Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show First 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | from swh.model.model import ( | ||||
MetadataTargetType, | MetadataTargetType, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Sha1, | Sha1, | ||||
Sha1Git, | Sha1Git, | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.storage.cassandra import CassandraStorage | from swh.storage.cassandra import CassandraStorage | ||||
from swh.storage.cassandra.model import BaseRow | from swh.storage.cassandra.model import BaseRow, ObjectCountRow | ||||
from swh.storage.interface import ( | from swh.storage.interface import ( | ||||
ListOrder, | ListOrder, | ||||
PagedResult, | PagedResult, | ||||
PartialBranches, | PartialBranches, | ||||
VISIT_STATUSES, | VISIT_STATUSES, | ||||
) | ) | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
▲ Show 20 Lines • Show All 145 Lines • ▼ Show 20 Lines | class Table(Generic[TRow]): | ||||
def iter_all(self) -> Iterator[Tuple[Tuple, TRow]]: | def iter_all(self) -> Iterator[Tuple[Tuple, TRow]]: | ||||
return ( | return ( | ||||
(self.primary_key(row), row) | (self.primary_key(row), row) | ||||
for (token, partition) in self.data.items() | for (token, partition) in self.data.items() | ||||
for (clustering_key, row) in partition.items() | for (clustering_key, row) in partition.items() | ||||
) | ) | ||||
class InMemoryCqlRunner: | |||||
def __init__(self): | |||||
self._stat_counters = defaultdict(int) | |||||
def increment_counter(self, object_type: str, nb: int): | |||||
self._stat_counters[object_type] += nb | |||||
def stat_counters(self) -> Iterable[ObjectCountRow]: | |||||
for (object_type, count) in self._stat_counters.items(): | |||||
yield ObjectCountRow(partition_key=0, object_type=object_type, count=count) | |||||
class InMemoryStorage(CassandraStorage): | class InMemoryStorage(CassandraStorage): | ||||
_cql_runner: InMemoryCqlRunner # type: ignore | |||||
def __init__(self, journal_writer=None): | def __init__(self, journal_writer=None): | ||||
self.reset() | self.reset() | ||||
self.journal_writer = JournalWriter(journal_writer) | self.journal_writer = JournalWriter(journal_writer) | ||||
def reset(self): | def reset(self): | ||||
self._cql_runner = InMemoryCqlRunner() | |||||
self._contents = {} | self._contents = {} | ||||
self._content_indexes = defaultdict(lambda: defaultdict(set)) | self._content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self._skipped_contents = {} | self._skipped_contents = {} | ||||
self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self._directories = {} | self._directories = {} | ||||
self._revisions = {} | self._revisions = {} | ||||
self._releases = {} | self._releases = {} | ||||
self._snapshots = {} | self._snapshots = {} | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | ||||
hash_ = content.get_hash(algorithm) | hash_ = content.get_hash(algorithm) | ||||
self._content_indexes[algorithm][hash_].add(key) | self._content_indexes[algorithm][hash_].add(key) | ||||
self._objects[content.sha1_git].append(("content", content.sha1)) | self._objects[content.sha1_git].append(("content", content.sha1)) | ||||
self._contents[key] = content | self._contents[key] = content | ||||
self._sorted_sha1s.add(content.sha1) | self._sorted_sha1s.add(content.sha1) | ||||
self._contents[key] = attr.evolve(self._contents[key], data=None) | self._contents[key] = attr.evolve(self._contents[key], data=None) | ||||
content_add += 1 | content_add += 1 | ||||
self._cql_runner.increment_counter("content", content_add) | |||||
summary = { | summary = { | ||||
"content:add": content_add, | "content:add": content_add, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary["content:add:bytes"] = content_add_bytes | summary["content:add:bytes"] = content_add_bytes | ||||
return summary | return summary | ||||
▲ Show 20 Lines • Show All 140 Lines • ▼ Show 20 Lines | def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict: | ||||
for content in contents: | for content in contents: | ||||
key = self._content_key(content) | key = self._content_key(content) | ||||
for algo in DEFAULT_ALGORITHMS: | for algo in DEFAULT_ALGORITHMS: | ||||
if content.get_hash(algo): | if content.get_hash(algo): | ||||
self._skipped_content_indexes[algo][content.get_hash(algo)].add(key) | self._skipped_content_indexes[algo][content.get_hash(algo)].add(key) | ||||
self._skipped_contents[key] = content | self._skipped_contents[key] = content | ||||
summary["skipped_content:add"] += 1 | summary["skipped_content:add"] += 1 | ||||
self._cql_runner.increment_counter("skipped_content", len(contents)) | |||||
return summary | return summary | ||||
def skipped_content_add(self, content: List[SkippedContent]) -> Dict: | def skipped_content_add(self, content: List[SkippedContent]) -> Dict: | ||||
content = [attr.evolve(c, ctime=now()) for c in content] | content = [attr.evolve(c, ctime=now()) for c in content] | ||||
return self._skipped_content_add(content) | return self._skipped_content_add(content) | ||||
def skipped_content_missing( | def skipped_content_missing( | ||||
self, contents: List[Dict[str, Any]] | self, contents: List[Dict[str, Any]] | ||||
Show All 16 Lines | def directory_add(self, directories: List[Directory]) -> Dict: | ||||
self.journal_writer.directory_add(directories) | self.journal_writer.directory_add(directories) | ||||
count = 0 | count = 0 | ||||
for directory in directories: | for directory in directories: | ||||
count += 1 | count += 1 | ||||
self._directories[directory.id] = directory | self._directories[directory.id] = directory | ||||
self._objects[directory.id].append(("directory", directory.id)) | self._objects[directory.id].append(("directory", directory.id)) | ||||
self._cql_runner.increment_counter("directory", len(directories)) | |||||
return {"directory:add": count} | return {"directory:add": count} | ||||
def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]: | def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]: | ||||
for id in directories: | for id in directories: | ||||
if id not in self._directories: | if id not in self._directories: | ||||
yield id | yield id | ||||
def _directory_ls(self, directory_id, recursive, prefix=b""): | def _directory_ls(self, directory_id, recursive, prefix=b""): | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def revision_add(self, revisions: List[Revision]) -> Dict: | ||||
revision, | revision, | ||||
committer=self._person_add(revision.committer), | committer=self._person_add(revision.committer), | ||||
author=self._person_add(revision.author), | author=self._person_add(revision.author), | ||||
) | ) | ||||
self._revisions[revision.id] = revision | self._revisions[revision.id] = revision | ||||
self._objects[revision.id].append(("revision", revision.id)) | self._objects[revision.id].append(("revision", revision.id)) | ||||
count += 1 | count += 1 | ||||
self._cql_runner.increment_counter("revision", len(revisions)) | |||||
return {"revision:add": count} | return {"revision:add": count} | ||||
def revision_missing(self, revisions: List[Sha1Git]) -> Iterable[Sha1Git]: | def revision_missing(self, revisions: List[Sha1Git]) -> Iterable[Sha1Git]: | ||||
for id in revisions: | for id in revisions: | ||||
if id not in self._revisions: | if id not in self._revisions: | ||||
yield id | yield id | ||||
def revision_get( | def revision_get( | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def release_add(self, releases: List[Release]) -> Dict: | ||||
self.journal_writer.release_add(to_add) | self.journal_writer.release_add(to_add) | ||||
for rel in to_add: | for rel in to_add: | ||||
if rel.author: | if rel.author: | ||||
self._person_add(rel.author) | self._person_add(rel.author) | ||||
self._objects[rel.id].append(("release", rel.id)) | self._objects[rel.id].append(("release", rel.id)) | ||||
self._releases[rel.id] = rel | self._releases[rel.id] = rel | ||||
self._cql_runner.increment_counter("release", len(to_add)) | |||||
return {"release:add": len(to_add)} | return {"release:add": len(to_add)} | ||||
def release_missing(self, releases: List[Sha1Git]) -> Iterable[Sha1Git]: | def release_missing(self, releases: List[Sha1Git]) -> Iterable[Sha1Git]: | ||||
yield from (rel for rel in releases if rel not in self._releases) | yield from (rel for rel in releases if rel not in self._releases) | ||||
def release_get( | def release_get( | ||||
self, releases: List[Sha1Git] | self, releases: List[Sha1Git] | ||||
) -> Iterable[Optional[Dict[str, Any]]]: | ) -> Iterable[Optional[Dict[str, Any]]]: | ||||
Show All 10 Lines | def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: | ||||
count = 0 | count = 0 | ||||
snapshots = [snap for snap in snapshots if snap.id not in self._snapshots] | snapshots = [snap for snap in snapshots if snap.id not in self._snapshots] | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
self.journal_writer.snapshot_add([snapshot]) | self.journal_writer.snapshot_add([snapshot]) | ||||
self._snapshots[snapshot.id] = snapshot | self._snapshots[snapshot.id] = snapshot | ||||
self._objects[snapshot.id].append(("snapshot", snapshot.id)) | self._objects[snapshot.id].append(("snapshot", snapshot.id)) | ||||
count += 1 | count += 1 | ||||
self._cql_runner.increment_counter("snapshot", len(snapshots)) | |||||
return {"snapshot:add": count} | return {"snapshot:add": count} | ||||
def snapshot_missing(self, snapshots: List[Sha1Git]) -> Iterable[Sha1Git]: | def snapshot_missing(self, snapshots: List[Sha1Git]) -> Iterable[Sha1Git]: | ||||
for id in snapshots: | for id in snapshots: | ||||
if id not in self._snapshots: | if id not in self._snapshots: | ||||
yield id | yield id | ||||
def snapshot_get(self, snapshot_id: Sha1Git) -> Optional[Dict[str, Any]]: | def snapshot_get(self, snapshot_id: Sha1Git) -> Optional[Dict[str, Any]]: | ||||
▲ Show 20 Lines • Show All 178 Lines • ▼ Show 20 Lines | ) -> int: | ||||
return len(actual_page.results) | return len(actual_page.results) | ||||
def origin_add(self, origins: List[Origin]) -> Dict[str, int]: | def origin_add(self, origins: List[Origin]) -> Dict[str, int]: | ||||
added = 0 | added = 0 | ||||
for origin in origins: | for origin in origins: | ||||
if origin.url not in self._origins: | if origin.url not in self._origins: | ||||
self.origin_add_one(origin) | self.origin_add_one(origin) | ||||
added += 1 | added += 1 | ||||
anlambert: I think it should rather be
```lang=python
self._cql_runner.increment_counter("origin", added)… | |||||
self._cql_runner.increment_counter("origin", added) | |||||
return {"origin:add": added} | return {"origin:add": added} | ||||
def origin_add_one(self, origin: Origin) -> str: | def origin_add_one(self, origin: Origin) -> str: | ||||
if origin.url not in self._origins: | if origin.url not in self._origins: | ||||
self.journal_writer.origin_add([origin]) | self.journal_writer.origin_add([origin]) | ||||
self._origins[origin.url] = origin | self._origins[origin.url] = origin | ||||
self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | ||||
self._origin_visits[origin.url] = [] | self._origin_visits[origin.url] = [] | ||||
Show All 32 Lines | def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: | ||||
visit=visit.visit, | visit=visit.visit, | ||||
date=visit.date, | date=visit.date, | ||||
status="created", | status="created", | ||||
snapshot=None, | snapshot=None, | ||||
) | ) | ||||
) | ) | ||||
all_visits.append(visit) | all_visits.append(visit) | ||||
self._cql_runner.increment_counter("origin_visit", len(all_visits)) | |||||
return all_visits | return all_visits | ||||
def _origin_visit_status_add_one(self, visit_status: OriginVisitStatus) -> None: | def _origin_visit_status_add_one(self, visit_status: OriginVisitStatus) -> None: | ||||
"""Add an origin visit status without checks. If already present, do nothing. | """Add an origin visit status without checks. If already present, do nothing. | ||||
""" | """ | ||||
self.journal_writer.origin_visit_status_add([visit_status]) | self.journal_writer.origin_visit_status_add([visit_status]) | ||||
visit_key = (visit_status.origin, visit_status.visit) | visit_key = (visit_status.origin, visit_status.visit) | ||||
▲ Show 20 Lines • Show All 226 Lines • ▼ Show 20 Lines | ) -> Optional[Tuple[OriginVisit, OriginVisitStatus]]: | ||||
if ( | if ( | ||||
origin_visit.date > back_in_the_day | origin_visit.date > back_in_the_day | ||||
and latest_visit_status.status == "full" | and latest_visit_status.status == "full" | ||||
): | ): | ||||
return origin_visit, latest_visit_status | return origin_visit, latest_visit_status | ||||
else: | else: | ||||
return None | return None | ||||
def stat_counters(self): | |||||
keys = ( | |||||
"content", | |||||
"directory", | |||||
"origin", | |||||
"origin_visit", | |||||
"person", | |||||
"release", | |||||
"revision", | |||||
"skipped_content", | |||||
"snapshot", | |||||
) | |||||
stats = {key: 0 for key in keys} | |||||
stats.update( | |||||
collections.Counter( | |||||
obj_type | |||||
for (obj_type, obj_id) in itertools.chain(*self._objects.values()) | |||||
) | |||||
) | |||||
return stats | |||||
def refresh_stat_counters(self): | |||||
pass | |||||
def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None: | def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None: | ||||
self.journal_writer.raw_extrinsic_metadata_add(metadata) | self.journal_writer.raw_extrinsic_metadata_add(metadata) | ||||
for metadata_entry in metadata: | for metadata_entry in metadata: | ||||
authority_key = self._metadata_authority_key(metadata_entry.authority) | authority_key = self._metadata_authority_key(metadata_entry.authority) | ||||
if authority_key not in self._metadata_authorities: | if authority_key not in self._metadata_authorities: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
f"Unknown authority {metadata_entry.authority}" | f"Unknown authority {metadata_entry.authority}" | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 184 Lines • Show Last 20 Lines |
I think it should rather be