diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -162,6 +162,17 @@ return _prepared_statement(f"SELECT id FROM {table_name} WHERE id IN ?") +def _prepared_select_statement( + row_class: Type[BaseRow], clauses: str = "", cols: Optional[List[str]] = None, +) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: + if cols is None: + cols = row_class.cols() + + return _prepared_statement( + f"SELECT {', '.join(cols)} FROM {row_class.TABLE} {clauses}" + ) + + class CqlRunner: """Class managing prepared statements and building queries to be sent to Cassandra.""" @@ -266,9 +277,8 @@ return (token, finalizer) - @_prepared_statement( - "SELECT * FROM content WHERE " - + " AND ".join(map("%s = ?".__mod__, HASH_ALGORITHMS)) + @_prepared_select_statement( + ContentRow, f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}" ) def content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement @@ -284,14 +294,14 @@ else: return None - @_prepared_statement( - "SELECT * FROM content WHERE token(" + ", ".join(_content_pk) + ") = ?" + @_prepared_select_statement( + ContentRow, f"WHERE token({', '.join(_content_pk)}) = ?" ) def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: return map(ContentRow.from_dict, self._execute_with_retries(statement, [token])) - @_prepared_statement( - "SELECT * FROM content WHERE token(%s) > ? LIMIT 1" % ", ".join(_content_pk) + @_prepared_select_statement( + ContentRow, f"WHERE token({', '.join(_content_pk)}) > ? LIMIT 1" ) def content_get_random(self, *, statement) -> Optional[ContentRow]: return self._get_random_row(ContentRow, statement) @@ -391,9 +401,9 @@ return (token, finalizer) - @_prepared_statement( - "SELECT * FROM skipped_content WHERE " - + " AND ".join(map("%s = ?".__mod__, HASH_ALGORITHMS)) + @_prepared_select_statement( + SkippedContentRow, + f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}", ) def skipped_content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement @@ -449,13 +459,13 @@ row["id"] for row in self._execute_with_retries(statement, [revision_ids]) ) - @_prepared_statement("SELECT * FROM revision WHERE id IN ?") + @_prepared_select_statement(RevisionRow, "WHERE id IN ?") def revision_get(self, revision_ids, *, statement) -> Iterable[RevisionRow]: return map( RevisionRow.from_dict, self._execute_with_retries(statement, [revision_ids]) ) - @_prepared_statement("SELECT * FROM revision WHERE token(id) > ? LIMIT 1") + @_prepared_select_statement(RevisionRow, "WHERE token(id) > ? LIMIT 1") def revision_get_random(self, *, statement) -> Optional[RevisionRow]: return self._get_random_row(RevisionRow, statement) @@ -490,13 +500,13 @@ def release_add_one(self, release: ReleaseRow, *, statement) -> None: self._add_one(statement, release) - @_prepared_statement("SELECT * FROM release WHERE id in ?") + @_prepared_select_statement(ReleaseRow, "WHERE id in ?") def release_get(self, release_ids: List[str], *, statement) -> Iterable[ReleaseRow]: return map( ReleaseRow.from_dict, self._execute_with_retries(statement, [release_ids]) ) - @_prepared_statement("SELECT * FROM release WHERE token(id) > ? LIMIT 1") + @_prepared_select_statement(ReleaseRow, "WHERE token(id) > ? LIMIT 1") def release_get_random(self, *, statement) -> Optional[ReleaseRow]: return self._get_random_row(ReleaseRow, statement) @@ -514,7 +524,7 @@ commit/finalize the directory.""" self._add_one(statement, directory) - @_prepared_statement("SELECT * FROM directory WHERE token(id) > ? LIMIT 1") + @_prepared_select_statement(DirectoryRow, "WHERE token(id) > ? LIMIT 1") def directory_get_random(self, *, statement) -> Optional[DirectoryRow]: return self._get_random_row(DirectoryRow, statement) @@ -526,7 +536,7 @@ def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None: self._add_one(statement, entry) - @_prepared_statement("SELECT * FROM directory_entry WHERE directory_id IN ?") + @_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?") def directory_entry_get( self, directory_ids, *, statement ) -> Iterable[DirectoryEntryRow]: @@ -547,13 +557,13 @@ def snapshot_add_one(self, snapshot: SnapshotRow, *, statement) -> None: self._add_one(statement, snapshot) - @_prepared_statement("SELECT * FROM snapshot WHERE id = ?") + @_prepared_select_statement(SnapshotRow, "WHERE id = ?") def snapshot_get(self, snapshot_id: Sha1Git, *, statement) -> ResultSet: return map( SnapshotRow.from_dict, self._execute_with_retries(statement, [snapshot_id]) ) - @_prepared_statement("SELECT * FROM snapshot WHERE token(id) > ? LIMIT 1") + @_prepared_select_statement(SnapshotRow, "WHERE token(id) > ? LIMIT 1") def snapshot_get_random(self, *, statement) -> Optional[SnapshotRow]: return self._get_random_row(SnapshotRow, statement) @@ -579,8 +589,8 @@ (nb_none, counts) = row["counts"] return {None: nb_none, **counts} - @_prepared_statement( - "SELECT * FROM snapshot_branch WHERE snapshot_id = ? AND name >= ? LIMIT ?" + @_prepared_select_statement( + SnapshotBranchRow, "WHERE snapshot_id = ? AND name >= ? LIMIT ?" ) def snapshot_branch_get( self, snapshot_id: Sha1Git, from_: bytes, limit: int, *, statement @@ -598,7 +608,7 @@ def origin_add_one(self, origin: OriginRow, *, statement) -> None: self._add_one(statement, origin) - @_prepared_statement("SELECT * FROM origin WHERE sha1 = ?") + @_prepared_select_statement(OriginRow, "WHERE sha1 = ?") def origin_get_by_sha1(self, sha1: bytes, *, statement) -> Iterable[OriginRow]: return map(OriginRow.from_dict, self._execute_with_retries(statement, [sha1])) @@ -618,7 +628,7 @@ for row in self._execute_with_retries(statement, [start_token, limit]) ) - @_prepared_statement("SELECT * FROM origin") + @_prepared_select_statement(OriginRow) def origin_iter_all(self, *, statement) -> Iterable[OriginRow]: return map(OriginRow.from_dict, self._execute_with_retries(statement, [])) @@ -655,70 +665,60 @@ # 'origin_visit' table ########################## - @_prepared_statement( - "SELECT * FROM origin_visit WHERE origin = ? AND visit > ? " - "ORDER BY visit ASC" + @_prepared_select_statement( + OriginVisitRow, "WHERE origin = ? AND visit > ? ORDER BY visit ASC" ) def _origin_visit_get_pagination_asc_no_limit( self, origin_url: str, last_visit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url, last_visit]) - @_prepared_statement( - "SELECT * FROM origin_visit WHERE origin = ? AND visit > ? " - "ORDER BY visit ASC " - "LIMIT ?" + @_prepared_select_statement( + OriginVisitRow, "WHERE origin = ? AND visit > ? ORDER BY visit ASC LIMIT ?" ) def _origin_visit_get_pagination_asc_limit( self, origin_url: str, last_visit: int, limit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url, last_visit, limit]) - @_prepared_statement( - "SELECT * FROM origin_visit WHERE origin = ? AND visit < ? " - "ORDER BY visit DESC" + @_prepared_select_statement( + OriginVisitRow, "WHERE origin = ? AND visit < ? ORDER BY visit DESC" ) def _origin_visit_get_pagination_desc_no_limit( self, origin_url: str, last_visit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url, last_visit]) - @_prepared_statement( - "SELECT * FROM origin_visit WHERE origin = ? AND visit < ? " - "ORDER BY visit DESC " - "LIMIT ?" + @_prepared_select_statement( + OriginVisitRow, "WHERE origin = ? AND visit < ? ORDER BY visit DESC LIMIT ?" ) def _origin_visit_get_pagination_desc_limit( self, origin_url: str, last_visit: int, limit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url, last_visit, limit]) - @_prepared_statement( - "SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit ASC LIMIT ?" + @_prepared_select_statement( + OriginVisitRow, "WHERE origin = ? ORDER BY visit ASC LIMIT ?" ) def _origin_visit_get_no_pagination_asc_limit( self, origin_url: str, limit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url, limit]) - @_prepared_statement( - "SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit ASC " - ) + @_prepared_select_statement(OriginVisitRow, "WHERE origin = ? ORDER BY visit ASC ") def _origin_visit_get_no_pagination_asc_no_limit( self, origin_url: str, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url]) - @_prepared_statement( - "SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit DESC" - ) + @_prepared_select_statement(OriginVisitRow, "WHERE origin = ? ORDER BY visit DESC") def _origin_visit_get_no_pagination_desc_no_limit( self, origin_url: str, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url]) - @_prepared_statement( - "SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit DESC LIMIT ?" + @_prepared_select_statement( + OriginVisitRow, "WHERE origin = ? ORDER BY visit DESC LIMIT ?" ) def _origin_visit_get_no_pagination_desc_limit( self, origin_url: str, limit: int, *, statement @@ -750,11 +750,9 @@ origin_visit_get_method = getattr(self, method_name) return map(OriginVisitRow.from_dict, origin_visit_get_method(*args)) - @_prepared_statement( - "SELECT * FROM origin_visit_status WHERE origin = ? " - "AND visit = ? AND date >= ? " - "ORDER BY date ASC " - "LIMIT ?" + @_prepared_select_statement( + OriginVisitStatusRow, + "WHERE origin = ? AND visit = ? AND date >= ? ORDER BY date ASC LIMIT ?", ) def _origin_visit_status_get_with_date_asc_limit( self, @@ -767,11 +765,9 @@ ) -> ResultSet: return self._execute_with_retries(statement, [origin, visit, date_from, limit]) - @_prepared_statement( - "SELECT * FROM origin_visit_status WHERE origin = ? " - "AND visit = ? AND date <= ? " - "ORDER BY visit DESC " - "LIMIT ?" + @_prepared_select_statement( + OriginVisitStatusRow, + "WHERE origin = ? AND visit = ? AND date <= ? ORDER BY visit DESC LIMIT ?", ) def _origin_visit_status_get_with_date_desc_limit( self, @@ -784,20 +780,18 @@ ) -> ResultSet: return self._execute_with_retries(statement, [origin, visit, date_from, limit]) - @_prepared_statement( - "SELECT * FROM origin_visit_status WHERE origin = ? AND visit = ? " - "ORDER BY visit ASC " - "LIMIT ?" + @_prepared_select_statement( + OriginVisitStatusRow, + "WHERE origin = ? AND visit = ? ORDER BY visit ASC LIMIT ?", ) def _origin_visit_status_get_with_no_date_asc_limit( self, origin: str, visit: int, limit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin, visit, limit]) - @_prepared_statement( - "SELECT * FROM origin_visit_status WHERE origin = ? AND visit = ? " - "ORDER BY visit DESC " - "LIMIT ?" + @_prepared_select_statement( + OriginVisitStatusRow, + "WHERE origin = ? AND visit = ? ORDER BY visit DESC LIMIT ?", ) def _origin_visit_status_get_with_no_date_desc_limit( self, origin: str, visit: int, limit: int, *, statement @@ -846,10 +840,8 @@ """ return next(self.origin_visit_status_get(origin, visit), None) - @_prepared_statement( - "SELECT * FROM origin_visit_status " - "WHERE origin = ? AND visit = ? " - "ORDER BY date DESC" + @_prepared_select_statement( + OriginVisitStatusRow, "WHERE origin = ? AND visit = ? ORDER BY date DESC" ) def origin_visit_status_get( self, @@ -868,7 +860,7 @@ self._execute_with_retries(statement, [origin, visit]), ) - @_prepared_statement("SELECT * FROM origin_visit WHERE origin = ? AND visit = ?") + @_prepared_select_statement(OriginVisitRow, "WHERE origin = ? AND visit = ?") def origin_visit_get_one( self, origin_url: str, visit_id: int, *, statement ) -> Optional[OriginVisitRow]: @@ -879,7 +871,7 @@ else: return None - @_prepared_statement("SELECT * FROM origin_visit WHERE origin = ?") + @_prepared_select_statement(OriginVisitRow, "WHERE origin = ?") def origin_visit_get_all( self, origin_url: str, *, statement ) -> Iterable[OriginVisitRow]: @@ -888,7 +880,7 @@ self._execute_with_retries(statement, [origin_url]), ) - @_prepared_statement("SELECT * FROM origin_visit WHERE token(origin) >= ?") + @_prepared_select_statement(OriginVisitRow, "WHERE token(origin) >= ?") def _origin_visit_iter_from( self, min_token: int, *, statement ) -> Iterable[OriginVisitRow]: @@ -896,7 +888,7 @@ OriginVisitRow.from_dict, self._execute_with_retries(statement, [min_token]) ) - @_prepared_statement("SELECT * FROM origin_visit WHERE token(origin) < ?") + @_prepared_select_statement(OriginVisitRow, "WHERE token(origin) < ?") def _origin_visit_iter_to( self, max_token: int, *, statement ) -> Iterable[OriginVisitRow]: @@ -918,7 +910,7 @@ def metadata_authority_add(self, authority: MetadataAuthorityRow, *, statement): self._add_one(statement, authority) - @_prepared_statement("SELECT * from metadata_authority WHERE type = ? AND url = ?") + @_prepared_select_statement(MetadataAuthorityRow, "WHERE type = ? AND url = ?") def metadata_authority_get( self, type, url, *, statement ) -> Optional[MetadataAuthorityRow]: @@ -936,9 +928,7 @@ def metadata_fetcher_add(self, fetcher, *, statement): self._add_one(statement, fetcher) - @_prepared_statement( - "SELECT * from metadata_fetcher WHERE name = ? AND version = ?" - ) + @_prepared_select_statement(MetadataFetcherRow, "WHERE name = ? AND version = ?") def metadata_fetcher_get( self, name, version, *, statement ) -> Optional[MetadataFetcherRow]: @@ -956,9 +946,9 @@ def raw_extrinsic_metadata_add(self, raw_extrinsic_metadata, *, statement): self._add_one(statement, raw_extrinsic_metadata) - @_prepared_statement( - "SELECT * from raw_extrinsic_metadata " - "WHERE id=? AND authority_url=? AND discovery_date>? AND authority_type=?" + @_prepared_select_statement( + RawExtrinsicMetadataRow, + "WHERE id=? AND authority_url=? AND discovery_date>? AND authority_type=?", ) def raw_extrinsic_metadata_get_after_date( self, @@ -976,10 +966,10 @@ ), ) - @_prepared_statement( - "SELECT * from raw_extrinsic_metadata " + @_prepared_select_statement( + RawExtrinsicMetadataRow, "WHERE id=? AND authority_type=? AND authority_url=? " - "AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)" + "AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)", ) def raw_extrinsic_metadata_get_after_date_and_fetcher( self, @@ -1007,9 +997,8 @@ ), ) - @_prepared_statement( - "SELECT * from raw_extrinsic_metadata " - "WHERE id=? AND authority_url=? AND authority_type=?" + @_prepared_select_statement( + RawExtrinsicMetadataRow, "WHERE id=? AND authority_url=? AND authority_type=?" ) def raw_extrinsic_metadata_get( self, id: str, authority_type: str, authority_url: str, *, statement @@ -1027,6 +1016,6 @@ def check_read(self, *, statement): self._execute_with_retries(statement, []) - @_prepared_statement("SELECT * FROM object_count WHERE partition_key=0") + @_prepared_select_statement(ObjectCountRow, "WHERE partition_key=0") def stat_counters(self, *, statement) -> ResultSet: return map(ObjectCountRow.from_dict, self._execute_with_retries(statement, [])) diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py --- a/swh/storage/cassandra/model.py +++ b/swh/storage/cassandra/model.py @@ -223,6 +223,8 @@ @dataclasses.dataclass class ObjectCountRow(BaseRow): + TABLE = "object_count" + partition_key: int object_type: str count: int