Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | import functools | ||||
import json | import json | ||||
import logging | import logging | ||||
import random | import random | ||||
from typing import ( | from typing import ( | ||||
Any, Callable, Dict, Generator, Iterable, List, Optional, TypeVar | Any, Callable, Dict, Generator, Iterable, List, Optional, TypeVar | ||||
) | ) | ||||
from cassandra import CoordinationFailure | |||||
from cassandra.cluster import ( | from cassandra.cluster import ( | ||||
Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet) | Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet) | ||||
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | ||||
from cassandra.query import PreparedStatement | from cassandra.query import PreparedStatement | ||||
from tenacity import retry, stop_after_attempt, wait_random_exponential | from tenacity import ( | ||||
retry, stop_after_attempt, wait_random_exponential, | |||||
retry_if_exception_type, | |||||
) | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Sha1Git, TimestampWithTimezone, Timestamp, Person, Content, | Sha1Git, TimestampWithTimezone, Timestamp, Person, Content, | ||||
OriginVisit | |||||
) | ) | ||||
from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url | from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url | ||||
from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
▲ Show 20 Lines • Show All 85 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
########################## | ########################## | ||||
# Common utility functions | # Common utility functions | ||||
########################## | ########################## | ||||
MAX_RETRIES = 3 | MAX_RETRIES = 3 | ||||
@retry(wait=wait_random_exponential(multiplier=1, max=10), | @retry(wait=wait_random_exponential(multiplier=1, max=10), | ||||
stop=stop_after_attempt(MAX_RETRIES)) | stop=stop_after_attempt(MAX_RETRIES), | ||||
retry=retry_if_exception_type(CoordinationFailure)) | |||||
def _execute_with_retries(self, statement, args) -> ResultSet: | def _execute_with_retries(self, statement, args) -> ResultSet: | ||||
return self._session.execute(statement, args, timeout=1000.) | return self._session.execute(statement, args, timeout=1000.) | ||||
@_prepared_statement('UPDATE object_count SET count = count + ? ' | @_prepared_statement('UPDATE object_count SET count = count + ? ' | ||||
'WHERE partition_key = 0 AND object_type = ?') | 'WHERE partition_key = 0 AND object_type = ?') | ||||
def _increment_counter( | def _increment_counter( | ||||
self, object_type: str, nb: int, *, statement: PreparedStatement | self, object_type: str, nb: int, *, statement: PreparedStatement | ||||
) -> None: | ) -> None: | ||||
▲ Show 20 Lines • Show All 391 Lines • ▼ Show 20 Lines | def origin_visit_update( | ||||
query = ('UPDATE origin_visit SET ' + ', '.join(set_parts) + | query = ('UPDATE origin_visit SET ' + ', '.join(set_parts) + | ||||
' WHERE origin = %s AND visit = %s') | ' WHERE origin = %s AND visit = %s') | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
query, args + [origin_url, visit_id]) | query, args + [origin_url, visit_id]) | ||||
@_prepared_insert_statement('origin_visit', _origin_visit_keys) | @_prepared_insert_statement('origin_visit', _origin_visit_keys) | ||||
def origin_visit_add_one( | def origin_visit_add_one( | ||||
self, visit: Dict[str, Any], *, statement) -> None: | self, visit: OriginVisit, *, statement) -> None: | ||||
self._execute_with_retries( | self._add_one(statement, 'origin_visit', visit, | ||||
statement, [visit[key] for key in self._origin_visit_keys]) | self._origin_visit_keys) | ||||
self._increment_counter('origin_visit', 1) | |||||
@_prepared_statement( | @_prepared_statement( | ||||
'UPDATE origin_visit SET ' + | 'UPDATE origin_visit SET ' + | ||||
', '.join('%s = ?' % key for key in _origin_visit_update_keys) + | ', '.join('%s = ?' % key for key in _origin_visit_update_keys) + | ||||
' WHERE origin = ? AND visit = ?') | ' WHERE origin = ? AND visit = ?') | ||||
def origin_visit_upsert( | def origin_visit_upsert( | ||||
self, visit: Dict[str, Any], *, statement) -> None: | self, visit: Dict[str, Any], *, statement) -> None: | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
▲ Show 20 Lines • Show All 108 Lines • Show Last 20 Lines |