Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show First 20 Lines • Show All 111 Lines • ▼ Show 20 Lines | def _content_get_from_hash(self, algo, hash_) -> Iterable: | ||||
# Query the main table ('content'). | # Query the main table ('content'). | ||||
res = self._cql_runner.content_get_from_token(token) | res = self._cql_runner.content_get_from_token(token) | ||||
for row in res: | for row in res: | ||||
# re-check the the hash (in case of murmur3 collision) | # re-check the the hash (in case of murmur3 collision) | ||||
if getattr(row, algo) == hash_: | if getattr(row, algo) == hash_: | ||||
yield row | yield row | ||||
def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | def _content_add(self, contents: List[Content], with_data: bool) -> Dict[str, int]: | ||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [ | contents = [ | ||||
c for c in contents if not self._cql_runner.content_get_from_pk(c.to_dict()) | c for c in contents if not self._cql_runner.content_get_from_pk(c.to_dict()) | ||||
] | ] | ||||
if with_data: | if with_data: | ||||
# First insert to the objstorage, if the endpoint is | # First insert to the objstorage, if the endpoint is | ||||
# `content_add` (as opposed to `content_add_metadata`). | # `content_add` (as opposed to `content_add_metadata`). | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | def _content_add(self, contents: List[Content], with_data: bool) -> Dict[str, int]: | ||||
"content:add": content_add, | "content:add": content_add, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary["content:add:bytes"] = content_add_bytes | summary["content:add:bytes"] = content_add_bytes | ||||
return summary | return summary | ||||
def content_add(self, content: List[Content]) -> Dict: | def content_add(self, content: List[Content]) -> Dict[str, int]: | ||||
to_add = { | to_add = { | ||||
(c.sha1, c.sha1_git, c.sha256, c.blake2s256): c for c in content | (c.sha1, c.sha1_git, c.sha256, c.blake2s256): c for c in content | ||||
}.values() | }.values() | ||||
contents = [attr.evolve(c, ctime=now()) for c in to_add] | contents = [attr.evolve(c, ctime=now()) for c in to_add] | ||||
return self._content_add(list(contents), with_data=True) | return self._content_add(list(contents), with_data=True) | ||||
def content_update( | def content_update( | ||||
self, contents: List[Dict[str, Any]], keys: List[str] = [] | self, contents: List[Dict[str, Any]], keys: List[str] = [] | ||||
) -> None: | ) -> None: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
"content_update is not supported by the Cassandra backend" | "content_update is not supported by the Cassandra backend" | ||||
) | ) | ||||
def content_add_metadata(self, content: List[Content]) -> Dict: | def content_add_metadata(self, content: List[Content]) -> Dict[str, int]: | ||||
return self._content_add(content, with_data=False) | return self._content_add(content, with_data=False) | ||||
def content_get_data(self, content: Sha1) -> Optional[bytes]: | def content_get_data(self, content: Sha1) -> Optional[bytes]: | ||||
# FIXME: Make this method support slicing the `data` | # FIXME: Make this method support slicing the `data` | ||||
return self.objstorage.content_get(content) | return self.objstorage.content_get(content) | ||||
def content_get_partition( | def content_get_partition( | ||||
self, | self, | ||||
▲ Show 20 Lines • Show All 98 Lines • ▼ Show 20 Lines | ) -> Iterable[Sha1Git]: | ||||
[{"sha1_git": c} for c in contents], key_hash="sha1_git" | [{"sha1_git": c} for c in contents], key_hash="sha1_git" | ||||
) | ) | ||||
def content_get_random(self) -> Sha1Git: | def content_get_random(self) -> Sha1Git: | ||||
content = self._cql_runner.content_get_random() | content = self._cql_runner.content_get_random() | ||||
assert content, "Could not find any content" | assert content, "Could not find any content" | ||||
return content.sha1_git | return content.sha1_git | ||||
def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict: | def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict[str, int]: | ||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [ | contents = [ | ||||
c | c | ||||
for c in contents | for c in contents | ||||
if not self._cql_runner.skipped_content_get_from_pk(c.to_dict()) | if not self._cql_runner.skipped_content_get_from_pk(c.to_dict()) | ||||
] | ] | ||||
self.journal_writer.skipped_content_add(contents) | self.journal_writer.skipped_content_add(contents) | ||||
for content in contents: | for content in contents: | ||||
# Compute token of the row in the main table | # Compute token of the row in the main table | ||||
(token, insertion_finalizer) = self._cql_runner.skipped_content_add_prepare( | (token, insertion_finalizer) = self._cql_runner.skipped_content_add_prepare( | ||||
SkippedContentRow.from_dict({"origin": None, **content.to_dict()}) | SkippedContentRow.from_dict({"origin": None, **content.to_dict()}) | ||||
) | ) | ||||
# Then add to index tables | # Then add to index tables | ||||
for algo in HASH_ALGORITHMS: | for algo in HASH_ALGORITHMS: | ||||
self._cql_runner.skipped_content_index_add_one(algo, content, token) | self._cql_runner.skipped_content_index_add_one(algo, content, token) | ||||
# Then to the main table | # Then to the main table | ||||
insertion_finalizer() | insertion_finalizer() | ||||
return {"skipped_content:add": len(contents)} | return {"skipped_content:add": len(contents)} | ||||
def skipped_content_add(self, content: List[SkippedContent]) -> Dict: | def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]: | ||||
contents = [attr.evolve(c, ctime=now()) for c in content] | contents = [attr.evolve(c, ctime=now()) for c in content] | ||||
return self._skipped_content_add(contents) | return self._skipped_content_add(contents) | ||||
def skipped_content_missing( | def skipped_content_missing( | ||||
self, contents: List[Dict[str, Any]] | self, contents: List[Dict[str, Any]] | ||||
) -> Iterable[Dict[str, Any]]: | ) -> Iterable[Dict[str, Any]]: | ||||
for content in contents: | for content in contents: | ||||
if not self._cql_runner.skipped_content_get_from_pk(content): | if not self._cql_runner.skipped_content_get_from_pk(content): | ||||
yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} | yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} | ||||
def directory_add(self, directories: List[Directory]) -> Dict: | def directory_add(self, directories: List[Directory]) -> Dict[str, int]: | ||||
to_add = {d.id: d for d in directories}.values() | to_add = {d.id: d for d in directories}.values() | ||||
# Filter out directories that are already inserted. | # Filter out directories that are already inserted. | ||||
missing = self.directory_missing([dir_.id for dir_ in to_add]) | missing = self.directory_missing([dir_.id for dir_ in to_add]) | ||||
directories = [dir_ for dir_ in directories if dir_.id in missing] | directories = [dir_ for dir_ in directories if dir_.id in missing] | ||||
self.journal_writer.directory_add(directories) | self.journal_writer.directory_add(directories) | ||||
for directory in directories: | for directory in directories: | ||||
▲ Show 20 Lines • Show All 108 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
) -> Iterable[Dict[str, Any]]: | ) -> Iterable[Dict[str, Any]]: | ||||
yield from self._directory_ls(directory, recursive) | yield from self._directory_ls(directory, recursive) | ||||
def directory_get_random(self) -> Sha1Git: | def directory_get_random(self) -> Sha1Git: | ||||
directory = self._cql_runner.directory_get_random() | directory = self._cql_runner.directory_get_random() | ||||
assert directory, "Could not find any directory" | assert directory, "Could not find any directory" | ||||
return directory.id | return directory.id | ||||
def revision_add(self, revisions: List[Revision]) -> Dict: | def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: | ||||
# Filter-out revisions already in the database | # Filter-out revisions already in the database | ||||
to_add = {r.id: r for r in revisions}.values() | to_add = {r.id: r for r in revisions}.values() | ||||
missing = self.revision_missing([rev.id for rev in to_add]) | missing = self.revision_missing([rev.id for rev in to_add]) | ||||
revisions = [rev for rev in revisions if rev.id in missing] | revisions = [rev for rev in revisions if rev.id in missing] | ||||
self.journal_writer.revision_add(revisions) | self.journal_writer.revision_add(revisions) | ||||
for revision in revisions: | for revision in revisions: | ||||
revobject = converters.revision_to_db(revision) | revobject = converters.revision_to_db(revision) | ||||
▲ Show 20 Lines • Show All 95 Lines • ▼ Show 20 Lines | ) -> Iterable[Optional[Tuple[Sha1Git, Tuple[Sha1Git, ...]]]]: | ||||
seen: Set[Sha1Git] = set() | seen: Set[Sha1Git] = set() | ||||
yield from self._get_parent_revs(revisions, seen, limit, True) | yield from self._get_parent_revs(revisions, seen, limit, True) | ||||
def revision_get_random(self) -> Sha1Git: | def revision_get_random(self) -> Sha1Git: | ||||
revision = self._cql_runner.revision_get_random() | revision = self._cql_runner.revision_get_random() | ||||
assert revision, "Could not find any revision" | assert revision, "Could not find any revision" | ||||
return revision.id | return revision.id | ||||
def release_add(self, releases: List[Release]) -> Dict: | def release_add(self, releases: List[Release]) -> Dict[str, int]: | ||||
to_add = {r.id: r for r in releases}.values() | to_add = {r.id: r for r in releases}.values() | ||||
missing = set(self.release_missing([rel.id for rel in to_add])) | missing = set(self.release_missing([rel.id for rel in to_add])) | ||||
releases = [rel for rel in to_add if rel.id in missing] | releases = [rel for rel in to_add if rel.id in missing] | ||||
self.journal_writer.release_add(releases) | self.journal_writer.release_add(releases) | ||||
for release in releases: | for release in releases: | ||||
if release: | if release: | ||||
self._cql_runner.release_add_one(converters.release_to_db(release)) | self._cql_runner.release_add_one(converters.release_to_db(release)) | ||||
Show All 12 Lines | def release_get(self, releases: List[Sha1Git]) -> List[Optional[Release]]: | ||||
return [rels.get(rel_id) for rel_id in releases] | return [rels.get(rel_id) for rel_id in releases] | ||||
def release_get_random(self) -> Sha1Git: | def release_get_random(self) -> Sha1Git: | ||||
release = self._cql_runner.release_get_random() | release = self._cql_runner.release_get_random() | ||||
assert release, "Could not find any release" | assert release, "Could not find any release" | ||||
return release.id | return release.id | ||||
def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: | def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]: | ||||
to_add = {s.id: s for s in snapshots}.values() | to_add = {s.id: s for s in snapshots}.values() | ||||
missing = self._cql_runner.snapshot_missing([snp.id for snp in to_add]) | missing = self._cql_runner.snapshot_missing([snp.id for snp in to_add]) | ||||
snapshots = [snp for snp in snapshots if snp.id in missing] | snapshots = [snp for snp in snapshots if snp.id in missing] | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
self.journal_writer.snapshot_add([snapshot]) | self.journal_writer.snapshot_add([snapshot]) | ||||
# Add branches | # Add branches | ||||
▲ Show 20 Lines • Show All 786 Lines • Show Last 20 Lines |