diff --git a/.gitignore b/.gitignore index 55cc78f..26dc573 100644 --- a/.gitignore +++ b/.gitignore @@ -1,13 +1,16 @@ *.pyc *.sw? *~ /.coverage /.coverage.* .eggs/ __pycache__ *.egg-info/ build dist version.txt .tox .mypy_cache/ +winery-test-environment/context + + diff --git a/MANIFEST.in b/MANIFEST.in index 3148ae2..6a9b252 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ include Makefile include requirements*.txt include version.txt recursive-include swh py.typed +exclude winery-test-environment diff --git a/docs/index.rst b/docs/index.rst index 14e2628..c526d90 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,16 +1,17 @@ .. _swh-objstorage: Software Heritage - Object storage ================================== Content-addressable object storage. Reference Documentation ----------------------- .. toctree:: :maxdepth: 2 cli + winery /apidoc/swh.objstorage diff --git a/docs/winery.rst b/docs/winery.rst new file mode 100644 index 0000000..56dbec7 --- /dev/null +++ b/docs/winery.rst @@ -0,0 +1,40 @@ +.. _swh-objstorage-winery: + +Winery backend +============== + +The Winery backend implements the `Ceph based object storage architecture `__. + +Implementation notes +-------------------- + +The `sharedstorage.py` file contains the global index implementation that associates every object id to the shard it contains. A list of shard (either writable or readonly) is stored in a table, with a numeric id to save space. The name of the shard is used to create a database (for write shards) or a RBD image (for read shards). + +The `roshard.py` file contain the lookup function for a read shard and is a thin layer on top of swh-perfect hash. + +The `rwshard.py` file contains the logic to read, write and enumerate the objects of a write shard using SQL statements on the database dedicated to it. + +The `obstorage.py` file contains the backend implementation in the `WineryObjStorage` class. It is a thin layer that delegates writes to a `WineryWriter` instance and reads to a `WineryReader` instance. Although they are currently tightly coupled, they are likely to eventually run in different workers if performance and security requires it. + +A `WineryReader` must be able to read an object from both Read Shards and Write Shards. It will first determine the kind of shard the object belongs to by looking it up in the global index. If it is a Read Shard, it will lookup the object using the `ROShard` class from `roshard.py`, ultimately using a Ceph RBD image. If it is a Write Shard, it will lookup the object using the `RWShard` class from `rwshard.py`, ultimately using a PostgreSQL database. + +All `WineryWriter` operations are idempotent so they can be resumed in case they fail. When a `WineryWriter` is instantiated, it will either: + +* Find a Write Shard (i.e. a database) that is not locked by another instance by looking up the list of shards or, +* Create a new Write Shard by creating a new database + +and it will lock the Write Shard and own it so no other instance tries to write to it. A PostgreSQL session lock is used to lock the shard so that it is released when the `WineryWrite` process dies unexpectedly and another process can pick it up. + +When a new object is added to the Write Shard, a new row is added to the global index to record that it is owned by this Write Shard and is in flight. Such an object cannot be read because it is not yet complete. If a request to write the same object is sent to another `WineryWriter` instance, it will fail to add it to the global index because it already exists. Since the object is in flight, the `WineryWriter` will check if the shard associated to the object is: + +* its name, which means it owns the object and must resume writing the object +* not its name, which means another `WineryWriter` owns it and nothing needs to be done + +After the content of the object is successfully added to the Write Shard, the state of the record in the global index is modified to no longer be in flight. The client is notified that the operation was successful and the object can be read from the Write Shard from that point on. + +When the size of the database associated with a Write Shard exceeds a threshold, it is set to be in the `packing` state. All objects it contains can be read from it by any `WineryReader` but no new object will be added to it. A process is spawned and is tasked to transform it into a Read Shard using the `Packer` class. Should it fail for any reason, a cron job will restart it when it finds Write Shards that are both in the `packing` state and not locked by any process. Packing is done by enumerating all the records from the Write Shard database and writing them into a Read Shard by the same name. Incomplete Read Shards will never be used by `WineryReader` because the global index will direct it to use the Write Shard instead. Once the packing completes, the state of the shard is modified to be readonly and from that point on the `WineryReader` will only use the Read Shard to find the objects it contains. The database containing the Write Shard is then destroyed because it is no longer useful and the process terminates on success. + +Benchmarks +---------- + +Follow the instructions at winery-test-environment/README.md diff --git a/mypy.ini b/mypy.ini index 92ecb16..256b044 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,21 +1,30 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) [mypy-azure.*] ignore_missing_imports = True [mypy-libcloud.*] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True [mypy-requests_toolbelt.*] ignore_missing_imports = True + +[mypy-psycopg2.*] +ignore_missing_imports = True + +[mypy-swh.perfecthash.*] +ignore_missing_imports = True + +[mypy-sh.*] +ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt index 544f1ac..02a6eb4 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,3 @@ swh.core[http] >= 0.3 swh.model >= 0.0.27 +swh.perfecthash diff --git a/requirements-test.txt b/requirements-test.txt index 20c3661..c4d7c8e 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,7 +1,11 @@ apache-libcloud azure-storage-blob >= 12.0, != 12.9.0 # version 12.9.0 breaks mypy https://github.com/Azure/azure-sdk-for-python/pull/20891 pytest +pytest-asyncio +pytest-mock requests_mock[fixture] >= 1.9 requests_toolbelt types-pyyaml types-requests +pytest-postgresql < 4.0.0 # version 4.0 depends on psycopg 3. https://github.com/ClearcodeHQ/pytest-postgresql/blob/main/CHANGES.rst#400 + diff --git a/requirements.txt b/requirements.txt index e546330..25b6273 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,14 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html # remote storage API server aiohttp >= 3 click requests +psycopg2 +sh # optional dependencies # apache-libcloud # azure-storage-blob >= 12.0 diff --git a/swh/objstorage/backends/winery/__init__.py b/swh/objstorage/backends/winery/__init__.py new file mode 100644 index 0000000..3b164b7 --- /dev/null +++ b/swh/objstorage/backends/winery/__init__.py @@ -0,0 +1 @@ +from .objstorage import WineryObjStorage # noqa: F401 diff --git a/swh/objstorage/backends/winery/database.py b/swh/objstorage/backends/winery/database.py new file mode 100644 index 0000000..e8c9c1d --- /dev/null +++ b/swh/objstorage/backends/winery/database.py @@ -0,0 +1,89 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from contextlib import contextmanager +import logging +import time + +import psycopg2 + +logger = logging.getLogger(__name__) + + +class Database: + def __init__(self, dsn): + self.dsn = dsn + + @contextmanager + def admin_cursor(self): + db = psycopg2.connect(dsn=self.dsn, dbname="postgres") + # https://wiki.postgresql.org/wiki/Psycopg2_Tutorial + # If you want to drop the database you would need to + # change the isolation level of the database. + db.set_isolation_level(0) + db.autocommit = True + c = db.cursor() + try: + yield c + finally: + c.close() + + def create_database(self, database): + with self.admin_cursor() as c: + c.execute( + "SELECT datname FROM pg_catalog.pg_database " + f"WHERE datname = '{database}'" + ) + if c.rowcount == 0: + try: + c.execute(f"CREATE DATABASE {database}") + except psycopg2.errors.UniqueViolation: + # someone else created the database, it is fine + pass + + def drop_database(self, database): + with self.admin_cursor() as c: + c.execute( + "SELECT pg_terminate_backend(pg_stat_activity.pid)" + "FROM pg_stat_activity " + "WHERE pg_stat_activity.datname = %s;", + (database,), + ) + # + # Dropping the database may fail because the server takes time + # to notice a connection was dropped and/or a named cursor is + # in the process of being deleted. It can happen here or even + # when deleting all database with the psql cli + # and there are no process active. + # + # ERROR: database "i606428a5a6274d1ab09eecc4d019fef7" is being + # accessed by other users DETAIL: There is 1 other session + # using the database. + # + # See: + # https://stackoverflow.com/questions/5108876/kill-a-postgresql-session-connection + # + # https://www.postgresql.org/docs/current/sql-dropdatabase.html + # + # WITH (FORCE) added in postgresql 13 but may also fail because the + # named cursor may not be handled as a client. + # + for i in range(60): + try: + c.execute(f"DROP DATABASE IF EXISTS {database}") + return + except psycopg2.errors.ObjectInUse: + logger.warning(f"{database} database drop fails, waiting 10s") + time.sleep(10) + continue + raise Exception(f"database drop failed on {database}") + + def list_databases(self): + with self.admin_cursor() as c: + c.execute( + "SELECT datname FROM pg_database " + "WHERE datistemplate = false and datname != 'postgres'" + ) + return [r[0] for r in c.fetchall()] diff --git a/swh/objstorage/backends/winery/objstorage.py b/swh/objstorage/backends/winery/objstorage.py new file mode 100644 index 0000000..c60f3f3 --- /dev/null +++ b/swh/objstorage/backends/winery/objstorage.py @@ -0,0 +1,168 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +from multiprocessing import Process + +from swh.objstorage import exc +from swh.objstorage.objstorage import ObjStorage, compute_hash + +from .roshard import ROShard +from .rwshard import RWShard +from .sharedbase import SharedBase + +logger = logging.getLogger(__name__) + + +class WineryObjStorage(ObjStorage): + def __init__(self, **kwargs): + super().__init__(**kwargs) + if kwargs.get("readonly"): + self.winery = WineryReader(**kwargs) + else: + self.winery = WineryWriter(**kwargs) + + def uninit(self): + self.winery.uninit() + + def get(self, obj_id): + return self.winery.get(obj_id) + + def check_config(self, *, check_write): + return True + + def __contains__(self, obj_id): + return obj_id in self.winery + + def add(self, content, obj_id=None, check_presence=True): + return self.winery.add(content, obj_id, check_presence) + + def check(self, obj_id): + return self.winery.check(obj_id) + + def delete(self, obj_id): + raise PermissionError("Delete is not allowed.") + + +class WineryBase: + def __init__(self, **kwargs): + self.args = kwargs + self.init() + + def init(self): + self.base = SharedBase(**self.args) + + def uninit(self): + self.base.uninit() + + def __contains__(self, obj_id): + return self.base.contains(obj_id) + + +class WineryReader(WineryBase): + def roshard(self, name): + shard = ROShard(name, **self.args) + shard.load() + return shard + + def get(self, obj_id): + shard_info = self.base.get(obj_id) + if shard_info is None: + raise exc.ObjNotFoundError(obj_id) + name, readonly = shard_info + if readonly: + shard = self.roshard(name) + content = shard.get(obj_id) + del shard + else: + shard = RWShard(name, **self.args) + content = shard.get(obj_id) + if content is None: + raise exc.ObjNotFoundError(obj_id) + return content + + +def pack(shard, **kwargs): + return Packer(shard, **kwargs).run() + + +class Packer: + def __init__(self, shard, **kwargs): + self.args = kwargs + self.shard = shard + self.init() + + def init(self): + self.rw = RWShard(self.shard, **self.args) + self.ro = ROShard(self.shard, **self.args) + + def uninit(self): + del self.ro + self.rw.uninit() + + def run(self): + shard = self.ro.create(self.rw.count()) + for obj_id, content in self.rw.all(): + shard.write(obj_id, content) + shard.save() + base = SharedBase(**self.args) + base.shard_packing_ends(self.shard) + base.uninit() + self.rw.uninit() + self.rw.drop() + return True + + +class WineryWriter(WineryReader): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.packers = [] + self.init() + + def init(self): + super().init() + self.shard = RWShard(self.base.whoami, **self.args) + + def uninit(self): + self.shard.uninit() + super().uninit() + + def add(self, content, obj_id=None, check_presence=True): + if obj_id is None: + obj_id = compute_hash(content) + + if check_presence and obj_id in self: + return obj_id + + shard = self.base.add_phase_1(obj_id) + if shard != self.base.id: + # this object is the responsibility of another shard + return obj_id + + self.shard.add(obj_id, content) + self.base.add_phase_2(obj_id) + + if self.shard.is_full(): + self.pack() + + return obj_id + + def check(self, obj_id): + # load all shards packing == True and not locked (i.e. packer + # was interrupted for whatever reason) run pack for each of them + pass + + def pack(self): + self.base.shard_packing_starts() + p = Process(target=pack, args=(self.shard.name,), kwargs=self.args) + self.uninit() + p.start() + self.packers.append(p) + self.init() + + def __del__(self): + for p in self.packers: + p.kill() + p.join() diff --git a/swh/objstorage/backends/winery/roshard.py b/swh/objstorage/backends/winery/roshard.py new file mode 100644 index 0000000..56310bb --- /dev/null +++ b/swh/objstorage/backends/winery/roshard.py @@ -0,0 +1,74 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +import sh + +from swh.perfecthash import Shard + +logger = logging.getLogger(__name__) + + +class Pool(object): + name = "shards" + + def __init__(self, **kwargs): + self.args = kwargs + self.rbd = sh.sudo.bake("rbd", f"--pool={self.name}") + self.ceph = sh.sudo.bake("ceph") + self.image_size = self.args["shard_max_size"] * 2 + + def image_list(self): + try: + self.rbd.ls() + except sh.ErrorReturnCode_2 as e: + if "No such file or directory" in e.args[0]: + return [] + else: + raise + return [image.strip() for image in self.rbd.ls()] + + def image_path(self, image): + return f"/dev/rbd/{self.name}/{image}" + + def image_create(self, image): + logger.info(f"rdb --pool {self.name} create --size={self.image_size} {image}") + self.rbd.create( + f"--size={self.image_size}", f"--data-pool={self.name}-data", image + ) + self.rbd.feature.disable( + f"{self.name}/{image}", "object-map", "fast-diff", "deep-flatten" + ) + self.image_map(image, "rw") + + def image_map(self, image, options): + self.rbd.device("map", "-o", options, image) + sh.sudo("chmod", "777", self.image_path(image)) + + def image_remap_ro(self, image): + self.image_unmap(image) + self.image_map(image, "ro") + + def image_unmap(self, image): + self.rbd.device.unmap(f"{self.name}/{image}", _ok_code=(0, 22)) + + +class ROShard: + def __init__(self, name, **kwargs): + self.pool = Pool(shard_max_size=kwargs["shard_max_size"]) + self.name = name + + def create(self, count): + self.pool.image_create(self.name) + self.shard = Shard(self.pool.image_path(self.name)) + return self.shard.create(count) + + def load(self): + self.shard = Shard(self.pool.image_path(self.name)) + return self.shard.load() == self.shard + + def get(self, key): + return self.shard.lookup(key) diff --git a/swh/objstorage/backends/winery/rwshard.py b/swh/objstorage/backends/winery/rwshard.py new file mode 100644 index 0000000..f7fab7a --- /dev/null +++ b/swh/objstorage/backends/winery/rwshard.py @@ -0,0 +1,89 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import psycopg2 + +from .database import Database + + +class RWShard(Database): + def __init__(self, name, **kwargs): + super().__init__(kwargs["shard_dsn"]) + self._name = name + self.create_database(self.name) + self.db = self.create_table(f"{self.dsn}/{self.name}") + self.size = self.total_size() + self.limit = kwargs["shard_max_size"] + + def uninit(self): + if hasattr(self, "db"): + self.db.close() + del self.db + + @property + def name(self): + return self._name + + def is_full(self): + return self.size > self.limit + + def drop(self): + self.drop_database(self.name) + + def create_table(self, dsn): + db = psycopg2.connect(dsn) + db.autocommit = True + c = db.cursor() + c.execute( + """ + CREATE TABLE IF NOT EXISTS objects( + key BYTEA PRIMARY KEY, + content BYTEA + ) + """ + ) + c.close() + return db + + def total_size(self): + with self.db.cursor() as c: + c.execute("SELECT SUM(LENGTH(content)) FROM objects") + size = c.fetchone()[0] + if size is None: + return 0 + else: + return size + + def add(self, obj_id, content): + try: + with self.db.cursor() as c: + c.execute( + "INSERT INTO objects (key, content) VALUES (%s, %s)", + (obj_id, content), + ) + self.db.commit() + self.size += len(content) + except psycopg2.errors.UniqueViolation: + pass + + def get(self, obj_id): + with self.db.cursor() as c: + c.execute("SELECT content FROM objects WHERE key = %s", (obj_id,)) + if c.rowcount == 0: + return None + else: + return c.fetchone()[0].tobytes() + + def all(self): + with self.db.cursor() as c: + c.execute("SELECT key,content FROM objects") + for row in c: + yield row[0].tobytes(), row[1].tobytes() + + def count(self): + with self.db.cursor() as c: + c.execute("SELECT COUNT(*) FROM objects") + return c.fetchone()[0] diff --git a/swh/objstorage/backends/winery/sharedbase.py b/swh/objstorage/backends/winery/sharedbase.py new file mode 100644 index 0000000..abad163 --- /dev/null +++ b/swh/objstorage/backends/winery/sharedbase.py @@ -0,0 +1,193 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import uuid + +import psycopg2 + +from .database import Database + + +class SharedBase(Database): + def __init__(self, **kwargs): + super().__init__(kwargs["base_dsn"]) + database = "sharedbase" + self.create_database(database) + self.db = self.create_table(f"{self.dsn}/{database}") + self._whoami = None + + def uninit(self): + self.db.close() + del self.db + + def create_table(self, dsn): + db = psycopg2.connect(dsn) + db.autocommit = True + c = db.cursor() + lock = 314116 # an abitrary unique number + c.execute("SELECT pg_advisory_lock(%s)", (lock,)) + c.execute( + """ + CREATE TABLE IF NOT EXISTS shards( + id SERIAL PRIMARY KEY, + readonly BOOLEAN NOT NULL, + packing BOOLEAN NOT NULL, + name CHAR(32) NOT NULL UNIQUE + ) + """ + ) + c.execute( + """ + CREATE TABLE IF NOT EXISTS signature2shard( + signature BYTEA PRIMARY KEY, + inflight BOOLEAN NOT NULL, + shard INTEGER NOT NULL + ) + """ + ) + c.close() + db.close() # so the pg_advisory_lock is released + db = psycopg2.connect(dsn) + db.autocommit = True + return db + + @property + def whoami(self): + self.set_whoami() + return self._whoami + + @property + def id(self): + self.set_whoami() + return self._whoami_id + + def set_whoami(self): + if self._whoami is not None: + return + + while True: + self._whoami, self._whoami_id = self.lock_a_shard() + if self._whoami is not None: + return self._whoami + self.create_shard() + + def lock_a_shard(self): + with self.db.cursor() as c: + c.execute( + "SELECT name FROM shards WHERE readonly = FALSE and packing = FALSE " + "LIMIT 1 FOR UPDATE SKIP LOCKED" + ) + if c.rowcount == 0: + return None, None + name = c.fetchone()[0] + return self.lock_shard(name) + + def lock_shard(self, name): + self.whoami_lock = self.db.cursor() + try: + self.whoami_lock.execute( + "SELECT name, id FROM shards " + "WHERE readonly = FALSE AND packing = FALSE AND name = %s " + "FOR UPDATE NOWAIT", + (name,), + ) + return self.whoami_lock.fetchone() + except psycopg2.Error: + return None + + def unlock_shard(self): + del self.whoami_lock + + def create_shard(self): + name = uuid.uuid4().hex + # + # ensure the first character is not a number so it can be used as a + # database name. + # + name = "i" + name[1:] + with self.db.cursor() as c: + c.execute( + "INSERT INTO shards (name, readonly, packing) " + "VALUES (%s, FALSE, FALSE)", + (name,), + ) + self.db.commit() + + def shard_packing_starts(self): + with self.db.cursor() as c: + c.execute( + "UPDATE shards SET packing = TRUE WHERE name = %s", (self.whoami,) + ) + self.unlock_shard() + + def shard_packing_ends(self, name): + with self.db.cursor() as c: + c.execute( + "UPDATE shards SET readonly = TRUE, packing = FALSE " "WHERE name = %s", + (name,), + ) + + def get_shard_info(self, id): + with self.db.cursor() as c: + c.execute("SELECT name, readonly FROM shards WHERE id = %s", (id,)) + if c.rowcount == 0: + return None + else: + return c.fetchone() + + def list_shards(self): + with self.db.cursor() as c: + c.execute("SELECT name, readonly, packing FROM shards") + for row in c: + yield row[0], row[1], row[2] + + def contains(self, obj_id): + with self.db.cursor() as c: + c.execute( + "SELECT shard FROM signature2shard WHERE " + "signature = %s AND inflight = FALSE", + (obj_id,), + ) + if c.rowcount == 0: + return None + else: + return c.fetchone()[0] + + def get(self, obj_id): + id = self.contains(obj_id) + if id is None: + return None + return self.get_shard_info(id) + + def add_phase_1(self, obj_id): + try: + with self.db.cursor() as c: + c.execute( + "INSERT INTO signature2shard (signature, shard, inflight) " + "VALUES (%s, %s, TRUE)", + (obj_id, self.id), + ) + self.db.commit() + return self.id + except psycopg2.errors.UniqueViolation: + with self.db.cursor() as c: + c.execute( + "SELECT shard FROM signature2shard WHERE " + "signature = %s AND inflight = TRUE", + (obj_id,), + ) + if c.rowcount == 0: + return None + else: + return c.fetchone()[0] + + def add_phase_2(self, obj_id): + with self.db.cursor() as c: + c.execute( + "UPDATE signature2shard SET inflight = FALSE " + "WHERE signature = %s AND shard = %s", + (obj_id, self.id), + ) + self.db.commit() diff --git a/swh/objstorage/factory.py b/swh/objstorage/factory.py index d720397..faf0a79 100644 --- a/swh/objstorage/factory.py +++ b/swh/objstorage/factory.py @@ -1,122 +1,124 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Callable, Dict, Union import warnings from swh.objstorage.api.client import RemoteObjStorage from swh.objstorage.backends.generator import RandomGeneratorObjStorage from swh.objstorage.backends.http import HTTPReadOnlyObjStorage from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.objstorage.backends.noop import NoopObjStorage from swh.objstorage.backends.pathslicing import PathSlicingObjStorage from swh.objstorage.backends.seaweedfs import SeaweedFilerObjStorage +from swh.objstorage.backends.winery import WineryObjStorage from swh.objstorage.multiplexer import MultiplexerObjStorage, StripingObjStorage from swh.objstorage.multiplexer.filter import add_filters from swh.objstorage.objstorage import ID_HASH_LENGTH, ObjStorage # noqa __all__ = ["get_objstorage", "ObjStorage"] _STORAGE_CLASSES: Dict[str, Union[type, Callable[..., type]]] = { "pathslicing": PathSlicingObjStorage, "remote": RemoteObjStorage, "memory": InMemoryObjStorage, "seaweedfs": SeaweedFilerObjStorage, "random": RandomGeneratorObjStorage, "http": HTTPReadOnlyObjStorage, + "winery": WineryObjStorage, "noop": NoopObjStorage, } _STORAGE_CLASSES_MISSING = {} _STORAGE_CLASSES_DEPRECATED = {"weed": "seaweedfs"} try: from swh.objstorage.backends.azure import ( AzureCloudObjStorage, PrefixedAzureCloudObjStorage, ) _STORAGE_CLASSES["azure"] = AzureCloudObjStorage _STORAGE_CLASSES["azure-prefixed"] = PrefixedAzureCloudObjStorage except ImportError as e: _STORAGE_CLASSES_MISSING["azure"] = e.args[0] _STORAGE_CLASSES_MISSING["azure-prefixed"] = e.args[0] try: from swh.objstorage.backends.libcloud import ( AwsCloudObjStorage, OpenStackCloudObjStorage, ) _STORAGE_CLASSES["s3"] = AwsCloudObjStorage _STORAGE_CLASSES["swift"] = OpenStackCloudObjStorage except ImportError as e: _STORAGE_CLASSES_MISSING["s3"] = e.args[0] _STORAGE_CLASSES_MISSING["swift"] = e.args[0] def get_objstorage(cls: str, args=None, **kwargs): """ Create an ObjStorage using the given implementation class. Args: cls: objstorage class unique key contained in the _STORAGE_CLASSES dict. kwargs: arguments for the required class of objstorage that must match exactly the one in the `__init__` method of the class. Returns: subclass of ObjStorage that match the given `storage_class` argument. Raises: ValueError: if the given storage class is not a valid objstorage key. """ if cls in _STORAGE_CLASSES_DEPRECATED: warnings.warn( f"{cls} objstorage class is deprecated, " f"use {_STORAGE_CLASSES_DEPRECATED[cls]} class instead.", DeprecationWarning, ) cls = _STORAGE_CLASSES_DEPRECATED[cls] if cls in _STORAGE_CLASSES: if args is not None: warnings.warn( 'Explicit "args" key is deprecated for objstorage initialization, ' "use class arguments keys directly instead.", DeprecationWarning, ) # TODO: when removing this, drop the "args" backwards compatibility # from swh.objstorage.api.server configuration checker kwargs = args return _STORAGE_CLASSES[cls](**kwargs) else: raise ValueError( "Storage class {} is not available: {}".format( cls, _STORAGE_CLASSES_MISSING.get(cls, "unknown name") ) ) def _construct_filtered_objstorage(storage_conf, filters_conf): return add_filters(get_objstorage(**storage_conf), filters_conf) _STORAGE_CLASSES["filtered"] = _construct_filtered_objstorage def _construct_multiplexer_objstorage(objstorages): storages = [get_objstorage(**conf) for conf in objstorages] return MultiplexerObjStorage(storages) _STORAGE_CLASSES["multiplexer"] = _construct_multiplexer_objstorage def _construct_striping_objstorage(objstorages): storages = [get_objstorage(**conf) for conf in objstorages] return StripingObjStorage(storages) _STORAGE_CLASSES["striping"] = _construct_striping_objstorage diff --git a/swh/objstorage/tests/conftest.py b/swh/objstorage/tests/conftest.py new file mode 100644 index 0000000..d3cda00 --- /dev/null +++ b/swh/objstorage/tests/conftest.py @@ -0,0 +1,35 @@ +def pytest_configure(config): + config.addinivalue_line("markers", "shard_max_size: winery backend") + + +def pytest_addoption(parser): + parser.addoption( + "--winery-bench-rw-workers", + type=int, + help="Number of Read/Write workers", + default=1, + ) + parser.addoption( + "--winery-bench-ro-workers", + type=int, + help="Number of Readonly workers", + default=1, + ) + parser.addoption( + "--winery-bench-duration", + type=int, + help="Duration of the benchmarks in seconds", + default=1, + ) + parser.addoption( + "--winery-shard-max-size", + type=int, + help="Size of the shard in bytes", + default=10 * 1024 * 1024, + ) + parser.addoption( + "--winery-bench-ro-worker-max-request", + type=int, + help="Number of requests a ro worker performs", + default=1, + ) diff --git a/swh/objstorage/tests/test_objstorage_winery.py b/swh/objstorage/tests/test_objstorage_winery.py new file mode 100644 index 0000000..1577236 --- /dev/null +++ b/swh/objstorage/tests/test_objstorage_winery.py @@ -0,0 +1,224 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import time + +import pytest +import sh + +from swh.objstorage import exc +from swh.objstorage.backends.winery.database import Database +from swh.objstorage.backends.winery.objstorage import Packer, pack +from swh.objstorage.factory import get_objstorage + +from .winery_benchmark import Bench, work +from .winery_testing_helpers import PoolHelper, SharedBaseHelper + + +@pytest.fixture +def needs_ceph(): + try: + sh.ceph("--version") + except sh.CommandNotFound: + pytest.skip("the ceph CLI was not found") + + +@pytest.fixture +def ceph_pool(needs_ceph): + pool = PoolHelper(shard_max_size=10 * 1024 * 1024) + pool.clobber() + pool.pool_create() + + yield pool + + pool.images_clobber() + pool.clobber() + + +@pytest.fixture +def storage(request, postgresql): + marker = request.node.get_closest_marker("shard_max_size") + if marker is None: + shard_max_size = 1024 + else: + shard_max_size = marker.args[0] + dsn = ( + f"postgres://{postgresql.info.user}" + f":@{postgresql.info.host}:{postgresql.info.port}" + ) + storage = get_objstorage( + cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=shard_max_size + ) + yield storage + storage.winery.uninit() + # + # pytest-postgresql will not remove databases that it did not + # create between tests (only at the very end). + # + d = Database(dsn) + for database in d.list_databases(): + if database != postgresql.info.dbname and database != "tests_tmpl": + d.drop_database(database) + + +@pytest.fixture +def winery(storage): + return storage.winery + + +def test_winery_sharedbase(winery): + base = winery.base + shard1 = base.whoami + assert shard1 is not None + assert shard1 == base.whoami + + id1 = base.id + assert id1 is not None + assert id1 == base.id + + +def test_winery_add_get(winery): + shard = winery.base.whoami + content = b"SOMETHING" + obj_id = winery.add(content=content) + assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" + assert winery.add(content=content, obj_id=obj_id) == obj_id + assert winery.add(content=content, obj_id=obj_id, check_presence=False) == obj_id + assert winery.base.whoami == shard + assert winery.get(obj_id) == content + with pytest.raises(exc.ObjNotFoundError): + winery.get(b"unknown") + winery.shard.drop() + + +@pytest.mark.shard_max_size(1) +def test_winery_add_and_pack(winery, mocker): + mocker.patch("swh.objstorage.backends.winery.objstorage.pack", return_value=True) + shard = winery.base.whoami + content = b"SOMETHING" + obj_id = winery.add(content=content) + assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" + assert winery.base.whoami != shard + assert len(winery.packers) == 1 + packer = winery.packers[0] + packer.join() + assert packer.exitcode == 0 + + +def test_winery_delete(storage): + with pytest.raises(PermissionError): + storage.delete(None) + + +def test_winery_get_shard_info(winery): + assert winery.base.get_shard_info(1234) is None + assert SharedBaseHelper(winery.base).get_shard_info_by_name("nothing") is None + + +@pytest.mark.shard_max_size(10 * 1024 * 1024) +def test_winery_packer(winery, ceph_pool): + shard = winery.base.whoami + content = b"SOMETHING" + winery.add(content=content) + winery.base.shard_packing_starts() + packer = Packer(shard, **winery.args) + try: + assert packer.run() is True + finally: + packer.uninit() + + readonly, packing = SharedBaseHelper(winery.base).get_shard_info_by_name(shard) + assert readonly is True + assert packing is False + + +@pytest.mark.shard_max_size(10 * 1024 * 1024) +def test_winery_get_object(winery, ceph_pool): + shard = winery.base.whoami + content = b"SOMETHING" + obj_id = winery.add(content=content) + winery.base.shard_packing_starts() + assert pack(shard, **winery.args) is True + assert winery.get(obj_id) == content + + +def test_winery_ceph_pool(needs_ceph): + name = "IMAGE" + pool = PoolHelper(shard_max_size=10 * 1024 * 1024) + pool.clobber() + pool.pool_create() + pool.image_create(name) + p = pool.image_path(name) + assert p.endswith(name) + something = "SOMETHING" + open(p, "w").write(something) + assert open(p).read(len(something)) == something + assert pool.image_list() == [name] + pool.image_remap_ro(name) + pool.images_clobber() + assert pool.image_list() == [name] + pool.clobber() + assert pool.image_list() == [] + + +@pytest.mark.shard_max_size(10 * 1024 * 1024) +def test_winery_bench_work(winery, ceph_pool, tmpdir): + # + # rw worker creates a shard + # + whoami = winery.base.whoami + shards_info = list(winery.base.list_shards()) + assert len(shards_info) == 1 + shard, readonly, packing = shards_info[0] + assert (readonly, packing) == (False, False) + winery.args["dir"] = str(tmpdir) + assert work("rw", winery.args) == "rw" + shards_info = { + name: (readonly, packing) + for name, readonly, packing in winery.base.list_shards() + } + assert len(shards_info) == 2 + assert shards_info[whoami] == (True, False) + # + # ro worker reads a shard + # + winery.args["ro_worker_max_request"] = 1 + assert work("ro", winery.args) == "ro" + + +@pytest.mark.asyncio +async def test_winery_bench_real(pytestconfig, postgresql, ceph_pool): + dsn = ( + f"postgres://{postgresql.info.user}" + f":@{postgresql.info.host}:{postgresql.info.port}" + ) + kwargs = { + "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), + "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), + "shard_max_size": pytestconfig.getoption("--winery-shard-max-size"), + "ro_worker_max_request": pytestconfig.getoption( + "--winery-bench-ro-worker-max-request" + ), + "duration": pytestconfig.getoption("--winery-bench-duration"), + "base_dsn": dsn, + "shard_dsn": dsn, + } + assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] + + +@pytest.mark.asyncio +async def test_winery_bench_fake(pytestconfig, mocker): + kwargs = { + "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), + "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), + "duration": pytestconfig.getoption("--winery-bench-duration"), + } + + def run(kind): + time.sleep(kwargs["duration"] * 2) + return kind + + mocker.patch("swh.objstorage.tests.winery_benchmark.Worker.run", side_effect=run) + assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] diff --git a/swh/objstorage/tests/winery_benchmark.py b/swh/objstorage/tests/winery_benchmark.py new file mode 100644 index 0000000..419cbb9 --- /dev/null +++ b/swh/objstorage/tests/winery_benchmark.py @@ -0,0 +1,145 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import asyncio +import concurrent.futures +import logging +import os +import random +import time + +from swh.objstorage.factory import get_objstorage + +logger = logging.getLogger(__name__) + + +def work(kind, args): + return Worker(args).run(kind) + + +class Worker(object): + def __init__(self, args): + self.args = args + + def run(self, kind): + getattr(self, kind)() + return kind + + def ro(self): + self.storage = get_objstorage( + cls="winery", + readonly=True, + base_dsn=self.args["base_dsn"], + shard_dsn=self.args["shard_dsn"], + shard_max_size=self.args["shard_max_size"], + ) + with self.storage.winery.base.db.cursor() as c: + while True: + c.execute( + "SELECT signature FROM signature2shard WHERE inflight = FALSE " + "ORDER BY random() LIMIT %s", + (self.args["ro_worker_max_request"],), + ) + if c.rowcount > 0: + break + logger.info(f"Worker(ro, {os.getpid()}): empty, waiting") + time.sleep(1) + logger.info(f"Worker(ro, {os.getpid()}): requesting {c.rowcount} objects") + start = time.time() + for row in c: + obj_id = row[0].tobytes() + assert self.storage.get(obj_id) is not None + elapsed = time.time() - start + logger.info(f"Worker(ro, {os.getpid()}): finished ({elapsed:.2f}s)") + + def payloads_define(self): + self.payloads = [ + 3 * 1024 + 1, + 3 * 1024 + 1, + 3 * 1024 + 1, + 3 * 1024 + 1, + 3 * 1024 + 1, + 10 * 1024 + 1, + 13 * 1024 + 1, + 16 * 1024 + 1, + 70 * 1024 + 1, + 80 * 1024 + 1, + ] + + def rw(self): + self.storage = get_objstorage( + cls="winery", + base_dsn=self.args["base_dsn"], + shard_dsn=self.args["shard_dsn"], + shard_max_size=self.args["shard_max_size"], + ) + self.payloads_define() + random_content = open("/dev/urandom", "rb") + logger.info(f"Worker(rw, {os.getpid()}): start") + start = time.time() + count = 0 + while len(self.storage.winery.packers) == 0: + content = random_content.read(random.choice(self.payloads)) + self.storage.add(content=content) + count += 1 + logger.info(f"Worker(rw, {os.getpid()}): packing {count} objects") + packer = self.storage.winery.packers[0] + packer.join() + assert packer.exitcode == 0 + elapsed = time.time() - start + logger.info(f"Worker(rw, {os.getpid()}): finished ({elapsed:.2f}s)") + + +class Bench(object): + def __init__(self, args): + self.args = args + + def timer_start(self): + self.start = time.time() + + def timeout(self): + return time.time() - self.start > self.args["duration"] + + async def run(self): + self.timer_start() + + loop = asyncio.get_running_loop() + + workers_count = self.args["rw_workers"] + self.args["ro_workers"] + with concurrent.futures.ProcessPoolExecutor( + max_workers=workers_count + ) as executor: + + logger.info("Bench.run: running") + + self.count = 0 + workers = set() + + def create_worker(kind): + self.count += 1 + logger.info(f"Bench.run: launched {kind} worker number {self.count}") + return loop.run_in_executor(executor, work, kind, self.args) + + for kind in ["rw"] * self.args["rw_workers"] + ["ro"] * self.args[ + "ro_workers" + ]: + workers.add(create_worker(kind)) + + while len(workers) > 0: + logger.info(f"Bench.run: waiting for {len(workers)} workers") + current = workers + done, pending = await asyncio.wait( + current, return_when=asyncio.FIRST_COMPLETED + ) + workers = pending + for task in done: + kind = task.result() + logger.info(f"Bench.run: worker {kind} complete") + if not self.timeout(): + workers.add(create_worker(kind)) + + logger.info("Bench.run: finished") + + return self.count diff --git a/swh/objstorage/tests/winery_testing_helpers.py b/swh/objstorage/tests/winery_testing_helpers.py new file mode 100644 index 0000000..3fceace --- /dev/null +++ b/swh/objstorage/tests/winery_testing_helpers.py @@ -0,0 +1,64 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +from swh.objstorage.backends.winery.roshard import Pool + +logger = logging.getLogger(__name__) + + +class SharedBaseHelper: + def __init__(self, sharedbase): + self.sharedbase = sharedbase + + def get_shard_info_by_name(self, name): + with self.sharedbase.db.cursor() as c: + c.execute("SELECT readonly, packing FROM shards WHERE name = %s", (name,)) + if c.rowcount == 0: + return None + else: + return c.fetchone() + + +class PoolHelper(Pool): + def image_delete(self, image): + self.image_unmap(image) + logger.info(f"rdb --pool {self.name} remove {image}") + self.rbd.remove(image) + + def images_clobber(self): + for image in self.image_list(): + image = image.strip() + self.image_unmap(image) + + def clobber(self): + self.images_clobber() + self.pool_clobber() + + def pool_clobber(self): + logger.info(f"ceph osd pool delete {self.name}") + self.ceph.osd.pool.delete(self.name, self.name, "--yes-i-really-really-mean-it") + data = f"{self.name}-data" + logger.info(f"ceph osd pool delete {data}") + self.ceph.osd.pool.delete(data, data, "--yes-i-really-really-mean-it") + + def pool_create(self): + data = f"{self.name}-data" + logger.info(f"ceph osd pool create {data}") + self.ceph.osd( + "erasure-code-profile", + "set", + "--force", + data, + "k=4", + "m=2", + "crush-failure-domain=host", + ) + self.ceph.osd.pool.create(data, "200", "erasure", data) + self.ceph.osd.pool.set(data, "allow_ec_overwrites", "true") + self.ceph.osd.pool.set(data, "pg_autoscale_mode", "off") + logger.info(f"ceph osd pool create {self.name}") + self.ceph.osd.pool.create(self.name) diff --git a/tox.ini b/tox.ini index da8a6ed..5ac7b32 100644 --- a/tox.ini +++ b/tox.ini @@ -1,73 +1,78 @@ [tox] -envlist=flake8,py3,mypy +envlist=flake8,py3,winery,mypy [testenv] extras = testing deps = pytest-cov dev: pdbpp commands = pytest --cov={envsitepackagesdir}/swh/objstorage \ {envsitepackagesdir}/swh/objstorage \ - --cov-branch {posargs} + --cov-branch --cov-report term-missing {posargs} + +[testenv:winery] +allowlist_externals = bash +commands = + bash {toxinidir}/winery-test-environment/remote-tox.sh {posargs} [testenv:black] skip_install = true deps = black==19.10b0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy==0.920 commands = mypy swh # build documentation outside swh-environment using the current # git HEAD of swh-docs, is executed on CI for each diff to prevent # breaking doc build [testenv:sphinx] whitelist_externals = make usedevelop = true extras = testing deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs # build documentation only inside swh-environment using local state # of swh-docs package [testenv:sphinx-dev] whitelist_externals = make usedevelop = true extras = testing deps = # install swh-docs in develop mode -e ../swh-docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs diff --git a/winery-test-environment/README.md b/winery-test-environment/README.md new file mode 100644 index 0000000..fb7964a --- /dev/null +++ b/winery-test-environment/README.md @@ -0,0 +1,76 @@ +This purpose of these instructions is to run `tox -e py3` in an +environment that has access to a ceph cluster. It enables tests that +would be otherwise be skipped and increases code coverage. + +The environment is composed of eight machines named ceph1 to ceph8. + +# Installation + +* pip install -r requirements.txt +* ansible-galaxy install geerlingguy.docker + +# Create the machines + +## libvirt + +* ensure virsh is available +* ./build-vms.sh + +If the internet cnx is slow it may take a while before the OSD show up +because they require downloading large docker images. + +## fed4fire + +### Create a base rspec specification. + +* /opt/jFed/jFed-Experimenter +* In the General Tab +* Create an experiment (New) +* Add one Physical Node by dragging it +* Right click on the node and choose "Configure Node" +* Select testbed: Grid 5000 +* Node => Specific hardware type: dahu-grenoble +* Disk image => Bullseye base +* Save under sample.rspec +* Manually edit to duplicate the nodes + +### Run the experiment. + +* /opt/jFed/jFed-Experimenter +* In the General Tab +* Open Local and load winery-test-environment/fed4fire.rspec +* Edit ceph1 node to check if the Specific hardware type is dahu-grenoble +* Click on Topology Viewer +* Run +* Give a unique name to the experiment +* Start experiment +* Once the provisionning is complete (Testing connectivity to resources on Grid5000) click "Export As" +* Choose "Export Configuration Management Settings" +* Save under /tmp/test.zip +* fed4fire.sh test.zip + +# Install the machines + +* ansible-playbook -i inventory context/setup.yml ceph.yml bootstrap.yml osd.yml tests.yml + +# Run the tests + +It copies the content of the repository and "ssh ceph1 tox -e py3" + +* tox -e winery + +# Login into a machine + +For each host found in context/ssh-config + +* ssh -i context/cluster_key -F context/ssh-config ceph1 + +# Destroy + +## libvirt + +* ./build-vms.sh stop $(seq 1 8) + +## fed4fire + +It will expire on its own diff --git a/winery-test-environment/ansible.cfg b/winery-test-environment/ansible.cfg new file mode 100644 index 0000000..4aa7c8b --- /dev/null +++ b/winery-test-environment/ansible.cfg @@ -0,0 +1,7 @@ +[defaults] +private_key_file = ./context/cluster_key +host_key_checking = false + +[ssh_connection] +ssh_args = -F context/ssh-config +scp_if_ssh = True diff --git a/winery-test-environment/bootstrap.yml b/winery-test-environment/bootstrap.yml new file mode 100644 index 0000000..530f6c5 --- /dev/null +++ b/winery-test-environment/bootstrap.yml @@ -0,0 +1,31 @@ +- hosts: mon + gather_facts: no + become: true + + tasks: + + - name: scp context/ceph_key.* + copy: + src: "context/{{ item }}" + dest: "{{ item }}" + loop: + - ceph_key + - ceph_key.pub + + - name: cephadm bootstrap + shell: | + set -ex + cephadm bootstrap --mon-ip {{ hostvars[groups['mon'][0]]['ansible_default_ipv4']['address'] }} + cephadm shell ceph cephadm clear-key + ceph config-key set mgr/cephadm/ssh_identity_key -i ceph_key + ceph config-key set mgr/cephadm/ssh_identity_pub -i ceph_key.pub + ceph orch apply osd --all-available-devices + args: + creates: /etc/ceph/ceph.pub + + - name: cephadm shell ceph mgr fail + shell: | + set -ex + ceph config set mon mon_allow_pool_delete true + # does not work for some reason: must be done manually + cephadm shell ceph mgr fail # required for mgr/cephadm/ssh_identity* to be refreshed diff --git a/winery-test-environment/build-vms.sh b/winery-test-environment/build-vms.sh new file mode 100755 index 0000000..7ba37e3 --- /dev/null +++ b/winery-test-environment/build-vms.sh @@ -0,0 +1,134 @@ +#!/bin/bash + +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +set -e + +: ${LIBVIRT_URI:=qemu:///system} +VIRSH="virsh --connect $LIBVIRT_URI" +VIRT_INSTALL="virt-install --connect $LIBVIRT_URI" + +function ssh_key() { + if ! test -f cluster_key; then + ssh-keygen -f cluster_key -N '' -t rsa + fi +} + +function stop() { + local ids="$@" + + for id in $ids ; do + $VIRSH destroy ceph$id >& /dev/null || true + $VIRSH undefine ceph$id >& /dev/null || true + rm -f ceph$id.qcow2 + rm -f disk$id*.img + done + $VIRSH net-destroy ceph >& /dev/null || true + $VIRSH net-undefine ceph >& /dev/null || true +} + +function start() { + local ids="$@" + + ssh_key + > ssh-config + + if ! test -f debian-11.qcow2 ; then + sudo virt-builder debian-11 --output debian-11.qcow2 --size 10G --format qcow2 --install sudo --run-command 'dpkg-reconfigure --frontend=noninteractive openssh-server' --run-command 'useradd -s /bin/bash -m debian || true ; echo "debian ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers.d/90-debian' --ssh-inject debian:file:cluster_key.pub --edit '/etc/network/interfaces: s/ens2/enp1s0/' + fi + + if ! $VIRSH net-list --name | grep ceph ; then + cat > ceph-net.xml < + ceph + + + + + + + + + + + + + + + + + +EOF + $VIRSH net-define ceph-net.xml + $VIRSH net-start ceph + fi + + + for id in $ids ; do + $VIRSH destroy ceph$id >& /dev/null || true + $VIRSH undefine ceph$id >& /dev/null || true + rm -f ceph$id.qcow2 + cp --sparse=always debian-11.qcow2 ceph$id.qcow2 + sudo virt-sysprep -a ceph$id.qcow2 --enable customize --hostname ceph$id + $VIRT_INSTALL --network network=ceph,mac=52:54:00:00:00:0$id --boot hd --name ceph$id --memory 2048 --vcpus 1 --cpu host --disk path=$(pwd)/ceph$id.qcow2,bus=virtio,format=qcow2 --os-type=linux --os-variant=debian10 --graphics none --noautoconsole + case $id in + 1) + ;; + 2) + $VIRSH detach-device ceph$id ../rng.xml --live + for drive in b c ; do + # + # Without the sleep it fails with: + # + # error: Failed to attach disk + # error: internal error: No more available PCI slots + # + sleep 10 + rm -f disk$id$drive.img + qemu-img create -f raw disk$id$drive.img 20G + sudo chown libvirt-qemu disk$id$drive.img + $VIRSH attach-disk ceph$id --source $(pwd)/disk$id$drive.img --target vd$drive --persistent + done + ;; + *) + rm -f disk$id.img + qemu-img create -f raw disk$id.img 20G + sudo chown libvirt-qemu disk$id.img + $VIRSH attach-disk ceph$id --source $(pwd)/disk$id.img --target vdb --persistent + ;; + esac + cat >> ssh-config < /etc/ceph/ceph.conf +# ceph auth get-or-create client.admin > /etc/ceph/ceph.keyring +# +- hosts: localhost + gather_facts: false + + pre_tasks: + + - name: keygen ceph_key + shell: | + mkdir -p context + ssh-keygen -f context/ceph_key -N '' -t rsa + args: + creates: context/ceph_key + +- hosts: all + become: true + + pre_tasks: + + - name: mkdir /root/.ssh + file: + path: /root/.ssh + state: directory + mode: 0700 + + - name: touch /root/.ssh/authorized_keys + file: + path: /root/.ssh/authorized_keys + state: touch + + - name: add context/ceph_key.pub to /root/.ssh/authorized_keys + lineinfile: + path: /root/.ssh/authorized_keys + line: "{{ lookup('file', 'context/ceph_key.pub') }}" + + - name: apt install + apt: + name: + - htop + - iotop + - iftop + - iperf + +- hosts: ceph + become: true + + pre_tasks: + + - name: apt install lvm2 curl gnupg2 + apt: + name: + - lvm2 + - curl + - gnupg2 + + - name: apt-key https://download.ceph.com/keys/release.asc + apt_key: + url: https://download.ceph.com/keys/release.asc + + - name: add repository + apt_repository: + repo: "deb https://download.ceph.com/debian-pacific/ bullseye main" + filename: ceph + + - name: apt install cephadm ceph-common + apt: + name: + - cephadm + - ceph-common + + roles: + - geerlingguy.docker + +- hosts: all + become: true + # so that lineinfile does not race against itself + serial: 1 + + tasks: + + - name: "add {{ inventory_hostname }} to /etc/hosts" + lineinfile: + path: /etc/hosts + line: "{{ hostvars[inventory_hostname]['ansible_default_ipv4']['address'] }} {{ inventory_hostname }}" + delegate_to: ceph1 + + - name: set hostname + hostname: + name: "{{ inventory_hostname }}" diff --git a/winery-test-environment/fed4fire.rspec b/winery-test-environment/fed4fire.rspec new file mode 100644 index 0000000..7758d18 --- /dev/null +++ b/winery-test-environment/fed4fire.rspec @@ -0,0 +1,59 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/winery-test-environment/fed4fire.sh b/winery-test-environment/fed4fire.sh new file mode 100755 index 0000000..a6a2792 --- /dev/null +++ b/winery-test-environment/fed4fire.sh @@ -0,0 +1,34 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +set -e + +function context() { + local fed4fire=$1 + + if ! test "$fed4fire" ; then + return + fi + + rm -fr ./context/fed4fire + mkdir -p ./context/fed4fire + cp $fed4fire ./context/fed4fire/fed4fire.zip + local here=$(pwd) + ( + cd ./context/fed4fire + unzip fed4fire.zip + sed -i \ + -e 's|IdentityFile ./id_rsa$|IdentityFile '"${here}"'/context/cluster_key|' \ + -e "s|-F ssh-config|-F ${here}/context/ssh-config|" \ + ssh-config + cp ssh-config .. + mv id_rsa ../cluster_key + mv id_rsa.pub ../cluster_key.pub + ) +} + +ln -sf $(pwd)/grid5000.yml context/setup.yml + +context "$@" diff --git a/winery-test-environment/grid5000.yml b/winery-test-environment/grid5000.yml new file mode 100644 index 0000000..b34f5e0 --- /dev/null +++ b/winery-test-environment/grid5000.yml @@ -0,0 +1,79 @@ +# https://www.grid5000.fr/w/Docker#Using_docker-cache.grid5000.fr + +- hosts: mon + gather_facts: no + become: true + + tasks: + + - name: Add the user 'debian' + user: + name: debian + group: debian + + - name: Allow 'debian' group to have passwordless sudo + lineinfile: + dest: /etc/sudoers + state: present + regexp: '^%debian' + line: '%debian ALL=(ALL) NOPASSWD: ALL' + validate: visudo -cf %s + + - name: mkdir /home/debian/.ssh + file: + path: /home/debian/.ssh + state: directory + mode: 0700 + owner: debian + group: debian + + + - name: copy authorized_keys to /home/debian + shell: | + cp /root/.ssh/authorized_keys /home/debian/.ssh/authorized_keys + chown debian:debian /home/debian/.ssh/authorized_keys + chmod 0600 /home/debian/.ssh/authorized_keys + +- hosts: osd + become: true + + tasks: + + # do that before lvm gets a chance to investigate and get the wrong idea + # about /dev/sdc on grid5000 because there surely will be leftovers from + # whoever used the machine last + - name: zap /dev/sdc + shell: | + dd if=/dev/zero of=/dev/sdc count=100 bs=1024k + +- hosts: all + become: true + + pre_tasks: + + - name: mkdir /etc/docker + file: + path: /etc/docker + state: directory + mode: 755 + + roles: + - geerlingguy.docker + + tasks: + + - name: docker cache + copy: + content: | + { + "registry-mirrors": [ + "http://docker-cache.grid5000.fr" + ], + "bip": "192.168.42.1/24" + } + dest: /etc/docker/daemon.json + + - name: systemctl restart docker + service: + name: docker + state: restarted diff --git a/winery-test-environment/inventory/group_vars/all/rw.yml b/winery-test-environment/inventory/group_vars/all/rw.yml new file mode 100644 index 0000000..e2f6a0d --- /dev/null +++ b/winery-test-environment/inventory/group_vars/all/rw.yml @@ -0,0 +1,5 @@ +--- +rw_disk1: /dev/vdb +rw_disk2: /dev/vdc +postgres_shared_buffers: 512MB +postgres_effective_cache_size: 1GB diff --git a/winery-test-environment/inventory/groups.yml b/winery-test-environment/inventory/groups.yml new file mode 100644 index 0000000..ca15a19 --- /dev/null +++ b/winery-test-environment/inventory/groups.yml @@ -0,0 +1,13 @@ +--- +ceph: + children: + mon: + osd: + +mon: + hosts: + ceph1: + +rw: + hosts: + ceph2: diff --git a/winery-test-environment/inventory/hosts.yml b/winery-test-environment/inventory/hosts.yml new file mode 100644 index 0000000..a28f861 --- /dev/null +++ b/winery-test-environment/inventory/hosts.yml @@ -0,0 +1,10 @@ +all: + hosts: + ceph1: {ansible_host: ceph1, ansible_port: '22', ansible_python_interpreter: '/usr/bin/python3'} + ceph2: {ansible_host: ceph2, ansible_port: '22', ansible_python_interpreter: '/usr/bin/python3'} + ceph3: {ansible_host: ceph3, ansible_port: '22', ansible_python_interpreter: '/usr/bin/python3'} + ceph4: {ansible_host: ceph4, ansible_port: '22', ansible_python_interpreter: '/usr/bin/python3'} + ceph5: {ansible_host: ceph5, ansible_port: '22', ansible_python_interpreter: '/usr/bin/python3'} + ceph6: {ansible_host: ceph6, ansible_port: '22', ansible_python_interpreter: '/usr/bin/python3'} + ceph7: {ansible_host: ceph7, ansible_port: '22', ansible_python_interpreter: '/usr/bin/python3'} + ceph8: {ansible_host: ceph8, ansible_port: '22', ansible_python_interpreter: '/usr/bin/python3'} diff --git a/winery-test-environment/inventory/osd.yml b/winery-test-environment/inventory/osd.yml new file mode 100644 index 0000000..7b8348c --- /dev/null +++ b/winery-test-environment/inventory/osd.yml @@ -0,0 +1,9 @@ +--- +osd: + hosts: + ceph3: + ceph4: + ceph5: + ceph6: + ceph7: + ceph8: diff --git a/winery-test-environment/libvirt.yml b/winery-test-environment/libvirt.yml new file mode 100644 index 0000000..8a17f1b --- /dev/null +++ b/winery-test-environment/libvirt.yml @@ -0,0 +1,5 @@ +# libvirt specific actions + +- hosts: mon + gather_facts: no + become: true diff --git a/winery-test-environment/osd.yml b/winery-test-environment/osd.yml new file mode 100644 index 0000000..44081a3 --- /dev/null +++ b/winery-test-environment/osd.yml @@ -0,0 +1,40 @@ +--- +- hosts: osd + gather_facts: no + become: true + + tasks: + + - name: add host + shell: | + ceph orch host add {{ inventory_hostname }} + delegate_to: ceph1 + +- hosts: osd + gather_facts: no + become: true + + tasks: + + - name: wait for host + shell: | + ceph orch host ls | grep '^{{ inventory_hostname }} ' + delegate_to: ceph1 + register: host + until: host is success + retries: 30 + delay: 5 + +- hosts: osd + gather_facts: no + become: true + + tasks: + + # the desired side effect here is twofold + # * device zap blocks until the osd daemon is ready on the target host + # * on grid5000 /dev/sdc needs to be applied + - name: zap /dev/sdc + shell: | + ceph orch device zap {{ inventory_hostname }} /dev/sdc --force || true + delegate_to: ceph1 diff --git a/winery-test-environment/remote-tox.sh b/winery-test-environment/remote-tox.sh new file mode 100755 index 0000000..f393499 --- /dev/null +++ b/winery-test-environment/remote-tox.sh @@ -0,0 +1,31 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +set -ex + +DIR=winery-test-environment +SSH="ssh -i ${DIR}/context/cluster_key -F ${DIR}/context/ssh-config" + +function sanity_check() { + if ! test -f ${DIR}/context/cluster_key ; then + echo "${DIR}/context/cluster_key does not exist" + echo "check ${DIR}/README.md for instructions." + return 1 + fi +} + +function copy() { + RSYNC_RSH="$SSH" rsync -av --exclude=.mypy_cache --exclude=.coverage --exclude=.eggs --exclude=swh.objstorage.egg-info --exclude=winery-test-environment/context --exclude=.tox --exclude='*~' --exclude=__pycache__ --exclude='*.py[co]' $(git rev-parse --show-toplevel)/ debian@ceph1:/home/debian/swh-objstorage/ +} + +function run() { + sanity_check || return 1 + + copy || return 1 + + $SSH -t debian@ceph1 bash -c "'cd swh-objstorage ; ../venv/bin/tox -e py3 -- -k test_winery $*'" || return 1 +} + +run "$@" diff --git a/winery-test-environment/requirements.txt b/winery-test-environment/requirements.txt new file mode 100644 index 0000000..90d4055 --- /dev/null +++ b/winery-test-environment/requirements.txt @@ -0,0 +1 @@ +ansible diff --git a/winery-test-environment/rng.xml b/winery-test-environment/rng.xml new file mode 100644 index 0000000..6ee1641 --- /dev/null +++ b/winery-test-environment/rng.xml @@ -0,0 +1,5 @@ + + /dev/urandom + +
+ diff --git a/winery-test-environment/rw.yml b/winery-test-environment/rw.yml new file mode 100644 index 0000000..6d1a093 --- /dev/null +++ b/winery-test-environment/rw.yml @@ -0,0 +1,110 @@ +--- +- name: install and configure Read Write Storage + hosts: rw + become: true + + pre_tasks: + + - name: zap attached disks + shell: | + for disk in {{ rw_disk1 }} {{ rw_disk2 }} ; do + dd if=/dev/zero of=$disk count=100 bs=1024k + done + touch /etc/zapped.done + args: + creates: /etc/zapped.done + + - name: apt install lvm2 + apt: + name: + - lvm2 + + - name: vgcreate pg + lvg: + vg: pg + pvs: "{{ rw_disk1 }},{{ rw_disk2 }}" + + - name: lvcreate pg + lvol: + vg: pg + lv: pg + size: +100%FREE + + - name: mkfs /dev/mapper/pg-pg + filesystem: + fstype: ext4 +# force: yes + dev: /dev/mapper/pg-pg + + - name: mkdir /var/lib/postgresql + file: + path: /var/lib/postgresql + state: directory + mode: 755 + + - name: mount /var/lib/postgresql + mount: + path: /var/lib/postgresql + src: /dev/mapper/pg-pg + fstype: ext4 + state: mounted + + - name: apt install postgres + apt: + name: + - postgresql + - postgresql-contrib + - libpq-dev + - python3-psycopg2 + - acl + + - name: postgresql.conf max_connections = 1000 + lineinfile: + path: /etc/postgresql/13/main/postgresql.conf + regexp: '^max_connections' + line: "max_connections = 1000" + + # + # https://wiki.postgresql.org/wiki/Tuning_Your_PostgreSQL_Server + # + - name: postgresql.conf shared_buffers + lineinfile: + path: /etc/postgresql/13/main/postgresql.conf + regexp: '^shared_buffers' + # 1/4 RAM + line: "shared_buffers = {{ postgres_shared_buffers }}" + + - name: postgresql.conf effective_cache_size + lineinfile: + path: /etc/postgresql/13/main/postgresql.conf + regexp: '.*effective_cache_size' + # 1/2 RAM + line: "effective_cache_size = {{ postgres_effective_cache_size }}" + + - name: postgresql.conf random_page_cost + lineinfile: + path: /etc/postgresql/13/main/postgresql.conf + regexp: '.*random_page_cost' + line: "random_page_cost = 2.0" + + - name: listen on * + lineinfile: + path: /etc/postgresql/13/main/postgresql.conf + line: "listen_addresses = '*'" + + - name: allow all connexions + lineinfile: + path: /etc/postgresql/13/main/pg_hba.conf + line: "host all all 0.0.0.0/0 trust" + + - name: systemctl restart postgresql + service: + name: postgresql + state: restarted + + - name: pg user testuser/testpassword + postgresql_user: + name: testuser + password: testpassword + role_attr_flags: SUPERUSER + become_user: postgres diff --git a/winery-test-environment/tests.yml b/winery-test-environment/tests.yml new file mode 100644 index 0000000..67e634f --- /dev/null +++ b/winery-test-environment/tests.yml @@ -0,0 +1,32 @@ +- name: install test environment + gather_facts: no + hosts: mon + + pre_tasks: + + - name: apt install + apt: + name: + - emacs-nox + - gcc + - libcap-dev + - libcmph-dev + - libpq-dev + - postgresql-client-common + - postgresql-13 + - python3-pip + - python3-rbd + - rsync + - tmux + - virtualenv + become: true + + - name: configure venv + shell: | + virtualenv venv + venv/bin/pip3 install tox + args: + creates: venv + chdir: /home/debian + become: true + become_user: debian