diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py index dc7e4249..34e683ae 100644 --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -1,196 +1,202 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information CREATE_TABLES_QUERIES = ''' CREATE OR REPLACE FUNCTION ascii_bins_count_sfunc ( state tuple>, -- (nb_none, map) bin_name ascii ) CALLED ON NULL INPUT RETURNS tuple> LANGUAGE java AS $$ if (bin_name == null) { state.setInt(0, state.getInt(0) + 1); } else { Map counters = state.getMap( 1, String.class, Integer.class); Integer nb = counters.get(bin_name); if (nb == null) { nb = 0; } counters.put(bin_name, nb + 1); state.setMap(1, counters, String.class, Integer.class); } return state; $$ ; CREATE OR REPLACE AGGREGATE ascii_bins_count ( ascii ) SFUNC ascii_bins_count_sfunc STYPE tuple> INITCOND (0, {}) ; CREATE TYPE IF NOT EXISTS microtimestamp ( seconds bigint, microseconds int ); CREATE TYPE IF NOT EXISTS microtimestamp_with_timezone ( timestamp frozen, offset smallint, negative_utc boolean ); CREATE TYPE IF NOT EXISTS person ( fullname blob, name blob, email blob ); CREATE TABLE IF NOT EXISTS content ( sha1 blob, sha1_git blob, sha256 blob, blake2s256 blob, length bigint, ctime timestamp, -- creation time, i.e. time of (first) injection into the storage status ascii, PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) ); CREATE TABLE IF NOT EXISTS revision ( id blob PRIMARY KEY, date microtimestamp_with_timezone, committer_date microtimestamp_with_timezone, type ascii, directory blob, -- source code "root" directory message blob, author person, committer person, synthetic boolean, -- true iff revision has been created by Software Heritage metadata text -- extra metadata as JSON(tarball checksums, -- extra commit information, etc...) ); CREATE TABLE IF NOT EXISTS revision_parent ( id blob, parent_rank int, -- parent position in merge commits, 0-based parent_id blob, PRIMARY KEY ((id), parent_rank) ); CREATE TABLE IF NOT EXISTS release ( id blob PRIMARY KEY, target_type ascii, target blob, date microtimestamp_with_timezone, name blob, message blob, author person, synthetic boolean, -- true iff release has been created by Software Heritage ); CREATE TABLE IF NOT EXISTS directory ( id blob PRIMARY KEY, ); CREATE TABLE IF NOT EXISTS directory_entry ( directory_id blob, name blob, -- path name, relative to containing dir target blob, perms int, -- unix-like permissions type ascii, -- target type PRIMARY KEY ((directory_id), name) ); CREATE TABLE IF NOT EXISTS snapshot ( id blob PRIMARY KEY, ); -- For a given snapshot_id, branches are sorted by their name, -- allowing easy pagination. CREATE TABLE IF NOT EXISTS snapshot_branch ( snapshot_id blob, name blob, target_type ascii, target blob, PRIMARY KEY ((snapshot_id), name) ); CREATE TABLE IF NOT EXISTS origin_visit ( origin text, visit bigint, date timestamp, type text, status ascii, metadata text, snapshot blob, PRIMARY KEY ((origin), visit) ); CREATE TABLE IF NOT EXISTS origin ( sha1 blob PRIMARY KEY, url text, type text, next_visit_id int, -- We need integer visit ids for compatibility with the pgsql -- storage, so we're using lightweight transactions with this trick: -- https://stackoverflow.com/a/29391877/539465 ); CREATE TABLE IF NOT EXISTS tool_by_uuid ( id timeuuid PRIMARY KEY, name ascii, version ascii, configuration blob, ); CREATE TABLE IF NOT EXISTS tool ( id timeuuid, name ascii, version ascii, configuration blob, PRIMARY KEY ((name, version, configuration)) ) CREATE TABLE IF NOT EXISTS object_count ( partition_key smallint, -- Constant, must always be 0 object_type ascii, count counter, PRIMARY KEY ((partition_key), object_type) ); '''.split('\n\n') CONTENT_INDEX_TEMPLATE = ''' CREATE TABLE IF NOT EXISTS content_by_{main_algo} ( sha1 blob, sha1_git blob, sha256 blob, blake2s256 blob, PRIMARY KEY (({main_algo}), {other_algos}) );''' +TABLES = ('content revision revision_parent release directory ' + 'directory_entry snapshot snapshot_branch origin_visit ' + 'origin tool_by_uuid tool object_count').split() + HASH_ALGORITHMS = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] for main_algo in HASH_ALGORITHMS: CREATE_TABLES_QUERIES.append(CONTENT_INDEX_TEMPLATE.format( main_algo=main_algo, other_algos=', '.join( [algo for algo in HASH_ALGORITHMS if algo != main_algo]) )) + + TABLES.append('content_by_%s' % main_algo) diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py index 565881bd..bb9e04c6 100644 --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -1,260 +1,268 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import signal import socket import subprocess import time import pytest from swh.storage import get_storage from swh.storage.cassandra import create_keyspace +from swh.storage.cassandra.schema import TABLES from swh.storage.tests.test_storage import TestStorage as _TestStorage from swh.storage.tests.test_storage import TestStorageGeneratedData \ as _TestStorageGeneratedData CONFIG_TEMPLATE = ''' data_file_directories: - {data_dir}/data commitlog_directory: {data_dir}/commitlog hints_directory: {data_dir}/hints saved_caches_directory: {data_dir}/saved_caches commitlog_sync: periodic commitlog_sync_period_in_ms: 1000000 partitioner: org.apache.cassandra.dht.Murmur3Partitioner endpoint_snitch: SimpleSnitch seed_provider: - class_name: org.apache.cassandra.locator.SimpleSeedProvider parameters: - seeds: "127.0.0.1" storage_port: {storage_port} native_transport_port: {native_transport_port} start_native_transport: true listen_address: 127.0.0.1 enable_user_defined_functions: true ''' def free_port(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('127.0.0.1', 0)) port = sock.getsockname()[1] sock.close() return port def wait_for_peer(addr, port): wait_until = time.time() + 20 while time.time() < wait_until: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((addr, port)) except ConnectionRefusedError: time.sleep(0.1) else: sock.close() return True return False @pytest.fixture(scope='session') def cassandra_cluster(tmpdir_factory): cassandra_conf = tmpdir_factory.mktemp('cassandra_conf') cassandra_data = tmpdir_factory.mktemp('cassandra_data') cassandra_log = tmpdir_factory.mktemp('cassandra_log') native_transport_port = free_port() storage_port = free_port() jmx_port = free_port() with open(str(cassandra_conf.join('cassandra.yaml')), 'w') as fd: fd.write(CONFIG_TEMPLATE.format( data_dir=str(cassandra_data), storage_port=storage_port, native_transport_port=native_transport_port, )) if os.environ.get('LOG_CASSANDRA'): stdout = stderr = None else: stdout = stderr = subprocess.DEVNULL proc = subprocess.Popen( [ '/usr/sbin/cassandra', '-Dcassandra.config=file://%s/cassandra.yaml' % cassandra_conf, '-Dcassandra.logdir=%s' % cassandra_log, '-Dcassandra.jmx.local.port=%d' % jmx_port, '-Dcassandra-foreground=yes', ], start_new_session=True, env={ 'MAX_HEAP_SIZE': '300M', 'HEAP_NEWSIZE': '50M', 'JVM_OPTS': '-Xlog:gc=error:file=%s/gc.log' % cassandra_log, }, stdout=stdout, stderr=stderr, ) running = wait_for_peer('127.0.0.1', native_transport_port) if running: yield (['127.0.0.1'], native_transport_port) if not running or os.environ.get('LOG_CASSANDRA'): with open(str(cassandra_log.join('debug.log'))) as fd: print(fd.read()) if not running: raise Exception('cassandra process stopped unexpectedly.') pgrp = os.getpgid(proc.pid) os.killpg(pgrp, signal.SIGKILL) class RequestHandler: def on_request(self, rf): if hasattr(rf.message, 'query'): print() print(rf.message.query) +@pytest.fixture(scope='session') +def keyspace(cassandra_cluster): + (hosts, port) = cassandra_cluster + keyspace = os.urandom(10).hex() + + create_keyspace(hosts, keyspace, port) + + return keyspace + + # tests are executed using imported classes (TestStorage and # TestStorageGeneratedData) using overloaded swh_storage fixture # below @pytest.fixture -def swh_storage(cassandra_cluster): +def swh_storage(cassandra_cluster, keyspace): (hosts, port) = cassandra_cluster - keyspace = os.urandom(10).hex() - - create_keyspace(hosts, keyspace, port) storage = get_storage( 'cassandra', hosts=hosts, port=port, keyspace=keyspace, journal_writer={ 'cls': 'memory', }, objstorage={ 'cls': 'memory', 'args': {}, }, ) yield storage - storage._cql_runner._session.execute( - 'DROP KEYSPACE "%s"' % keyspace) + for table in TABLES: + storage._cql_runner._session.execute('TRUNCATE TABLE "%s"' % table) @pytest.mark.cassandra class TestCassandraStorage(_TestStorage): @pytest.mark.skip('content_update is not yet implemented for Cassandra') def test_content_update(self): pass @pytest.mark.skip( 'not implemented, see https://forge.softwareheritage.org/T1633') def test_skipped_content_add(self): pass @pytest.mark.skip( 'The "person" table of the pgsql is a legacy thing, and not ' 'supported by the cassandra backend.') def test_person_fullname_unicity(self): pass @pytest.mark.skip( 'The "person" table of the pgsql is a legacy thing, and not ' 'supported by the cassandra backend.') def test_person_get(self): pass @pytest.mark.skip('Not yet implemented') def test_metadata_provider_add(self): pass @pytest.mark.skip('Not yet implemented') def test_metadata_provider_get(self): pass @pytest.mark.skip('Not yet implemented') def test_metadata_provider_get_by(self): pass @pytest.mark.skip('Not yet implemented') def test_origin_metadata_add(self): pass @pytest.mark.skip('Not yet implemented') def test_origin_metadata_get(self): pass @pytest.mark.skip('Not yet implemented') def test_origin_metadata_get_by_provider_type(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_origin_count(self): pass @pytest.mark.cassandra class TestCassandraStorageGeneratedData(_TestStorageGeneratedData): @pytest.mark.skip('Not supported by Cassandra') def test_origin_count(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_origin_get_range(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_origin_get_range_from_zero(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_generate_content_get_range_limit(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_generate_content_get_range_no_limit(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_generate_content_get_range(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_generate_content_get_range_empty(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_generate_content_get_range_limit_none(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_generate_content_get_range_full(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_origin_count_with_visit_no_visits(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_origin_count_with_visit_with_visits_and_snapshot(self): pass @pytest.mark.skip('Not supported by Cassandra') def test_origin_count_with_visit_with_visits_no_snapshot(self): pass