diff --git a/swh/objstorage/backends/winery/objstorage.py b/swh/objstorage/backends/winery/objstorage.py index 58b064e..d362625 100644 --- a/swh/objstorage/backends/winery/objstorage.py +++ b/swh/objstorage/backends/winery/objstorage.py @@ -1,168 +1,173 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from multiprocessing import Process from swh.objstorage import exc from swh.objstorage.objstorage import ObjStorage, compute_hash from .roshard import ROShard from .rwshard import RWShard from .sharedbase import SharedBase +from .stats import Stats logger = logging.getLogger(__name__) class WineryObjStorage(ObjStorage): def __init__(self, **kwargs): super().__init__(**kwargs) if kwargs.get("readonly"): self.winery = WineryReader(**kwargs) else: self.winery = WineryWriter(**kwargs) def uninit(self): self.winery.uninit() def get(self, obj_id): return self.winery.get(obj_id) def check_config(self, *, check_write): return True def __contains__(self, obj_id): return obj_id in self.winery def add(self, content, obj_id=None, check_presence=True): return self.winery.add(content, obj_id, check_presence) def check(self, obj_id): return self.winery.check(obj_id) def delete(self, obj_id): raise PermissionError("Delete is not allowed.") class WineryBase: def __init__(self, **kwargs): self.args = kwargs self.init() def init(self): self.base = SharedBase(**self.args) def uninit(self): self.base.uninit() def __contains__(self, obj_id): return self.base.contains(obj_id) class WineryReader(WineryBase): def roshard(self, name): shard = ROShard(name, **self.args) shard.load() return shard def get(self, obj_id): shard_info = self.base.get(obj_id) if shard_info is None: raise exc.ObjNotFoundError(obj_id) name, readonly = shard_info if readonly: shard = self.roshard(name) content = shard.get(obj_id) del shard else: shard = RWShard(name, **self.args) content = shard.get(obj_id) if content is None: raise exc.ObjNotFoundError(obj_id) return content def pack(shard, **kwargs): return Packer(shard, **kwargs).run() class Packer: def __init__(self, shard, **kwargs): + self.stats = Stats(kwargs.get("output_dir")) self.args = kwargs self.shard = shard self.init() def init(self): self.rw = RWShard(self.shard, **self.args) self.ro = ROShard(self.shard, **self.args) def uninit(self): del self.ro self.rw.uninit() def run(self): self.ro.create(self.rw.count()) for obj_id, content in self.rw.all(): self.ro.add(content, obj_id) + if self.stats.stats_active: + self.stats.stats_read(obj_id, content) + self.stats.stats_write(obj_id, content) self.ro.save() base = SharedBase(**self.args) base.shard_packing_ends(self.shard) base.uninit() self.rw.uninit() self.rw.drop() return True class WineryWriter(WineryReader): def __init__(self, **kwargs): super().__init__(**kwargs) self.packers = [] self.init() def init(self): super().init() self.shard = RWShard(self.base.whoami, **self.args) def uninit(self): self.shard.uninit() super().uninit() def add(self, content, obj_id=None, check_presence=True): if obj_id is None: obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id shard = self.base.add_phase_1(obj_id) if shard != self.base.id: # this object is the responsibility of another shard return obj_id self.shard.add(obj_id, content) self.base.add_phase_2(obj_id) if self.shard.is_full(): self.pack() return obj_id def check(self, obj_id): # load all shards packing == True and not locked (i.e. packer # was interrupted for whatever reason) run pack for each of them pass def pack(self): self.base.shard_packing_starts() p = Process(target=pack, args=(self.shard.name,), kwargs=self.args) self.uninit() p.start() self.packers.append(p) self.init() def __del__(self): for p in self.packers: p.kill() p.join() diff --git a/swh/objstorage/backends/winery/roshard.py b/swh/objstorage/backends/winery/roshard.py index cee9a01..07d96da 100644 --- a/swh/objstorage/backends/winery/roshard.py +++ b/swh/objstorage/backends/winery/roshard.py @@ -1,83 +1,83 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import sh from swh.perfecthash import Shard from .throttler import Throttler logger = logging.getLogger(__name__) class Pool(object): name = "shards" def __init__(self, **kwargs): self.args = kwargs self.rbd = sh.sudo.bake("rbd", f"--pool={self.name}") self.ceph = sh.sudo.bake("ceph") - self.image_size = self.args["shard_max_size"] * 2 + self.image_size = int((self.args["shard_max_size"] * 2) / (1024 * 1024)) def image_list(self): try: self.rbd.ls() except sh.ErrorReturnCode_2 as e: if "No such file or directory" in e.args[0]: return [] else: raise return [image.strip() for image in self.rbd.ls()] def image_path(self, image): return f"/dev/rbd/{self.name}/{image}" def image_create(self, image): logger.info(f"rdb --pool {self.name} create --size={self.image_size} {image}") self.rbd.create( f"--size={self.image_size}", f"--data-pool={self.name}-data", image ) self.rbd.feature.disable( f"{self.name}/{image}", "object-map", "fast-diff", "deep-flatten" ) self.image_map(image, "rw") def image_map(self, image, options): self.rbd.device("map", "-o", options, image) sh.sudo("chmod", "777", self.image_path(image)) def image_remap_ro(self, image): self.image_unmap(image) self.image_map(image, "ro") def image_unmap(self, image): self.rbd.device.unmap(f"{self.name}/{image}", _ok_code=(0, 22)) class ROShard: def __init__(self, name, **kwargs): self.pool = Pool(shard_max_size=kwargs["shard_max_size"]) self.throttler = Throttler(**kwargs) self.name = name def create(self, count): self.pool.image_create(self.name) self.shard = Shard(self.pool.image_path(self.name)) return self.shard.create(count) def load(self): self.shard = Shard(self.pool.image_path(self.name)) return self.shard.load() == self.shard def get(self, key): return self.throttler.throttle_get(self.shard.lookup, key) def add(self, content, obj_id): return self.throttler.throttle_add(self.shard.write, obj_id, content) def save(self): return self.shard.save() diff --git a/swh/objstorage/backends/winery/stats.py b/swh/objstorage/backends/winery/stats.py new file mode 100644 index 0000000..396bc05 --- /dev/null +++ b/swh/objstorage/backends/winery/stats.py @@ -0,0 +1,87 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import os +import time + +logger = logging.getLogger(__name__) + + +class Stats: + def __init__(self, d): + if d is None: + self._stats_active = False + return + + self._stats_active = True + if not os.path.exists(d): + os.makedirs(d) + self._stats_filename = f"{d}/{os.getpid()}.csv" + self._stats_fd = open(self.stats_filename, "w") + self._stats_fd.write( + # time in seconds since epoch + "time," + # total number of objects written at this point in time + "object_write_count," + # total number of bytes written at this point in time + "bytes_write," + # total number of objects read at this point in time + "object_read_count," + # total number of bytes read at this point in time + "bytes_read" + "\n" + ) + self._stats_fd.flush() + self._stats_last_write = time.monotonic() + self._stats_flush_interval = 5 + self._stats = { + "object_write_count": 0, + "bytes_write": 0, + "object_read_count": 0, + "bytes_read": 0, + } + + @property + def stats_active(self): + return self._stats_active + + @property + def stats_filename(self): + return self._stats_filename + + def __del__(self): + if self.stats_active and not self._stats_fd.closed: + self._stats_print() + self._stats_fd.close() + + def _stats_print(self): + ll = ",".join( + str(self._stats[x]) + for x in [ + "object_write_count", + "bytes_write", + "object_read_count", + "bytes_read", + ] + ) + self._stats_fd.write(f"{int(time.monotonic())},{ll}\n") + self._stats_fd.flush() + + def _stats_maybe_print(self): + now = time.monotonic() + if now - self._stats_last_write > self._stats_flush_interval: + self._stats_print() + self._stats_last_write = now + + def stats_read(self, key, content): + self._stats["object_read_count"] += 1 + self._stats["bytes_read"] += len(key) + len(content) + self._stats_maybe_print() + + def stats_write(self, key, content): + self._stats["object_write_count"] += 1 + self._stats["bytes_write"] += len(key) + len(content) + self._stats_maybe_print() diff --git a/swh/objstorage/tests/conftest.py b/swh/objstorage/tests/conftest.py index a8f858e..aa3c778 100644 --- a/swh/objstorage/tests/conftest.py +++ b/swh/objstorage/tests/conftest.py @@ -1,47 +1,52 @@ def pytest_configure(config): config.addinivalue_line("markers", "shard_max_size: winery backend") def pytest_addoption(parser): + parser.addoption( + "--winery-bench-output-directory", + help="Directory in which the performance results are stored", + default="/tmp/winery", + ) parser.addoption( "--winery-bench-rw-workers", type=int, help="Number of Read/Write workers", default=1, ) parser.addoption( "--winery-bench-ro-workers", type=int, help="Number of Readonly workers", default=1, ) parser.addoption( "--winery-bench-duration", type=int, help="Duration of the benchmarks in seconds", default=1, ) parser.addoption( "--winery-shard-max-size", type=int, help="Size of the shard in bytes", default=10 * 1024 * 1024, ) parser.addoption( "--winery-bench-ro-worker-max-request", type=int, help="Number of requests a ro worker performs", default=1, ) parser.addoption( "--winery-bench-throttle-read", type=int, help="Maximum number of bytes per second read", default=100 * 1024 * 1024, ) parser.addoption( "--winery-bench-throttle-write", type=int, help="Maximum number of bytes per second write", default=100 * 1024 * 1024, ) diff --git a/swh/objstorage/tests/test_objstorage_winery.py b/swh/objstorage/tests/test_objstorage_winery.py index dbc5441..d7b5ba3 100644 --- a/swh/objstorage/tests/test_objstorage_winery.py +++ b/swh/objstorage/tests/test_objstorage_winery.py @@ -1,374 +1,396 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os import time import pytest import sh from swh.objstorage import exc from swh.objstorage.backends.winery.database import DatabaseAdmin from swh.objstorage.backends.winery.objstorage import Packer, pack +from swh.objstorage.backends.winery.stats import Stats from swh.objstorage.backends.winery.throttler import ( BandwidthCalculator, IOThrottler, LeakyBucket, Throttler, ) from swh.objstorage.factory import get_objstorage from .winery_benchmark import Bench, work from .winery_testing_helpers import PoolHelper, SharedBaseHelper @pytest.fixture def needs_ceph(): try: sh.ceph("--version") except sh.CommandNotFound: pytest.skip("the ceph CLI was not found") @pytest.fixture def ceph_pool(needs_ceph): pool = PoolHelper(shard_max_size=10 * 1024 * 1024) pool.clobber() pool.pool_create() yield pool pool.images_clobber() pool.clobber() @pytest.fixture def storage(request, postgresql): marker = request.node.get_closest_marker("shard_max_size") if marker is None: shard_max_size = 1024 else: shard_max_size = marker.args[0] dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) storage = get_objstorage( cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=shard_max_size, throttle_write=200 * 1024 * 1024, throttle_read=100 * 1024 * 1024, ) yield storage storage.winery.uninit() # # pytest-postgresql will not remove databases that it did not # create between tests (only at the very end). # d = DatabaseAdmin(dsn) for database in d.list_databases(): if database != postgresql.info.dbname and database != "tests_tmpl": DatabaseAdmin(dsn, database).drop_database() @pytest.fixture def winery(storage): return storage.winery def test_winery_sharedbase(winery): base = winery.base shard1 = base.whoami assert shard1 is not None assert shard1 == base.whoami id1 = base.id assert id1 is not None assert id1 == base.id def test_winery_add_get(winery): shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" assert winery.add(content=content, obj_id=obj_id) == obj_id assert winery.add(content=content, obj_id=obj_id, check_presence=False) == obj_id assert winery.base.whoami == shard assert winery.get(obj_id) == content with pytest.raises(exc.ObjNotFoundError): winery.get(b"unknown") winery.shard.drop() @pytest.mark.shard_max_size(1) def test_winery_add_and_pack(winery, mocker): mocker.patch("swh.objstorage.backends.winery.objstorage.pack", return_value=True) shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" assert winery.base.whoami != shard assert len(winery.packers) == 1 packer = winery.packers[0] packer.join() assert packer.exitcode == 0 def test_winery_delete(storage): with pytest.raises(PermissionError): storage.delete(None) def test_winery_get_shard_info(winery): assert winery.base.get_shard_info(1234) is None assert SharedBaseHelper(winery.base).get_shard_info_by_name("nothing") is None @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_packer(winery, ceph_pool): shard = winery.base.whoami content = b"SOMETHING" winery.add(content=content) winery.base.shard_packing_starts() packer = Packer(shard, **winery.args) try: assert packer.run() is True finally: packer.uninit() readonly, packing = SharedBaseHelper(winery.base).get_shard_info_by_name(shard) assert readonly is True assert packing is False @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_get_object(winery, ceph_pool): shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) winery.base.shard_packing_starts() assert pack(shard, **winery.args) is True assert winery.get(obj_id) == content def test_winery_ceph_pool(needs_ceph): name = "IMAGE" pool = PoolHelper(shard_max_size=10 * 1024 * 1024) pool.clobber() pool.pool_create() pool.image_create(name) p = pool.image_path(name) assert p.endswith(name) something = "SOMETHING" open(p, "w").write(something) assert open(p).read(len(something)) == something assert pool.image_list() == [name] pool.image_remap_ro(name) pool.images_clobber() assert pool.image_list() == [name] pool.clobber() assert pool.image_list() == [] @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_bench_work(winery, ceph_pool, tmpdir): # # rw worker creates a shard # whoami = winery.base.whoami shards_info = list(winery.base.list_shards()) assert len(shards_info) == 1 shard, readonly, packing = shards_info[0] assert (readonly, packing) == (False, False) winery.args["dir"] = str(tmpdir) assert work("rw", winery.args) == "rw" shards_info = { name: (readonly, packing) for name, readonly, packing in winery.base.list_shards() } assert len(shards_info) == 2 assert shards_info[whoami] == (True, False) # # ro worker reads a shard # winery.args["ro_worker_max_request"] = 1 assert work("ro", winery.args) == "ro" @pytest.mark.asyncio async def test_winery_bench_real(pytestconfig, postgresql, ceph_pool): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) kwargs = { + "output_dir": pytestconfig.getoption("--winery-bench-output-directory"), "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), "shard_max_size": pytestconfig.getoption("--winery-shard-max-size"), "ro_worker_max_request": pytestconfig.getoption( "--winery-bench-ro-worker-max-request" ), "duration": pytestconfig.getoption("--winery-bench-duration"), "base_dsn": dsn, "shard_dsn": dsn, "throttle_read": pytestconfig.getoption("--winery-bench-throttle-read"), "throttle_write": pytestconfig.getoption("--winery-bench-throttle-write"), } - assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] + count = await Bench(kwargs).run() + assert count > 0 @pytest.mark.asyncio async def test_winery_bench_fake(pytestconfig, mocker): kwargs = { "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), "duration": pytestconfig.getoption("--winery-bench-duration"), } def run(kind): time.sleep(kwargs["duration"] * 2) return kind mocker.patch("swh.objstorage.tests.winery_benchmark.Worker.run", side_effect=run) assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] def test_winery_leaky_bucket_tick(mocker): total = 100 half = 50 b = LeakyBucket(total) sleep = mocker.spy(time, "sleep") assert b.current == b.total sleep.assert_not_called() # # Bucket is at 100, add(50) => drops to 50 # b.add(half) assert b.current == half sleep.assert_not_called() # # Bucket is at 50, add(50) => drops to 0 # b.add(half) assert b.current == 0 sleep.assert_not_called() # # Bucket is at 0, add(50) => waits until it is at 50 and then drops to 0 # b.add(half) assert b.current == 0 sleep.assert_called_once() # # Sleep more than one second, bucket is full again, i.e. at 100 # time.sleep(2) mocker.resetall() b.add(0) assert b.current == total sleep.assert_not_called() # # Bucket is full at 100 and and waits when requesting 150 which is # more than it can contain # b.add(total + half) assert b.current == 0 sleep.assert_called_once() mocker.resetall() # # Bucket is empty and and waits when requesting 150 which is more # than it can contain # b.add(total + half) assert b.current == 0 sleep.assert_called_once() mocker.resetall() def test_winery_leaky_bucket_reset(): b = LeakyBucket(100) assert b.total == 100 assert b.current == b.total b.reset(50) assert b.total == 50 assert b.current == b.total b.reset(100) assert b.total == 100 assert b.current == 50 def test_winery_bandwidth_calculator(mocker): now = 1 def monotonic(): return now mocker.patch("time.monotonic", side_effect=monotonic) b = BandwidthCalculator() assert b.get() == 0 count = 100 * 1024 * 1024 going_up = [] for t in range(b.duration): now += 1 b.add(count) going_up.append(b.get()) assert b.get() == count going_down = [] for t in range(b.duration - 1): now += 1 b.add(0) going_down.append(b.get()) going_down.reverse() assert going_up[:-1] == going_down assert len(b.history) == b.duration - 1 def test_winery_io_throttler(postgresql, mocker): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) DatabaseAdmin(dsn, "throttler").create_database() sleep = mocker.spy(time, "sleep") speed = 100 i = IOThrottler("read", base_dsn=dsn, throttle_read=100) count = speed i.add(count) sleep.assert_not_called() i.add(count) sleep.assert_called_once() # # Force slow down # mocker.resetall() i.sync_interval = 0 i.max_speed = 1 assert i.max_speed != i.bucket.total i.add(2) assert i.max_speed == i.bucket.total sleep.assert_called_once() def test_winery_throttler(postgresql): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) t = Throttler(base_dsn=dsn, throttle_read=100, throttle_write=100) base = {} key = "KEY" content = "CONTENT" def reader(k): return base[k] def writer(k, v): base[k] = v return True assert t.throttle_add(writer, key, content) is True assert t.throttle_get(reader, key) == content + + +def test_winery_stats(tmpdir): + s = Stats(None) + assert s.stats_active is False + s = Stats(tmpdir / "stats") + assert s.stats_active is True + assert os.path.exists(s.stats_filename) + size = os.path.getsize(s.stats_filename) + s._stats_flush_interval = 0 + k = "KEY" + v = "CONTENT" + s.stats_read(k, v) + s.stats_write(k, v) + s.stats_read(k, v) + s.stats_write(k, v) + s.__del__() + assert os.path.getsize(s.stats_filename) > size diff --git a/swh/objstorage/tests/winery_benchmark.py b/swh/objstorage/tests/winery_benchmark.py index 36ccc35..8671fcf 100644 --- a/swh/objstorage/tests/winery_benchmark.py +++ b/swh/objstorage/tests/winery_benchmark.py @@ -1,149 +1,168 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import concurrent.futures import logging import os import random import time +import psycopg2 + +from swh.objstorage.backends.winery.stats import Stats from swh.objstorage.factory import get_objstorage logger = logging.getLogger(__name__) def work(kind, args): return Worker(args).run(kind) -class Worker(object): +class Worker: def __init__(self, args): + self.stats = Stats(args.get("output_dir")) self.args = args def run(self, kind): getattr(self, kind)() return kind def ro(self): + try: + self._ro() + except psycopg2.OperationalError: + # It may happen when the database is dropped, just + # conclude the read loop gracefully and move on + pass + + def _ro(self): self.storage = get_objstorage( cls="winery", readonly=True, base_dsn=self.args["base_dsn"], shard_dsn=self.args["shard_dsn"], shard_max_size=self.args["shard_max_size"], throttle_read=self.args["throttle_read"], throttle_write=self.args["throttle_write"], + output_dir=self.args.get("output_dir"), ) with self.storage.winery.base.db.cursor() as c: while True: c.execute( "SELECT signature FROM signature2shard WHERE inflight = FALSE " "ORDER BY random() LIMIT %s", (self.args["ro_worker_max_request"],), ) if c.rowcount > 0: break logger.info(f"Worker(ro, {os.getpid()}): empty, waiting") time.sleep(1) logger.info(f"Worker(ro, {os.getpid()}): requesting {c.rowcount} objects") start = time.time() for row in c: obj_id = row[0].tobytes() - assert self.storage.get(obj_id) is not None + content = self.storage.get(obj_id) + assert content is not None + if self.stats.stats_active: + self.stats.stats_read(obj_id, content) elapsed = time.time() - start logger.info(f"Worker(ro, {os.getpid()}): finished ({elapsed:.2f}s)") def payloads_define(self): self.payloads = [ 3 * 1024 + 1, 3 * 1024 + 1, 3 * 1024 + 1, 3 * 1024 + 1, 3 * 1024 + 1, 10 * 1024 + 1, 13 * 1024 + 1, 16 * 1024 + 1, 70 * 1024 + 1, 80 * 1024 + 1, ] def rw(self): self.storage = get_objstorage( cls="winery", base_dsn=self.args["base_dsn"], shard_dsn=self.args["shard_dsn"], shard_max_size=self.args["shard_max_size"], throttle_read=self.args["throttle_read"], throttle_write=self.args["throttle_write"], + output_dir=self.args.get("output_dir"), ) self.payloads_define() random_content = open("/dev/urandom", "rb") logger.info(f"Worker(rw, {os.getpid()}): start") start = time.time() count = 0 while len(self.storage.winery.packers) == 0: content = random_content.read(random.choice(self.payloads)) - self.storage.add(content=content) + obj_id = self.storage.add(content=content) + if self.stats.stats_active: + self.stats.stats_write(obj_id, content) count += 1 logger.info(f"Worker(rw, {os.getpid()}): packing {count} objects") packer = self.storage.winery.packers[0] packer.join() assert packer.exitcode == 0 elapsed = time.time() - start logger.info(f"Worker(rw, {os.getpid()}): finished ({elapsed:.2f}s)") class Bench(object): def __init__(self, args): self.args = args def timer_start(self): self.start = time.time() def timeout(self): return time.time() - self.start > self.args["duration"] async def run(self): self.timer_start() loop = asyncio.get_running_loop() workers_count = self.args["rw_workers"] + self.args["ro_workers"] with concurrent.futures.ProcessPoolExecutor( max_workers=workers_count ) as executor: logger.info("Bench.run: running") self.count = 0 workers = set() def create_worker(kind): self.count += 1 logger.info(f"Bench.run: launched {kind} worker number {self.count}") return loop.run_in_executor(executor, work, kind, self.args) for kind in ["rw"] * self.args["rw_workers"] + ["ro"] * self.args[ "ro_workers" ]: workers.add(create_worker(kind)) while len(workers) > 0: logger.info(f"Bench.run: waiting for {len(workers)} workers") current = workers done, pending = await asyncio.wait( current, return_when=asyncio.FIRST_COMPLETED ) workers = pending for task in done: kind = task.result() logger.info(f"Bench.run: worker {kind} complete") if not self.timeout(): workers.add(create_worker(kind)) logger.info("Bench.run: finished") return self.count diff --git a/winery-test-environment/README.md b/winery-test-environment/README.md index fb7964a..6cd790b 100644 --- a/winery-test-environment/README.md +++ b/winery-test-environment/README.md @@ -1,76 +1,90 @@ This purpose of these instructions is to run `tox -e py3` in an environment that has access to a ceph cluster. It enables tests that would be otherwise be skipped and increases code coverage. The environment is composed of eight machines named ceph1 to ceph8. # Installation * pip install -r requirements.txt * ansible-galaxy install geerlingguy.docker # Create the machines ## libvirt * ensure virsh is available * ./build-vms.sh If the internet cnx is slow it may take a while before the OSD show up because they require downloading large docker images. ## fed4fire ### Create a base rspec specification. * /opt/jFed/jFed-Experimenter * In the General Tab * Create an experiment (New) * Add one Physical Node by dragging it * Right click on the node and choose "Configure Node" * Select testbed: Grid 5000 * Node => Specific hardware type: dahu-grenoble * Disk image => Bullseye base * Save under sample.rspec * Manually edit to duplicate the nodes ### Run the experiment. * /opt/jFed/jFed-Experimenter * In the General Tab * Open Local and load winery-test-environment/fed4fire.rspec * Edit ceph1 node to check if the Specific hardware type is dahu-grenoble * Click on Topology Viewer * Run * Give a unique name to the experiment * Start experiment * Once the provisionning is complete (Testing connectivity to resources on Grid5000) click "Export As" * Choose "Export Configuration Management Settings" * Save under /tmp/test.zip * fed4fire.sh test.zip # Install the machines * ansible-playbook -i inventory context/setup.yml ceph.yml bootstrap.yml osd.yml tests.yml # Run the tests It copies the content of the repository and "ssh ceph1 tox -e py3" * tox -e winery # Login into a machine For each host found in context/ssh-config * ssh -i context/cluster_key -F context/ssh-config ceph1 +# Run the benchmarks + +The `tox -e winery` command is used to run the benchmarks with the desired parameters. Upon completion the raw data can be found in the `winery-test-environment/context/stats` directory and is displayed on the standard output as well as rendered in a graph, if a display is available (see the `winery-test-environment/render-stats.py` for the details). + +### Example + +* tox -e winery -- -s --log-cli-level=INFO -vvv -k test_winery_bench_real --winery-bench-duration 30 --winery-shard-max-size $((10 * 1024 * 1024)) --winery-bench-ro-worker-max-request 2000 + +### Get all benchmark flags + +Run the following command and look for flags that start with `--winery-bench-` + +* tox -e winery -- --help + # Destroy ## libvirt * ./build-vms.sh stop $(seq 1 8) ## fed4fire It will expire on its own diff --git a/winery-test-environment/remote-tox.sh b/winery-test-environment/remote-tox.sh index f393499..c0108f4 100755 --- a/winery-test-environment/remote-tox.sh +++ b/winery-test-environment/remote-tox.sh @@ -1,31 +1,43 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information set -ex DIR=winery-test-environment SSH="ssh -i ${DIR}/context/cluster_key -F ${DIR}/context/ssh-config" function sanity_check() { if ! test -f ${DIR}/context/cluster_key ; then echo "${DIR}/context/cluster_key does not exist" echo "check ${DIR}/README.md for instructions." return 1 fi } -function copy() { +function copy_to() { RSYNC_RSH="$SSH" rsync -av --exclude=.mypy_cache --exclude=.coverage --exclude=.eggs --exclude=swh.objstorage.egg-info --exclude=winery-test-environment/context --exclude=.tox --exclude='*~' --exclude=__pycache__ --exclude='*.py[co]' $(git rev-parse --show-toplevel)/ debian@ceph1:/home/debian/swh-objstorage/ } +function copy_from() { + RSYNC_RSH="$SSH" rsync -av --delete debian@ceph1:/tmp/winery/ ${DIR}/context/stats/ +} + +function render() { + python ${DIR}/render-stats.py ${DIR}/context/stats/ +} + function run() { sanity_check || return 1 - copy || return 1 + copy_to || return 1 $SSH -t debian@ceph1 bash -c "'cd swh-objstorage ; ../venv/bin/tox -e py3 -- -k test_winery $*'" || return 1 + + copy_from || return 1 + + render || return 1 } run "$@" diff --git a/winery-test-environment/render-stats.py b/winery-test-environment/render-stats.py new file mode 100644 index 0000000..6da9dcc --- /dev/null +++ b/winery-test-environment/render-stats.py @@ -0,0 +1,59 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import sys + +from matplotlib import pyplot as plt +from matplotlib.ticker import FormatStrFormatter +import pandas as pd + + +def human(size, unit): + if size < 1024: + return f"{int(size)} {unit}/s" + elif size / 1024 < 1024: + return f"{round(size/1024, 1)} K{unit}/s" + elif size / (1024 * 1024) < 1024: + return f"{round(size / (1024 * 1024), 1)} M{unit}/s" + elif size / (1024 * 1024 * 1024) < 1024: + return f"{round(size / (1024 * 1024 * 1024), 1)} G{unit}/s" + + +def read_stats(stats): + dfs = [] + files = os.listdir(stats) + for file in files: + f = f"{stats}/{file}" + if not os.path.isfile(f): + continue + dfs.append(pd.read_csv(f)) + df = pd.concat(dfs) + df.set_index("time") + return df.sort_values(by=["time"]) + + +def main(stats): + df = read_stats(stats) + print(df) + t = df["time"].to_numpy() + sec = t[-1] - t[0] + a = df.sum() / sec + print(f'Bytes write {human(a["bytes_write"], "B")}') + print(f'Objects write {human(a["object_write_count"], "object")}') + print(f'Bytes read {human(a["bytes_read"], "B")}') + print(f'Objects read {human(a["object_read_count"], "object")}') + + df["date"] = pd.to_datetime(df["time"], unit="s") + + p = df.plot(x="time", y=["bytes_write", "bytes_read"]) + p.set_xlabel("Time") + p.yaxis.set_major_formatter(FormatStrFormatter("%.0f")) + p.set_ylabel("B/s") + plt.show() + + +if __name__ == "__main__": + main(sys.argv[1]) diff --git a/winery-test-environment/requirements.txt b/winery-test-environment/requirements.txt index 8649a7a..00b43ed 100644 --- a/winery-test-environment/requirements.txt +++ b/winery-test-environment/requirements.txt @@ -1,2 +1,5 @@ ansible mitogen +pandas +matplotlib +PyQt5