diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -154,9 +154,11 @@ def decorator(f): @functools.wraps(f) - def newf(self, *args, **kwargs) -> TRet: + def newf(self: "CqlRunner", *args, **kwargs) -> TRet: if f.__name__ not in self._prepared_statements: - statement: PreparedStatement = self._session.prepare(query) + statement: PreparedStatement = self._session.prepare( + query.format(keyspace=self.keyspace) + ) self._prepared_statements[f.__name__] = statement return f( self, *args, **kwargs, statement=self._prepared_statements[f.__name__] @@ -171,10 +173,10 @@ TSelf = TypeVar("TSelf") -def _insert_query(row_class): +def _insert_query(row_class: Type[BaseRow]) -> str: columns = row_class.cols() return ( - f"INSERT INTO {row_class.TABLE} ({', '.join(columns)}) " + f"INSERT INTO {{keyspace}}.{row_class.TABLE} ({', '.join(columns)}) " f"VALUES ({', '.join('?' for _ in columns)})" ) @@ -198,7 +200,7 @@ ]: """Shorthand for using `_prepared_statement` for queries that only check which ids in a list exist in the table.""" - return _prepared_statement(f"SELECT id FROM {table_name} WHERE id = ?") + return _prepared_statement(f"SELECT id FROM {{keyspace}}.{table_name} WHERE id = ?") def _prepared_select_statement( @@ -210,7 +212,7 @@ cols = row_class.cols() return _prepared_statement( - f"SELECT {', '.join(cols)} FROM {row_class.TABLE} {clauses}" + f"SELECT {', '.join(cols)} FROM {{keyspace}}.{row_class.TABLE} {clauses}" ) @@ -222,14 +224,21 @@ and passes a dict of prepared statements to the decorated method""" cols = row_class.cols() - statement_start = f"SELECT {', '.join(cols)} FROM {row_class.TABLE} " + statement_template = "SELECT {cols} FROM {keyspace}.{table} {rest}" def decorator(f): @functools.wraps(f) - def newf(self, *args, **kwargs) -> TRet: + def newf(self: "CqlRunner", *args, **kwargs) -> TRet: if f.__name__ not in self._prepared_statements: self._prepared_statements[f.__name__] = { - key: self._session.prepare(statement_start + query) + key: self._session.prepare( + statement_template.format( + cols=", ".join(cols), + keyspace=self.keyspace, + table=row_class.TABLE, + rest=query, + ) + ) for (key, query) in queries.items() } return f( @@ -266,7 +275,8 @@ port=port, execution_profiles=get_execution_profiles(consistency_level), ) - self._session = self._cluster.connect(keyspace) + self.keyspace = keyspace + self._session = self._cluster.connect() self._cluster.register_user_type( keyspace, "microtimestamp_with_timezone", TimestampWithTimezone ) @@ -443,7 +453,7 @@ @_prepared_statement( """ - SELECT token({pk}) AS tok, {cols} FROM {table} + SELECT token({pk}) AS tok, {cols} FROM {{keyspace}}.{table} WHERE token({pk}) >= ? AND token({pk}) <= ? LIMIT ? """.format( pk=", ".join(ContentRow.PARTITION_KEY), @@ -467,10 +477,9 @@ def content_index_add_one(self, algo: str, content: Content, token: int) -> None: """Adds a row mapping content[algo] to the token of the Content in the main 'content' table.""" + table = content_index_table_name(algo, skipped_content=False) query = f""" - INSERT INTO {content_index_table_name(algo, skipped_content=False)} - ({algo}, target_token) - VALUES (%s, %s) + INSERT INTO {self.keyspace}.{table} ({algo}, target_token) VALUES (%s, %s) """ self._execute_with_retries(query, [content.get_hash(algo), token]) @@ -478,11 +487,8 @@ self, algo: str, hashes: List[bytes] ) -> Iterable[int]: assert algo in HASH_ALGORITHMS - query = f""" - SELECT target_token - FROM {content_index_table_name(algo, skipped_content=False)} - WHERE {algo} = %s - """ + table = content_index_table_name(algo, skipped_content=False) + query = f"SELECT target_token FROM {self.keyspace}.{table} WHERE {algo} = %s" return ( row["target_token"] # type: ignore for row in self._execute_many_with_retries( @@ -573,7 +579,7 @@ """Adds a row mapping content[algo] to the token of the SkippedContent in the main 'skipped_content' table.""" query = ( - f"INSERT INTO skipped_content_by_{algo} ({algo}, target_token) " + f"INSERT INTO {self.keyspace}.skipped_content_by_{algo} ({algo}, target_token) " f"VALUES (%s, %s)" ) self._execute_with_retries( @@ -584,11 +590,8 @@ self, algo: str, hash_: bytes ) -> Iterable[int]: assert algo in HASH_ALGORITHMS - query = f""" - SELECT target_token - FROM {content_index_table_name(algo, skipped_content=True)} - WHERE {algo} = %s - """ + table = content_index_table_name(algo, skipped_content=True) + query = f"SELECT target_token FROM {self.keyspace}.{table} WHERE {algo} = %s" return ( row["target_token"] for row in self._execute_with_retries(query, [hash_]) ) @@ -793,7 +796,7 @@ @_prepared_statement( f""" SELECT ascii_bins_count(target_type) AS counts - FROM {SnapshotBranchRow.TABLE} + FROM {{keyspace}}.{SnapshotBranchRow.TABLE} WHERE snapshot_id = ? AND name >= ? """ ) @@ -807,7 +810,7 @@ @_prepared_statement( f""" SELECT ascii_bins_count(target_type) AS counts - FROM {SnapshotBranchRow.TABLE} + FROM {{keyspace}}.{SnapshotBranchRow.TABLE} WHERE snapshot_id = ? AND name < ? """ ) @@ -921,7 +924,7 @@ @_prepared_statement( f""" SELECT token(sha1) AS tok, {", ".join(OriginRow.cols())} - FROM {OriginRow.TABLE} + FROM {{keyspace}}.{OriginRow.TABLE} WHERE token(sha1) >= ? LIMIT ? """ ) @@ -940,7 +943,7 @@ @_prepared_statement( f""" - UPDATE {OriginRow.TABLE} + UPDATE {{keyspace}}.{OriginRow.TABLE} SET next_visit_id=? WHERE sha1 = ? IF next_visit_id