diff --git a/docs/winery.rst b/docs/winery.rst --- a/docs/winery.rst +++ b/docs/winery.rst @@ -5,6 +5,15 @@ The Winery backend implements the `Ceph based object storage architecture `__. +IO Throttling +-------------- + +Ceph (Pacific) implements IO QoS in librbd but it is only effective within a single process, not cluster wide. The preliminary benchmarks showed that accumulated read and write throughput must be throttled client side to prevent performance degradation (slower throughput and increased latency). + +Table are created in a PostgreSQL database dedicated to throttling, so independent processes performing I/O against the Ceph cluster can synchronize with each other and control their accumulated throughput for reads and writes. Workers creates a row in the read and write tables and update them every minute with their current read and write throughput, in bytes per second. They also query all rows to figure out the current accumulated bandwidth. + +If the current accumulated bandwidth is above the maximum desired speed for N active workers, the process will reduce its throughput to use a maximum of 1/N of the maximum desired speed. For instance, if the current accumulated usage is above 100MB/s and there are 10 workers, the process will reduce its own speed to 10MB/s. After the 10 workers independently do the same, each of them will share 1/10 of the bandwidth. + Implementation notes -------------------- diff --git a/swh/objstorage/backends/winery/database.py b/swh/objstorage/backends/winery/database.py --- a/swh/objstorage/backends/winery/database.py +++ b/swh/objstorage/backends/winery/database.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import abc from contextlib import contextmanager import logging import time @@ -12,9 +13,10 @@ logger = logging.getLogger(__name__) -class Database: - def __init__(self, dsn): +class DatabaseAdmin: + def __init__(self, dsn, dbname=None): self.dsn = dsn + self.dbname = dbname @contextmanager def admin_cursor(self): @@ -30,26 +32,26 @@ finally: c.close() - def create_database(self, database): + def create_database(self): with self.admin_cursor() as c: c.execute( "SELECT datname FROM pg_catalog.pg_database " - f"WHERE datname = '{database}'" + f"WHERE datname = '{self.dbname}'" ) if c.rowcount == 0: try: - c.execute(f"CREATE DATABASE {database}") + c.execute(f"CREATE DATABASE {self.dbname}") except psycopg2.errors.UniqueViolation: # someone else created the database, it is fine pass - def drop_database(self, database): + def drop_database(self): with self.admin_cursor() as c: c.execute( "SELECT pg_terminate_backend(pg_stat_activity.pid)" "FROM pg_stat_activity " "WHERE pg_stat_activity.datname = %s;", - (database,), + (self.dbname,), ) # # Dropping the database may fail because the server takes time @@ -72,13 +74,13 @@ # for i in range(60): try: - c.execute(f"DROP DATABASE IF EXISTS {database}") + c.execute(f"DROP DATABASE IF EXISTS {self.dbname}") return except psycopg2.errors.ObjectInUse: - logger.warning(f"{database} database drop fails, waiting 10s") + logger.warning(f"{self.dbname} database drop fails, waiting 10s") time.sleep(10) continue - raise Exception(f"database drop failed on {database}") + raise Exception(f"database drop failed on {self.dbname}") def list_databases(self): with self.admin_cursor() as c: @@ -87,3 +89,36 @@ "WHERE datistemplate = false and datname != 'postgres'" ) return [r[0] for r in c.fetchall()] + + +class Database(abc.ABC): + def __init__(self, dsn, dbname): + self.dsn = dsn + self.dbname = dbname + + @property + @abc.abstractmethod + def lock(self): + "Return an arbitrary unique number for pg_advisory_lock when creating tables" + raise NotImplementedError("Database.lock") + + @property + @abc.abstractmethod + def database_tables(self): + "Return the list of CREATE TABLE statements for all tables in the database" + raise NotImplementedError("Database.database_tables") + + def create_tables(self): + db = psycopg2.connect(dsn=self.dsn, dbname=self.dbname) + db.autocommit = True + c = db.cursor() + c.execute("SELECT pg_advisory_lock(%s)", (self.lock,)) + for table in self.database_tables: + c.execute(table) + c.close() + db.close() # so the pg_advisory_lock is released + + def connect_database(self): + db = psycopg2.connect(dsn=self.dsn, dbname=self.dbname) + db.autocommit = True + return db diff --git a/swh/objstorage/backends/winery/objstorage.py b/swh/objstorage/backends/winery/objstorage.py --- a/swh/objstorage/backends/winery/objstorage.py +++ b/swh/objstorage/backends/winery/objstorage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -103,10 +103,10 @@ self.rw.uninit() def run(self): - shard = self.ro.create(self.rw.count()) + self.ro.create(self.rw.count()) for obj_id, content in self.rw.all(): - shard.write(obj_id, content) - shard.save() + self.ro.add(content, obj_id) + self.ro.save() base = SharedBase(**self.args) base.shard_packing_ends(self.shard) base.uninit() diff --git a/swh/objstorage/backends/winery/roshard.py b/swh/objstorage/backends/winery/roshard.py --- a/swh/objstorage/backends/winery/roshard.py +++ b/swh/objstorage/backends/winery/roshard.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -9,6 +9,8 @@ from swh.perfecthash import Shard +from .throttler import Throttler + logger = logging.getLogger(__name__) @@ -59,6 +61,7 @@ class ROShard: def __init__(self, name, **kwargs): self.pool = Pool(shard_max_size=kwargs["shard_max_size"]) + self.throttler = Throttler(**kwargs) self.name = name def create(self, count): @@ -71,4 +74,10 @@ return self.shard.load() == self.shard def get(self, key): - return self.shard.lookup(key) + return self.throttler.throttle_get(self.shard.lookup, key) + + def add(self, content, obj_id): + return self.throttler.throttle_add(self.shard.write, obj_id, content) + + def save(self): + return self.shard.save() diff --git a/swh/objstorage/backends/winery/rwshard.py b/swh/objstorage/backends/winery/rwshard.py --- a/swh/objstorage/backends/winery/rwshard.py +++ b/swh/objstorage/backends/winery/rwshard.py @@ -6,15 +6,16 @@ import psycopg2 -from .database import Database +from .database import Database, DatabaseAdmin class RWShard(Database): def __init__(self, name, **kwargs): - super().__init__(kwargs["shard_dsn"]) self._name = name - self.create_database(self.name) - self.db = self.create_table(f"{self.dsn}/{self.name}") + DatabaseAdmin(kwargs["base_dsn"], self.name).create_database() + super().__init__(kwargs["shard_dsn"], self.name) + self.create_tables() + self.db = self.connect_database() self.size = self.total_size() self.limit = kwargs["shard_max_size"] @@ -23,6 +24,10 @@ self.db.close() del self.db + @property + def lock(self): + return 452343 # an abitrary unique number + @property def name(self): return self._name @@ -31,22 +36,18 @@ return self.size > self.limit def drop(self): - self.drop_database(self.name) + DatabaseAdmin(self.dsn, self.dbname).drop_database() - def create_table(self, dsn): - db = psycopg2.connect(dsn) - db.autocommit = True - c = db.cursor() - c.execute( + @property + def database_tables(self): + return [ """ CREATE TABLE IF NOT EXISTS objects( key BYTEA PRIMARY KEY, content BYTEA ) - """ - ) - c.close() - return db + """, + ] def total_size(self): with self.db.cursor() as c: diff --git a/swh/objstorage/backends/winery/sharedbase.py b/swh/objstorage/backends/winery/sharedbase.py --- a/swh/objstorage/backends/winery/sharedbase.py +++ b/swh/objstorage/backends/winery/sharedbase.py @@ -7,28 +7,28 @@ import psycopg2 -from .database import Database +from .database import Database, DatabaseAdmin class SharedBase(Database): def __init__(self, **kwargs): - super().__init__(kwargs["base_dsn"]) - database = "sharedbase" - self.create_database(database) - self.db = self.create_table(f"{self.dsn}/{database}") + DatabaseAdmin(kwargs["base_dsn"], "sharedbase").create_database() + super().__init__(kwargs["base_dsn"], "sharedbase") + self.create_tables() + self.db = self.connect_database() self._whoami = None def uninit(self): self.db.close() del self.db - def create_table(self, dsn): - db = psycopg2.connect(dsn) - db.autocommit = True - c = db.cursor() - lock = 314116 # an abitrary unique number - c.execute("SELECT pg_advisory_lock(%s)", (lock,)) - c.execute( + @property + def lock(self): + return 314116 # an abitrary unique number + + @property + def database_tables(self): + return [ """ CREATE TABLE IF NOT EXISTS shards( id SERIAL PRIMARY KEY, @@ -36,22 +36,23 @@ packing BOOLEAN NOT NULL, name CHAR(32) NOT NULL UNIQUE ) - """ - ) - c.execute( + """, """ CREATE TABLE IF NOT EXISTS signature2shard( signature BYTEA PRIMARY KEY, inflight BOOLEAN NOT NULL, shard INTEGER NOT NULL ) - """ + """, + """ + CREATE TABLE IF NOT EXISTS iothrottling( + id SERIAL PRIMARY KEY, + updated TIMESTAMP NOT NULL, + bytesread INTEGER NOT NULL, + byteswritten INTEGER NOT NULL ) - c.close() - db.close() # so the pg_advisory_lock is released - db = psycopg2.connect(dsn) - db.autocommit = True - return db + """, + ] @property def whoami(self): diff --git a/swh/objstorage/backends/winery/throttler.py b/swh/objstorage/backends/winery/throttler.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/throttler.py @@ -0,0 +1,203 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import collections +import datetime +import logging +import time + +from .database import Database, DatabaseAdmin + +logger = logging.getLogger(__name__) + + +class LeakyBucket: + """ + Leaky bucket that can contain at most `total` and leaks it within a second. + If adding (with the add method) more than `total` per second, it will sleep + until the bucket can be filled without overflowing. + + The capacity of the bucket can be changed dynamically with the reset method. + If the new capacity is lower than it previously was, the overflow is ignored. + """ + + def __init__(self, total): + self.updated = 0.0 + self.current = 0.0 + self.reset(total) + + def reset(self, total): + self.total = total + self.current = min(self.total, self.current) + self._tick() + + def add(self, count): + self._tick() + if count > self.current: + time.sleep((count - self.current) / self.total) + self._tick() + self.current -= min(self.total, count) + + def _tick(self): + now = time.monotonic() + self.current += self.total * (now - self.updated) + self.current = int(min(self.total, self.current)) + self.updated = now + + +class BandwidthCalculator: + """Keeps a histogram (of length `duration`, defaults to 60) where + each element is the number of bytes read or written within a + second. + + Only the last `duration` seconds are represented in the histogram: + after each second the oldest element is discarded. + + The `add` method is called to add a value to the current second. + + The `get` method retrieves the current bandwidth usage which is + the average of all values in the histogram. + """ + + def __init__(self): + self.duration = 60 + self.history = collections.deque([0] * (self.duration - 1), self.duration - 1) + self.current = 0 + self.current_second = 0 + + def add(self, count): + current_second = int(time.monotonic()) + if current_second > self.current_second: + self.history.append(self.current) + self.history.extend( + [0] * min(self.duration, current_second - self.current_second - 1) + ) + self.current_second = current_second + self.current = 0 + self.current += count + + def get(self): + return (sum(self.history) + self.current) / self.duration + + +class IOThrottler(Database): + """Throttle IO (either read or write, depending on the `name` + argument). The maximum speed in bytes is from the throttle_`name` + argument and controlled by a LeakyBucket that guarantees it won't + go any faster. + + Every `sync_interval` seconds the current bandwidth reported by + the BandwidthCalculator instance is written into a row in a table + shared with other instances of IOThrottler. The cumulated + bandwidth of all other instances is retrieved from the same table. + If it exceeds `max_speed`, the LeakyBucket instance is reset to + only allow max_speed/(number of instances) so that the total + bandwidth is shared equally between instances. + """ + + def __init__(self, name, **kwargs): + super().__init__(kwargs["base_dsn"], "throttler") + self.name = name + self.init_db() + self.last_sync = 0 + self.max_speed = kwargs["throttle_" + name] + self.bucket = LeakyBucket(self.max_speed) + self.bandwidth = BandwidthCalculator() + + def init_db(self): + self.create_tables() + self.db = self.connect_database() + self.cursor = self.db.cursor() + self.cursor.execute( + f"INSERT INTO t_{self.name} (updated, bytes) VALUES (%s, %s) RETURNING id", + (datetime.datetime.now(), 0), + ) + self.rowid = self.cursor.fetchone()[0] + self.cursor.execute( + f"SELECT * FROM t_{self.name} WHERE id = %s FOR UPDATE", (self.rowid,) + ) + self.cursor.execute( + f"DELETE FROM t_{self.name} WHERE id IN (" + f"SELECT id FROM t_{self.name} WHERE updated < NOW() - INTERVAL '30 days' " + " FOR UPDATE SKIP LOCKED)" + ) + self.db.commit() + self.sync_interval = 60 + + @property + def lock(self): + return 9485433 # an abitrary unique number + + @property + def database_tables(self): + return [ + f""" + CREATE TABLE IF NOT EXISTS t_{self.name}( + id SERIAL PRIMARY KEY, + updated TIMESTAMP NOT NULL, + bytes INTEGER NOT NULL + ) + """, + ] + + def download_info(self): + self.cursor.execute( + f"SELECT COUNT(*), SUM(bytes) FROM t_{self.name} " + "WHERE bytes > 0 AND updated > NOW() - INTERVAL '5 minutes'" + ) + return self.cursor.fetchone() + + def upload_info(self): + bytes = int(self.bandwidth.get()) + logger.debug("%d: upload %s/s", self.rowid, bytes) + self.cursor.execute( + f"UPDATE t_{self.name} SET updated = %s, bytes = %s WHERE id = %s", + (datetime.datetime.now(), bytes, self.rowid), + ) + self.db.commit() + + def add(self, count): + self.bucket.add(count) + self.bandwidth.add(count) + self.maybe_sync() + + def sync(self): + self.upload_info() + (others_count, total_usage) = self.download_info() + logger.debug( + "%d: sync others_count=%s total_usage=%s", + self.rowid, + others_count, + total_usage, + ) + if others_count > 0 and total_usage > self.max_speed: + self.bucket.reset(self.max_speed / others_count) + + def maybe_sync(self): + now = time.monotonic() + if now - self.last_sync > self.sync_interval: + self.sync() + self.last_sync = now + + +class Throttler: + """Throttle reads and writes to not exceed limits imposed by the + `thottle_read` and `throttle_write` arguments, as measured by the + cumulated bandwidth reported by each Throttler instance. + """ + + def __init__(self, **kwargs): + DatabaseAdmin(kwargs["base_dsn"], "throttler").create_database() + self.read = IOThrottler("read", **kwargs) + self.write = IOThrottler("write", **kwargs) + + def throttle_get(self, fun, key): + content = fun(key) + self.read.add(len(content)) + return content + + def throttle_add(self, fun, obj_id, content): + self.write.add(len(obj_id) + len(content)) + return fun(obj_id, content) diff --git a/swh/objstorage/tests/conftest.py b/swh/objstorage/tests/conftest.py --- a/swh/objstorage/tests/conftest.py +++ b/swh/objstorage/tests/conftest.py @@ -33,3 +33,15 @@ help="Number of requests a ro worker performs", default=1, ) + parser.addoption( + "--winery-bench-throttle-read", + type=int, + help="Maximum number of bytes per second read", + default=100 * 1024 * 1024, + ) + parser.addoption( + "--winery-bench-throttle-write", + type=int, + help="Maximum number of bytes per second write", + default=100 * 1024 * 1024, + ) diff --git a/swh/objstorage/tests/test_objstorage_winery.py b/swh/objstorage/tests/test_objstorage_winery.py --- a/swh/objstorage/tests/test_objstorage_winery.py +++ b/swh/objstorage/tests/test_objstorage_winery.py @@ -9,8 +9,14 @@ import sh from swh.objstorage import exc -from swh.objstorage.backends.winery.database import Database +from swh.objstorage.backends.winery.database import DatabaseAdmin from swh.objstorage.backends.winery.objstorage import Packer, pack +from swh.objstorage.backends.winery.throttler import ( + BandwidthCalculator, + IOThrottler, + LeakyBucket, + Throttler, +) from swh.objstorage.factory import get_objstorage from .winery_benchmark import Bench, work @@ -49,7 +55,12 @@ f":@{postgresql.info.host}:{postgresql.info.port}" ) storage = get_objstorage( - cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=shard_max_size + cls="winery", + base_dsn=dsn, + shard_dsn=dsn, + shard_max_size=shard_max_size, + throttle_write=200 * 1024 * 1024, + throttle_read=100 * 1024 * 1024, ) yield storage storage.winery.uninit() @@ -57,10 +68,10 @@ # pytest-postgresql will not remove databases that it did not # create between tests (only at the very end). # - d = Database(dsn) + d = DatabaseAdmin(dsn) for database in d.list_databases(): if database != postgresql.info.dbname and database != "tests_tmpl": - d.drop_database(database) + DatabaseAdmin(dsn, database).drop_database() @pytest.fixture @@ -204,6 +215,8 @@ "duration": pytestconfig.getoption("--winery-bench-duration"), "base_dsn": dsn, "shard_dsn": dsn, + "throttle_read": pytestconfig.getoption("--winery-bench-throttle-read"), + "throttle_write": pytestconfig.getoption("--winery-bench-throttle-write"), } assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] @@ -222,3 +235,140 @@ mocker.patch("swh.objstorage.tests.winery_benchmark.Worker.run", side_effect=run) assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] + + +def test_winery_leaky_bucket_tick(mocker): + total = 100 + half = 50 + b = LeakyBucket(total) + sleep = mocker.spy(time, "sleep") + assert b.current == b.total + sleep.assert_not_called() + # + # Bucket is at 100, add(50) => drops to 50 + # + b.add(half) + assert b.current == half + sleep.assert_not_called() + # + # Bucket is at 50, add(50) => drops to 0 + # + b.add(half) + assert b.current == 0 + sleep.assert_not_called() + # + # Bucket is at 0, add(50) => waits until it is at 50 and then drops to 0 + # + b.add(half) + assert b.current == 0 + sleep.assert_called_once() + # + # Sleep more than one second, bucket is full again, i.e. at 100 + # + time.sleep(2) + mocker.resetall() + b.add(0) + assert b.current == total + sleep.assert_not_called() + # + # Bucket is full at 100 and and waits when requesting 150 which is + # more than it can contain + # + b.add(total + half) + assert b.current == 0 + sleep.assert_called_once() + mocker.resetall() + # + # Bucket is empty and and waits when requesting 150 which is more + # than it can contain + # + b.add(total + half) + assert b.current == 0 + sleep.assert_called_once() + mocker.resetall() + + +def test_winery_leaky_bucket_reset(): + b = LeakyBucket(100) + assert b.total == 100 + assert b.current == b.total + b.reset(50) + assert b.total == 50 + assert b.current == b.total + b.reset(100) + assert b.total == 100 + assert b.current == 50 + + +def test_winery_bandwidth_calculator(mocker): + now = 1 + + def monotonic(): + return now + + mocker.patch("time.monotonic", side_effect=monotonic) + b = BandwidthCalculator() + assert b.get() == 0 + count = 100 * 1024 * 1024 + going_up = [] + for t in range(b.duration): + now += 1 + b.add(count) + going_up.append(b.get()) + assert b.get() == count + going_down = [] + for t in range(b.duration - 1): + now += 1 + b.add(0) + going_down.append(b.get()) + going_down.reverse() + assert going_up[:-1] == going_down + assert len(b.history) == b.duration - 1 + + +def test_winery_io_throttler(postgresql, mocker): + dsn = ( + f"postgres://{postgresql.info.user}" + f":@{postgresql.info.host}:{postgresql.info.port}" + ) + DatabaseAdmin(dsn, "throttler").create_database() + sleep = mocker.spy(time, "sleep") + speed = 100 + i = IOThrottler("read", base_dsn=dsn, throttle_read=100) + count = speed + i.add(count) + sleep.assert_not_called() + i.add(count) + sleep.assert_called_once() + # + # Force slow down + # + mocker.resetall() + i.sync_interval = 0 + i.max_speed = 1 + assert i.max_speed != i.bucket.total + i.add(2) + assert i.max_speed == i.bucket.total + sleep.assert_called_once() + + +def test_winery_throttler(postgresql): + dsn = ( + f"postgres://{postgresql.info.user}" + f":@{postgresql.info.host}:{postgresql.info.port}" + ) + t = Throttler(base_dsn=dsn, throttle_read=100, throttle_write=100) + + base = {} + key = "KEY" + content = "CONTENT" + + def reader(k): + return base[k] + + def writer(k, v): + base[k] = v + return True + + assert t.throttle_add(writer, key, content) is True + assert t.throttle_get(reader, key) == content diff --git a/swh/objstorage/tests/winery_benchmark.py b/swh/objstorage/tests/winery_benchmark.py --- a/swh/objstorage/tests/winery_benchmark.py +++ b/swh/objstorage/tests/winery_benchmark.py @@ -34,6 +34,8 @@ base_dsn=self.args["base_dsn"], shard_dsn=self.args["shard_dsn"], shard_max_size=self.args["shard_max_size"], + throttle_read=self.args["throttle_read"], + throttle_write=self.args["throttle_write"], ) with self.storage.winery.base.db.cursor() as c: while True: @@ -74,6 +76,8 @@ base_dsn=self.args["base_dsn"], shard_dsn=self.args["shard_dsn"], shard_max_size=self.args["shard_max_size"], + throttle_read=self.args["throttle_read"], + throttle_write=self.args["throttle_write"], ) self.payloads_define() random_content = open("/dev/urandom", "rb")