diff --git a/.gitignore b/.gitignore --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ version.txt .tox .mypy_cache/ +ceph_key* +*.qcow2 +*.img diff --git a/MANIFEST.in b/MANIFEST.in --- a/MANIFEST.in +++ b/MANIFEST.in @@ -2,3 +2,4 @@ include requirements*.txt include version.txt recursive-include swh py.typed +exclude swh/objstorage/tests/winery diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -19,3 +19,12 @@ [mypy-requests_toolbelt.*] ignore_missing_imports = True + +[mypy-psycopg2.*] +ignore_missing_imports = True + +[mypy-swh.perfecthash.*] +ignore_missing_imports = True + +[mypy-sh.*] +ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,3 @@ swh.core[http] >= 0.3 swh.model >= 0.0.27 +swh.perfecthash diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,7 +1,10 @@ apache-libcloud azure-storage-blob >= 12.0, != 12.9.0 # version 12.9.0 breaks mypy https://github.com/Azure/azure-sdk-for-python/pull/20891 pytest +pytest-mock requests_mock[fixture] >= 1.9 requests_toolbelt types-pyyaml types-requests +pytest-postgresql < 4.0.0 # version 4.0 depends on psycopg 3. https://github.com/ClearcodeHQ/pytest-postgresql/blob/main/CHANGES.rst#400 + diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,8 @@ aiohttp >= 3 click requests +psycopg2 +sh # optional dependencies # apache-libcloud diff --git a/swh/objstorage/backends/winery/__init__.py b/swh/objstorage/backends/winery/__init__.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/__init__.py @@ -0,0 +1 @@ +from .objstorage import WineryObjStorage # noqa: F401 diff --git a/swh/objstorage/backends/winery/database.py b/swh/objstorage/backends/winery/database.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/database.py @@ -0,0 +1,75 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import time + +import psycopg2 + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.ERROR) + + +class Database: + def __init__(self, dsn): + self.dsn = dsn + + def create_database(self, database): + db = psycopg2.connect(f"{self.dsn}/postgres") + db.autocommit = True + c = db.cursor() + c.execute( + f"SELECT datname FROM pg_catalog.pg_database WHERE datname = '{database}'" + ) + if c.rowcount == 0: + c.execute(f"CREATE DATABASE {database}") + c.close() + + def drop_database(self, database): + db = psycopg2.connect(f"{self.dsn}/postgres") + # https://wiki.postgresql.org/wiki/Psycopg2_Tutorial + # If you want to drop the database you would need to + # change the isolation level of the database. + db.set_isolation_level(0) + db.autocommit = True + c = db.cursor() + # + # Dropping the database may fail because the server takes time + # to notice a connection was dropped and/or a named cursor is + # in the process of being deleted. It can happen here or even + # when deleting all database with the psql cli + # and there are no bench process active. + # + # ERROR: database "i606428a5a6274d1ab09eecc4d019fef7" is being + # accessed by other users DETAIL: There is 1 other session + # using the database. + # + # See: + # https://stackoverflow.com/questions/5108876/kill-a-postgresql-session-connection + # + # https://www.postgresql.org/docs/current/sql-dropdatabase.html + # + # WITH (FORCE) added in postgresql 13 but may also fail because the + # named cursor may not be handled as a client. + # + for i in range(60): + try: + c.execute(f"DROP DATABASE IF EXISTS {database}") + break + except psycopg2.errors.ObjectInUse: + LOGGER.warning(f"{database} database drop fails, waiting") + time.sleep(10) + continue + raise Exception("database drop fails {database}") + c.close() + + def list_databases(self): + db = psycopg2.connect(f"{self.dsn}/postgres") + with db.cursor() as c: + c.execute( + "SELECT datname FROM pg_database " + "WHERE datistemplate = false and datname != 'postgres'" + ) + return [r[0] for r in c.fetchall()] diff --git a/swh/objstorage/backends/winery/objstorage.py b/swh/objstorage/backends/winery/objstorage.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/objstorage.py @@ -0,0 +1,126 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +from multiprocessing import Process + +from swh.objstorage import exc +from swh.objstorage.objstorage import ObjStorage, compute_hash + +from .roshard import ROShard +from .rwshard import RWShard +from .sharedbase import SharedBase + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.ERROR) + + +def pack(shard, **kwargs): + return Packer(shard, **kwargs).run() + + +class Packer: + def __init__(self, shard, **kwargs): + self.args = kwargs + self.shard = shard + self.init() + + def init(self): + self.rw = RWShard(self.shard, **self.args) + self.ro = ROShard(self.shard, **self.args) + + def uninit(self): + del self.ro + del self.rw + + def run(self): + shard = self.ro.create(self.rw.count()) + for obj_id, content in self.rw.all(): + shard.write(obj_id, content) + shard.save() + base = SharedBase(**self.args) + base.shard_packing_ends(self.shard) + del base + return True + + +class WineryObjStorage(ObjStorage): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.args = kwargs + self.packers = [] + self.init() + + def init(self): + self.base = SharedBase(**self.args) + self.shard = RWShard(self.base.whoami, **self.args) + + def uninit(self): + del self.shard + del self.base + + def check_config(self, *, check_write): + return True + + def __contains__(self, obj_id): + return self.base.contains(obj_id) + + def add(self, content, obj_id=None, check_presence=True): + if obj_id is None: + obj_id = compute_hash(content) + + if check_presence and obj_id in self: + return obj_id + + shard = self.base.add_phase_1(obj_id) + if shard != self.base.id: + # this object is the responsibility of another shard + return obj_id + + self.shard.add(obj_id, content) + self.base.add_phase_2(obj_id) + + if self.shard.is_full(): + self.pack() + + return obj_id + + def get(self, obj_id): + shard_info = self.base.get(obj_id) + if shard_info is None: + raise exc.ObjNotFoundError(obj_id) + name, readonly = shard_info + if readonly: + shard = ROShard(name, **self.args) + shard.load() + content = shard.get(obj_id) + del shard + else: + shard = RWShard(name, **self.args) + content = shard.get(obj_id) + if content is None: + raise exc.ObjNotFoundError(obj_id) + return content + + def check(self, obj_id): + # load all shards readonly == NULL and not locked (i.e. packer + # was interrupted for whatever reason) run pack for each of them + pass + + def delete(self, obj_id): + raise PermissionError("Delete is not allowed.") + + def pack(self): + self.base.shard_packing_starts() + p = Process(target=pack, args=(self.shard.name,), kwargs=self.args) + p.start() + self.packers.append(p) + self.uninit() + self.init() + + def __del__(self): + for p in self.packers: + p.kill() + p.join() diff --git a/swh/objstorage/backends/winery/roshard.py b/swh/objstorage/backends/winery/roshard.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/roshard.py @@ -0,0 +1,120 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +import sh + +from swh.perfecthash import Shard + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.ERROR) + + +class Pool(object): + name = "shards" + + def __init__(self, **kwargs): + self.args = kwargs + + def init(self): + self.rbd = sh.sudo.bake("rbd", f"--pool={self.name}") + self.ceph = sh.sudo.bake("ceph") + self.image_size = self.args["shard_max_size"] * 2 + + def uninit(self): + pass + + def image_list(self): + try: + self.rbd.ls() + except sh.ErrorReturnCode_2 as e: + if "No such file or directory" in e.args[0]: + return [] + else: + raise + return [image.strip() for image in self.rbd.ls()] + + def image_path(self, image): + return f"/dev/rbd/{self.name}/{image}" + + def image_create(self, image): + LOGGER.info(f"rdb --pool {self.name} create --size={self.image_size} {image}") + self.rbd.create( + f"--size={self.image_size}", f"--data-pool={self.name}-data", image + ) + self.rbd.feature.disable( + f"{self.name}/{image}", "object-map", "fast-diff", "deep-flatten" + ) + self.image_map(image, "rw") + + def image_map(self, image, options): + self.rbd.device("map", "-o", options, image) + sh.sudo("chmod", "777", self.image_path(image)) + + def image_remap_ro(self, image): + self.image_unmap(image) + self.image_map(image, "ro") + + def image_unmap(self, image): + self.rbd.device.unmap(f"{self.name}/{image}", _ok_code=(0, 22)) + + def image_delete(self, image): + self.image_unmap(image) + LOGGER.info(f"rdb --pool {self.name} remove {image}") + self.rbd.remove(image) + + def images_clobber(self): + for image in self.image_list(): + image = image.strip() + self.image_unmap(image) + + def clobber(self): + self.images_clobber() + self.pool_clobber() + + def pool_clobber(self): + LOGGER.info(f"ceph osd pool delete {self.name}") + self.ceph.osd.pool.delete(self.name, self.name, "--yes-i-really-really-mean-it") + data = f"{self.name}-data" + LOGGER.info(f"ceph osd pool delete {data}") + self.ceph.osd.pool.delete(data, data, "--yes-i-really-really-mean-it") + + def pool_create(self): + data = f"{self.name}-data" + LOGGER.info(f"ceph osd pool create {data}") + self.ceph.osd( + "erasure-code-profile", + "set", + "--force", + data, + "k=4", + "m=2", + "crush-failure-domain=host", + ) + self.ceph.osd.pool.create(data, "200", "erasure", data) + self.ceph.osd.pool.set(data, "allow_ec_overwrites", "true") + self.ceph.osd.pool.set(data, "pg_autoscale_mode", "off") + LOGGER.info(f"ceph osd pool create {self.name}") + self.ceph.osd.pool.create(self.name) + + +class ROShard: + def __init__(self, name, **kwargs): + self.pool = Pool(shard_max_size=kwargs["shard_max_size"]) + self.pool.init() + self.name = name + + def create(self, count): + self.pool.image_create(self.name) + self.shard = Shard(self.pool.image_path(self.name)) + return self.shard.create(count) + + def load(self): + self.shard = Shard(self.pool.image_path(self.name)) + return self.shard.load() == self.shard + + def get(self, key): + return self.shard.lookup(key) diff --git a/swh/objstorage/backends/winery/rwshard.py b/swh/objstorage/backends/winery/rwshard.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/rwshard.py @@ -0,0 +1,79 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import psycopg2 + +from .database import Database + + +class RWShard(Database): + def __init__(self, name, **kwargs): + super().__init__(kwargs["shard_dsn"]) + self._name = name + self.create_database(self.name) + self.db = self.create_table(f"{self.dsn}/{self.name}") + self.size = self.total_size() + self.limit = kwargs["shard_max_size"] + + @property + def name(self): + return self._name + + def is_full(self): + return self.size > self.limit + + def create_table(self, dsn): + db = psycopg2.connect(dsn) + db.autocommit = True + c = db.cursor() + c.execute( + "CREATE TABLE IF NOT EXISTS objects(" + "key BYTEA PRIMARY KEY, " + "content BYTEA " + ")" + ) + c.close() + return db + + def total_size(self): + with self.db.cursor() as c: + c.execute("SELECT SUM(LENGTH(content)) FROM objects") + size = c.fetchone()[0] + if size is None: + return 0 + else: + return size + + def add(self, obj_id, content): + try: + with self.db.cursor() as c: + c.execute( + "INSERT INTO objects (key, content) VALUES (%s, %s)", + (obj_id, content), + ) + self.db.commit() + self.size += len(content) + except psycopg2.errors.UniqueViolation: + pass + + def get(self, obj_id): + with self.db.cursor() as c: + c.execute("SELECT content FROM objects WHERE key = %s", (obj_id,)) + if c.rowcount == 0: + return None + else: + return c.fetchone()[0].tobytes() + + def all(self): + with self.db.cursor() as c: + c.execute("SELECT key,content FROM objects") + for row in c: + yield row[0].tobytes(), row[1].tobytes() + + def count(self): + with self.db.cursor() as c: + c.execute("SELECT COUNT(*) FROM objects") + return c.fetchone()[0] diff --git a/swh/objstorage/backends/winery/sharedbase.py b/swh/objstorage/backends/winery/sharedbase.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/sharedbase.py @@ -0,0 +1,173 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import uuid + +import psycopg2 + +from .database import Database + + +class SharedBase(Database): + def __init__(self, **kwargs): + super().__init__(kwargs["base_dsn"]) + database = "sharedbase" + self.create_database(database) + self.db = self.create_table(f"{self.dsn}/{database}") + self._whoami = None + + def create_table(self, dsn): + db = psycopg2.connect(dsn) + db.autocommit = True + c = db.cursor() + c.execute( + "CREATE TABLE IF NOT EXISTS shards(" + "id SERIAL PRIMARY KEY, " + "readonly BOOLEAN, " + "packing BOOLEAN, " + "name CHAR(32) NOT NULL " + ")" + ) + c.execute( + "CREATE TABLE IF NOT EXISTS signature2shard(" + "signature BYTEA PRIMARY KEY, " + "inflight BOOLEAN NOT NULL, " + "shard INTEGER NOT NULL" + ")" + ) + c.close() + return db + + @property + def whoami(self): + self.set_whoami() + return self._whoami + + @property + def id(self): + self.set_whoami() + return self._whoami_id + + def set_whoami(self): + if self._whoami is not None: + return + + while True: + self._whoami, self._whoami_id = self.lock_a_shard() + if self._whoami is not None: + return self._whoami + self.create_shard() + + def lock_a_shard(self): + with self.db.cursor() as c: + c.execute( + "SELECT name FROM shards WHERE readonly = FALSE and packing = FALSE " + "LIMIT 1 FOR UPDATE SKIP LOCKED" + ) + if c.rowcount == 0: + return None, None + name = c.fetchone()[0] + return self.lock_shard(name) + + def lock_shard(self, name): + self.whoami_lock = self.db.cursor() + try: + self.whoami_lock.execute( + "SELECT name, id FROM shards " + "WHERE readonly = FALSE AND packing = FALSE AND name = %s " + "FOR UPDATE NOWAIT", + (name,), + ) + return self.whoami_lock.fetchone() + except Exception: + return None + + def unlock_shard(self): + del self.whoami_lock + + def create_shard(self): + name = uuid.uuid4().hex + # + # ensure the first character is not a number so it can be used as a + # database name. + # + name = "i" + name[1:] + with self.db.cursor() as c: + c.execute( + "INSERT INTO shards (name, readonly, packing) " + "VALUES (%s, FALSE, FALSE)", + (name,), + ) + self.db.commit() + + def shard_packing_starts(self): + with self.db.cursor() as c: + c.execute( + "UPDATE shards SET packing = TRUE WHERE name = %s", (self.whoami,) + ) + self.unlock_shard() + + def shard_packing_ends(self, name): + with self.db.cursor() as c: + c.execute( + "UPDATE shards SET readonly = TRUE, packing = FALSE " "WHERE name = %s", + (name,), + ) + + def get_shard_info(self, id): + with self.db.cursor() as c: + c.execute("SELECT name, readonly FROM shards WHERE id = %s", (id,)) + if c.rowcount == 0: + return None + else: + return c.fetchone() + + def get_shard_info_by_name(self, name): + with self.db.cursor() as c: + c.execute("SELECT readonly, packing FROM shards WHERE name = %s", (name,)) + if c.rowcount == 0: + return None + else: + return c.fetchone() + + def contains(self, obj_id): + with self.db.cursor() as c: + c.execute( + "SELECT shard FROM signature2shard WHERE " + "signature = %s AND inflight = FALSE", + (obj_id,), + ) + if c.rowcount == 0: + return None + else: + return c.fetchone()[0] + + def get(self, obj_id): + id = self.contains(obj_id) + if id is None: + return None + return self.get_shard_info(id) + + def add_phase_1(self, obj_id): + try: + with self.db.cursor() as c: + c.execute( + "INSERT INTO signature2shard (signature, shard, inflight) " + "VALUES (%s, %s, TRUE)", + (obj_id, self.id), + ) + self.db.commit() + return self.id + except psycopg2.errors.UniqueViolation: + return self.contains(obj_id) + + def add_phase_2(self, obj_id): + with self.db.cursor() as c: + c.execute( + "UPDATE signature2shard SET inflight = FALSE " + "WHERE signature = %s AND shard = %s", + (obj_id, self.id), + ) + self.db.commit() diff --git a/swh/objstorage/factory.py b/swh/objstorage/factory.py --- a/swh/objstorage/factory.py +++ b/swh/objstorage/factory.py @@ -13,6 +13,7 @@ from swh.objstorage.backends.noop import NoopObjStorage from swh.objstorage.backends.pathslicing import PathSlicingObjStorage from swh.objstorage.backends.seaweedfs import SeaweedFilerObjStorage +from swh.objstorage.backends.winery import WineryObjStorage from swh.objstorage.multiplexer import MultiplexerObjStorage, StripingObjStorage from swh.objstorage.multiplexer.filter import add_filters from swh.objstorage.objstorage import ID_HASH_LENGTH, ObjStorage # noqa @@ -27,6 +28,7 @@ "seaweedfs": SeaweedFilerObjStorage, "random": RandomGeneratorObjStorage, "http": HTTPReadOnlyObjStorage, + "winery": WineryObjStorage, "noop": NoopObjStorage, } diff --git a/swh/objstorage/tests/conftest.py b/swh/objstorage/tests/conftest.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/conftest.py @@ -0,0 +1,2 @@ +def pytest_configure(config): + config.addinivalue_line("markers", "shard_max_size: winery backend") diff --git a/swh/objstorage/tests/test_objstorage_winery.py b/swh/objstorage/tests/test_objstorage_winery.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/test_objstorage_winery.py @@ -0,0 +1,175 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# +# Read carefully if the test run fails and hang forever. +# +# pytest-postgresql will hang forever if there is a connexion open +# when it cleans up and tries to "drop database". It can be verified +# with strace -s 255 on the pytest process. This never happens when +# the tests succeed but is very likely to happen whenever a test +# fails. To prevent that, all tests *must*: +# +# * explicitly close the database connections and not leave that to +# the garbage collector because the object may be reclaimed *after* +# the pytest-posgresql cleanup +# +# * not include a function call that accesses the database in an +# assert because it may be instrumented by pytest in a way that will +# leak the database connexion (when cursors "with" context have a +# return statement in their body, for instance). +# + +import pytest +import sh + +from swh.objstorage import exc +from swh.objstorage.backends.winery.database import Database +from swh.objstorage.backends.winery.objstorage import Packer, pack +from swh.objstorage.backends.winery.roshard import Pool +from swh.objstorage.factory import get_objstorage + + +@pytest.fixture +def needs_ceph(): + try: + sh.ceph("--version") + except sh.CommandNotFound: + pytest.skip("the ceph CLI was not found") + + +@pytest.fixture +def ceph_pool(needs_ceph): + pool = Pool(shard_max_size=10 * 1024 * 1024) + pool.init() + pool.clobber() + pool.pool_create() + + yield pool + + pool.images_clobber() + pool.clobber() + + +@pytest.fixture +def winery(request, postgresql): + marker = request.node.get_closest_marker("shard_max_size") + if marker is None: + shard_max_size = 1024 + else: + shard_max_size = marker.args[0] + dsn = ( + f"postgres://{postgresql.info.user}" + f":@{postgresql.info.host}:{postgresql.info.port}" + ) + winery = get_objstorage( + cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=shard_max_size + ) + yield winery + winery.uninit() + # + # pytest-postgresql will not remove databases that it did not + # create between tests (only at the very end). + # + d = Database(dsn) + for database in d.list_databases(): + if database != postgresql.info.dbname and database != "tests_tmpl": + d.drop_database(database) + + +def test_winery_sharedbase(winery): + base = winery.base + shard1 = base.whoami + assert shard1 is not None + assert shard1 == base.whoami + + id1 = base.id + assert id1 is not None + assert id1 == base.id + + +def test_winery_add_get(winery): + shard = winery.base.whoami + content = b"SOMETHING" + obj_id = winery.add(content=content) + assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" + assert winery.add(content=content, obj_id=obj_id) == obj_id + assert winery.add(content=content, obj_id=obj_id, check_presence=False) == obj_id + assert winery.base.whoami == shard + assert winery.get(obj_id) == content + with pytest.raises(exc.ObjNotFoundError): + winery.get(b"unknown") + + +@pytest.mark.shard_max_size(1) +def test_winery_add_and_pack(winery, mocker): + mocker.patch("swh.objstorage.backends.winery.objstorage.pack", return_value=True) + shard = winery.base.whoami + content = b"SOMETHING" + obj_id = winery.add(content=content) + assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" + assert winery.base.whoami != shard + assert len(winery.packers) == 1 + packer = winery.packers[0] + packer.join() + assert packer.exitcode == 0 + + +def test_winery_delete(winery): + with pytest.raises(PermissionError): + winery.delete(None) + + +def test_winery_get_shard_info(winery): + assert winery.base.get_shard_info(1234) is None + assert winery.base.get_shard_info_by_name("nothing") is None + + +@pytest.mark.shard_max_size(10 * 1024 * 1024) +def test_winery_packer(winery, ceph_pool): + shard = winery.base.whoami + content = b"SOMETHING" + winery.add(content=content) + winery.base.shard_packing_starts() + packer = Packer(shard, **winery.args) + try: + assert packer.run() is True + finally: + packer.uninit() + + readonly, packing = winery.base.get_shard_info_by_name(shard) + assert readonly is True + assert packing is False + + +@pytest.mark.shard_max_size(10 * 1024 * 1024) +def test_winery_get_object(winery, ceph_pool): + shard = winery.base.whoami + content = b"SOMETHING" + obj_id = winery.add(content=content) + winery.base.shard_packing_starts() + assert pack(shard, **winery.args) is True + winery.base.get_shard_info_by_name(shard) + assert winery.get(obj_id) == content + + +def test_winery_ceph_pool(needs_ceph): + name = "IMAGE" + pool = Pool(shard_max_size=10 * 1024 * 1024) + pool.init() + pool.clobber() + pool.pool_create() + pool.image_create(name) + p = pool.image_path(name) + assert p.endswith(name) + something = "SOMETHING" + open(p, "w").write(something) + assert open(p).read(len(something)) == something + assert pool.image_list() == [name] + pool.image_remap_ro(name) + pool.images_clobber() + assert pool.image_list() == [name] + pool.clobber() + assert pool.image_list() == [] diff --git a/swh/objstorage/tests/winery/README.md b/swh/objstorage/tests/winery/README.md new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/winery/README.md @@ -0,0 +1,20 @@ +# Installation + +* ensure virsh is available +* pip install -r requirements.txt + +# Create + +* ansible-galaxy install geerlingguy.docker +* ./build-vms.sh +* runner=debian@10.11.12.211 +* idx=debian@10.11.12.213 +* ansible-playbook -i inventory ceph.yml osd.yml rw.yml tests.yml + +# Running + +* rsync -av --exclude=.mypy_cache --exclude=.coverage --exclude=.eggs --exclude=swh.objstorage.egg-info --exclude=*.img --exclude=*.qcow2 --exclude=.tox --exclude='*~' --exclude=__pycache__ --exclude='*.py[co]' $(git rev-parse --show-toplevel)/ $runner:swh-objstorage/ ; ssh -t $runner bash -c "'cd swh-objstorage ; ../venv/bin/tox -e py3 -- -k test_winery'" + +# Destroy + +* ./build-vms.sh stop $(seq 1 8) diff --git a/swh/objstorage/tests/winery/build-vms.sh b/swh/objstorage/tests/winery/build-vms.sh new file mode 100755 --- /dev/null +++ b/swh/objstorage/tests/winery/build-vms.sh @@ -0,0 +1,99 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +function stop() { + local ids="$@" + + for id in $ids ; do + virsh destroy ceph$id + virsh undefine ceph$id + rm -f ceph$id.qcow2 + done + virsh net-destroy ceph + virsh net-undefine ceph + rm -f ceph*.img +} + +function start() { + local ids="$@" + + if ! test -f debian-11.qcow2 ; then + sudo virt-builder debian-11 --output debian-11.qcow2 --size 10G --format qcow2 --install sudo --run-command 'dpkg-reconfigure --frontend=noninteractive openssh-server' --run-command 'useradd -s /bin/bash -m debian || true ; echo "debian ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers.d/90-debian' --ssh-inject debian:file:$HOME/.ssh/id_rsa.pub --edit '/etc/network/interfaces: s/ens2/enp1s0/' + fi + + if ! virsh net-list --name | grep ceph ; then + cat > /tmp/ceph-net.xml < + ceph + + + + + + + + + + + + + + + + + +EOF + virsh net-define /tmp/ceph-net.xml + virsh net-start ceph + fi + + + for id in $ids ; do + virsh destroy ceph$id + virsh undefine ceph$id + rm -f ceph$id.qcow2 + cp --sparse=always debian-11.qcow2 ceph$id.qcow2 + sudo virt-sysprep -a ceph$id.qcow2 --enable customize --hostname ceph$id + virt-install --network network=ceph,mac=52:54:00:00:00:0$id --boot hd --name ceph$id --memory 2048 --vcpus 1 --cpu host --disk path=$(pwd)/ceph$id.qcow2,bus=virtio,format=qcow2 --os-type=linux --os-variant=debian10 --graphics none --noautoconsole + case $id in + 1) + ;; + 2) + virsh detach-device ceph$id rng.xml --live + for drive in b c ; do + # + # Without the sleep it fails with: + # + # error: Failed to attach disk + # error: internal error: No more available PCI slots + # + sleep 10 + rm -f disk$id$drive.img + qemu-img create -f raw disk$id$drive.img 20G + sudo chown libvirt-qemu disk$id$drive.img + virsh attach-disk ceph$id --source $(pwd)/disk$id$drive.img --target vd$drive --persistent + done + ;; + *) + rm -f disk$id.img + qemu-img create -f raw disk$id.img 20G + sudo chown libvirt-qemu disk$id.img + virsh attach-disk ceph$id --source $(pwd)/disk$id.img --target vdb --persistent + ;; + esac + done +} + +function restart() { + local ids="$@" + stop $ids + start $ids +} + +if test "$1" ; then + "$@" +else + restart 1 2 3 5 4 6 7 8 +fi diff --git a/swh/objstorage/tests/winery/ceph.yml b/swh/objstorage/tests/winery/ceph.yml new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/winery/ceph.yml @@ -0,0 +1,121 @@ +# +# notes to install a client +# https://docs.ceph.com/en/latest/cephadm/client-setup/ +# ceph config generate-minimal-conf > /etc/ceph/ceph.conf +# ceph auth get-or-create client.admin > /etc/ceph/ceph.keyring +# +- hosts: localhost + gather_facts: false + + pre_tasks: + + - name: keygen ceph_key + shell: | + ssh-keygen -f ceph_key -N '' -t rsa + args: + creates: ceph_key + +- hosts: all + become: true + + pre_tasks: + + - name: mkdir /root/.ssh + file: + path: /root/.ssh + state: directory + mode: 700 + + - name: touch /root/.ssh/authorized_keys + file: + path: /root/.ssh/authorized_keys + state: touch + + - name: add ceph_key.pub to /root/.ssh/authorized_keys + lineinfile: + path: /root/.ssh/authorized_keys + line: "{{ lookup('file', 'ceph_key.pub') }}" + + - name: apt install + apt: + name: + - htop + - iotop + - iftop + - iperf + +- hosts: ceph + become: true + + pre_tasks: + + - name: apt install lvm2 curl gnupg2 + apt: + name: + - lvm2 + - curl + - gnupg2 + + - name: apt-key https://download.ceph.com/keys/release.asc + apt_key: + url: https://download.ceph.com/keys/release.asc + + - name: add repository + apt_repository: + repo: "deb https://download.ceph.com/debian-pacific/ bullseye main" + filename: ceph + + - name: apt install cephadm ceph-common + apt: + name: + - cephadm + - ceph-common + + roles: + - geerlingguy.docker + +- hosts: all + become: true + + tasks: + + - name: "add {{ inventory_hostname }} to /etc/hosts" + lineinfile: + path: /etc/hosts + line: "{{ hostvars[inventory_hostname]['ansible_default_ipv4']['address'] }} {{ inventory_hostname }}" + delegate_to: ceph1 + + - name: set hostname + hostname: + name: "{{ inventory_hostname }}" + +- hosts: mon + become: true + + tasks: + + - name: scp ceph_key.* + copy: + src: "{{ item }}" + dest: "{{ item }}" + loop: + - ceph_key + - ceph_key.pub + + - name: cephadm bootstrap + shell: | + set -ex + cephadm bootstrap --mon-ip {{ hostvars[groups['mon'][0]]['ansible_default_ipv4']['address'] }} + cephadm shell ceph cephadm clear-key + ceph config-key set mgr/cephadm/ssh_identity_key -i ceph_key + ceph config-key set mgr/cephadm/ssh_identity_pub -i ceph_key.pub + ceph orch apply osd --all-available-devices + args: + creates: /etc/ceph/ceph.pub + + - name: ceph configuration + shell: | + set -ex + ceph config set mon mon_allow_pool_delete true + # does not work for some reason: must be done manually + cephadm shell ceph mgr fail # required for mgr/cephadm/ssh_identity* to be refreshed diff --git a/swh/objstorage/tests/winery/inventory/group_vars/all/rw.yml b/swh/objstorage/tests/winery/inventory/group_vars/all/rw.yml new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/winery/inventory/group_vars/all/rw.yml @@ -0,0 +1,5 @@ +--- +rw_disk1: /dev/vdb +rw_disk2: /dev/vdc +postgres_shared_buffers: 512MB +postgres_effective_cache_size: 1GB diff --git a/swh/objstorage/tests/winery/inventory/groups.yml b/swh/objstorage/tests/winery/inventory/groups.yml new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/winery/inventory/groups.yml @@ -0,0 +1,13 @@ +--- +ceph: + children: + mon: + osd: + +mon: + hosts: + ceph1: + +rw: + hosts: + ceph2: diff --git a/swh/objstorage/tests/winery/inventory/hosts.yml b/swh/objstorage/tests/winery/inventory/hosts.yml new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/winery/inventory/hosts.yml @@ -0,0 +1,10 @@ +all: + hosts: + ceph1: {ansible_host: 10.11.12.211, ansible_port: '22', ansible_user: debian, ansible_python_interpreter: '/usr/bin/python3'} + ceph2: {ansible_host: 10.11.12.212, ansible_port: '22', ansible_user: debian, ansible_python_interpreter: '/usr/bin/python3'} + ceph3: {ansible_host: 10.11.12.213, ansible_port: '22', ansible_user: debian, ansible_python_interpreter: '/usr/bin/python3'} + ceph4: {ansible_host: 10.11.12.214, ansible_port: '22', ansible_user: debian, ansible_python_interpreter: '/usr/bin/python3'} + ceph5: {ansible_host: 10.11.12.215, ansible_port: '22', ansible_user: debian, ansible_python_interpreter: '/usr/bin/python3'} + ceph6: {ansible_host: 10.11.12.216, ansible_port: '22', ansible_user: debian, ansible_python_interpreter: '/usr/bin/python3'} + ceph7: {ansible_host: 10.11.12.217, ansible_port: '22', ansible_user: debian, ansible_python_interpreter: '/usr/bin/python3'} + ceph8: {ansible_host: 10.11.12.218, ansible_port: '22', ansible_user: debian, ansible_python_interpreter: '/usr/bin/python3'} diff --git a/swh/objstorage/tests/winery/inventory/osd.yml b/swh/objstorage/tests/winery/inventory/osd.yml new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/winery/inventory/osd.yml @@ -0,0 +1,9 @@ +--- +osd: + hosts: + ceph3: + ceph4: + ceph5: + ceph6: + ceph7: + ceph8: diff --git a/swh/objstorage/tests/winery/osd.yml b/swh/objstorage/tests/winery/osd.yml new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/winery/osd.yml @@ -0,0 +1,38 @@ +--- + +- hosts: osd + become: true + + tasks: + + - name: add host + shell: | + ceph orch host add {{ inventory_hostname }} + delegate_to: ceph1 + +- hosts: osd + become: true + + tasks: + + - name: wait for host + shell: | + ceph orch host ls | grep '^{{ inventory_hostname }} ' + delegate_to: ceph1 + register: host + until: host is success + retries: 30 + delay: 5 + +- hosts: osd + become: true + + tasks: + + # the desired side effect here is twofold + # * device zap blocks until the osd daemon is ready on the target host + # * on grid5000 /dev/sdc needs to be applied + - name: zap /dev/sdc + shell: | + ceph orch device zap {{ inventory_hostname }} /dev/sdc --force || true + delegate_to: ceph1 diff --git a/swh/objstorage/tests/winery/requirements.txt b/swh/objstorage/tests/winery/requirements.txt new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/winery/requirements.txt @@ -0,0 +1 @@ +ansible diff --git a/swh/objstorage/tests/winery/rng.xml b/swh/objstorage/tests/winery/rng.xml new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/winery/rng.xml @@ -0,0 +1,5 @@ + + /dev/urandom + +
+ diff --git a/swh/objstorage/tests/winery/rw.yml b/swh/objstorage/tests/winery/rw.yml new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/winery/rw.yml @@ -0,0 +1,110 @@ +--- +- name: install and configure Read Write Storage + hosts: rw + become: true + + pre_tasks: + + - name: zap attached disks + shell: | + for disk in {{ rw_disk1 }} {{ rw_disk2 }} ; do + dd if=/dev/zero of=$disk count=100 bs=1024k + done + touch /etc/zapped.done + args: + creates: /etc/zapped.done + + - name: apt install lvm2 + apt: + name: + - lvm2 + + - name: vgcreate pg + lvg: + vg: pg + pvs: "{{ rw_disk1 }},{{ rw_disk2 }}" + + - name: lvcreate pg + lvol: + vg: pg + lv: pg + size: +100%FREE + + - name: mkfs /dev/mapper/pg-pg + filesystem: + fstype: ext4 +# force: yes + dev: /dev/mapper/pg-pg + + - name: mkdir /var/lib/postgresql + file: + path: /var/lib/postgresql + state: directory + mode: 755 + + - name: mount /var/lib/postgresql + mount: + path: /var/lib/postgresql + src: /dev/mapper/pg-pg + fstype: ext4 + state: mounted + + - name: apt install postgres + apt: + name: + - postgresql + - postgresql-contrib + - libpq-dev + - python3-psycopg2 + - acl + + - name: postgresql.conf max_connections = 1000 + lineinfile: + path: /etc/postgresql/13/main/postgresql.conf + regexp: '^max_connections' + line: "max_connections = 1000" + + # + # https://wiki.postgresql.org/wiki/Tuning_Your_PostgreSQL_Server + # + - name: postgresql.conf shared_buffers + lineinfile: + path: /etc/postgresql/13/main/postgresql.conf + regexp: '^shared_buffers' + # 1/4 RAM + line: "shared_buffers = {{ postgres_shared_buffers }}" + + - name: postgresql.conf effective_cache_size + lineinfile: + path: /etc/postgresql/13/main/postgresql.conf + regexp: '.*effective_cache_size' + # 1/2 RAM + line: "effective_cache_size = {{ postgres_effective_cache_size }}" + + - name: postgresql.conf random_page_cost + lineinfile: + path: /etc/postgresql/13/main/postgresql.conf + regexp: '.*random_page_cost' + line: "random_page_cost = 2.0" + + - name: listen on * + lineinfile: + path: /etc/postgresql/13/main/postgresql.conf + line: "listen_addresses = '*'" + + - name: allow all connexions + lineinfile: + path: /etc/postgresql/13/main/pg_hba.conf + line: "host all all 0.0.0.0/0 trust" + + - name: systemctl restart postgresql + service: + name: postgresql + state: restarted + + - name: pg user testuser/testpassword + postgresql_user: + name: testuser + password: testpassword + role_attr_flags: SUPERUSER + become_user: postgres diff --git a/swh/objstorage/tests/winery/tests.yml b/swh/objstorage/tests/winery/tests.yml new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/winery/tests.yml @@ -0,0 +1,28 @@ +- name: install test environment + hosts: mon + + pre_tasks: + + - name: apt install + apt: + name: + - emacs-nox + - gcc + - libcap-dev + - libcmph-dev + - libpq-dev + - postgresql-client-common + - postgresql-13 + - python3-pip + - python3-rbd + - rsync + - tmux + - virtualenv + become: true + + - name: configure venv + shell: | + virtualenv venv + venv/bin/pip3 install tox + args: + creates: venv diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -10,7 +10,7 @@ commands = pytest --cov={envsitepackagesdir}/swh/objstorage \ {envsitepackagesdir}/swh/objstorage \ - --cov-branch {posargs} + --cov-branch --cov-report term-missing {posargs} [testenv:black] skip_install = true