Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/docs/winery.rst b/docs/winery.rst
index 56dbec7..016fbe6 100644
--- a/docs/winery.rst
+++ b/docs/winery.rst
@@ -1,40 +1,49 @@
.. _swh-objstorage-winery:
Winery backend
==============
The Winery backend implements the `Ceph based object storage architecture <https://wiki.softwareheritage.org/wiki/A_practical_approach_to_efficiently_store_100_billions_small_objects_in_Ceph>`__.
+IO Throttling
+--------------
+
+Ceph (Pacific) implements IO QoS in librbd but it is only effective within a single process, not cluster wide. The preliminary benchmarks showed that accumulated read and write throughput must be throttled client side to prevent performance degradation (slower throughput and increased latency).
+
+Table are created in a PostgreSQL database dedicated to throttling, so independent processes performing I/O against the Ceph cluster can synchronize with each other and control their accumulated throughput for reads and writes. Workers creates a row in the read and write tables and update them every minute with their current read and write throughput, in bytes per second. They also query all rows to figure out the current accumulated bandwidth.
+
+If the current accumulated bandwidth is above the maximum desired speed for N active workers, the process will reduce its throughput to use a maximum of 1/N of the maximum desired speed. For instance, if the current accumulated usage is above 100MB/s and there are 10 workers, the process will reduce its own speed to 10MB/s. After the 10 workers independently do the same, each of them will share 1/10 of the bandwidth.
+
Implementation notes
--------------------
The `sharedstorage.py` file contains the global index implementation that associates every object id to the shard it contains. A list of shard (either writable or readonly) is stored in a table, with a numeric id to save space. The name of the shard is used to create a database (for write shards) or a RBD image (for read shards).
The `roshard.py` file contain the lookup function for a read shard and is a thin layer on top of swh-perfect hash.
The `rwshard.py` file contains the logic to read, write and enumerate the objects of a write shard using SQL statements on the database dedicated to it.
The `obstorage.py` file contains the backend implementation in the `WineryObjStorage` class. It is a thin layer that delegates writes to a `WineryWriter` instance and reads to a `WineryReader` instance. Although they are currently tightly coupled, they are likely to eventually run in different workers if performance and security requires it.
A `WineryReader` must be able to read an object from both Read Shards and Write Shards. It will first determine the kind of shard the object belongs to by looking it up in the global index. If it is a Read Shard, it will lookup the object using the `ROShard` class from `roshard.py`, ultimately using a Ceph RBD image. If it is a Write Shard, it will lookup the object using the `RWShard` class from `rwshard.py`, ultimately using a PostgreSQL database.
All `WineryWriter` operations are idempotent so they can be resumed in case they fail. When a `WineryWriter` is instantiated, it will either:
* Find a Write Shard (i.e. a database) that is not locked by another instance by looking up the list of shards or,
* Create a new Write Shard by creating a new database
and it will lock the Write Shard and own it so no other instance tries to write to it. A PostgreSQL session lock is used to lock the shard so that it is released when the `WineryWrite` process dies unexpectedly and another process can pick it up.
When a new object is added to the Write Shard, a new row is added to the global index to record that it is owned by this Write Shard and is in flight. Such an object cannot be read because it is not yet complete. If a request to write the same object is sent to another `WineryWriter` instance, it will fail to add it to the global index because it already exists. Since the object is in flight, the `WineryWriter` will check if the shard associated to the object is:
* its name, which means it owns the object and must resume writing the object
* not its name, which means another `WineryWriter` owns it and nothing needs to be done
After the content of the object is successfully added to the Write Shard, the state of the record in the global index is modified to no longer be in flight. The client is notified that the operation was successful and the object can be read from the Write Shard from that point on.
When the size of the database associated with a Write Shard exceeds a threshold, it is set to be in the `packing` state. All objects it contains can be read from it by any `WineryReader` but no new object will be added to it. A process is spawned and is tasked to transform it into a Read Shard using the `Packer` class. Should it fail for any reason, a cron job will restart it when it finds Write Shards that are both in the `packing` state and not locked by any process. Packing is done by enumerating all the records from the Write Shard database and writing them into a Read Shard by the same name. Incomplete Read Shards will never be used by `WineryReader` because the global index will direct it to use the Write Shard instead. Once the packing completes, the state of the shard is modified to be readonly and from that point on the `WineryReader` will only use the Read Shard to find the objects it contains. The database containing the Write Shard is then destroyed because it is no longer useful and the process terminates on success.
Benchmarks
----------
Follow the instructions at winery-test-environment/README.md
diff --git a/swh/objstorage/backends/winery/database.py b/swh/objstorage/backends/winery/database.py
index e8c9c1d..25d6ec1 100644
--- a/swh/objstorage/backends/winery/database.py
+++ b/swh/objstorage/backends/winery/database.py
@@ -1,89 +1,124 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import abc
from contextlib import contextmanager
import logging
import time
import psycopg2
logger = logging.getLogger(__name__)
-class Database:
- def __init__(self, dsn):
+class DatabaseAdmin:
+ def __init__(self, dsn, dbname=None):
self.dsn = dsn
+ self.dbname = dbname
@contextmanager
def admin_cursor(self):
db = psycopg2.connect(dsn=self.dsn, dbname="postgres")
# https://wiki.postgresql.org/wiki/Psycopg2_Tutorial
# If you want to drop the database you would need to
# change the isolation level of the database.
db.set_isolation_level(0)
db.autocommit = True
c = db.cursor()
try:
yield c
finally:
c.close()
- def create_database(self, database):
+ def create_database(self):
with self.admin_cursor() as c:
c.execute(
"SELECT datname FROM pg_catalog.pg_database "
- f"WHERE datname = '{database}'"
+ f"WHERE datname = '{self.dbname}'"
)
if c.rowcount == 0:
try:
- c.execute(f"CREATE DATABASE {database}")
+ c.execute(f"CREATE DATABASE {self.dbname}")
except psycopg2.errors.UniqueViolation:
# someone else created the database, it is fine
pass
- def drop_database(self, database):
+ def drop_database(self):
with self.admin_cursor() as c:
c.execute(
"SELECT pg_terminate_backend(pg_stat_activity.pid)"
"FROM pg_stat_activity "
"WHERE pg_stat_activity.datname = %s;",
- (database,),
+ (self.dbname,),
)
#
# Dropping the database may fail because the server takes time
# to notice a connection was dropped and/or a named cursor is
# in the process of being deleted. It can happen here or even
# when deleting all database with the psql cli
# and there are no process active.
#
# ERROR: database "i606428a5a6274d1ab09eecc4d019fef7" is being
# accessed by other users DETAIL: There is 1 other session
# using the database.
#
# See:
# https://stackoverflow.com/questions/5108876/kill-a-postgresql-session-connection
#
# https://www.postgresql.org/docs/current/sql-dropdatabase.html
#
# WITH (FORCE) added in postgresql 13 but may also fail because the
# named cursor may not be handled as a client.
#
for i in range(60):
try:
- c.execute(f"DROP DATABASE IF EXISTS {database}")
+ c.execute(f"DROP DATABASE IF EXISTS {self.dbname}")
return
except psycopg2.errors.ObjectInUse:
- logger.warning(f"{database} database drop fails, waiting 10s")
+ logger.warning(f"{self.dbname} database drop fails, waiting 10s")
time.sleep(10)
continue
- raise Exception(f"database drop failed on {database}")
+ raise Exception(f"database drop failed on {self.dbname}")
def list_databases(self):
with self.admin_cursor() as c:
c.execute(
"SELECT datname FROM pg_database "
"WHERE datistemplate = false and datname != 'postgres'"
)
return [r[0] for r in c.fetchall()]
+
+
+class Database(abc.ABC):
+ def __init__(self, dsn, dbname):
+ self.dsn = dsn
+ self.dbname = dbname
+
+ @property
+ @abc.abstractmethod
+ def lock(self):
+ "Return an arbitrary unique number for pg_advisory_lock when creating tables"
+ raise NotImplementedError("Database.lock")
+
+ @property
+ @abc.abstractmethod
+ def database_tables(self):
+ "Return the list of CREATE TABLE statements for all tables in the database"
+ raise NotImplementedError("Database.database_tables")
+
+ def create_tables(self):
+ db = psycopg2.connect(dsn=self.dsn, dbname=self.dbname)
+ db.autocommit = True
+ c = db.cursor()
+ c.execute("SELECT pg_advisory_lock(%s)", (self.lock,))
+ for table in self.database_tables:
+ c.execute(table)
+ c.close()
+ db.close() # so the pg_advisory_lock is released
+
+ def connect_database(self):
+ db = psycopg2.connect(dsn=self.dsn, dbname=self.dbname)
+ db.autocommit = True
+ return db
diff --git a/swh/objstorage/backends/winery/objstorage.py b/swh/objstorage/backends/winery/objstorage.py
index c60f3f3..58b064e 100644
--- a/swh/objstorage/backends/winery/objstorage.py
+++ b/swh/objstorage/backends/winery/objstorage.py
@@ -1,168 +1,168 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from multiprocessing import Process
from swh.objstorage import exc
from swh.objstorage.objstorage import ObjStorage, compute_hash
from .roshard import ROShard
from .rwshard import RWShard
from .sharedbase import SharedBase
logger = logging.getLogger(__name__)
class WineryObjStorage(ObjStorage):
def __init__(self, **kwargs):
super().__init__(**kwargs)
if kwargs.get("readonly"):
self.winery = WineryReader(**kwargs)
else:
self.winery = WineryWriter(**kwargs)
def uninit(self):
self.winery.uninit()
def get(self, obj_id):
return self.winery.get(obj_id)
def check_config(self, *, check_write):
return True
def __contains__(self, obj_id):
return obj_id in self.winery
def add(self, content, obj_id=None, check_presence=True):
return self.winery.add(content, obj_id, check_presence)
def check(self, obj_id):
return self.winery.check(obj_id)
def delete(self, obj_id):
raise PermissionError("Delete is not allowed.")
class WineryBase:
def __init__(self, **kwargs):
self.args = kwargs
self.init()
def init(self):
self.base = SharedBase(**self.args)
def uninit(self):
self.base.uninit()
def __contains__(self, obj_id):
return self.base.contains(obj_id)
class WineryReader(WineryBase):
def roshard(self, name):
shard = ROShard(name, **self.args)
shard.load()
return shard
def get(self, obj_id):
shard_info = self.base.get(obj_id)
if shard_info is None:
raise exc.ObjNotFoundError(obj_id)
name, readonly = shard_info
if readonly:
shard = self.roshard(name)
content = shard.get(obj_id)
del shard
else:
shard = RWShard(name, **self.args)
content = shard.get(obj_id)
if content is None:
raise exc.ObjNotFoundError(obj_id)
return content
def pack(shard, **kwargs):
return Packer(shard, **kwargs).run()
class Packer:
def __init__(self, shard, **kwargs):
self.args = kwargs
self.shard = shard
self.init()
def init(self):
self.rw = RWShard(self.shard, **self.args)
self.ro = ROShard(self.shard, **self.args)
def uninit(self):
del self.ro
self.rw.uninit()
def run(self):
- shard = self.ro.create(self.rw.count())
+ self.ro.create(self.rw.count())
for obj_id, content in self.rw.all():
- shard.write(obj_id, content)
- shard.save()
+ self.ro.add(content, obj_id)
+ self.ro.save()
base = SharedBase(**self.args)
base.shard_packing_ends(self.shard)
base.uninit()
self.rw.uninit()
self.rw.drop()
return True
class WineryWriter(WineryReader):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.packers = []
self.init()
def init(self):
super().init()
self.shard = RWShard(self.base.whoami, **self.args)
def uninit(self):
self.shard.uninit()
super().uninit()
def add(self, content, obj_id=None, check_presence=True):
if obj_id is None:
obj_id = compute_hash(content)
if check_presence and obj_id in self:
return obj_id
shard = self.base.add_phase_1(obj_id)
if shard != self.base.id:
# this object is the responsibility of another shard
return obj_id
self.shard.add(obj_id, content)
self.base.add_phase_2(obj_id)
if self.shard.is_full():
self.pack()
return obj_id
def check(self, obj_id):
# load all shards packing == True and not locked (i.e. packer
# was interrupted for whatever reason) run pack for each of them
pass
def pack(self):
self.base.shard_packing_starts()
p = Process(target=pack, args=(self.shard.name,), kwargs=self.args)
self.uninit()
p.start()
self.packers.append(p)
self.init()
def __del__(self):
for p in self.packers:
p.kill()
p.join()
diff --git a/swh/objstorage/backends/winery/roshard.py b/swh/objstorage/backends/winery/roshard.py
index 56310bb..cee9a01 100644
--- a/swh/objstorage/backends/winery/roshard.py
+++ b/swh/objstorage/backends/winery/roshard.py
@@ -1,74 +1,83 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import sh
from swh.perfecthash import Shard
+from .throttler import Throttler
+
logger = logging.getLogger(__name__)
class Pool(object):
name = "shards"
def __init__(self, **kwargs):
self.args = kwargs
self.rbd = sh.sudo.bake("rbd", f"--pool={self.name}")
self.ceph = sh.sudo.bake("ceph")
self.image_size = self.args["shard_max_size"] * 2
def image_list(self):
try:
self.rbd.ls()
except sh.ErrorReturnCode_2 as e:
if "No such file or directory" in e.args[0]:
return []
else:
raise
return [image.strip() for image in self.rbd.ls()]
def image_path(self, image):
return f"/dev/rbd/{self.name}/{image}"
def image_create(self, image):
logger.info(f"rdb --pool {self.name} create --size={self.image_size} {image}")
self.rbd.create(
f"--size={self.image_size}", f"--data-pool={self.name}-data", image
)
self.rbd.feature.disable(
f"{self.name}/{image}", "object-map", "fast-diff", "deep-flatten"
)
self.image_map(image, "rw")
def image_map(self, image, options):
self.rbd.device("map", "-o", options, image)
sh.sudo("chmod", "777", self.image_path(image))
def image_remap_ro(self, image):
self.image_unmap(image)
self.image_map(image, "ro")
def image_unmap(self, image):
self.rbd.device.unmap(f"{self.name}/{image}", _ok_code=(0, 22))
class ROShard:
def __init__(self, name, **kwargs):
self.pool = Pool(shard_max_size=kwargs["shard_max_size"])
+ self.throttler = Throttler(**kwargs)
self.name = name
def create(self, count):
self.pool.image_create(self.name)
self.shard = Shard(self.pool.image_path(self.name))
return self.shard.create(count)
def load(self):
self.shard = Shard(self.pool.image_path(self.name))
return self.shard.load() == self.shard
def get(self, key):
- return self.shard.lookup(key)
+ return self.throttler.throttle_get(self.shard.lookup, key)
+
+ def add(self, content, obj_id):
+ return self.throttler.throttle_add(self.shard.write, obj_id, content)
+
+ def save(self):
+ return self.shard.save()
diff --git a/swh/objstorage/backends/winery/rwshard.py b/swh/objstorage/backends/winery/rwshard.py
index f7fab7a..c77fb80 100644
--- a/swh/objstorage/backends/winery/rwshard.py
+++ b/swh/objstorage/backends/winery/rwshard.py
@@ -1,89 +1,90 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import psycopg2
-from .database import Database
+from .database import Database, DatabaseAdmin
class RWShard(Database):
def __init__(self, name, **kwargs):
- super().__init__(kwargs["shard_dsn"])
self._name = name
- self.create_database(self.name)
- self.db = self.create_table(f"{self.dsn}/{self.name}")
+ DatabaseAdmin(kwargs["base_dsn"], self.name).create_database()
+ super().__init__(kwargs["shard_dsn"], self.name)
+ self.create_tables()
+ self.db = self.connect_database()
self.size = self.total_size()
self.limit = kwargs["shard_max_size"]
def uninit(self):
if hasattr(self, "db"):
self.db.close()
del self.db
+ @property
+ def lock(self):
+ return 452343 # an abitrary unique number
+
@property
def name(self):
return self._name
def is_full(self):
return self.size > self.limit
def drop(self):
- self.drop_database(self.name)
+ DatabaseAdmin(self.dsn, self.dbname).drop_database()
- def create_table(self, dsn):
- db = psycopg2.connect(dsn)
- db.autocommit = True
- c = db.cursor()
- c.execute(
+ @property
+ def database_tables(self):
+ return [
"""
CREATE TABLE IF NOT EXISTS objects(
key BYTEA PRIMARY KEY,
content BYTEA
)
- """
- )
- c.close()
- return db
+ """,
+ ]
def total_size(self):
with self.db.cursor() as c:
c.execute("SELECT SUM(LENGTH(content)) FROM objects")
size = c.fetchone()[0]
if size is None:
return 0
else:
return size
def add(self, obj_id, content):
try:
with self.db.cursor() as c:
c.execute(
"INSERT INTO objects (key, content) VALUES (%s, %s)",
(obj_id, content),
)
self.db.commit()
self.size += len(content)
except psycopg2.errors.UniqueViolation:
pass
def get(self, obj_id):
with self.db.cursor() as c:
c.execute("SELECT content FROM objects WHERE key = %s", (obj_id,))
if c.rowcount == 0:
return None
else:
return c.fetchone()[0].tobytes()
def all(self):
with self.db.cursor() as c:
c.execute("SELECT key,content FROM objects")
for row in c:
yield row[0].tobytes(), row[1].tobytes()
def count(self):
with self.db.cursor() as c:
c.execute("SELECT COUNT(*) FROM objects")
return c.fetchone()[0]
diff --git a/swh/objstorage/backends/winery/sharedbase.py b/swh/objstorage/backends/winery/sharedbase.py
index abad163..f51f148 100644
--- a/swh/objstorage/backends/winery/sharedbase.py
+++ b/swh/objstorage/backends/winery/sharedbase.py
@@ -1,193 +1,186 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import uuid
import psycopg2
-from .database import Database
+from .database import Database, DatabaseAdmin
class SharedBase(Database):
def __init__(self, **kwargs):
- super().__init__(kwargs["base_dsn"])
- database = "sharedbase"
- self.create_database(database)
- self.db = self.create_table(f"{self.dsn}/{database}")
+ DatabaseAdmin(kwargs["base_dsn"], "sharedbase").create_database()
+ super().__init__(kwargs["base_dsn"], "sharedbase")
+ self.create_tables()
+ self.db = self.connect_database()
self._whoami = None
def uninit(self):
self.db.close()
del self.db
- def create_table(self, dsn):
- db = psycopg2.connect(dsn)
- db.autocommit = True
- c = db.cursor()
- lock = 314116 # an abitrary unique number
- c.execute("SELECT pg_advisory_lock(%s)", (lock,))
- c.execute(
+ @property
+ def lock(self):
+ return 314116 # an abitrary unique number
+
+ @property
+ def database_tables(self):
+ return [
"""
CREATE TABLE IF NOT EXISTS shards(
id SERIAL PRIMARY KEY,
readonly BOOLEAN NOT NULL,
packing BOOLEAN NOT NULL,
name CHAR(32) NOT NULL UNIQUE
)
- """
- )
- c.execute(
+ """,
"""
CREATE TABLE IF NOT EXISTS signature2shard(
signature BYTEA PRIMARY KEY,
inflight BOOLEAN NOT NULL,
shard INTEGER NOT NULL
)
- """
- )
- c.close()
- db.close() # so the pg_advisory_lock is released
- db = psycopg2.connect(dsn)
- db.autocommit = True
- return db
+ """,
+ ]
@property
def whoami(self):
self.set_whoami()
return self._whoami
@property
def id(self):
self.set_whoami()
return self._whoami_id
def set_whoami(self):
if self._whoami is not None:
return
while True:
self._whoami, self._whoami_id = self.lock_a_shard()
if self._whoami is not None:
return self._whoami
self.create_shard()
def lock_a_shard(self):
with self.db.cursor() as c:
c.execute(
"SELECT name FROM shards WHERE readonly = FALSE and packing = FALSE "
"LIMIT 1 FOR UPDATE SKIP LOCKED"
)
if c.rowcount == 0:
return None, None
name = c.fetchone()[0]
return self.lock_shard(name)
def lock_shard(self, name):
self.whoami_lock = self.db.cursor()
try:
self.whoami_lock.execute(
"SELECT name, id FROM shards "
"WHERE readonly = FALSE AND packing = FALSE AND name = %s "
"FOR UPDATE NOWAIT",
(name,),
)
return self.whoami_lock.fetchone()
except psycopg2.Error:
return None
def unlock_shard(self):
del self.whoami_lock
def create_shard(self):
name = uuid.uuid4().hex
#
# ensure the first character is not a number so it can be used as a
# database name.
#
name = "i" + name[1:]
with self.db.cursor() as c:
c.execute(
"INSERT INTO shards (name, readonly, packing) "
"VALUES (%s, FALSE, FALSE)",
(name,),
)
self.db.commit()
def shard_packing_starts(self):
with self.db.cursor() as c:
c.execute(
"UPDATE shards SET packing = TRUE WHERE name = %s", (self.whoami,)
)
self.unlock_shard()
def shard_packing_ends(self, name):
with self.db.cursor() as c:
c.execute(
"UPDATE shards SET readonly = TRUE, packing = FALSE " "WHERE name = %s",
(name,),
)
def get_shard_info(self, id):
with self.db.cursor() as c:
c.execute("SELECT name, readonly FROM shards WHERE id = %s", (id,))
if c.rowcount == 0:
return None
else:
return c.fetchone()
def list_shards(self):
with self.db.cursor() as c:
c.execute("SELECT name, readonly, packing FROM shards")
for row in c:
yield row[0], row[1], row[2]
def contains(self, obj_id):
with self.db.cursor() as c:
c.execute(
"SELECT shard FROM signature2shard WHERE "
"signature = %s AND inflight = FALSE",
(obj_id,),
)
if c.rowcount == 0:
return None
else:
return c.fetchone()[0]
def get(self, obj_id):
id = self.contains(obj_id)
if id is None:
return None
return self.get_shard_info(id)
def add_phase_1(self, obj_id):
try:
with self.db.cursor() as c:
c.execute(
"INSERT INTO signature2shard (signature, shard, inflight) "
"VALUES (%s, %s, TRUE)",
(obj_id, self.id),
)
self.db.commit()
return self.id
except psycopg2.errors.UniqueViolation:
with self.db.cursor() as c:
c.execute(
"SELECT shard FROM signature2shard WHERE "
"signature = %s AND inflight = TRUE",
(obj_id,),
)
if c.rowcount == 0:
return None
else:
return c.fetchone()[0]
def add_phase_2(self, obj_id):
with self.db.cursor() as c:
c.execute(
"UPDATE signature2shard SET inflight = FALSE "
"WHERE signature = %s AND shard = %s",
(obj_id, self.id),
)
self.db.commit()
diff --git a/swh/objstorage/backends/winery/throttler.py b/swh/objstorage/backends/winery/throttler.py
new file mode 100644
index 0000000..e6e2eaf
--- /dev/null
+++ b/swh/objstorage/backends/winery/throttler.py
@@ -0,0 +1,203 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import collections
+import datetime
+import logging
+import time
+
+from .database import Database, DatabaseAdmin
+
+logger = logging.getLogger(__name__)
+
+
+class LeakyBucket:
+ """
+ Leaky bucket that can contain at most `total` and leaks it within a second.
+ If adding (with the add method) more than `total` per second, it will sleep
+ until the bucket can be filled without overflowing.
+
+ The capacity of the bucket can be changed dynamically with the reset method.
+ If the new capacity is lower than it previously was, the overflow is ignored.
+ """
+
+ def __init__(self, total):
+ self.updated = 0.0
+ self.current = 0.0
+ self.reset(total)
+
+ def reset(self, total):
+ self.total = total
+ self.current = min(self.total, self.current)
+ self._tick()
+
+ def add(self, count):
+ self._tick()
+ if count > self.current:
+ time.sleep((count - self.current) / self.total)
+ self._tick()
+ self.current -= min(self.total, count)
+
+ def _tick(self):
+ now = time.monotonic()
+ self.current += self.total * (now - self.updated)
+ self.current = int(min(self.total, self.current))
+ self.updated = now
+
+
+class BandwidthCalculator:
+ """Keeps a histogram (of length `duration`, defaults to 60) where
+ each element is the number of bytes read or written within a
+ second.
+
+ Only the last `duration` seconds are represented in the histogram:
+ after each second the oldest element is discarded.
+
+ The `add` method is called to add a value to the current second.
+
+ The `get` method retrieves the current bandwidth usage which is
+ the average of all values in the histogram.
+ """
+
+ def __init__(self):
+ self.duration = 60
+ self.history = collections.deque([0] * (self.duration - 1), self.duration - 1)
+ self.current = 0
+ self.current_second = 0
+
+ def add(self, count):
+ current_second = int(time.monotonic())
+ if current_second > self.current_second:
+ self.history.append(self.current)
+ self.history.extend(
+ [0] * min(self.duration, current_second - self.current_second - 1)
+ )
+ self.current_second = current_second
+ self.current = 0
+ self.current += count
+
+ def get(self):
+ return (sum(self.history) + self.current) / self.duration
+
+
+class IOThrottler(Database):
+ """Throttle IO (either read or write, depending on the `name`
+ argument). The maximum speed in bytes is from the throttle_`name`
+ argument and controlled by a LeakyBucket that guarantees it won't
+ go any faster.
+
+ Every `sync_interval` seconds the current bandwidth reported by
+ the BandwidthCalculator instance is written into a row in a table
+ shared with other instances of IOThrottler. The cumulated
+ bandwidth of all other instances is retrieved from the same table.
+ If it exceeds `max_speed`, the LeakyBucket instance is reset to
+ only allow max_speed/(number of instances) so that the total
+ bandwidth is shared equally between instances.
+ """
+
+ def __init__(self, name, **kwargs):
+ super().__init__(kwargs["base_dsn"], "throttler")
+ self.name = name
+ self.init_db()
+ self.last_sync = 0
+ self.max_speed = kwargs["throttle_" + name]
+ self.bucket = LeakyBucket(self.max_speed)
+ self.bandwidth = BandwidthCalculator()
+
+ def init_db(self):
+ self.create_tables()
+ self.db = self.connect_database()
+ self.cursor = self.db.cursor()
+ self.cursor.execute(
+ f"INSERT INTO t_{self.name} (updated, bytes) VALUES (%s, %s) RETURNING id",
+ (datetime.datetime.now(), 0),
+ )
+ self.rowid = self.cursor.fetchone()[0]
+ self.cursor.execute(
+ f"SELECT * FROM t_{self.name} WHERE id = %s FOR UPDATE", (self.rowid,)
+ )
+ self.cursor.execute(
+ f"DELETE FROM t_{self.name} WHERE id IN ("
+ f"SELECT id FROM t_{self.name} WHERE updated < NOW() - INTERVAL '30 days' "
+ " FOR UPDATE SKIP LOCKED)"
+ )
+ self.db.commit()
+ self.sync_interval = 60
+
+ @property
+ def lock(self):
+ return 9485433 # an abitrary unique number
+
+ @property
+ def database_tables(self):
+ return [
+ f"""
+ CREATE TABLE IF NOT EXISTS t_{self.name}(
+ id SERIAL PRIMARY KEY,
+ updated TIMESTAMP NOT NULL,
+ bytes INTEGER NOT NULL
+ )
+ """,
+ ]
+
+ def download_info(self):
+ self.cursor.execute(
+ f"SELECT COUNT(*), SUM(bytes) FROM t_{self.name} "
+ "WHERE bytes > 0 AND updated > NOW() - INTERVAL '5 minutes'"
+ )
+ return self.cursor.fetchone()
+
+ def upload_info(self):
+ bytes = int(self.bandwidth.get())
+ logger.debug("%d: upload %s/s", self.rowid, bytes)
+ self.cursor.execute(
+ f"UPDATE t_{self.name} SET updated = %s, bytes = %s WHERE id = %s",
+ (datetime.datetime.now(), bytes, self.rowid),
+ )
+ self.db.commit()
+
+ def add(self, count):
+ self.bucket.add(count)
+ self.bandwidth.add(count)
+ self.maybe_sync()
+
+ def sync(self):
+ self.upload_info()
+ (others_count, total_usage) = self.download_info()
+ logger.debug(
+ "%d: sync others_count=%s total_usage=%s",
+ self.rowid,
+ others_count,
+ total_usage,
+ )
+ if others_count > 0 and total_usage > self.max_speed:
+ self.bucket.reset(self.max_speed / others_count)
+
+ def maybe_sync(self):
+ now = time.monotonic()
+ if now - self.last_sync > self.sync_interval:
+ self.sync()
+ self.last_sync = now
+
+
+class Throttler:
+ """Throttle reads and writes to not exceed limits imposed by the
+ `thottle_read` and `throttle_write` arguments, as measured by the
+ cumulated bandwidth reported by each Throttler instance.
+ """
+
+ def __init__(self, **kwargs):
+ DatabaseAdmin(kwargs["base_dsn"], "throttler").create_database()
+ self.read = IOThrottler("read", **kwargs)
+ self.write = IOThrottler("write", **kwargs)
+
+ def throttle_get(self, fun, key):
+ content = fun(key)
+ self.read.add(len(content))
+ return content
+
+ def throttle_add(self, fun, obj_id, content):
+ self.write.add(len(obj_id) + len(content))
+ return fun(obj_id, content)
diff --git a/swh/objstorage/tests/conftest.py b/swh/objstorage/tests/conftest.py
index d3cda00..a8f858e 100644
--- a/swh/objstorage/tests/conftest.py
+++ b/swh/objstorage/tests/conftest.py
@@ -1,35 +1,47 @@
def pytest_configure(config):
config.addinivalue_line("markers", "shard_max_size: winery backend")
def pytest_addoption(parser):
parser.addoption(
"--winery-bench-rw-workers",
type=int,
help="Number of Read/Write workers",
default=1,
)
parser.addoption(
"--winery-bench-ro-workers",
type=int,
help="Number of Readonly workers",
default=1,
)
parser.addoption(
"--winery-bench-duration",
type=int,
help="Duration of the benchmarks in seconds",
default=1,
)
parser.addoption(
"--winery-shard-max-size",
type=int,
help="Size of the shard in bytes",
default=10 * 1024 * 1024,
)
parser.addoption(
"--winery-bench-ro-worker-max-request",
type=int,
help="Number of requests a ro worker performs",
default=1,
)
+ parser.addoption(
+ "--winery-bench-throttle-read",
+ type=int,
+ help="Maximum number of bytes per second read",
+ default=100 * 1024 * 1024,
+ )
+ parser.addoption(
+ "--winery-bench-throttle-write",
+ type=int,
+ help="Maximum number of bytes per second write",
+ default=100 * 1024 * 1024,
+ )
diff --git a/swh/objstorage/tests/test_objstorage_winery.py b/swh/objstorage/tests/test_objstorage_winery.py
index 1577236..dbc5441 100644
--- a/swh/objstorage/tests/test_objstorage_winery.py
+++ b/swh/objstorage/tests/test_objstorage_winery.py
@@ -1,224 +1,374 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import time
import pytest
import sh
from swh.objstorage import exc
-from swh.objstorage.backends.winery.database import Database
+from swh.objstorage.backends.winery.database import DatabaseAdmin
from swh.objstorage.backends.winery.objstorage import Packer, pack
+from swh.objstorage.backends.winery.throttler import (
+ BandwidthCalculator,
+ IOThrottler,
+ LeakyBucket,
+ Throttler,
+)
from swh.objstorage.factory import get_objstorage
from .winery_benchmark import Bench, work
from .winery_testing_helpers import PoolHelper, SharedBaseHelper
@pytest.fixture
def needs_ceph():
try:
sh.ceph("--version")
except sh.CommandNotFound:
pytest.skip("the ceph CLI was not found")
@pytest.fixture
def ceph_pool(needs_ceph):
pool = PoolHelper(shard_max_size=10 * 1024 * 1024)
pool.clobber()
pool.pool_create()
yield pool
pool.images_clobber()
pool.clobber()
@pytest.fixture
def storage(request, postgresql):
marker = request.node.get_closest_marker("shard_max_size")
if marker is None:
shard_max_size = 1024
else:
shard_max_size = marker.args[0]
dsn = (
f"postgres://{postgresql.info.user}"
f":@{postgresql.info.host}:{postgresql.info.port}"
)
storage = get_objstorage(
- cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=shard_max_size
+ cls="winery",
+ base_dsn=dsn,
+ shard_dsn=dsn,
+ shard_max_size=shard_max_size,
+ throttle_write=200 * 1024 * 1024,
+ throttle_read=100 * 1024 * 1024,
)
yield storage
storage.winery.uninit()
#
# pytest-postgresql will not remove databases that it did not
# create between tests (only at the very end).
#
- d = Database(dsn)
+ d = DatabaseAdmin(dsn)
for database in d.list_databases():
if database != postgresql.info.dbname and database != "tests_tmpl":
- d.drop_database(database)
+ DatabaseAdmin(dsn, database).drop_database()
@pytest.fixture
def winery(storage):
return storage.winery
def test_winery_sharedbase(winery):
base = winery.base
shard1 = base.whoami
assert shard1 is not None
assert shard1 == base.whoami
id1 = base.id
assert id1 is not None
assert id1 == base.id
def test_winery_add_get(winery):
shard = winery.base.whoami
content = b"SOMETHING"
obj_id = winery.add(content=content)
assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579"
assert winery.add(content=content, obj_id=obj_id) == obj_id
assert winery.add(content=content, obj_id=obj_id, check_presence=False) == obj_id
assert winery.base.whoami == shard
assert winery.get(obj_id) == content
with pytest.raises(exc.ObjNotFoundError):
winery.get(b"unknown")
winery.shard.drop()
@pytest.mark.shard_max_size(1)
def test_winery_add_and_pack(winery, mocker):
mocker.patch("swh.objstorage.backends.winery.objstorage.pack", return_value=True)
shard = winery.base.whoami
content = b"SOMETHING"
obj_id = winery.add(content=content)
assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579"
assert winery.base.whoami != shard
assert len(winery.packers) == 1
packer = winery.packers[0]
packer.join()
assert packer.exitcode == 0
def test_winery_delete(storage):
with pytest.raises(PermissionError):
storage.delete(None)
def test_winery_get_shard_info(winery):
assert winery.base.get_shard_info(1234) is None
assert SharedBaseHelper(winery.base).get_shard_info_by_name("nothing") is None
@pytest.mark.shard_max_size(10 * 1024 * 1024)
def test_winery_packer(winery, ceph_pool):
shard = winery.base.whoami
content = b"SOMETHING"
winery.add(content=content)
winery.base.shard_packing_starts()
packer = Packer(shard, **winery.args)
try:
assert packer.run() is True
finally:
packer.uninit()
readonly, packing = SharedBaseHelper(winery.base).get_shard_info_by_name(shard)
assert readonly is True
assert packing is False
@pytest.mark.shard_max_size(10 * 1024 * 1024)
def test_winery_get_object(winery, ceph_pool):
shard = winery.base.whoami
content = b"SOMETHING"
obj_id = winery.add(content=content)
winery.base.shard_packing_starts()
assert pack(shard, **winery.args) is True
assert winery.get(obj_id) == content
def test_winery_ceph_pool(needs_ceph):
name = "IMAGE"
pool = PoolHelper(shard_max_size=10 * 1024 * 1024)
pool.clobber()
pool.pool_create()
pool.image_create(name)
p = pool.image_path(name)
assert p.endswith(name)
something = "SOMETHING"
open(p, "w").write(something)
assert open(p).read(len(something)) == something
assert pool.image_list() == [name]
pool.image_remap_ro(name)
pool.images_clobber()
assert pool.image_list() == [name]
pool.clobber()
assert pool.image_list() == []
@pytest.mark.shard_max_size(10 * 1024 * 1024)
def test_winery_bench_work(winery, ceph_pool, tmpdir):
#
# rw worker creates a shard
#
whoami = winery.base.whoami
shards_info = list(winery.base.list_shards())
assert len(shards_info) == 1
shard, readonly, packing = shards_info[0]
assert (readonly, packing) == (False, False)
winery.args["dir"] = str(tmpdir)
assert work("rw", winery.args) == "rw"
shards_info = {
name: (readonly, packing)
for name, readonly, packing in winery.base.list_shards()
}
assert len(shards_info) == 2
assert shards_info[whoami] == (True, False)
#
# ro worker reads a shard
#
winery.args["ro_worker_max_request"] = 1
assert work("ro", winery.args) == "ro"
@pytest.mark.asyncio
async def test_winery_bench_real(pytestconfig, postgresql, ceph_pool):
dsn = (
f"postgres://{postgresql.info.user}"
f":@{postgresql.info.host}:{postgresql.info.port}"
)
kwargs = {
"rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"),
"ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"),
"shard_max_size": pytestconfig.getoption("--winery-shard-max-size"),
"ro_worker_max_request": pytestconfig.getoption(
"--winery-bench-ro-worker-max-request"
),
"duration": pytestconfig.getoption("--winery-bench-duration"),
"base_dsn": dsn,
"shard_dsn": dsn,
+ "throttle_read": pytestconfig.getoption("--winery-bench-throttle-read"),
+ "throttle_write": pytestconfig.getoption("--winery-bench-throttle-write"),
}
assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"]
@pytest.mark.asyncio
async def test_winery_bench_fake(pytestconfig, mocker):
kwargs = {
"rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"),
"ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"),
"duration": pytestconfig.getoption("--winery-bench-duration"),
}
def run(kind):
time.sleep(kwargs["duration"] * 2)
return kind
mocker.patch("swh.objstorage.tests.winery_benchmark.Worker.run", side_effect=run)
assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"]
+
+
+def test_winery_leaky_bucket_tick(mocker):
+ total = 100
+ half = 50
+ b = LeakyBucket(total)
+ sleep = mocker.spy(time, "sleep")
+ assert b.current == b.total
+ sleep.assert_not_called()
+ #
+ # Bucket is at 100, add(50) => drops to 50
+ #
+ b.add(half)
+ assert b.current == half
+ sleep.assert_not_called()
+ #
+ # Bucket is at 50, add(50) => drops to 0
+ #
+ b.add(half)
+ assert b.current == 0
+ sleep.assert_not_called()
+ #
+ # Bucket is at 0, add(50) => waits until it is at 50 and then drops to 0
+ #
+ b.add(half)
+ assert b.current == 0
+ sleep.assert_called_once()
+ #
+ # Sleep more than one second, bucket is full again, i.e. at 100
+ #
+ time.sleep(2)
+ mocker.resetall()
+ b.add(0)
+ assert b.current == total
+ sleep.assert_not_called()
+ #
+ # Bucket is full at 100 and and waits when requesting 150 which is
+ # more than it can contain
+ #
+ b.add(total + half)
+ assert b.current == 0
+ sleep.assert_called_once()
+ mocker.resetall()
+ #
+ # Bucket is empty and and waits when requesting 150 which is more
+ # than it can contain
+ #
+ b.add(total + half)
+ assert b.current == 0
+ sleep.assert_called_once()
+ mocker.resetall()
+
+
+def test_winery_leaky_bucket_reset():
+ b = LeakyBucket(100)
+ assert b.total == 100
+ assert b.current == b.total
+ b.reset(50)
+ assert b.total == 50
+ assert b.current == b.total
+ b.reset(100)
+ assert b.total == 100
+ assert b.current == 50
+
+
+def test_winery_bandwidth_calculator(mocker):
+ now = 1
+
+ def monotonic():
+ return now
+
+ mocker.patch("time.monotonic", side_effect=monotonic)
+ b = BandwidthCalculator()
+ assert b.get() == 0
+ count = 100 * 1024 * 1024
+ going_up = []
+ for t in range(b.duration):
+ now += 1
+ b.add(count)
+ going_up.append(b.get())
+ assert b.get() == count
+ going_down = []
+ for t in range(b.duration - 1):
+ now += 1
+ b.add(0)
+ going_down.append(b.get())
+ going_down.reverse()
+ assert going_up[:-1] == going_down
+ assert len(b.history) == b.duration - 1
+
+
+def test_winery_io_throttler(postgresql, mocker):
+ dsn = (
+ f"postgres://{postgresql.info.user}"
+ f":@{postgresql.info.host}:{postgresql.info.port}"
+ )
+ DatabaseAdmin(dsn, "throttler").create_database()
+ sleep = mocker.spy(time, "sleep")
+ speed = 100
+ i = IOThrottler("read", base_dsn=dsn, throttle_read=100)
+ count = speed
+ i.add(count)
+ sleep.assert_not_called()
+ i.add(count)
+ sleep.assert_called_once()
+ #
+ # Force slow down
+ #
+ mocker.resetall()
+ i.sync_interval = 0
+ i.max_speed = 1
+ assert i.max_speed != i.bucket.total
+ i.add(2)
+ assert i.max_speed == i.bucket.total
+ sleep.assert_called_once()
+
+
+def test_winery_throttler(postgresql):
+ dsn = (
+ f"postgres://{postgresql.info.user}"
+ f":@{postgresql.info.host}:{postgresql.info.port}"
+ )
+ t = Throttler(base_dsn=dsn, throttle_read=100, throttle_write=100)
+
+ base = {}
+ key = "KEY"
+ content = "CONTENT"
+
+ def reader(k):
+ return base[k]
+
+ def writer(k, v):
+ base[k] = v
+ return True
+
+ assert t.throttle_add(writer, key, content) is True
+ assert t.throttle_get(reader, key) == content
diff --git a/swh/objstorage/tests/winery_benchmark.py b/swh/objstorage/tests/winery_benchmark.py
index 419cbb9..36ccc35 100644
--- a/swh/objstorage/tests/winery_benchmark.py
+++ b/swh/objstorage/tests/winery_benchmark.py
@@ -1,145 +1,149 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import concurrent.futures
import logging
import os
import random
import time
from swh.objstorage.factory import get_objstorage
logger = logging.getLogger(__name__)
def work(kind, args):
return Worker(args).run(kind)
class Worker(object):
def __init__(self, args):
self.args = args
def run(self, kind):
getattr(self, kind)()
return kind
def ro(self):
self.storage = get_objstorage(
cls="winery",
readonly=True,
base_dsn=self.args["base_dsn"],
shard_dsn=self.args["shard_dsn"],
shard_max_size=self.args["shard_max_size"],
+ throttle_read=self.args["throttle_read"],
+ throttle_write=self.args["throttle_write"],
)
with self.storage.winery.base.db.cursor() as c:
while True:
c.execute(
"SELECT signature FROM signature2shard WHERE inflight = FALSE "
"ORDER BY random() LIMIT %s",
(self.args["ro_worker_max_request"],),
)
if c.rowcount > 0:
break
logger.info(f"Worker(ro, {os.getpid()}): empty, waiting")
time.sleep(1)
logger.info(f"Worker(ro, {os.getpid()}): requesting {c.rowcount} objects")
start = time.time()
for row in c:
obj_id = row[0].tobytes()
assert self.storage.get(obj_id) is not None
elapsed = time.time() - start
logger.info(f"Worker(ro, {os.getpid()}): finished ({elapsed:.2f}s)")
def payloads_define(self):
self.payloads = [
3 * 1024 + 1,
3 * 1024 + 1,
3 * 1024 + 1,
3 * 1024 + 1,
3 * 1024 + 1,
10 * 1024 + 1,
13 * 1024 + 1,
16 * 1024 + 1,
70 * 1024 + 1,
80 * 1024 + 1,
]
def rw(self):
self.storage = get_objstorage(
cls="winery",
base_dsn=self.args["base_dsn"],
shard_dsn=self.args["shard_dsn"],
shard_max_size=self.args["shard_max_size"],
+ throttle_read=self.args["throttle_read"],
+ throttle_write=self.args["throttle_write"],
)
self.payloads_define()
random_content = open("/dev/urandom", "rb")
logger.info(f"Worker(rw, {os.getpid()}): start")
start = time.time()
count = 0
while len(self.storage.winery.packers) == 0:
content = random_content.read(random.choice(self.payloads))
self.storage.add(content=content)
count += 1
logger.info(f"Worker(rw, {os.getpid()}): packing {count} objects")
packer = self.storage.winery.packers[0]
packer.join()
assert packer.exitcode == 0
elapsed = time.time() - start
logger.info(f"Worker(rw, {os.getpid()}): finished ({elapsed:.2f}s)")
class Bench(object):
def __init__(self, args):
self.args = args
def timer_start(self):
self.start = time.time()
def timeout(self):
return time.time() - self.start > self.args["duration"]
async def run(self):
self.timer_start()
loop = asyncio.get_running_loop()
workers_count = self.args["rw_workers"] + self.args["ro_workers"]
with concurrent.futures.ProcessPoolExecutor(
max_workers=workers_count
) as executor:
logger.info("Bench.run: running")
self.count = 0
workers = set()
def create_worker(kind):
self.count += 1
logger.info(f"Bench.run: launched {kind} worker number {self.count}")
return loop.run_in_executor(executor, work, kind, self.args)
for kind in ["rw"] * self.args["rw_workers"] + ["ro"] * self.args[
"ro_workers"
]:
workers.add(create_worker(kind))
while len(workers) > 0:
logger.info(f"Bench.run: waiting for {len(workers)} workers")
current = workers
done, pending = await asyncio.wait(
current, return_when=asyncio.FIRST_COMPLETED
)
workers = pending
for task in done:
kind = task.result()
logger.info(f"Bench.run: worker {kind} complete")
if not self.timeout():
workers.add(create_worker(kind))
logger.info("Bench.run: finished")
return self.count
diff --git a/swh/objstorage/tests/winery_testing_helpers.py b/swh/objstorage/tests/winery_testing_helpers.py
index 3fceace..4f28240 100644
--- a/swh/objstorage/tests/winery_testing_helpers.py
+++ b/swh/objstorage/tests/winery_testing_helpers.py
@@ -1,64 +1,64 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from swh.objstorage.backends.winery.roshard import Pool
logger = logging.getLogger(__name__)
class SharedBaseHelper:
def __init__(self, sharedbase):
self.sharedbase = sharedbase
def get_shard_info_by_name(self, name):
with self.sharedbase.db.cursor() as c:
c.execute("SELECT readonly, packing FROM shards WHERE name = %s", (name,))
if c.rowcount == 0:
return None
else:
return c.fetchone()
class PoolHelper(Pool):
def image_delete(self, image):
self.image_unmap(image)
logger.info(f"rdb --pool {self.name} remove {image}")
self.rbd.remove(image)
def images_clobber(self):
for image in self.image_list():
image = image.strip()
self.image_unmap(image)
def clobber(self):
self.images_clobber()
self.pool_clobber()
def pool_clobber(self):
logger.info(f"ceph osd pool delete {self.name}")
self.ceph.osd.pool.delete(self.name, self.name, "--yes-i-really-really-mean-it")
data = f"{self.name}-data"
logger.info(f"ceph osd pool delete {data}")
self.ceph.osd.pool.delete(data, data, "--yes-i-really-really-mean-it")
def pool_create(self):
data = f"{self.name}-data"
logger.info(f"ceph osd pool create {data}")
self.ceph.osd(
"erasure-code-profile",
"set",
"--force",
data,
"k=4",
"m=2",
"crush-failure-domain=host",
)
- self.ceph.osd.pool.create(data, "200", "erasure", data)
+ self.ceph.osd.pool.create(data, "100", "erasure", data)
self.ceph.osd.pool.set(data, "allow_ec_overwrites", "true")
self.ceph.osd.pool.set(data, "pg_autoscale_mode", "off")
logger.info(f"ceph osd pool create {self.name}")
self.ceph.osd.pool.create(self.name)

File Metadata

Mime Type
text/x-diff
Expires
Mar 18 2025, 6:31 PM (7 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3272323

Event Timeline