diff --git a/docs/winery.rst b/docs/winery.rst index 56dbec7..016fbe6 100644 --- a/docs/winery.rst +++ b/docs/winery.rst @@ -1,40 +1,49 @@ .. _swh-objstorage-winery: Winery backend ============== The Winery backend implements the `Ceph based object storage architecture `__. +IO Throttling +-------------- + +Ceph (Pacific) implements IO QoS in librbd but it is only effective within a single process, not cluster wide. The preliminary benchmarks showed that accumulated read and write throughput must be throttled client side to prevent performance degradation (slower throughput and increased latency). + +Table are created in a PostgreSQL database dedicated to throttling, so independent processes performing I/O against the Ceph cluster can synchronize with each other and control their accumulated throughput for reads and writes. Workers creates a row in the read and write tables and update them every minute with their current read and write throughput, in bytes per second. They also query all rows to figure out the current accumulated bandwidth. + +If the current accumulated bandwidth is above the maximum desired speed for N active workers, the process will reduce its throughput to use a maximum of 1/N of the maximum desired speed. For instance, if the current accumulated usage is above 100MB/s and there are 10 workers, the process will reduce its own speed to 10MB/s. After the 10 workers independently do the same, each of them will share 1/10 of the bandwidth. + Implementation notes -------------------- The `sharedstorage.py` file contains the global index implementation that associates every object id to the shard it contains. A list of shard (either writable or readonly) is stored in a table, with a numeric id to save space. The name of the shard is used to create a database (for write shards) or a RBD image (for read shards). The `roshard.py` file contain the lookup function for a read shard and is a thin layer on top of swh-perfect hash. The `rwshard.py` file contains the logic to read, write and enumerate the objects of a write shard using SQL statements on the database dedicated to it. The `obstorage.py` file contains the backend implementation in the `WineryObjStorage` class. It is a thin layer that delegates writes to a `WineryWriter` instance and reads to a `WineryReader` instance. Although they are currently tightly coupled, they are likely to eventually run in different workers if performance and security requires it. A `WineryReader` must be able to read an object from both Read Shards and Write Shards. It will first determine the kind of shard the object belongs to by looking it up in the global index. If it is a Read Shard, it will lookup the object using the `ROShard` class from `roshard.py`, ultimately using a Ceph RBD image. If it is a Write Shard, it will lookup the object using the `RWShard` class from `rwshard.py`, ultimately using a PostgreSQL database. All `WineryWriter` operations are idempotent so they can be resumed in case they fail. When a `WineryWriter` is instantiated, it will either: * Find a Write Shard (i.e. a database) that is not locked by another instance by looking up the list of shards or, * Create a new Write Shard by creating a new database and it will lock the Write Shard and own it so no other instance tries to write to it. A PostgreSQL session lock is used to lock the shard so that it is released when the `WineryWrite` process dies unexpectedly and another process can pick it up. When a new object is added to the Write Shard, a new row is added to the global index to record that it is owned by this Write Shard and is in flight. Such an object cannot be read because it is not yet complete. If a request to write the same object is sent to another `WineryWriter` instance, it will fail to add it to the global index because it already exists. Since the object is in flight, the `WineryWriter` will check if the shard associated to the object is: * its name, which means it owns the object and must resume writing the object * not its name, which means another `WineryWriter` owns it and nothing needs to be done After the content of the object is successfully added to the Write Shard, the state of the record in the global index is modified to no longer be in flight. The client is notified that the operation was successful and the object can be read from the Write Shard from that point on. When the size of the database associated with a Write Shard exceeds a threshold, it is set to be in the `packing` state. All objects it contains can be read from it by any `WineryReader` but no new object will be added to it. A process is spawned and is tasked to transform it into a Read Shard using the `Packer` class. Should it fail for any reason, a cron job will restart it when it finds Write Shards that are both in the `packing` state and not locked by any process. Packing is done by enumerating all the records from the Write Shard database and writing them into a Read Shard by the same name. Incomplete Read Shards will never be used by `WineryReader` because the global index will direct it to use the Write Shard instead. Once the packing completes, the state of the shard is modified to be readonly and from that point on the `WineryReader` will only use the Read Shard to find the objects it contains. The database containing the Write Shard is then destroyed because it is no longer useful and the process terminates on success. Benchmarks ---------- Follow the instructions at winery-test-environment/README.md diff --git a/swh/objstorage/backends/winery/database.py b/swh/objstorage/backends/winery/database.py index e8c9c1d..25d6ec1 100644 --- a/swh/objstorage/backends/winery/database.py +++ b/swh/objstorage/backends/winery/database.py @@ -1,89 +1,124 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import abc from contextlib import contextmanager import logging import time import psycopg2 logger = logging.getLogger(__name__) -class Database: - def __init__(self, dsn): +class DatabaseAdmin: + def __init__(self, dsn, dbname=None): self.dsn = dsn + self.dbname = dbname @contextmanager def admin_cursor(self): db = psycopg2.connect(dsn=self.dsn, dbname="postgres") # https://wiki.postgresql.org/wiki/Psycopg2_Tutorial # If you want to drop the database you would need to # change the isolation level of the database. db.set_isolation_level(0) db.autocommit = True c = db.cursor() try: yield c finally: c.close() - def create_database(self, database): + def create_database(self): with self.admin_cursor() as c: c.execute( "SELECT datname FROM pg_catalog.pg_database " - f"WHERE datname = '{database}'" + f"WHERE datname = '{self.dbname}'" ) if c.rowcount == 0: try: - c.execute(f"CREATE DATABASE {database}") + c.execute(f"CREATE DATABASE {self.dbname}") except psycopg2.errors.UniqueViolation: # someone else created the database, it is fine pass - def drop_database(self, database): + def drop_database(self): with self.admin_cursor() as c: c.execute( "SELECT pg_terminate_backend(pg_stat_activity.pid)" "FROM pg_stat_activity " "WHERE pg_stat_activity.datname = %s;", - (database,), + (self.dbname,), ) # # Dropping the database may fail because the server takes time # to notice a connection was dropped and/or a named cursor is # in the process of being deleted. It can happen here or even # when deleting all database with the psql cli # and there are no process active. # # ERROR: database "i606428a5a6274d1ab09eecc4d019fef7" is being # accessed by other users DETAIL: There is 1 other session # using the database. # # See: # https://stackoverflow.com/questions/5108876/kill-a-postgresql-session-connection # # https://www.postgresql.org/docs/current/sql-dropdatabase.html # # WITH (FORCE) added in postgresql 13 but may also fail because the # named cursor may not be handled as a client. # for i in range(60): try: - c.execute(f"DROP DATABASE IF EXISTS {database}") + c.execute(f"DROP DATABASE IF EXISTS {self.dbname}") return except psycopg2.errors.ObjectInUse: - logger.warning(f"{database} database drop fails, waiting 10s") + logger.warning(f"{self.dbname} database drop fails, waiting 10s") time.sleep(10) continue - raise Exception(f"database drop failed on {database}") + raise Exception(f"database drop failed on {self.dbname}") def list_databases(self): with self.admin_cursor() as c: c.execute( "SELECT datname FROM pg_database " "WHERE datistemplate = false and datname != 'postgres'" ) return [r[0] for r in c.fetchall()] + + +class Database(abc.ABC): + def __init__(self, dsn, dbname): + self.dsn = dsn + self.dbname = dbname + + @property + @abc.abstractmethod + def lock(self): + "Return an arbitrary unique number for pg_advisory_lock when creating tables" + raise NotImplementedError("Database.lock") + + @property + @abc.abstractmethod + def database_tables(self): + "Return the list of CREATE TABLE statements for all tables in the database" + raise NotImplementedError("Database.database_tables") + + def create_tables(self): + db = psycopg2.connect(dsn=self.dsn, dbname=self.dbname) + db.autocommit = True + c = db.cursor() + c.execute("SELECT pg_advisory_lock(%s)", (self.lock,)) + for table in self.database_tables: + c.execute(table) + c.close() + db.close() # so the pg_advisory_lock is released + + def connect_database(self): + db = psycopg2.connect(dsn=self.dsn, dbname=self.dbname) + db.autocommit = True + return db diff --git a/swh/objstorage/backends/winery/objstorage.py b/swh/objstorage/backends/winery/objstorage.py index c60f3f3..58b064e 100644 --- a/swh/objstorage/backends/winery/objstorage.py +++ b/swh/objstorage/backends/winery/objstorage.py @@ -1,168 +1,168 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from multiprocessing import Process from swh.objstorage import exc from swh.objstorage.objstorage import ObjStorage, compute_hash from .roshard import ROShard from .rwshard import RWShard from .sharedbase import SharedBase logger = logging.getLogger(__name__) class WineryObjStorage(ObjStorage): def __init__(self, **kwargs): super().__init__(**kwargs) if kwargs.get("readonly"): self.winery = WineryReader(**kwargs) else: self.winery = WineryWriter(**kwargs) def uninit(self): self.winery.uninit() def get(self, obj_id): return self.winery.get(obj_id) def check_config(self, *, check_write): return True def __contains__(self, obj_id): return obj_id in self.winery def add(self, content, obj_id=None, check_presence=True): return self.winery.add(content, obj_id, check_presence) def check(self, obj_id): return self.winery.check(obj_id) def delete(self, obj_id): raise PermissionError("Delete is not allowed.") class WineryBase: def __init__(self, **kwargs): self.args = kwargs self.init() def init(self): self.base = SharedBase(**self.args) def uninit(self): self.base.uninit() def __contains__(self, obj_id): return self.base.contains(obj_id) class WineryReader(WineryBase): def roshard(self, name): shard = ROShard(name, **self.args) shard.load() return shard def get(self, obj_id): shard_info = self.base.get(obj_id) if shard_info is None: raise exc.ObjNotFoundError(obj_id) name, readonly = shard_info if readonly: shard = self.roshard(name) content = shard.get(obj_id) del shard else: shard = RWShard(name, **self.args) content = shard.get(obj_id) if content is None: raise exc.ObjNotFoundError(obj_id) return content def pack(shard, **kwargs): return Packer(shard, **kwargs).run() class Packer: def __init__(self, shard, **kwargs): self.args = kwargs self.shard = shard self.init() def init(self): self.rw = RWShard(self.shard, **self.args) self.ro = ROShard(self.shard, **self.args) def uninit(self): del self.ro self.rw.uninit() def run(self): - shard = self.ro.create(self.rw.count()) + self.ro.create(self.rw.count()) for obj_id, content in self.rw.all(): - shard.write(obj_id, content) - shard.save() + self.ro.add(content, obj_id) + self.ro.save() base = SharedBase(**self.args) base.shard_packing_ends(self.shard) base.uninit() self.rw.uninit() self.rw.drop() return True class WineryWriter(WineryReader): def __init__(self, **kwargs): super().__init__(**kwargs) self.packers = [] self.init() def init(self): super().init() self.shard = RWShard(self.base.whoami, **self.args) def uninit(self): self.shard.uninit() super().uninit() def add(self, content, obj_id=None, check_presence=True): if obj_id is None: obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id shard = self.base.add_phase_1(obj_id) if shard != self.base.id: # this object is the responsibility of another shard return obj_id self.shard.add(obj_id, content) self.base.add_phase_2(obj_id) if self.shard.is_full(): self.pack() return obj_id def check(self, obj_id): # load all shards packing == True and not locked (i.e. packer # was interrupted for whatever reason) run pack for each of them pass def pack(self): self.base.shard_packing_starts() p = Process(target=pack, args=(self.shard.name,), kwargs=self.args) self.uninit() p.start() self.packers.append(p) self.init() def __del__(self): for p in self.packers: p.kill() p.join() diff --git a/swh/objstorage/backends/winery/roshard.py b/swh/objstorage/backends/winery/roshard.py index 56310bb..cee9a01 100644 --- a/swh/objstorage/backends/winery/roshard.py +++ b/swh/objstorage/backends/winery/roshard.py @@ -1,74 +1,83 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import sh from swh.perfecthash import Shard +from .throttler import Throttler + logger = logging.getLogger(__name__) class Pool(object): name = "shards" def __init__(self, **kwargs): self.args = kwargs self.rbd = sh.sudo.bake("rbd", f"--pool={self.name}") self.ceph = sh.sudo.bake("ceph") self.image_size = self.args["shard_max_size"] * 2 def image_list(self): try: self.rbd.ls() except sh.ErrorReturnCode_2 as e: if "No such file or directory" in e.args[0]: return [] else: raise return [image.strip() for image in self.rbd.ls()] def image_path(self, image): return f"/dev/rbd/{self.name}/{image}" def image_create(self, image): logger.info(f"rdb --pool {self.name} create --size={self.image_size} {image}") self.rbd.create( f"--size={self.image_size}", f"--data-pool={self.name}-data", image ) self.rbd.feature.disable( f"{self.name}/{image}", "object-map", "fast-diff", "deep-flatten" ) self.image_map(image, "rw") def image_map(self, image, options): self.rbd.device("map", "-o", options, image) sh.sudo("chmod", "777", self.image_path(image)) def image_remap_ro(self, image): self.image_unmap(image) self.image_map(image, "ro") def image_unmap(self, image): self.rbd.device.unmap(f"{self.name}/{image}", _ok_code=(0, 22)) class ROShard: def __init__(self, name, **kwargs): self.pool = Pool(shard_max_size=kwargs["shard_max_size"]) + self.throttler = Throttler(**kwargs) self.name = name def create(self, count): self.pool.image_create(self.name) self.shard = Shard(self.pool.image_path(self.name)) return self.shard.create(count) def load(self): self.shard = Shard(self.pool.image_path(self.name)) return self.shard.load() == self.shard def get(self, key): - return self.shard.lookup(key) + return self.throttler.throttle_get(self.shard.lookup, key) + + def add(self, content, obj_id): + return self.throttler.throttle_add(self.shard.write, obj_id, content) + + def save(self): + return self.shard.save() diff --git a/swh/objstorage/backends/winery/rwshard.py b/swh/objstorage/backends/winery/rwshard.py index f7fab7a..c77fb80 100644 --- a/swh/objstorage/backends/winery/rwshard.py +++ b/swh/objstorage/backends/winery/rwshard.py @@ -1,89 +1,90 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 -from .database import Database +from .database import Database, DatabaseAdmin class RWShard(Database): def __init__(self, name, **kwargs): - super().__init__(kwargs["shard_dsn"]) self._name = name - self.create_database(self.name) - self.db = self.create_table(f"{self.dsn}/{self.name}") + DatabaseAdmin(kwargs["base_dsn"], self.name).create_database() + super().__init__(kwargs["shard_dsn"], self.name) + self.create_tables() + self.db = self.connect_database() self.size = self.total_size() self.limit = kwargs["shard_max_size"] def uninit(self): if hasattr(self, "db"): self.db.close() del self.db + @property + def lock(self): + return 452343 # an abitrary unique number + @property def name(self): return self._name def is_full(self): return self.size > self.limit def drop(self): - self.drop_database(self.name) + DatabaseAdmin(self.dsn, self.dbname).drop_database() - def create_table(self, dsn): - db = psycopg2.connect(dsn) - db.autocommit = True - c = db.cursor() - c.execute( + @property + def database_tables(self): + return [ """ CREATE TABLE IF NOT EXISTS objects( key BYTEA PRIMARY KEY, content BYTEA ) - """ - ) - c.close() - return db + """, + ] def total_size(self): with self.db.cursor() as c: c.execute("SELECT SUM(LENGTH(content)) FROM objects") size = c.fetchone()[0] if size is None: return 0 else: return size def add(self, obj_id, content): try: with self.db.cursor() as c: c.execute( "INSERT INTO objects (key, content) VALUES (%s, %s)", (obj_id, content), ) self.db.commit() self.size += len(content) except psycopg2.errors.UniqueViolation: pass def get(self, obj_id): with self.db.cursor() as c: c.execute("SELECT content FROM objects WHERE key = %s", (obj_id,)) if c.rowcount == 0: return None else: return c.fetchone()[0].tobytes() def all(self): with self.db.cursor() as c: c.execute("SELECT key,content FROM objects") for row in c: yield row[0].tobytes(), row[1].tobytes() def count(self): with self.db.cursor() as c: c.execute("SELECT COUNT(*) FROM objects") return c.fetchone()[0] diff --git a/swh/objstorage/backends/winery/sharedbase.py b/swh/objstorage/backends/winery/sharedbase.py index abad163..f51f148 100644 --- a/swh/objstorage/backends/winery/sharedbase.py +++ b/swh/objstorage/backends/winery/sharedbase.py @@ -1,193 +1,186 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import uuid import psycopg2 -from .database import Database +from .database import Database, DatabaseAdmin class SharedBase(Database): def __init__(self, **kwargs): - super().__init__(kwargs["base_dsn"]) - database = "sharedbase" - self.create_database(database) - self.db = self.create_table(f"{self.dsn}/{database}") + DatabaseAdmin(kwargs["base_dsn"], "sharedbase").create_database() + super().__init__(kwargs["base_dsn"], "sharedbase") + self.create_tables() + self.db = self.connect_database() self._whoami = None def uninit(self): self.db.close() del self.db - def create_table(self, dsn): - db = psycopg2.connect(dsn) - db.autocommit = True - c = db.cursor() - lock = 314116 # an abitrary unique number - c.execute("SELECT pg_advisory_lock(%s)", (lock,)) - c.execute( + @property + def lock(self): + return 314116 # an abitrary unique number + + @property + def database_tables(self): + return [ """ CREATE TABLE IF NOT EXISTS shards( id SERIAL PRIMARY KEY, readonly BOOLEAN NOT NULL, packing BOOLEAN NOT NULL, name CHAR(32) NOT NULL UNIQUE ) - """ - ) - c.execute( + """, """ CREATE TABLE IF NOT EXISTS signature2shard( signature BYTEA PRIMARY KEY, inflight BOOLEAN NOT NULL, shard INTEGER NOT NULL ) - """ - ) - c.close() - db.close() # so the pg_advisory_lock is released - db = psycopg2.connect(dsn) - db.autocommit = True - return db + """, + ] @property def whoami(self): self.set_whoami() return self._whoami @property def id(self): self.set_whoami() return self._whoami_id def set_whoami(self): if self._whoami is not None: return while True: self._whoami, self._whoami_id = self.lock_a_shard() if self._whoami is not None: return self._whoami self.create_shard() def lock_a_shard(self): with self.db.cursor() as c: c.execute( "SELECT name FROM shards WHERE readonly = FALSE and packing = FALSE " "LIMIT 1 FOR UPDATE SKIP LOCKED" ) if c.rowcount == 0: return None, None name = c.fetchone()[0] return self.lock_shard(name) def lock_shard(self, name): self.whoami_lock = self.db.cursor() try: self.whoami_lock.execute( "SELECT name, id FROM shards " "WHERE readonly = FALSE AND packing = FALSE AND name = %s " "FOR UPDATE NOWAIT", (name,), ) return self.whoami_lock.fetchone() except psycopg2.Error: return None def unlock_shard(self): del self.whoami_lock def create_shard(self): name = uuid.uuid4().hex # # ensure the first character is not a number so it can be used as a # database name. # name = "i" + name[1:] with self.db.cursor() as c: c.execute( "INSERT INTO shards (name, readonly, packing) " "VALUES (%s, FALSE, FALSE)", (name,), ) self.db.commit() def shard_packing_starts(self): with self.db.cursor() as c: c.execute( "UPDATE shards SET packing = TRUE WHERE name = %s", (self.whoami,) ) self.unlock_shard() def shard_packing_ends(self, name): with self.db.cursor() as c: c.execute( "UPDATE shards SET readonly = TRUE, packing = FALSE " "WHERE name = %s", (name,), ) def get_shard_info(self, id): with self.db.cursor() as c: c.execute("SELECT name, readonly FROM shards WHERE id = %s", (id,)) if c.rowcount == 0: return None else: return c.fetchone() def list_shards(self): with self.db.cursor() as c: c.execute("SELECT name, readonly, packing FROM shards") for row in c: yield row[0], row[1], row[2] def contains(self, obj_id): with self.db.cursor() as c: c.execute( "SELECT shard FROM signature2shard WHERE " "signature = %s AND inflight = FALSE", (obj_id,), ) if c.rowcount == 0: return None else: return c.fetchone()[0] def get(self, obj_id): id = self.contains(obj_id) if id is None: return None return self.get_shard_info(id) def add_phase_1(self, obj_id): try: with self.db.cursor() as c: c.execute( "INSERT INTO signature2shard (signature, shard, inflight) " "VALUES (%s, %s, TRUE)", (obj_id, self.id), ) self.db.commit() return self.id except psycopg2.errors.UniqueViolation: with self.db.cursor() as c: c.execute( "SELECT shard FROM signature2shard WHERE " "signature = %s AND inflight = TRUE", (obj_id,), ) if c.rowcount == 0: return None else: return c.fetchone()[0] def add_phase_2(self, obj_id): with self.db.cursor() as c: c.execute( "UPDATE signature2shard SET inflight = FALSE " "WHERE signature = %s AND shard = %s", (obj_id, self.id), ) self.db.commit() diff --git a/swh/objstorage/backends/winery/throttler.py b/swh/objstorage/backends/winery/throttler.py new file mode 100644 index 0000000..e6e2eaf --- /dev/null +++ b/swh/objstorage/backends/winery/throttler.py @@ -0,0 +1,203 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import collections +import datetime +import logging +import time + +from .database import Database, DatabaseAdmin + +logger = logging.getLogger(__name__) + + +class LeakyBucket: + """ + Leaky bucket that can contain at most `total` and leaks it within a second. + If adding (with the add method) more than `total` per second, it will sleep + until the bucket can be filled without overflowing. + + The capacity of the bucket can be changed dynamically with the reset method. + If the new capacity is lower than it previously was, the overflow is ignored. + """ + + def __init__(self, total): + self.updated = 0.0 + self.current = 0.0 + self.reset(total) + + def reset(self, total): + self.total = total + self.current = min(self.total, self.current) + self._tick() + + def add(self, count): + self._tick() + if count > self.current: + time.sleep((count - self.current) / self.total) + self._tick() + self.current -= min(self.total, count) + + def _tick(self): + now = time.monotonic() + self.current += self.total * (now - self.updated) + self.current = int(min(self.total, self.current)) + self.updated = now + + +class BandwidthCalculator: + """Keeps a histogram (of length `duration`, defaults to 60) where + each element is the number of bytes read or written within a + second. + + Only the last `duration` seconds are represented in the histogram: + after each second the oldest element is discarded. + + The `add` method is called to add a value to the current second. + + The `get` method retrieves the current bandwidth usage which is + the average of all values in the histogram. + """ + + def __init__(self): + self.duration = 60 + self.history = collections.deque([0] * (self.duration - 1), self.duration - 1) + self.current = 0 + self.current_second = 0 + + def add(self, count): + current_second = int(time.monotonic()) + if current_second > self.current_second: + self.history.append(self.current) + self.history.extend( + [0] * min(self.duration, current_second - self.current_second - 1) + ) + self.current_second = current_second + self.current = 0 + self.current += count + + def get(self): + return (sum(self.history) + self.current) / self.duration + + +class IOThrottler(Database): + """Throttle IO (either read or write, depending on the `name` + argument). The maximum speed in bytes is from the throttle_`name` + argument and controlled by a LeakyBucket that guarantees it won't + go any faster. + + Every `sync_interval` seconds the current bandwidth reported by + the BandwidthCalculator instance is written into a row in a table + shared with other instances of IOThrottler. The cumulated + bandwidth of all other instances is retrieved from the same table. + If it exceeds `max_speed`, the LeakyBucket instance is reset to + only allow max_speed/(number of instances) so that the total + bandwidth is shared equally between instances. + """ + + def __init__(self, name, **kwargs): + super().__init__(kwargs["base_dsn"], "throttler") + self.name = name + self.init_db() + self.last_sync = 0 + self.max_speed = kwargs["throttle_" + name] + self.bucket = LeakyBucket(self.max_speed) + self.bandwidth = BandwidthCalculator() + + def init_db(self): + self.create_tables() + self.db = self.connect_database() + self.cursor = self.db.cursor() + self.cursor.execute( + f"INSERT INTO t_{self.name} (updated, bytes) VALUES (%s, %s) RETURNING id", + (datetime.datetime.now(), 0), + ) + self.rowid = self.cursor.fetchone()[0] + self.cursor.execute( + f"SELECT * FROM t_{self.name} WHERE id = %s FOR UPDATE", (self.rowid,) + ) + self.cursor.execute( + f"DELETE FROM t_{self.name} WHERE id IN (" + f"SELECT id FROM t_{self.name} WHERE updated < NOW() - INTERVAL '30 days' " + " FOR UPDATE SKIP LOCKED)" + ) + self.db.commit() + self.sync_interval = 60 + + @property + def lock(self): + return 9485433 # an abitrary unique number + + @property + def database_tables(self): + return [ + f""" + CREATE TABLE IF NOT EXISTS t_{self.name}( + id SERIAL PRIMARY KEY, + updated TIMESTAMP NOT NULL, + bytes INTEGER NOT NULL + ) + """, + ] + + def download_info(self): + self.cursor.execute( + f"SELECT COUNT(*), SUM(bytes) FROM t_{self.name} " + "WHERE bytes > 0 AND updated > NOW() - INTERVAL '5 minutes'" + ) + return self.cursor.fetchone() + + def upload_info(self): + bytes = int(self.bandwidth.get()) + logger.debug("%d: upload %s/s", self.rowid, bytes) + self.cursor.execute( + f"UPDATE t_{self.name} SET updated = %s, bytes = %s WHERE id = %s", + (datetime.datetime.now(), bytes, self.rowid), + ) + self.db.commit() + + def add(self, count): + self.bucket.add(count) + self.bandwidth.add(count) + self.maybe_sync() + + def sync(self): + self.upload_info() + (others_count, total_usage) = self.download_info() + logger.debug( + "%d: sync others_count=%s total_usage=%s", + self.rowid, + others_count, + total_usage, + ) + if others_count > 0 and total_usage > self.max_speed: + self.bucket.reset(self.max_speed / others_count) + + def maybe_sync(self): + now = time.monotonic() + if now - self.last_sync > self.sync_interval: + self.sync() + self.last_sync = now + + +class Throttler: + """Throttle reads and writes to not exceed limits imposed by the + `thottle_read` and `throttle_write` arguments, as measured by the + cumulated bandwidth reported by each Throttler instance. + """ + + def __init__(self, **kwargs): + DatabaseAdmin(kwargs["base_dsn"], "throttler").create_database() + self.read = IOThrottler("read", **kwargs) + self.write = IOThrottler("write", **kwargs) + + def throttle_get(self, fun, key): + content = fun(key) + self.read.add(len(content)) + return content + + def throttle_add(self, fun, obj_id, content): + self.write.add(len(obj_id) + len(content)) + return fun(obj_id, content) diff --git a/swh/objstorage/tests/conftest.py b/swh/objstorage/tests/conftest.py index d3cda00..a8f858e 100644 --- a/swh/objstorage/tests/conftest.py +++ b/swh/objstorage/tests/conftest.py @@ -1,35 +1,47 @@ def pytest_configure(config): config.addinivalue_line("markers", "shard_max_size: winery backend") def pytest_addoption(parser): parser.addoption( "--winery-bench-rw-workers", type=int, help="Number of Read/Write workers", default=1, ) parser.addoption( "--winery-bench-ro-workers", type=int, help="Number of Readonly workers", default=1, ) parser.addoption( "--winery-bench-duration", type=int, help="Duration of the benchmarks in seconds", default=1, ) parser.addoption( "--winery-shard-max-size", type=int, help="Size of the shard in bytes", default=10 * 1024 * 1024, ) parser.addoption( "--winery-bench-ro-worker-max-request", type=int, help="Number of requests a ro worker performs", default=1, ) + parser.addoption( + "--winery-bench-throttle-read", + type=int, + help="Maximum number of bytes per second read", + default=100 * 1024 * 1024, + ) + parser.addoption( + "--winery-bench-throttle-write", + type=int, + help="Maximum number of bytes per second write", + default=100 * 1024 * 1024, + ) diff --git a/swh/objstorage/tests/test_objstorage_winery.py b/swh/objstorage/tests/test_objstorage_winery.py index 1577236..dbc5441 100644 --- a/swh/objstorage/tests/test_objstorage_winery.py +++ b/swh/objstorage/tests/test_objstorage_winery.py @@ -1,224 +1,374 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time import pytest import sh from swh.objstorage import exc -from swh.objstorage.backends.winery.database import Database +from swh.objstorage.backends.winery.database import DatabaseAdmin from swh.objstorage.backends.winery.objstorage import Packer, pack +from swh.objstorage.backends.winery.throttler import ( + BandwidthCalculator, + IOThrottler, + LeakyBucket, + Throttler, +) from swh.objstorage.factory import get_objstorage from .winery_benchmark import Bench, work from .winery_testing_helpers import PoolHelper, SharedBaseHelper @pytest.fixture def needs_ceph(): try: sh.ceph("--version") except sh.CommandNotFound: pytest.skip("the ceph CLI was not found") @pytest.fixture def ceph_pool(needs_ceph): pool = PoolHelper(shard_max_size=10 * 1024 * 1024) pool.clobber() pool.pool_create() yield pool pool.images_clobber() pool.clobber() @pytest.fixture def storage(request, postgresql): marker = request.node.get_closest_marker("shard_max_size") if marker is None: shard_max_size = 1024 else: shard_max_size = marker.args[0] dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) storage = get_objstorage( - cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=shard_max_size + cls="winery", + base_dsn=dsn, + shard_dsn=dsn, + shard_max_size=shard_max_size, + throttle_write=200 * 1024 * 1024, + throttle_read=100 * 1024 * 1024, ) yield storage storage.winery.uninit() # # pytest-postgresql will not remove databases that it did not # create between tests (only at the very end). # - d = Database(dsn) + d = DatabaseAdmin(dsn) for database in d.list_databases(): if database != postgresql.info.dbname and database != "tests_tmpl": - d.drop_database(database) + DatabaseAdmin(dsn, database).drop_database() @pytest.fixture def winery(storage): return storage.winery def test_winery_sharedbase(winery): base = winery.base shard1 = base.whoami assert shard1 is not None assert shard1 == base.whoami id1 = base.id assert id1 is not None assert id1 == base.id def test_winery_add_get(winery): shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" assert winery.add(content=content, obj_id=obj_id) == obj_id assert winery.add(content=content, obj_id=obj_id, check_presence=False) == obj_id assert winery.base.whoami == shard assert winery.get(obj_id) == content with pytest.raises(exc.ObjNotFoundError): winery.get(b"unknown") winery.shard.drop() @pytest.mark.shard_max_size(1) def test_winery_add_and_pack(winery, mocker): mocker.patch("swh.objstorage.backends.winery.objstorage.pack", return_value=True) shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" assert winery.base.whoami != shard assert len(winery.packers) == 1 packer = winery.packers[0] packer.join() assert packer.exitcode == 0 def test_winery_delete(storage): with pytest.raises(PermissionError): storage.delete(None) def test_winery_get_shard_info(winery): assert winery.base.get_shard_info(1234) is None assert SharedBaseHelper(winery.base).get_shard_info_by_name("nothing") is None @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_packer(winery, ceph_pool): shard = winery.base.whoami content = b"SOMETHING" winery.add(content=content) winery.base.shard_packing_starts() packer = Packer(shard, **winery.args) try: assert packer.run() is True finally: packer.uninit() readonly, packing = SharedBaseHelper(winery.base).get_shard_info_by_name(shard) assert readonly is True assert packing is False @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_get_object(winery, ceph_pool): shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) winery.base.shard_packing_starts() assert pack(shard, **winery.args) is True assert winery.get(obj_id) == content def test_winery_ceph_pool(needs_ceph): name = "IMAGE" pool = PoolHelper(shard_max_size=10 * 1024 * 1024) pool.clobber() pool.pool_create() pool.image_create(name) p = pool.image_path(name) assert p.endswith(name) something = "SOMETHING" open(p, "w").write(something) assert open(p).read(len(something)) == something assert pool.image_list() == [name] pool.image_remap_ro(name) pool.images_clobber() assert pool.image_list() == [name] pool.clobber() assert pool.image_list() == [] @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_bench_work(winery, ceph_pool, tmpdir): # # rw worker creates a shard # whoami = winery.base.whoami shards_info = list(winery.base.list_shards()) assert len(shards_info) == 1 shard, readonly, packing = shards_info[0] assert (readonly, packing) == (False, False) winery.args["dir"] = str(tmpdir) assert work("rw", winery.args) == "rw" shards_info = { name: (readonly, packing) for name, readonly, packing in winery.base.list_shards() } assert len(shards_info) == 2 assert shards_info[whoami] == (True, False) # # ro worker reads a shard # winery.args["ro_worker_max_request"] = 1 assert work("ro", winery.args) == "ro" @pytest.mark.asyncio async def test_winery_bench_real(pytestconfig, postgresql, ceph_pool): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) kwargs = { "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), "shard_max_size": pytestconfig.getoption("--winery-shard-max-size"), "ro_worker_max_request": pytestconfig.getoption( "--winery-bench-ro-worker-max-request" ), "duration": pytestconfig.getoption("--winery-bench-duration"), "base_dsn": dsn, "shard_dsn": dsn, + "throttle_read": pytestconfig.getoption("--winery-bench-throttle-read"), + "throttle_write": pytestconfig.getoption("--winery-bench-throttle-write"), } assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] @pytest.mark.asyncio async def test_winery_bench_fake(pytestconfig, mocker): kwargs = { "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), "duration": pytestconfig.getoption("--winery-bench-duration"), } def run(kind): time.sleep(kwargs["duration"] * 2) return kind mocker.patch("swh.objstorage.tests.winery_benchmark.Worker.run", side_effect=run) assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] + + +def test_winery_leaky_bucket_tick(mocker): + total = 100 + half = 50 + b = LeakyBucket(total) + sleep = mocker.spy(time, "sleep") + assert b.current == b.total + sleep.assert_not_called() + # + # Bucket is at 100, add(50) => drops to 50 + # + b.add(half) + assert b.current == half + sleep.assert_not_called() + # + # Bucket is at 50, add(50) => drops to 0 + # + b.add(half) + assert b.current == 0 + sleep.assert_not_called() + # + # Bucket is at 0, add(50) => waits until it is at 50 and then drops to 0 + # + b.add(half) + assert b.current == 0 + sleep.assert_called_once() + # + # Sleep more than one second, bucket is full again, i.e. at 100 + # + time.sleep(2) + mocker.resetall() + b.add(0) + assert b.current == total + sleep.assert_not_called() + # + # Bucket is full at 100 and and waits when requesting 150 which is + # more than it can contain + # + b.add(total + half) + assert b.current == 0 + sleep.assert_called_once() + mocker.resetall() + # + # Bucket is empty and and waits when requesting 150 which is more + # than it can contain + # + b.add(total + half) + assert b.current == 0 + sleep.assert_called_once() + mocker.resetall() + + +def test_winery_leaky_bucket_reset(): + b = LeakyBucket(100) + assert b.total == 100 + assert b.current == b.total + b.reset(50) + assert b.total == 50 + assert b.current == b.total + b.reset(100) + assert b.total == 100 + assert b.current == 50 + + +def test_winery_bandwidth_calculator(mocker): + now = 1 + + def monotonic(): + return now + + mocker.patch("time.monotonic", side_effect=monotonic) + b = BandwidthCalculator() + assert b.get() == 0 + count = 100 * 1024 * 1024 + going_up = [] + for t in range(b.duration): + now += 1 + b.add(count) + going_up.append(b.get()) + assert b.get() == count + going_down = [] + for t in range(b.duration - 1): + now += 1 + b.add(0) + going_down.append(b.get()) + going_down.reverse() + assert going_up[:-1] == going_down + assert len(b.history) == b.duration - 1 + + +def test_winery_io_throttler(postgresql, mocker): + dsn = ( + f"postgres://{postgresql.info.user}" + f":@{postgresql.info.host}:{postgresql.info.port}" + ) + DatabaseAdmin(dsn, "throttler").create_database() + sleep = mocker.spy(time, "sleep") + speed = 100 + i = IOThrottler("read", base_dsn=dsn, throttle_read=100) + count = speed + i.add(count) + sleep.assert_not_called() + i.add(count) + sleep.assert_called_once() + # + # Force slow down + # + mocker.resetall() + i.sync_interval = 0 + i.max_speed = 1 + assert i.max_speed != i.bucket.total + i.add(2) + assert i.max_speed == i.bucket.total + sleep.assert_called_once() + + +def test_winery_throttler(postgresql): + dsn = ( + f"postgres://{postgresql.info.user}" + f":@{postgresql.info.host}:{postgresql.info.port}" + ) + t = Throttler(base_dsn=dsn, throttle_read=100, throttle_write=100) + + base = {} + key = "KEY" + content = "CONTENT" + + def reader(k): + return base[k] + + def writer(k, v): + base[k] = v + return True + + assert t.throttle_add(writer, key, content) is True + assert t.throttle_get(reader, key) == content diff --git a/swh/objstorage/tests/winery_benchmark.py b/swh/objstorage/tests/winery_benchmark.py index 419cbb9..36ccc35 100644 --- a/swh/objstorage/tests/winery_benchmark.py +++ b/swh/objstorage/tests/winery_benchmark.py @@ -1,145 +1,149 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import concurrent.futures import logging import os import random import time from swh.objstorage.factory import get_objstorage logger = logging.getLogger(__name__) def work(kind, args): return Worker(args).run(kind) class Worker(object): def __init__(self, args): self.args = args def run(self, kind): getattr(self, kind)() return kind def ro(self): self.storage = get_objstorage( cls="winery", readonly=True, base_dsn=self.args["base_dsn"], shard_dsn=self.args["shard_dsn"], shard_max_size=self.args["shard_max_size"], + throttle_read=self.args["throttle_read"], + throttle_write=self.args["throttle_write"], ) with self.storage.winery.base.db.cursor() as c: while True: c.execute( "SELECT signature FROM signature2shard WHERE inflight = FALSE " "ORDER BY random() LIMIT %s", (self.args["ro_worker_max_request"],), ) if c.rowcount > 0: break logger.info(f"Worker(ro, {os.getpid()}): empty, waiting") time.sleep(1) logger.info(f"Worker(ro, {os.getpid()}): requesting {c.rowcount} objects") start = time.time() for row in c: obj_id = row[0].tobytes() assert self.storage.get(obj_id) is not None elapsed = time.time() - start logger.info(f"Worker(ro, {os.getpid()}): finished ({elapsed:.2f}s)") def payloads_define(self): self.payloads = [ 3 * 1024 + 1, 3 * 1024 + 1, 3 * 1024 + 1, 3 * 1024 + 1, 3 * 1024 + 1, 10 * 1024 + 1, 13 * 1024 + 1, 16 * 1024 + 1, 70 * 1024 + 1, 80 * 1024 + 1, ] def rw(self): self.storage = get_objstorage( cls="winery", base_dsn=self.args["base_dsn"], shard_dsn=self.args["shard_dsn"], shard_max_size=self.args["shard_max_size"], + throttle_read=self.args["throttle_read"], + throttle_write=self.args["throttle_write"], ) self.payloads_define() random_content = open("/dev/urandom", "rb") logger.info(f"Worker(rw, {os.getpid()}): start") start = time.time() count = 0 while len(self.storage.winery.packers) == 0: content = random_content.read(random.choice(self.payloads)) self.storage.add(content=content) count += 1 logger.info(f"Worker(rw, {os.getpid()}): packing {count} objects") packer = self.storage.winery.packers[0] packer.join() assert packer.exitcode == 0 elapsed = time.time() - start logger.info(f"Worker(rw, {os.getpid()}): finished ({elapsed:.2f}s)") class Bench(object): def __init__(self, args): self.args = args def timer_start(self): self.start = time.time() def timeout(self): return time.time() - self.start > self.args["duration"] async def run(self): self.timer_start() loop = asyncio.get_running_loop() workers_count = self.args["rw_workers"] + self.args["ro_workers"] with concurrent.futures.ProcessPoolExecutor( max_workers=workers_count ) as executor: logger.info("Bench.run: running") self.count = 0 workers = set() def create_worker(kind): self.count += 1 logger.info(f"Bench.run: launched {kind} worker number {self.count}") return loop.run_in_executor(executor, work, kind, self.args) for kind in ["rw"] * self.args["rw_workers"] + ["ro"] * self.args[ "ro_workers" ]: workers.add(create_worker(kind)) while len(workers) > 0: logger.info(f"Bench.run: waiting for {len(workers)} workers") current = workers done, pending = await asyncio.wait( current, return_when=asyncio.FIRST_COMPLETED ) workers = pending for task in done: kind = task.result() logger.info(f"Bench.run: worker {kind} complete") if not self.timeout(): workers.add(create_worker(kind)) logger.info("Bench.run: finished") return self.count diff --git a/swh/objstorage/tests/winery_testing_helpers.py b/swh/objstorage/tests/winery_testing_helpers.py index 3fceace..4f28240 100644 --- a/swh/objstorage/tests/winery_testing_helpers.py +++ b/swh/objstorage/tests/winery_testing_helpers.py @@ -1,64 +1,64 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.objstorage.backends.winery.roshard import Pool logger = logging.getLogger(__name__) class SharedBaseHelper: def __init__(self, sharedbase): self.sharedbase = sharedbase def get_shard_info_by_name(self, name): with self.sharedbase.db.cursor() as c: c.execute("SELECT readonly, packing FROM shards WHERE name = %s", (name,)) if c.rowcount == 0: return None else: return c.fetchone() class PoolHelper(Pool): def image_delete(self, image): self.image_unmap(image) logger.info(f"rdb --pool {self.name} remove {image}") self.rbd.remove(image) def images_clobber(self): for image in self.image_list(): image = image.strip() self.image_unmap(image) def clobber(self): self.images_clobber() self.pool_clobber() def pool_clobber(self): logger.info(f"ceph osd pool delete {self.name}") self.ceph.osd.pool.delete(self.name, self.name, "--yes-i-really-really-mean-it") data = f"{self.name}-data" logger.info(f"ceph osd pool delete {data}") self.ceph.osd.pool.delete(data, data, "--yes-i-really-really-mean-it") def pool_create(self): data = f"{self.name}-data" logger.info(f"ceph osd pool create {data}") self.ceph.osd( "erasure-code-profile", "set", "--force", data, "k=4", "m=2", "crush-failure-domain=host", ) - self.ceph.osd.pool.create(data, "200", "erasure", data) + self.ceph.osd.pool.create(data, "100", "erasure", data) self.ceph.osd.pool.set(data, "allow_ec_overwrites", "true") self.ceph.osd.pool.set(data, "pg_autoscale_mode", "off") logger.info(f"ceph osd pool create {self.name}") self.ceph.osd.pool.create(self.name)