diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3e2bc1f..d35e042 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,41 +1,42 @@ exclude: winery-test-environment/mitogen-strategy repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.3.0 hooks: - id: trailing-whitespace - id: check-json - id: check-yaml - repo: https://github.com/pycqa/flake8 rev: 5.0.4 hooks: - id: flake8 additional_dependencies: [flake8-bugbear==22.9.23] - repo: https://github.com/codespell-project/codespell rev: v2.2.2 hooks: - id: codespell name: Check source code spelling + args: [-L inflight] stages: [commit] - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] - repo: https://github.com/PyCQA/isort rev: 5.10.1 hooks: - id: isort - repo: https://github.com/python/black rev: 22.10.0 hooks: - id: black diff --git a/swh/objstorage/backends/winery/rwshard.py b/swh/objstorage/backends/winery/rwshard.py index c77fb80..e6e4865 100644 --- a/swh/objstorage/backends/winery/rwshard.py +++ b/swh/objstorage/backends/winery/rwshard.py @@ -1,90 +1,90 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 from .database import Database, DatabaseAdmin class RWShard(Database): def __init__(self, name, **kwargs): self._name = name DatabaseAdmin(kwargs["base_dsn"], self.name).create_database() super().__init__(kwargs["shard_dsn"], self.name) self.create_tables() self.db = self.connect_database() self.size = self.total_size() self.limit = kwargs["shard_max_size"] def uninit(self): if hasattr(self, "db"): self.db.close() del self.db @property def lock(self): - return 452343 # an abitrary unique number + return 452343 # an arbitrary unique number @property def name(self): return self._name def is_full(self): return self.size > self.limit def drop(self): DatabaseAdmin(self.dsn, self.dbname).drop_database() @property def database_tables(self): return [ """ CREATE TABLE IF NOT EXISTS objects( key BYTEA PRIMARY KEY, content BYTEA ) """, ] def total_size(self): with self.db.cursor() as c: c.execute("SELECT SUM(LENGTH(content)) FROM objects") size = c.fetchone()[0] if size is None: return 0 else: return size def add(self, obj_id, content): try: with self.db.cursor() as c: c.execute( "INSERT INTO objects (key, content) VALUES (%s, %s)", (obj_id, content), ) self.db.commit() self.size += len(content) except psycopg2.errors.UniqueViolation: pass def get(self, obj_id): with self.db.cursor() as c: c.execute("SELECT content FROM objects WHERE key = %s", (obj_id,)) if c.rowcount == 0: return None else: return c.fetchone()[0].tobytes() def all(self): with self.db.cursor() as c: c.execute("SELECT key,content FROM objects") for row in c: yield row[0].tobytes(), row[1].tobytes() def count(self): with self.db.cursor() as c: c.execute("SELECT COUNT(*) FROM objects") return c.fetchone()[0] diff --git a/swh/objstorage/backends/winery/sharedbase.py b/swh/objstorage/backends/winery/sharedbase.py index f51f148..3bddccf 100644 --- a/swh/objstorage/backends/winery/sharedbase.py +++ b/swh/objstorage/backends/winery/sharedbase.py @@ -1,186 +1,186 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import uuid import psycopg2 from .database import Database, DatabaseAdmin class SharedBase(Database): def __init__(self, **kwargs): DatabaseAdmin(kwargs["base_dsn"], "sharedbase").create_database() super().__init__(kwargs["base_dsn"], "sharedbase") self.create_tables() self.db = self.connect_database() self._whoami = None def uninit(self): self.db.close() del self.db @property def lock(self): - return 314116 # an abitrary unique number + return 314116 # an arbitrary unique number @property def database_tables(self): return [ """ CREATE TABLE IF NOT EXISTS shards( id SERIAL PRIMARY KEY, readonly BOOLEAN NOT NULL, packing BOOLEAN NOT NULL, name CHAR(32) NOT NULL UNIQUE ) """, """ CREATE TABLE IF NOT EXISTS signature2shard( signature BYTEA PRIMARY KEY, inflight BOOLEAN NOT NULL, shard INTEGER NOT NULL ) """, ] @property def whoami(self): self.set_whoami() return self._whoami @property def id(self): self.set_whoami() return self._whoami_id def set_whoami(self): if self._whoami is not None: return while True: self._whoami, self._whoami_id = self.lock_a_shard() if self._whoami is not None: return self._whoami self.create_shard() def lock_a_shard(self): with self.db.cursor() as c: c.execute( "SELECT name FROM shards WHERE readonly = FALSE and packing = FALSE " "LIMIT 1 FOR UPDATE SKIP LOCKED" ) if c.rowcount == 0: return None, None name = c.fetchone()[0] return self.lock_shard(name) def lock_shard(self, name): self.whoami_lock = self.db.cursor() try: self.whoami_lock.execute( "SELECT name, id FROM shards " "WHERE readonly = FALSE AND packing = FALSE AND name = %s " "FOR UPDATE NOWAIT", (name,), ) return self.whoami_lock.fetchone() except psycopg2.Error: return None def unlock_shard(self): del self.whoami_lock def create_shard(self): name = uuid.uuid4().hex # # ensure the first character is not a number so it can be used as a # database name. # name = "i" + name[1:] with self.db.cursor() as c: c.execute( "INSERT INTO shards (name, readonly, packing) " "VALUES (%s, FALSE, FALSE)", (name,), ) self.db.commit() def shard_packing_starts(self): with self.db.cursor() as c: c.execute( "UPDATE shards SET packing = TRUE WHERE name = %s", (self.whoami,) ) self.unlock_shard() def shard_packing_ends(self, name): with self.db.cursor() as c: c.execute( "UPDATE shards SET readonly = TRUE, packing = FALSE " "WHERE name = %s", (name,), ) def get_shard_info(self, id): with self.db.cursor() as c: c.execute("SELECT name, readonly FROM shards WHERE id = %s", (id,)) if c.rowcount == 0: return None else: return c.fetchone() def list_shards(self): with self.db.cursor() as c: c.execute("SELECT name, readonly, packing FROM shards") for row in c: yield row[0], row[1], row[2] def contains(self, obj_id): with self.db.cursor() as c: c.execute( "SELECT shard FROM signature2shard WHERE " "signature = %s AND inflight = FALSE", (obj_id,), ) if c.rowcount == 0: return None else: return c.fetchone()[0] def get(self, obj_id): id = self.contains(obj_id) if id is None: return None return self.get_shard_info(id) def add_phase_1(self, obj_id): try: with self.db.cursor() as c: c.execute( "INSERT INTO signature2shard (signature, shard, inflight) " "VALUES (%s, %s, TRUE)", (obj_id, self.id), ) self.db.commit() return self.id except psycopg2.errors.UniqueViolation: with self.db.cursor() as c: c.execute( "SELECT shard FROM signature2shard WHERE " "signature = %s AND inflight = TRUE", (obj_id,), ) if c.rowcount == 0: return None else: return c.fetchone()[0] def add_phase_2(self, obj_id): with self.db.cursor() as c: c.execute( "UPDATE signature2shard SET inflight = FALSE " "WHERE signature = %s AND shard = %s", (obj_id, self.id), ) self.db.commit() diff --git a/swh/objstorage/backends/winery/throttler.py b/swh/objstorage/backends/winery/throttler.py index e6e2eaf..7fc4220 100644 --- a/swh/objstorage/backends/winery/throttler.py +++ b/swh/objstorage/backends/winery/throttler.py @@ -1,203 +1,203 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import datetime import logging import time from .database import Database, DatabaseAdmin logger = logging.getLogger(__name__) class LeakyBucket: """ Leaky bucket that can contain at most `total` and leaks it within a second. If adding (with the add method) more than `total` per second, it will sleep until the bucket can be filled without overflowing. The capacity of the bucket can be changed dynamically with the reset method. If the new capacity is lower than it previously was, the overflow is ignored. """ def __init__(self, total): self.updated = 0.0 self.current = 0.0 self.reset(total) def reset(self, total): self.total = total self.current = min(self.total, self.current) self._tick() def add(self, count): self._tick() if count > self.current: time.sleep((count - self.current) / self.total) self._tick() self.current -= min(self.total, count) def _tick(self): now = time.monotonic() self.current += self.total * (now - self.updated) self.current = int(min(self.total, self.current)) self.updated = now class BandwidthCalculator: """Keeps a histogram (of length `duration`, defaults to 60) where each element is the number of bytes read or written within a second. Only the last `duration` seconds are represented in the histogram: after each second the oldest element is discarded. The `add` method is called to add a value to the current second. The `get` method retrieves the current bandwidth usage which is the average of all values in the histogram. """ def __init__(self): self.duration = 60 self.history = collections.deque([0] * (self.duration - 1), self.duration - 1) self.current = 0 self.current_second = 0 def add(self, count): current_second = int(time.monotonic()) if current_second > self.current_second: self.history.append(self.current) self.history.extend( [0] * min(self.duration, current_second - self.current_second - 1) ) self.current_second = current_second self.current = 0 self.current += count def get(self): return (sum(self.history) + self.current) / self.duration class IOThrottler(Database): """Throttle IO (either read or write, depending on the `name` argument). The maximum speed in bytes is from the throttle_`name` argument and controlled by a LeakyBucket that guarantees it won't go any faster. Every `sync_interval` seconds the current bandwidth reported by the BandwidthCalculator instance is written into a row in a table shared with other instances of IOThrottler. The cumulated bandwidth of all other instances is retrieved from the same table. If it exceeds `max_speed`, the LeakyBucket instance is reset to only allow max_speed/(number of instances) so that the total bandwidth is shared equally between instances. """ def __init__(self, name, **kwargs): super().__init__(kwargs["base_dsn"], "throttler") self.name = name self.init_db() self.last_sync = 0 self.max_speed = kwargs["throttle_" + name] self.bucket = LeakyBucket(self.max_speed) self.bandwidth = BandwidthCalculator() def init_db(self): self.create_tables() self.db = self.connect_database() self.cursor = self.db.cursor() self.cursor.execute( f"INSERT INTO t_{self.name} (updated, bytes) VALUES (%s, %s) RETURNING id", (datetime.datetime.now(), 0), ) self.rowid = self.cursor.fetchone()[0] self.cursor.execute( f"SELECT * FROM t_{self.name} WHERE id = %s FOR UPDATE", (self.rowid,) ) self.cursor.execute( f"DELETE FROM t_{self.name} WHERE id IN (" f"SELECT id FROM t_{self.name} WHERE updated < NOW() - INTERVAL '30 days' " " FOR UPDATE SKIP LOCKED)" ) self.db.commit() self.sync_interval = 60 @property def lock(self): - return 9485433 # an abitrary unique number + return 9485433 # an arbitrary unique number @property def database_tables(self): return [ f""" CREATE TABLE IF NOT EXISTS t_{self.name}( id SERIAL PRIMARY KEY, updated TIMESTAMP NOT NULL, bytes INTEGER NOT NULL ) """, ] def download_info(self): self.cursor.execute( f"SELECT COUNT(*), SUM(bytes) FROM t_{self.name} " "WHERE bytes > 0 AND updated > NOW() - INTERVAL '5 minutes'" ) return self.cursor.fetchone() def upload_info(self): bytes = int(self.bandwidth.get()) logger.debug("%d: upload %s/s", self.rowid, bytes) self.cursor.execute( f"UPDATE t_{self.name} SET updated = %s, bytes = %s WHERE id = %s", (datetime.datetime.now(), bytes, self.rowid), ) self.db.commit() def add(self, count): self.bucket.add(count) self.bandwidth.add(count) self.maybe_sync() def sync(self): self.upload_info() (others_count, total_usage) = self.download_info() logger.debug( "%d: sync others_count=%s total_usage=%s", self.rowid, others_count, total_usage, ) if others_count > 0 and total_usage > self.max_speed: self.bucket.reset(self.max_speed / others_count) def maybe_sync(self): now = time.monotonic() if now - self.last_sync > self.sync_interval: self.sync() self.last_sync = now class Throttler: """Throttle reads and writes to not exceed limits imposed by the `thottle_read` and `throttle_write` arguments, as measured by the cumulated bandwidth reported by each Throttler instance. """ def __init__(self, **kwargs): DatabaseAdmin(kwargs["base_dsn"], "throttler").create_database() self.read = IOThrottler("read", **kwargs) self.write = IOThrottler("write", **kwargs) def throttle_get(self, fun, key): content = fun(key) self.read.add(len(content)) return content def throttle_add(self, fun, obj_id, content): self.write.add(len(obj_id) + len(content)) return fun(obj_id, content)