diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -980,7 +980,9 @@ return next(self.origin_visit_status_get(origin, visit), None) @_prepared_select_statement( - OriginVisitStatusRow, "WHERE origin = ? AND visit = ? ORDER BY date DESC" + OriginVisitStatusRow, + # 'visit DESC,' is optional with Cassandra 4, but ScyllaDB needs it + "WHERE origin = ? AND visit = ? ORDER BY visit DESC, date DESC", ) def origin_visit_status_get( self, origin: str, visit: int, *, statement, @@ -1059,8 +1061,12 @@ @_prepared_select_statement( RawExtrinsicMetadataRow, - "WHERE target=? AND authority_type=? AND authority_url=? " - "AND (discovery_date, id) > (?, ?)", + # This is equivalent to: + # WHERE target=? AND authority_type = ? AND authority_url = ? " + # AND (discovery_date, id) > (?, ?)" + # but it needs to be written this way to work with ScyllaDB. + "WHERE target=? AND (authority_type, authority_url) <= (?, ?) " + "AND (authority_type, authority_url, discovery_date, id) > (?, ?, ?, ?)", ) def raw_extrinsic_metadata_get_after_date_and_id( self, @@ -1076,7 +1082,15 @@ RawExtrinsicMetadataRow.from_dict, self._execute_with_retries( statement, - [target, authority_type, authority_url, after_date, after_id,], + [ + target, + authority_type, + authority_url, + authority_type, + authority_url, + after_date, + after_id, + ], ), ) diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -3,39 +3,53 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +UDF_LANGUAGE = "lua" + +if UDF_LANGUAGE == "java": + # For Cassandra + CREATE_TABLES_QUERIES = [ + """ + CREATE OR REPLACE FUNCTION ascii_bins_count_sfunc ( + state tuple>, -- (nb_none, map) + bin_name ascii + ) + CALLED ON NULL INPUT + RETURNS tuple> + LANGUAGE java AS + $$ + if (bin_name == null) { + state.setInt(0, state.getInt(0) + 1); + } + else { + Map counters = state.getMap( + 1, String.class, Integer.class); + Integer nb = counters.get(bin_name); + if (nb == null) { + nb = 0; + } + counters.put(bin_name, nb + 1); + state.setMap(1, counters, String.class, Integer.class); + } + return state; + $$;""", + """ + CREATE OR REPLACE AGGREGATE ascii_bins_count ( ascii ) + SFUNC ascii_bins_count_sfunc + STYPE tuple> + FINALFUNC ascii_bins_count_finalfunc + INITCOND (0, {}) + ;""", + ] +elif UDF_LANGUAGE == "lua": + # For ScyllaDB + # TODO: this is not implementable yet, because ScyllaDB does not support + # user-defined aggregates. https://github.com/scylladb/scylla/issues/7201 + CREATE_TABLES_QUERIES = [] +else: + assert False, f"{UDF_LANGUAGE} must be 'lua' or 'java'" CREATE_TABLES_QUERIES = [ - """ -CREATE OR REPLACE FUNCTION ascii_bins_count_sfunc ( - state tuple>, -- (nb_none, map) - bin_name ascii -) -CALLED ON NULL INPUT -RETURNS tuple> -LANGUAGE java AS -$$ - if (bin_name == null) { - state.setInt(0, state.getInt(0) + 1); - } - else { - Map counters = state.getMap( - 1, String.class, Integer.class); - Integer nb = counters.get(bin_name); - if (nb == null) { - nb = 0; - } - counters.put(bin_name, nb + 1); - state.setMap(1, counters, String.class, Integer.class); - } - return state; -$$ -;""", - """ -CREATE OR REPLACE AGGREGATE ascii_bins_count ( ascii ) -SFUNC ascii_bins_count_sfunc -STYPE tuple> -INITCOND (0, {}) -;""", + *CREATE_TABLES_QUERIES, """ CREATE TYPE IF NOT EXISTS microtimestamp ( seconds bigint, @@ -162,7 +176,9 @@ metadata text, snapshot blob, PRIMARY KEY ((origin), visit, date) -);""", +) +WITH CLUSTERING ORDER BY (visit DESC, date DESC) +;""", # 'WITH CLUSTERING ORDER BY' is optional with Cassandra 4, but ScyllaDB needs it """ CREATE TABLE IF NOT EXISTS origin ( sha1 blob PRIMARY KEY, diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -6,6 +6,7 @@ import datetime import itertools import os +import resource import signal import socket import subprocess @@ -56,6 +57,14 @@ row_cache_save_period: 0 trickle_fsync: false commitlog_sync_period_in_ms: 100000 + +# Used only by ScyllaDB: +experimental_features: + - udf +view_hints_directory: {data_dir}/view_hints +prometheus_port: 0 # disable prometheus server +start_rpc: false # disable thrift server +api_port: {api_port} """ @@ -68,7 +77,7 @@ def wait_for_peer(addr, port): - wait_until = time.time() + 20 + wait_until = time.time() + 60 while time.time() < wait_until: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -89,13 +98,25 @@ native_transport_port = free_port() storage_port = free_port() jmx_port = free_port() + api_port = free_port() + + cassandra_bin = os.environ.get("SWH_CASSANDRA_BIN", "/usr/sbin/cassandra") + + scylla = "scylla" in cassandra_bin.lower() + + if scylla: + os.makedirs(cassandra_conf.join("conf")) + config_path = cassandra_conf.join("conf/scylla.yaml") + else: + config_path = cassandra_conf.join("cassandra.yaml") - with open(str(cassandra_conf.join("cassandra.yaml")), "w") as fd: + with open(str(config_path), "w") as fd: fd.write( CONFIG_TEMPLATE.format( data_dir=str(cassandra_data), storage_port=storage_port, native_transport_port=native_transport_port, + api_port=api_port, ) ) @@ -104,7 +125,6 @@ else: stdout = stderr = subprocess.DEVNULL - cassandra_bin = os.environ.get("SWH_CASSANDRA_BIN", "/usr/sbin/cassandra") env = { "MAX_HEAP_SIZE": "300M", "HEAP_NEWSIZE": "50M", @@ -113,19 +133,36 @@ if "JAVA_HOME" in os.environ: env["JAVA_HOME"] = os.environ["JAVA_HOME"] - proc = subprocess.Popen( - [ - cassandra_bin, - "-Dcassandra.config=file://%s/cassandra.yaml" % cassandra_conf, - "-Dcassandra.logdir=%s" % cassandra_log, - "-Dcassandra.jmx.local.port=%d" % jmx_port, - "-Dcassandra-foreground=yes", - ], - start_new_session=True, - env=env, - stdout=stdout, - stderr=stderr, - ) + if scylla: + env = { + **env, + "SCYLLA_HOME": cassandra_conf, + } + # prevent "NOFILE rlimit too low (recommended setting 200000, + # minimum setting 10000; refusing to start." + resource.setrlimit(resource.RLIMIT_NOFILE, (200000, 200000)) + + proc = subprocess.Popen( + [cassandra_bin, "--developer-mode=1",], + start_new_session=True, + env=env, + stdout=stdout, + stderr=stderr, + ) + else: + proc = subprocess.Popen( + [ + cassandra_bin, + "-Dcassandra.config=file://%s/cassandra.yaml" % cassandra_conf, + "-Dcassandra.logdir=%s" % cassandra_log, + "-Dcassandra.jmx.local.port=%d" % jmx_port, + "-Dcassandra-foreground=yes", + ], + start_new_session=True, + env=env, + stdout=stdout, + stderr=stderr, + ) listening = wait_for_peer("127.0.0.1", native_transport_port)