diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -54,6 +54,7 @@ ContentRow, DirectoryEntryRow, DirectoryRow, + ExtIDByTargetRow, ExtIDRow, MetadataAuthorityRow, MetadataFetcherRow, @@ -68,6 +69,7 @@ SkippedContentRow, SnapshotBranchRow, SnapshotRow, + content_index_table_name, ) from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS @@ -354,10 +356,14 @@ return self._get_random_row(ContentRow, statement) @_prepared_statement( - ( - "SELECT token({0}) AS tok, {1} FROM content " - "WHERE token({0}) >= ? AND token({0}) <= ? LIMIT ?" - ).format(", ".join(ContentRow.PARTITION_KEY), ", ".join(ContentRow.cols())) + """ + SELECT token({pk}) AS tok, {cols} FROM {table} + WHERE token({pk}) >= ? AND token({pk}) <= ? LIMIT ? + """.format( + pk=", ".join(ContentRow.PARTITION_KEY), + cols=", ".join(ContentRow.cols()), + table=ContentRow.TABLE, + ) ) def content_get_token_range( self, start: int, end: int, limit: int, *, statement @@ -373,7 +379,11 @@ ########################## @_prepared_statement( - "SELECT sha1_git AS id FROM content_by_sha1_git WHERE sha1_git IN ?" + f""" + SELECT sha1_git AS id + FROM {content_index_table_name("sha1_git", skipped_content=False)} + WHERE sha1_git IN ? + """ ) def content_missing_by_sha1_git( self, ids: List[bytes], *, statement @@ -383,16 +393,22 @@ def content_index_add_one(self, algo: str, content: Content, token: int) -> None: """Adds a row mapping content[algo] to the token of the Content in the main 'content' table.""" - query = ( - f"INSERT INTO content_by_{algo} ({algo}, target_token) " f"VALUES (%s, %s)" - ) + query = f""" + INSERT INTO {content_index_table_name(algo, skipped_content=False)} + ({algo}, target_token) + VALUES (%s, %s) + """ self._execute_with_retries(query, [content.get_hash(algo), token]) def content_get_tokens_from_single_hash( self, algo: str, hash_: bytes ) -> Iterable[int]: assert algo in HASH_ALGORITHMS - query = f"SELECT target_token FROM content_by_{algo} WHERE {algo} = %s" + query = f""" + SELECT target_token + FROM {content_index_table_name(algo, skipped_content=False)} + WHERE {algo} = %s + """ return ( row["target_token"] for row in self._execute_with_retries(query, [hash_]) ) @@ -492,7 +508,11 @@ self, algo: str, hash_: bytes ) -> Iterable[int]: assert algo in HASH_ALGORITHMS - query = f"SELECT target_token FROM skipped_content_by_{algo} WHERE {algo} = %s" + query = f""" + SELECT target_token + FROM {content_index_table_name(algo, skipped_content=True)} + WHERE {algo} = %s + """ return ( row["target_token"] for row in self._execute_with_retries(query, [hash_]) ) @@ -509,7 +529,7 @@ def revision_add_one(self, revision: RevisionRow, *, statement) -> None: self._add_one(statement, revision) - @_prepared_statement("SELECT id FROM revision WHERE id IN ?") + @_prepared_statement(f"SELECT id FROM {RevisionRow.TABLE} WHERE id IN ?") def revision_get_ids(self, revision_ids, *, statement) -> Iterable[int]: return ( row["id"] for row in self._execute_with_retries(statement, [revision_ids]) @@ -537,7 +557,9 @@ ) -> None: self._add_one(statement, revision_parent) - @_prepared_statement("SELECT parent_id FROM revision_parent WHERE id = ?") + @_prepared_statement( + f"SELECT parent_id FROM {RevisionParentRow.TABLE} WHERE id = ?" + ) def revision_parent_get( self, revision_id: Sha1Git, *, statement ) -> Iterable[bytes]: @@ -628,9 +650,11 @@ self._add_one(statement, branch) @_prepared_statement( - "SELECT ascii_bins_count(target_type) AS counts " - "FROM snapshot_branch " - "WHERE snapshot_id = ? AND name >= ?" + f""" + SELECT ascii_bins_count(target_type) AS counts + FROM {SnapshotBranchRow.TABLE} + WHERE snapshot_id = ? AND name >= ? + """ ) def snapshot_count_branches_from_name( self, snapshot_id: Sha1Git, from_: bytes, *, statement @@ -640,9 +664,11 @@ return {None: nb_none, **counts} @_prepared_statement( - "SELECT ascii_bins_count(target_type) AS counts " - "FROM snapshot_branch " - "WHERE snapshot_id = ? AND name < ?" + f""" + SELECT ascii_bins_count(target_type) AS counts + FROM {SnapshotBranchRow.TABLE} + WHERE snapshot_id = ? AND name < ? + """ ) def snapshot_count_branches_before_name( self, snapshot_id: Sha1Git, before: bytes, *, statement, @@ -746,8 +772,11 @@ return self.origin_get_by_sha1(hash_url(url)) @_prepared_statement( - f'SELECT token(sha1) AS tok, {", ".join(OriginRow.cols())} ' - f"FROM origin WHERE token(sha1) >= ? LIMIT ?" + f""" + SELECT token(sha1) AS tok, {", ".join(OriginRow.cols())} + FROM {OriginRow.TABLE} + WHERE token(sha1) >= ? LIMIT ? + """ ) def origin_list( self, start_token: int, limit: int, *, statement @@ -762,14 +791,18 @@ def origin_iter_all(self, *, statement) -> Iterable[OriginRow]: return map(OriginRow.from_dict, self._execute_with_retries(statement, [])) - @_prepared_statement("SELECT next_visit_id FROM origin WHERE sha1 = ?") + @_prepared_statement(f"SELECT next_visit_id FROM {OriginRow.TABLE} WHERE sha1 = ?") def _origin_get_next_visit_id(self, origin_sha1: bytes, *, statement) -> int: rows = list(self._execute_with_retries(statement, [origin_sha1])) assert len(rows) == 1 # TODO: error handling return rows[0]["next_visit_id"] @_prepared_statement( - "UPDATE origin SET next_visit_id=? WHERE sha1 = ? IF next_visit_id=?" + f""" + UPDATE {OriginRow.TABLE} + SET next_visit_id=? + WHERE sha1 = ? IF next_visit_id=? + """ ) def origin_generate_unique_visit_id(self, origin_url: str, *, statement) -> int: origin_sha1 = hash_url(origin_url) @@ -1127,19 +1160,18 @@ # 'extid_by_target' table ########################## - @_prepared_statement( - "INSERT INTO extid_by_target (target_type, target, target_token) " - "VALUES (?, ?, ?)" - ) - def extid_index_add_one(self, extid: ExtIDRow, token: int, *, statement) -> None: + @_prepared_insert_statement(ExtIDByTargetRow) + def extid_index_add_one(self, row: ExtIDByTargetRow, *, statement) -> None: """Adds a row mapping extid[target_type, target] to the token of the ExtID in the main 'extid' table.""" - self._execute_with_retries(statement, [extid.target_type, extid.target, token]) + self._add_one(statement, row) @_prepared_statement( - "SELECT target_token " - "FROM extid_by_target " - "WHERE target_type = ? AND target = ?" + f""" + SELECT target_token + FROM {ExtIDByTargetRow.TABLE} + WHERE target_type = ? AND target = ? + """ ) def _extid_get_tokens_from_target( self, target_type: str, target: bytes, *, statement diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py --- a/swh/storage/cassandra/model.py +++ b/swh/storage/cassandra/model.py @@ -35,6 +35,19 @@ T = TypeVar("T", bound="BaseRow") +def content_index_table_name(algo: str, skipped_content: bool) -> str: + """Given an algorithm name, returns the name of one of the 'content_by_*' + and 'skipped_content_by_*' tables that serve as index for the 'content' + and 'skipped_content' tables based on this algorithm's hashes. + + For now it is a simple substitution, but future versions may append a version + number to it, if needed for schema updates.""" + if skipped_content: + return f"skipped_content_by_{algo}" + else: + return f"content_by_{algo}" + + class BaseRow: TABLE: ClassVar[str] PARTITION_KEY: ClassVar[Tuple[str, ...]] @@ -292,3 +305,14 @@ extid: bytes target_type: str target: bytes + + +@dataclasses.dataclass +class ExtIDByTargetRow(BaseRow): + TABLE = "extid_by_target" + PARTITION_KEY = ("target_type", "target") + CLUSTERING_KEY = ("target_token",) + + target_type: str + target: bytes + target_token: int diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -69,6 +69,7 @@ ContentRow, DirectoryEntryRow, DirectoryRow, + ExtIDByTargetRow, ExtIDRow, MetadataAuthorityRow, MetadataFetcherRow, @@ -1354,14 +1355,19 @@ inserted = 0 for extid in extids: + target_type = extid.target.object_type.value + target = extid.target.object_id extidrow = ExtIDRow( extid_type=extid.extid_type, extid=extid.extid, - target_type=extid.target.object_type.value, - target=extid.target.object_id, + target_type=target_type, + target=target, ) (token, insertion_finalizer) = self._cql_runner.extid_add_prepare(extidrow) - self._cql_runner.extid_index_add_one(extidrow, token) + indexrow = ExtIDByTargetRow( + target_type=target_type, target=target, target_token=token, + ) + self._cql_runner.extid_index_add_one(indexrow) insertion_finalizer() inserted += 1 return {"extid:add": inserted} diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -29,6 +29,7 @@ ContentRow, DirectoryEntryRow, DirectoryRow, + ExtIDByTargetRow, ExtIDRow, MetadataAuthorityRow, MetadataFetcherRow, @@ -638,7 +639,7 @@ finalizer = functools.partial(self._extid_add_finalize, extid) return (self._extid.token(self._extid.partition_key(extid)), finalizer) - def extid_index_add_one(self, extid: ExtIDRow, token: int) -> None: + def extid_index_add_one(self, row: ExtIDByTargetRow) -> None: pass def extid_get_from_pk(