Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show First 20 Lines • Show All 413 Lines • ▼ Show 20 Lines | ) -> Iterator[Dict[str, bytes]]: | ||||
for algo in HASH_ALGORITHMS: | for algo in HASH_ALGORITHMS: | ||||
assert content.get(algo) is not None, ( | assert content.get(algo) is not None, ( | ||||
"content_missing_from_all_hashes must not be called with " | "content_missing_from_all_hashes must not be called with " | ||||
"partial hashes." | "partial hashes." | ||||
) | ) | ||||
if tuple(content[algo] for algo in HASH_ALGORITHMS) not in present: | if tuple(content[algo] for algo in HASH_ALGORITHMS) not in present: | ||||
yield content | yield content | ||||
@_prepared_statement( | @_prepared_select_statement(ContentRow, "WHERE sha256 IN ?", HASH_ALGORITHMS) | ||||
f"SELECT {', '.join(HASH_ALGORITHMS)} FROM content WHERE sha256 IN ?" | |||||
) | |||||
def _content_get_hashes_from_sha256( | def _content_get_hashes_from_sha256( | ||||
self, ids: List[bytes], *, statement | self, ids: List[bytes], *, statement | ||||
) -> Iterator[Tuple[bytes, bytes, bytes, bytes]]: | ) -> Iterator[Tuple[bytes, bytes, bytes, bytes]]: | ||||
for row in self._execute_with_retries(statement, [ids]): | for row in self._execute_with_retries(statement, [ids]): | ||||
yield tuple(row[algo] for algo in HASH_ALGORITHMS) # type: ignore | yield tuple(row[algo] for algo in HASH_ALGORITHMS) # type: ignore | ||||
@_prepared_select_statement( | @_prepared_select_statement( | ||||
ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" | ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" | ||||
▲ Show 20 Lines • Show All 169 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
@_prepared_exists_statement("revision") | @_prepared_exists_statement("revision") | ||||
def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | ||||
return self._missing(statement, ids) | return self._missing(statement, ids) | ||||
@_prepared_insert_statement(RevisionRow) | @_prepared_insert_statement(RevisionRow) | ||||
def revision_add_one(self, revision: RevisionRow, *, statement) -> None: | def revision_add_one(self, revision: RevisionRow, *, statement) -> None: | ||||
self._add_one(statement, revision) | self._add_one(statement, revision) | ||||
@_prepared_statement(f"SELECT id FROM {RevisionRow.TABLE} WHERE id IN ?") | @_prepared_select_statement(RevisionRow, "WHERE id IN ?", ["id"]) | ||||
def revision_get_ids(self, revision_ids, *, statement) -> Iterable[int]: | def revision_get_ids(self, revision_ids, *, statement) -> Iterable[int]: | ||||
return ( | return ( | ||||
row["id"] for row in self._execute_with_retries(statement, [revision_ids]) | row["id"] for row in self._execute_with_retries(statement, [revision_ids]) | ||||
) | ) | ||||
@_prepared_select_statement(RevisionRow, "WHERE id IN ?") | @_prepared_select_statement(RevisionRow, "WHERE id IN ?") | ||||
def revision_get( | def revision_get( | ||||
self, revision_ids: List[Sha1Git], *, statement | self, revision_ids: List[Sha1Git], *, statement | ||||
Show All 11 Lines | class CqlRunner: | ||||
########################## | ########################## | ||||
@_prepared_insert_statement(RevisionParentRow) | @_prepared_insert_statement(RevisionParentRow) | ||||
def revision_parent_add_one( | def revision_parent_add_one( | ||||
self, revision_parent: RevisionParentRow, *, statement | self, revision_parent: RevisionParentRow, *, statement | ||||
) -> None: | ) -> None: | ||||
self._add_one(statement, revision_parent) | self._add_one(statement, revision_parent) | ||||
@_prepared_statement( | @_prepared_select_statement(RevisionParentRow, "WHERE id = ?", ["parent_id"]) | ||||
f"SELECT parent_id FROM {RevisionParentRow.TABLE} WHERE id = ?" | |||||
) | |||||
def revision_parent_get( | def revision_parent_get( | ||||
self, revision_id: Sha1Git, *, statement | self, revision_id: Sha1Git, *, statement | ||||
) -> Iterable[bytes]: | ) -> Iterable[bytes]: | ||||
return ( | return ( | ||||
row["parent_id"] | row["parent_id"] | ||||
for row in self._execute_with_retries(statement, [revision_id]) | for row in self._execute_with_retries(statement, [revision_id]) | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 302 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
) | ) | ||||
def origin_bump_next_visit_id( | def origin_bump_next_visit_id( | ||||
self, origin_url: str, visit_id: int, *, statement | self, origin_url: str, visit_id: int, *, statement | ||||
) -> None: | ) -> None: | ||||
origin_sha1 = hash_url(origin_url) | origin_sha1 = hash_url(origin_url) | ||||
next_id = visit_id + 1 | next_id = visit_id + 1 | ||||
self._execute_with_retries(statement, [next_id, origin_sha1, next_id]) | self._execute_with_retries(statement, [next_id, origin_sha1, next_id]) | ||||
@_prepared_statement(f"SELECT next_visit_id FROM {OriginRow.TABLE} WHERE sha1 = ?") | @_prepared_select_statement(OriginRow, "WHERE sha1 = ?", ["next_visit_id"]) | ||||
def _origin_get_next_visit_id(self, origin_sha1: bytes, *, statement) -> int: | def _origin_get_next_visit_id(self, origin_sha1: bytes, *, statement) -> int: | ||||
rows = list(self._execute_with_retries(statement, [origin_sha1])) | rows = list(self._execute_with_retries(statement, [origin_sha1])) | ||||
assert len(rows) == 1 # TODO: error handling | assert len(rows) == 1 # TODO: error handling | ||||
return rows[0]["next_visit_id"] | return rows[0]["next_visit_id"] | ||||
@_prepared_statement( | @_prepared_statement( | ||||
f""" | f""" | ||||
UPDATE {OriginRow.TABLE} | UPDATE {OriginRow.TABLE} | ||||
▲ Show 20 Lines • Show All 199 Lines • ▼ Show 20 Lines | def origin_visit_status_get( | ||||
statement, | statement, | ||||
) -> Iterator[OriginVisitStatusRow]: | ) -> Iterator[OriginVisitStatusRow]: | ||||
"""Return all origin visit statuses for a given visit""" | """Return all origin visit statuses for a given visit""" | ||||
return map( | return map( | ||||
OriginVisitStatusRow.from_dict, | OriginVisitStatusRow.from_dict, | ||||
self._execute_with_retries(statement, [origin, visit]), | self._execute_with_retries(statement, [origin, visit]), | ||||
) | ) | ||||
@_prepared_statement("SELECT snapshot FROM origin_visit_status WHERE origin = ?") | @_prepared_select_statement(OriginVisitStatusRow, "WHERE origin = ?", ["snapshot"]) | ||||
def origin_snapshot_get_all(self, origin: str, *, statement) -> Iterable[Sha1Git]: | def origin_snapshot_get_all(self, origin: str, *, statement) -> Iterable[Sha1Git]: | ||||
ardumont: you sure on this one?
It used to query the snapshot table. | |||||
Done Inline Actionsit used to query the snapshot field of the origin_visit_status table, and still does vlorentz: it used to query the `snapshot` field of the `origin_visit_status` table, and still does | |||||
yield from { | yield from { | ||||
d["snapshot"] | d["snapshot"] | ||||
for d in self._execute_with_retries(statement, [origin]) | for d in self._execute_with_retries(statement, [origin]) | ||||
if d["snapshot"] is not None | if d["snapshot"] is not None | ||||
} | } | ||||
########################## | ########################## | ||||
# 'metadata_authority' table | # 'metadata_authority' table | ||||
▲ Show 20 Lines • Show All 120 Lines • ▼ Show 20 Lines | |||||
) -> Iterable[RawExtrinsicMetadataRow]: | ) -> Iterable[RawExtrinsicMetadataRow]: | ||||
return map( | return map( | ||||
RawExtrinsicMetadataRow.from_dict, | RawExtrinsicMetadataRow.from_dict, | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
statement, [target, authority_url, authority_type] | statement, [target, authority_url, authority_type] | ||||
), | ), | ||||
) | ) | ||||
@_prepared_statement( | @_prepared_select_statement(RawExtrinsicMetadataRow, "WHERE target = ?") | ||||
"SELECT authority_type, authority_url FROM raw_extrinsic_metadata " | |||||
"WHERE target = ?" | |||||
) | |||||
def raw_extrinsic_metadata_get_authorities( | def raw_extrinsic_metadata_get_authorities( | ||||
self, target: str, *, statement | self, target: str, *, statement | ||||
) -> Iterable[Tuple[str, str]]: | ) -> Iterable[Tuple[str, str]]: | ||||
return ( | return ( | ||||
(entry["authority_type"], entry["authority_url"]) | (entry["authority_type"], entry["authority_url"]) | ||||
for entry in self._execute_with_retries(statement, [target]) | for entry in self._execute_with_retries(statement, [target]) | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 143 Lines • ▼ Show 20 Lines | |||||
########################## | ########################## | ||||
@_prepared_insert_statement(ExtIDByTargetRow) | @_prepared_insert_statement(ExtIDByTargetRow) | ||||
def extid_index_add_one(self, row: ExtIDByTargetRow, *, statement) -> None: | def extid_index_add_one(self, row: ExtIDByTargetRow, *, statement) -> None: | ||||
"""Adds a row mapping extid[target_type, target] to the token of the ExtID in | """Adds a row mapping extid[target_type, target] to the token of the ExtID in | ||||
the main 'extid' table.""" | the main 'extid' table.""" | ||||
self._add_one(statement, row) | self._add_one(statement, row) | ||||
@_prepared_statement( | @_prepared_select_statement( | ||||
f""" | ExtIDByTargetRow, "WHERE target_type = ? AND target = ?" | ||||
SELECT target_token | |||||
FROM {ExtIDByTargetRow.TABLE} | |||||
WHERE target_type = ? AND target = ? | |||||
""" | |||||
) | ) | ||||
def _extid_get_tokens_from_target( | def _extid_get_tokens_from_target( | ||||
self, target_type: str, target: bytes, *, statement | self, target_type: str, target: bytes, *, statement | ||||
) -> Iterable[int]: | ) -> Iterable[int]: | ||||
return ( | return ( | ||||
row["target_token"] | row["target_token"] | ||||
for row in self._execute_with_retries(statement, [target_type, target]) | for row in self._execute_with_retries(statement, [target_type, target]) | ||||
) | ) | ||||
Show All 13 Lines |
you sure on this one?
It used to query the snapshot table.