diff --git a/README.md b/README.md index 08b52b67..25b65f1b 100644 --- a/README.md +++ b/README.md @@ -1,192 +1,194 @@ swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host -The tests expects `/usr/sbin/cassandra` to exist. +The tests expects the path to `cassandra` to either be unspecified, it is then +looked up at `/usr/sbin/cassandra`, either specified through the environment +variable `SWH_CASSANDRA_BIN`. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: local args: db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing args: root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote args: url: http://localhost:5002/ ``` You could directly define a local storage with the following snippet: ``` storage: cls: local args: db: service=swh-dev objstorage: cls: pathslicing args: root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py index 181af743..f6d1c9ef 100644 --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -1,391 +1,395 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import namedtuple import datetime import os import signal import socket import subprocess import time import pytest from swh.storage import get_storage from swh.storage.cassandra import create_keyspace from swh.storage.cassandra.schema import TABLES, HASH_ALGORITHMS from swh.storage.tests.test_storage import TestStorage as _TestStorage from swh.storage.tests.test_storage import ( TestStorageGeneratedData as _TestStorageGeneratedData, ) from .storage_data import data CONFIG_TEMPLATE = """ data_file_directories: - {data_dir}/data commitlog_directory: {data_dir}/commitlog hints_directory: {data_dir}/hints saved_caches_directory: {data_dir}/saved_caches commitlog_sync: periodic commitlog_sync_period_in_ms: 1000000 partitioner: org.apache.cassandra.dht.Murmur3Partitioner endpoint_snitch: SimpleSnitch seed_provider: - class_name: org.apache.cassandra.locator.SimpleSeedProvider parameters: - seeds: "127.0.0.1" storage_port: {storage_port} native_transport_port: {native_transport_port} start_native_transport: true listen_address: 127.0.0.1 enable_user_defined_functions: true # speed-up by disabling period saving to disk key_cache_save_period: 0 row_cache_save_period: 0 trickle_fsync: false commitlog_sync_period_in_ms: 100000 """ def free_port(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("127.0.0.1", 0)) port = sock.getsockname()[1] sock.close() return port def wait_for_peer(addr, port): wait_until = time.time() + 20 while time.time() < wait_until: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((addr, port)) except ConnectionRefusedError: time.sleep(0.1) else: sock.close() return True return False @pytest.fixture(scope="session") def cassandra_cluster(tmpdir_factory): cassandra_conf = tmpdir_factory.mktemp("cassandra_conf") cassandra_data = tmpdir_factory.mktemp("cassandra_data") cassandra_log = tmpdir_factory.mktemp("cassandra_log") native_transport_port = free_port() storage_port = free_port() jmx_port = free_port() with open(str(cassandra_conf.join("cassandra.yaml")), "w") as fd: fd.write( CONFIG_TEMPLATE.format( data_dir=str(cassandra_data), storage_port=storage_port, native_transport_port=native_transport_port, ) ) - if os.environ.get("LOG_CASSANDRA"): + if os.environ.get("SWH_CASSANDRA_LOG"): stdout = stderr = None else: stdout = stderr = subprocess.DEVNULL + + cassandra_bin = os.environ.get("SWH_CASSANDRA_BIN", "/usr/sbin/cassandra") proc = subprocess.Popen( [ - "/usr/sbin/cassandra", + cassandra_bin, "-Dcassandra.config=file://%s/cassandra.yaml" % cassandra_conf, "-Dcassandra.logdir=%s" % cassandra_log, "-Dcassandra.jmx.local.port=%d" % jmx_port, "-Dcassandra-foreground=yes", ], start_new_session=True, env={ "MAX_HEAP_SIZE": "300M", "HEAP_NEWSIZE": "50M", "JVM_OPTS": "-Xlog:gc=error:file=%s/gc.log" % cassandra_log, }, stdout=stdout, stderr=stderr, ) running = wait_for_peer("127.0.0.1", native_transport_port) if running: yield (["127.0.0.1"], native_transport_port) - if not running or os.environ.get("LOG_CASSANDRA"): - with open(str(cassandra_log.join("debug.log"))) as fd: - print(fd.read()) + if not running or os.environ.get("SWH_CASSANDRA_LOG"): + debug_log_path = str(cassandra_log.join("debug.log")) + if os.path.exists(debug_log_path): + with open(debug_log_path) as fd: + print(fd.read()) if not running: raise Exception("cassandra process stopped unexpectedly.") pgrp = os.getpgid(proc.pid) os.killpg(pgrp, signal.SIGKILL) class RequestHandler: def on_request(self, rf): if hasattr(rf.message, "query"): print() print(rf.message.query) @pytest.fixture(scope="session") def keyspace(cassandra_cluster): (hosts, port) = cassandra_cluster keyspace = os.urandom(10).hex() create_keyspace(hosts, keyspace, port) return keyspace # tests are executed using imported classes (TestStorage and # TestStorageGeneratedData) using overloaded swh_storage fixture # below @pytest.fixture def swh_storage_backend_config(cassandra_cluster, keyspace): (hosts, port) = cassandra_cluster storage_config = dict( cls="cassandra", hosts=hosts, port=port, keyspace=keyspace, journal_writer={"cls": "memory",}, objstorage={"cls": "memory", "args": {},}, ) yield storage_config storage = get_storage(**storage_config) for table in TABLES: storage._cql_runner._session.execute('TRUNCATE TABLE "%s"' % table) storage._cql_runner._cluster.shutdown() @pytest.mark.cassandra class TestCassandraStorage(_TestStorage): def test_content_add_murmur3_collision(self, swh_storage, mocker): """The Murmur3 token is used as link from index tables to the main table; and non-matching contents with colliding murmur3-hash are filtered-out when reading the main table. This test checks the content methods do filter out these collision. """ called = 0 # always return a token def mock_cgtfsh(algo, hash_): nonlocal called called += 1 assert algo in ("sha1", "sha1_git") return [123456] mocker.patch.object( swh_storage.storage._cql_runner, "content_get_tokens_from_single_hash", mock_cgtfsh, ) # For all tokens, always return data.cont Row = namedtuple("Row", HASH_ALGORITHMS) def mock_cgft(token): nonlocal called called += 1 return [Row(**{algo: data.cont[algo] for algo in HASH_ALGORITHMS})] mocker.patch.object( swh_storage.storage._cql_runner, "content_get_from_token", mock_cgft ) actual_result = swh_storage.content_add([data.cont2]) assert called == 4 assert actual_result == { "content:add": 1, "content:add:bytes": data.cont2["length"], } def test_content_get_metadata_murmur3_collision(self, swh_storage, mocker): """The Murmur3 token is used as link from index tables to the main table; and non-matching contents with colliding murmur3-hash are filtered-out when reading the main table. This test checks the content methods do filter out these collision. """ called = 0 # always return a token def mock_cgtfsh(algo, hash_): nonlocal called called += 1 assert algo in ("sha1", "sha1_git") return [123456] mocker.patch.object( swh_storage.storage._cql_runner, "content_get_tokens_from_single_hash", mock_cgtfsh, ) # For all tokens, always return data.cont and data.cont2 cols = list(set(data.cont) - {"data"}) Row = namedtuple("Row", cols + ["ctime"]) def mock_cgft(token): nonlocal called called += 1 return [ Row(ctime=42, **{col: cont[col] for col in cols}) for cont in [data.cont, data.cont2] ] mocker.patch.object( swh_storage.storage._cql_runner, "content_get_from_token", mock_cgft ) expected_cont = data.cont.copy() del expected_cont["data"] actual_result = swh_storage.content_get_metadata([data.cont["sha1"]]) assert called == 2 # but data.cont2 should be filtered out assert actual_result == {data.cont["sha1"]: [expected_cont]} def test_content_find_murmur3_collision(self, swh_storage, mocker): """The Murmur3 token is used as link from index tables to the main table; and non-matching contents with colliding murmur3-hash are filtered-out when reading the main table. This test checks the content methods do filter out these collision. """ called = 0 # always return a token def mock_cgtfsh(algo, hash_): nonlocal called called += 1 assert algo in ("sha1", "sha1_git") return [123456] mocker.patch.object( swh_storage.storage._cql_runner, "content_get_tokens_from_single_hash", mock_cgtfsh, ) # For all tokens, always return data.cont and data.cont2 cols = list(set(data.cont) - {"data"}) Row = namedtuple("Row", cols + ["ctime"]) def mock_cgft(token): nonlocal called called += 1 return [ Row(ctime=datetime.datetime.now(), **{col: cont[col] for col in cols}) for cont in [data.cont, data.cont2] ] mocker.patch.object( swh_storage.storage._cql_runner, "content_get_from_token", mock_cgft ) expected_cont = data.cont.copy() del expected_cont["data"] actual_result = swh_storage.content_find({"sha1": data.cont["sha1"]}) assert called == 2 # but data.cont2 should be filtered out del actual_result[0]["ctime"] assert actual_result == [expected_cont] @pytest.mark.skip("content_update is not yet implemented for Cassandra") def test_content_update(self): pass @pytest.mark.skip( 'The "person" table of the pgsql is a legacy thing, and not ' "supported by the cassandra backend." ) def test_person_fullname_unicity(self): pass @pytest.mark.skip( 'The "person" table of the pgsql is a legacy thing, and not ' "supported by the cassandra backend." ) def test_person_get(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count(self): pass @pytest.mark.cassandra class TestCassandraStorageGeneratedData(_TestStorageGeneratedData): @pytest.mark.skip("Not supported by Cassandra") def test_origin_count(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_get_range(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_get_range_from_zero(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_generate_content_get_range_limit(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_generate_content_get_range_no_limit(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_generate_content_get_range(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_generate_content_get_range_empty(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_generate_content_get_range_limit_none(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_generate_content_get_range_full(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count_with_visit_no_visits(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count_with_visit_with_visits_and_snapshot(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count_with_visit_with_visits_no_snapshot(self): pass diff --git a/tox.ini b/tox.ini index e82bc0ab..cf5cbabd 100644 --- a/tox.ini +++ b/tox.ini @@ -1,41 +1,42 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov dev: ipdb passenv = - LOG_CASSANDRA + SWH_CASSANDRA_BIN + SWH_CASSANDRA_LOG commands = pytest \ !slow: --hypothesis-profile=fast \ slow: --hypothesis-profile=slow \ --cov={envsitepackagesdir}/swh/storage \ {envsitepackagesdir}/swh/storage \ --doctest-modules \ --cov-branch {posargs} [testenv:black] skip_install = true deps = black commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh