diff --git a/docs/winery.rst b/docs/winery.rst --- a/docs/winery.rst +++ b/docs/winery.rst @@ -5,6 +5,15 @@ The Winery backend implements the `Ceph based object storage architecture `__. +IO Throttling +-------------- + +Ceph (Pacific) implements IO QoS in librbd but it is only effective within a single process, not cluster wide. The preliminary benchmarks showed that accumulated read and write throughput must be throttled client side to prevent performance degradation (slower throughput and increased latency). + +Table are created in a PostgreSQL database dedicated to throttling, so independent processes performing I/O against the Ceph cluster can synchronize with each other and control their accumulated throughput for reads and writes. Workers creates a row in the read and write tables and update them every minute with their current read and write throughput, in bytes per second. They also query all rows to figure out the current accumulated bandwidth. + +If the current accumulated bandwidth is above the maximum desired speed for N active workers, the process will reduce its throughput to use a maximum of 1/N of the maximum desired speed. For instance, if the current accumulated usage is above 100MB/s and there are 10 workers, the process will reduce its own speed to 10MB/s. After the 10 workers independently do the same, each of them will share 1/10 of the bandwidth. + Implementation notes -------------------- diff --git a/swh/objstorage/backends/winery/database.py b/swh/objstorage/backends/winery/database.py --- a/swh/objstorage/backends/winery/database.py +++ b/swh/objstorage/backends/winery/database.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import abc from contextlib import contextmanager import logging import time @@ -12,9 +13,10 @@ logger = logging.getLogger(__name__) -class Database: - def __init__(self, dsn): +class DatabaseAdmin: + def __init__(self, dsn, dbname=None): self.dsn = dsn + self.dbname = dbname @contextmanager def admin_cursor(self): @@ -30,26 +32,26 @@ finally: c.close() - def create_database(self, database): + def create_database(self): with self.admin_cursor() as c: c.execute( "SELECT datname FROM pg_catalog.pg_database " - f"WHERE datname = '{database}'" + f"WHERE datname = '{self.dbname}'" ) if c.rowcount == 0: try: - c.execute(f"CREATE DATABASE {database}") + c.execute(f"CREATE DATABASE {self.dbname}") except psycopg2.errors.UniqueViolation: # someone else created the database, it is fine pass - def drop_database(self, database): + def drop_database(self): with self.admin_cursor() as c: c.execute( "SELECT pg_terminate_backend(pg_stat_activity.pid)" "FROM pg_stat_activity " "WHERE pg_stat_activity.datname = %s;", - (database,), + (self.dbname,), ) # # Dropping the database may fail because the server takes time @@ -72,13 +74,13 @@ # for i in range(60): try: - c.execute(f"DROP DATABASE IF EXISTS {database}") + c.execute(f"DROP DATABASE IF EXISTS {self.dbname}") return except psycopg2.errors.ObjectInUse: - logger.warning(f"{database} database drop fails, waiting 10s") + logger.warning(f"{self.dbname} database drop fails, waiting 10s") time.sleep(10) continue - raise Exception(f"database drop failed on {database}") + raise Exception(f"database drop failed on {self.dbname}") def list_databases(self): with self.admin_cursor() as c: @@ -87,3 +89,30 @@ "WHERE datistemplate = false and datname != 'postgres'" ) return [r[0] for r in c.fetchall()] + + +class Database(abc.ABC): + def __init__(self, dsn, dbname): + self.dsn = dsn + self.dbname = dbname + + @property + @abc.abstractmethod + def lock(self): + "Return an abitrary unique number for pg_advisory_lock when creating tables" + ... + + def create_tables(self, tables): + db = psycopg2.connect(dsn=self.dsn, dbname=self.dbname) + db.autocommit = True + c = db.cursor() + c.execute("SELECT pg_advisory_lock(%s)", (self.lock,)) + for table in tables: + c.execute(table) + c.close() + db.close() # so the pg_advisory_lock is released + + def connect_database(self): + db = psycopg2.connect(dsn=self.dsn, dbname=self.dbname) + db.autocommit = True + return db diff --git a/swh/objstorage/backends/winery/objstorage.py b/swh/objstorage/backends/winery/objstorage.py --- a/swh/objstorage/backends/winery/objstorage.py +++ b/swh/objstorage/backends/winery/objstorage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -103,10 +103,10 @@ self.rw.uninit() def run(self): - shard = self.ro.create(self.rw.count()) + self.ro.create(self.rw.count()) for obj_id, content in self.rw.all(): - shard.write(obj_id, content) - shard.save() + self.ro.add(content, obj_id) + self.ro.save() base = SharedBase(**self.args) base.shard_packing_ends(self.shard) base.uninit() diff --git a/swh/objstorage/backends/winery/roshard.py b/swh/objstorage/backends/winery/roshard.py --- a/swh/objstorage/backends/winery/roshard.py +++ b/swh/objstorage/backends/winery/roshard.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -9,6 +9,8 @@ from swh.perfecthash import Shard +from .throttler import Throttler + logger = logging.getLogger(__name__) @@ -59,6 +61,7 @@ class ROShard: def __init__(self, name, **kwargs): self.pool = Pool(shard_max_size=kwargs["shard_max_size"]) + self.throttler = Throttler(**kwargs) self.name = name def create(self, count): @@ -71,4 +74,10 @@ return self.shard.load() == self.shard def get(self, key): - return self.shard.lookup(key) + return self.throttler.throttle_get(self.shard.lookup, key) + + def add(self, content, obj_id): + return self.throttler.throttle_add(self.shard.write, obj_id, content) + + def save(self): + return self.shard.save() diff --git a/swh/objstorage/backends/winery/rwshard.py b/swh/objstorage/backends/winery/rwshard.py --- a/swh/objstorage/backends/winery/rwshard.py +++ b/swh/objstorage/backends/winery/rwshard.py @@ -6,15 +6,16 @@ import psycopg2 -from .database import Database +from .database import Database, DatabaseAdmin class RWShard(Database): def __init__(self, name, **kwargs): - super().__init__(kwargs["shard_dsn"]) self._name = name - self.create_database(self.name) - self.db = self.create_table(f"{self.dsn}/{self.name}") + DatabaseAdmin(kwargs["base_dsn"], self.name).create_database() + super().__init__(kwargs["shard_dsn"], self.name) + self.create_tables() + self.db = self.connect_database() self.size = self.total_size() self.limit = kwargs["shard_max_size"] @@ -23,6 +24,10 @@ self.db.close() del self.db + @property + def lock(self): + return 452343 # an abitrary unique number + @property def name(self): return self._name @@ -31,22 +36,18 @@ return self.size > self.limit def drop(self): - self.drop_database(self.name) + DatabaseAdmin(self.dsn, self.dbname).drop_database() - def create_table(self, dsn): - db = psycopg2.connect(dsn) - db.autocommit = True - c = db.cursor() - c.execute( + def create_tables(self): + tables = [ """ CREATE TABLE IF NOT EXISTS objects( key BYTEA PRIMARY KEY, content BYTEA ) - """ - ) - c.close() - return db + """, + ] + super().create_tables(tables) def total_size(self): with self.db.cursor() as c: diff --git a/swh/objstorage/backends/winery/sharedbase.py b/swh/objstorage/backends/winery/sharedbase.py --- a/swh/objstorage/backends/winery/sharedbase.py +++ b/swh/objstorage/backends/winery/sharedbase.py @@ -7,28 +7,27 @@ import psycopg2 -from .database import Database +from .database import Database, DatabaseAdmin class SharedBase(Database): def __init__(self, **kwargs): - super().__init__(kwargs["base_dsn"]) - database = "sharedbase" - self.create_database(database) - self.db = self.create_table(f"{self.dsn}/{database}") + DatabaseAdmin(kwargs["base_dsn"], "sharedbase").create_database() + super().__init__(kwargs["base_dsn"], "sharedbase") + self.create_tables() + self.db = self.connect_database() self._whoami = None def uninit(self): self.db.close() del self.db - def create_table(self, dsn): - db = psycopg2.connect(dsn) - db.autocommit = True - c = db.cursor() - lock = 314116 # an abitrary unique number - c.execute("SELECT pg_advisory_lock(%s)", (lock,)) - c.execute( + @property + def lock(self): + return 314116 # an abitrary unique number + + def create_tables(self): + tables = [ """ CREATE TABLE IF NOT EXISTS shards( id SERIAL PRIMARY KEY, @@ -36,22 +35,24 @@ packing BOOLEAN NOT NULL, name CHAR(32) NOT NULL UNIQUE ) - """ - ) - c.execute( + """, """ CREATE TABLE IF NOT EXISTS signature2shard( signature BYTEA PRIMARY KEY, inflight BOOLEAN NOT NULL, shard INTEGER NOT NULL ) - """ + """, + """ + CREATE TABLE IF NOT EXISTS iothrottling( + id SERIAL PRIMARY KEY, + updated TIMESTAMP NOT NULL, + bytesread INTEGER NOT NULL, + byteswritten INTEGER NOT NULL ) - c.close() - db.close() # so the pg_advisory_lock is released - db = psycopg2.connect(dsn) - db.autocommit = True - return db + """, + ] + super().create_tables(tables) @property def whoami(self): diff --git a/swh/objstorage/backends/winery/throttler.py b/swh/objstorage/backends/winery/throttler.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/throttler.py @@ -0,0 +1,159 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import collections +import datetime +import logging +import time + +from .database import Database, DatabaseAdmin + +logger = logging.getLogger(__name__) + + +class LeakyBucket: + def __init__(self, total): + self.updated = 0.0 + self.current = 0.0 + self.reset(total) + + def reset(self, total): + self.total = total + self.current = min(self.total, self.current) + self.tick() + + def tick(self): + now = time.monotonic() + self.current += self.total * (now - self.updated) + self.current = int(min(self.total, self.current)) + self.updated = now + + def add(self, count): + self.tick() + if count > self.current: + time.sleep((count - self.current) / self.total) + self.tick() + self.current -= min(self.total, count) + + +class BandwidthCalculator: + def __init__(self): + self.duration = 60 + self.history = collections.deque([0] * (self.duration - 1), self.duration - 1) + self.current = 0 + self.current_second = 0 + + def add(self, count): + current_second = int(time.monotonic()) + if current_second > self.current_second: + self.history.append(self.current) + self.history.extend( + [0] * min(self.duration, current_second - self.current_second - 1) + ) + self.current_second = current_second + self.current = 0 + self.current += count + + def get(self): + return (sum(self.history) + self.current) / self.duration + + +class IOThrottler(Database): + def __init__(self, name, **kwargs): + super().__init__(kwargs["base_dsn"], "throttler") + self.name = name + self.init_db() + self.last_sync = 0 + self.max_speed = kwargs["throttle_" + name] + self.bucket = LeakyBucket(self.max_speed) + self.bandwidth = BandwidthCalculator() + + def init_db(self): + self.create_tables() + self.db = self.connect_database() + self.cursor = self.db.cursor() + self.cursor.execute( + f"INSERT INTO t_{self.name} (updated, bytes) VALUES (%s, %s) RETURNING id", + (datetime.datetime.now(), 0), + ) + self.rowid = self.cursor.fetchone()[0] + self.cursor.execute( + f"SELECT * FROM t_{self.name} WHERE id = %s FOR UPDATE", (self.rowid,) + ) + self.cursor.execute( + f"DELETE FROM t_{self.name} WHERE id IN (" + f"SELECT id FROM t_{self.name} WHERE updated < NOW() - INTERVAL '30 days' " + " FOR UPDATE SKIP LOCKED)" + ) + self.db.commit() + self.sync_interval = 60 + + @property + def lock(self): + return 9485433 # an abitrary unique number + + def create_tables(self): + tables = [ + f""" + CREATE TABLE IF NOT EXISTS t_{self.name}( + id SERIAL PRIMARY KEY, + updated TIMESTAMP NOT NULL, + bytes INTEGER NOT NULL + ) + """, + ] + super().create_tables(tables) + + def download_info(self): + self.cursor.execute( + f"SELECT COUNT(*), SUM(bytes) FROM t_{self.name} " + "WHERE bytes > 0 AND updated > NOW() - INTERVAL '5 minutes'" + ) + return self.cursor.fetchone() + + def upload_info(self): + bytes = int(self.bandwidth.get()) + logger.debug(f"{self.rowid}: upload {bytes}/s") + self.cursor.execute( + f"UPDATE t_{self.name} SET updated = %s, bytes = %s WHERE id = %s", + (datetime.datetime.now(), bytes, self.rowid), + ) + self.db.commit() + + def add(self, count): + self.bucket.add(count) + self.bandwidth.add(count) + self.maybe_sync() + + def sync(self): + self.upload_info() + (others_count, total_usage) = self.download_info() + logger.debug( + f"{self.rowid}: sync others_count={others_count} total_usage={total_usage}" + ) + if others_count > 0 and total_usage > self.max_speed: + self.bucket.reset(self.max_speed / others_count) + + def maybe_sync(self): + now = time.monotonic() + if now - self.last_sync > self.sync_interval: + self.sync() + self.last_sync = now + + +class Throttler: + def __init__(self, **kwargs): + DatabaseAdmin(kwargs["base_dsn"], "throttler").create_database() + self.read = IOThrottler("read", **kwargs) + self.write = IOThrottler("write", **kwargs) + + def throttle_get(self, fun, key): + content = fun(key) + self.read.add(len(content)) + return content + + def throttle_add(self, fun, obj_id, content): + self.write.add(len(obj_id) + len(content)) + return fun(obj_id, content) diff --git a/swh/objstorage/tests/conftest.py b/swh/objstorage/tests/conftest.py --- a/swh/objstorage/tests/conftest.py +++ b/swh/objstorage/tests/conftest.py @@ -33,3 +33,15 @@ help="Number of requests a ro worker performs", default=1, ) + parser.addoption( + "--winery-bench-throttle-read", + type=int, + help="Maximum number of bytes per second read", + default=100 * 1024 * 1024, + ) + parser.addoption( + "--winery-bench-throttle-write", + type=int, + help="Maximum number of bytes per second write", + default=100 * 1024 * 1024, + ) diff --git a/swh/objstorage/tests/test_objstorage_winery.py b/swh/objstorage/tests/test_objstorage_winery.py --- a/swh/objstorage/tests/test_objstorage_winery.py +++ b/swh/objstorage/tests/test_objstorage_winery.py @@ -9,8 +9,14 @@ import sh from swh.objstorage import exc -from swh.objstorage.backends.winery.database import Database +from swh.objstorage.backends.winery.database import DatabaseAdmin from swh.objstorage.backends.winery.objstorage import Packer, pack +from swh.objstorage.backends.winery.throttler import ( + BandwidthCalculator, + IOThrottler, + LeakyBucket, + Throttler, +) from swh.objstorage.factory import get_objstorage from .winery_benchmark import Bench, work @@ -49,7 +55,12 @@ f":@{postgresql.info.host}:{postgresql.info.port}" ) storage = get_objstorage( - cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=shard_max_size + cls="winery", + base_dsn=dsn, + shard_dsn=dsn, + shard_max_size=shard_max_size, + throttle_write=200 * 1024 * 1024, + throttle_read=100 * 1024 * 1024, ) yield storage storage.winery.uninit() @@ -57,10 +68,10 @@ # pytest-postgresql will not remove databases that it did not # create between tests (only at the very end). # - d = Database(dsn) + d = DatabaseAdmin(dsn) for database in d.list_databases(): if database != postgresql.info.dbname and database != "tests_tmpl": - d.drop_database(database) + DatabaseAdmin(dsn, database).drop_database() @pytest.fixture @@ -204,6 +215,8 @@ "duration": pytestconfig.getoption("--winery-bench-duration"), "base_dsn": dsn, "shard_dsn": dsn, + "throttle_read": pytestconfig.getoption("--winery-bench-throttle-read"), + "throttle_write": pytestconfig.getoption("--winery-bench-throttle-write"), } assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] @@ -222,3 +235,138 @@ mocker.patch("swh.objstorage.tests.winery_benchmark.Worker.run", side_effect=run) assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] + + +def test_winery_leaky_bucket_tick(mocker): + total = 100 + half = 50 + b = LeakyBucket(total) + sleep = mocker.spy(time, "sleep") + assert b.current == b.total + sleep.assert_not_called() + # + # Bucket is at 100 and drops to 50 + # + b.add(half) + assert b.current == half + sleep.assert_not_called() + # + # Bucket is at 50 and drops to 0 + # + b.add(half) + assert b.current == 0 + sleep.assert_not_called() + # + # Bucket is at 0, waits until it is at 50 and drops to 0 + # + b.add(half) + assert b.current == 0 + sleep.assert_called() + # + # Sleep more than one second, bucket is full again + # + time.sleep(2) + mocker.resetall() + b.add(0) + assert b.current == total + sleep.assert_not_called() + # + # Bucket is full and and waits when requesting more than it can contain + # + b.add(total + half) + assert b.current == 0 + sleep.assert_called() + mocker.resetall() + # + # Bucket is empty and and waits when requesting more than it can contain + # + b.add(total + half) + assert b.current == 0 + sleep.assert_called() + mocker.resetall() + + +def test_winery_leaky_bucket_reset(): + b = LeakyBucket(100) + assert b.total == 100 + assert b.current == b.total + b.reset(50) + assert b.total == 50 + assert b.current == b.total + b.reset(100) + assert b.total == 100 + assert b.current == 50 + + +def test_winery_bandwidth_calculator(mocker): + now = 1 + + def monotonic(): + return now + + mocker.patch("time.monotonic", side_effect=monotonic) + b = BandwidthCalculator() + assert b.get() == 0 + count = 100 * 1024 * 1024 + going_up = [] + for t in range(b.duration): + now += 1 + b.add(count) + going_up.append(b.get()) + assert b.get() == count + going_down = [] + for t in range(b.duration - 1): + now += 1 + b.add(0) + going_down.append(b.get()) + going_down.reverse() + assert going_up[:-1] == going_down + assert len(b.history) == b.duration - 1 + + +def test_winery_io_throttler(postgresql, mocker): + dsn = ( + f"postgres://{postgresql.info.user}" + f":@{postgresql.info.host}:{postgresql.info.port}" + ) + DatabaseAdmin(dsn, "throttler").create_database() + sleep = mocker.spy(time, "sleep") + speed = 100 + i = IOThrottler("read", base_dsn=dsn, throttle_read=100) + count = speed + i.add(count) + sleep.assert_not_called() + i.add(count) + sleep.assert_called() + # + # Force slow down + # + mocker.resetall() + i.sync_interval = 0 + i.max_speed = 1 + assert i.max_speed != i.bucket.total + i.add(2) + assert i.max_speed == i.bucket.total + sleep.assert_called() + + +def test_winery_throttler(postgresql): + dsn = ( + f"postgres://{postgresql.info.user}" + f":@{postgresql.info.host}:{postgresql.info.port}" + ) + t = Throttler(base_dsn=dsn, throttle_read=100, throttle_write=100) + + base = {} + key = "KEY" + content = "CONTENT" + + def reader(k): + return base[k] + + def writer(k, v): + base[k] = v + return True + + assert t.throttle_add(writer, key, content) is True + assert t.throttle_get(reader, key) == content diff --git a/swh/objstorage/tests/winery_benchmark.py b/swh/objstorage/tests/winery_benchmark.py --- a/swh/objstorage/tests/winery_benchmark.py +++ b/swh/objstorage/tests/winery_benchmark.py @@ -34,6 +34,8 @@ base_dsn=self.args["base_dsn"], shard_dsn=self.args["shard_dsn"], shard_max_size=self.args["shard_max_size"], + throttle_read=self.args["throttle_read"], + throttle_write=self.args["throttle_write"], ) with self.storage.winery.base.db.cursor() as c: while True: @@ -74,6 +76,8 @@ base_dsn=self.args["base_dsn"], shard_dsn=self.args["shard_dsn"], shard_max_size=self.args["shard_max_size"], + throttle_read=self.args["throttle_read"], + throttle_write=self.args["throttle_write"], ) self.payloads_define() random_content = open("/dev/urandom", "rb")