diff --git a/swh/objstorage/backends/winery/objstorage.py b/swh/objstorage/backends/winery/objstorage.py index 6cbacaa..be1a1a4 100644 --- a/swh/objstorage/backends/winery/objstorage.py +++ b/swh/objstorage/backends/winery/objstorage.py @@ -1,177 +1,175 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from multiprocessing import Process from swh.objstorage import exc from swh.objstorage.interface import ObjId from swh.objstorage.objstorage import ObjStorage from .roshard import ROShard from .rwshard import RWShard from .sharedbase import SharedBase from .stats import Stats logger = logging.getLogger(__name__) class WineryObjStorage(ObjStorage): def __init__(self, **kwargs): super().__init__(**kwargs) if kwargs.get("readonly"): self.winery = WineryReader(**kwargs) else: self.winery = WineryWriter(**kwargs) def uninit(self): self.winery.uninit() def get(self, obj_id: ObjId) -> bytes: return self.winery.get(obj_id) def check_config(self, *, check_write): return True def __contains__(self, obj_id): return obj_id in self.winery def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None: self.winery.add(content, obj_id, check_presence) def check(self, obj_id: ObjId) -> None: return self.winery.check(obj_id) def delete(self, obj_id: ObjId): raise PermissionError("Delete is not allowed.") class WineryBase: def __init__(self, **kwargs): self.args = kwargs self.init() def init(self): self.base = SharedBase(**self.args) def uninit(self): self.base.uninit() def __contains__(self, obj_id): return self.base.contains(obj_id) class WineryReader(WineryBase): def __init__(self, **kwargs): super().__init__(**kwargs) self.shards = {} def roshard(self, name): if name not in self.shards: shard = ROShard(name, **self.args) shard.load() self.shards[name] = shard return self.shards[name] def get(self, obj_id: ObjId) -> bytes: shard_info = self.base.get(obj_id) if shard_info is None: raise exc.ObjNotFoundError(obj_id) name, readonly = shard_info if readonly: shard = self.roshard(name) content = shard.get(obj_id) del shard else: shard = RWShard(name, **self.args) content = shard.get(obj_id) if content is None: raise exc.ObjNotFoundError(obj_id) return content def pack(shard, **kwargs): return Packer(shard, **kwargs).run() class Packer: def __init__(self, shard, **kwargs): self.stats = Stats(kwargs.get("output_dir")) self.args = kwargs self.shard = shard self.init() def init(self): self.rw = RWShard(self.shard, **self.args) self.ro = ROShard(self.shard, **self.args) def uninit(self): del self.ro self.rw.uninit() def run(self): self.ro.create(self.rw.count()) for obj_id, content in self.rw.all(): self.ro.add(content, obj_id) if self.stats.stats_active: self.stats.stats_read(obj_id, content) self.stats.stats_write(obj_id, content) self.ro.save() base = SharedBase(**self.args) base.shard_packing_ends(self.shard) base.uninit() self.rw.uninit() self.rw.drop() return True class WineryWriter(WineryReader): def __init__(self, **kwargs): super().__init__(**kwargs) self.packers = [] self.init() def init(self): super().init() self.shard = RWShard(self.base.whoami, **self.args) def uninit(self): self.shard.uninit() super().uninit() - def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId: + def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None: if check_presence and obj_id in self: - return obj_id + return shard = self.base.add_phase_1(obj_id) if shard != self.base.id: # this object is the responsibility of another shard - return obj_id + return self.shard.add(obj_id, content) self.base.add_phase_2(obj_id) if self.shard.is_full(): self.pack() - return obj_id - def check(self, obj_id: ObjId) -> None: # load all shards packing == True and not locked (i.e. packer # was interrupted for whatever reason) run pack for each of them pass def pack(self): self.base.shard_packing_starts() p = Process(target=pack, args=(self.shard.name,), kwargs=self.args) self.uninit() p.start() self.packers.append(p) self.init() def __del__(self): for p in self.packers: p.kill() p.join() diff --git a/swh/objstorage/tests/test_objstorage_winery.py b/swh/objstorage/tests/test_objstorage_winery.py index 1b2ba8b..0f017ce 100644 --- a/swh/objstorage/tests/test_objstorage_winery.py +++ b/swh/objstorage/tests/test_objstorage_winery.py @@ -1,402 +1,399 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import time import pytest import sh from swh.objstorage import exc from swh.objstorage.backends.winery.database import DatabaseAdmin from swh.objstorage.backends.winery.objstorage import Packer, pack from swh.objstorage.backends.winery.stats import Stats from swh.objstorage.backends.winery.throttler import ( BandwidthCalculator, IOThrottler, LeakyBucket, Throttler, ) from swh.objstorage.factory import get_objstorage from swh.objstorage.objstorage import compute_hash from swh.objstorage.utils import call_async from .winery_benchmark import Bench, work from .winery_testing_helpers import PoolHelper, SharedBaseHelper @pytest.fixture def needs_ceph(): try: sh.ceph("--version") except sh.CommandNotFound: pytest.skip("the ceph CLI was not found") @pytest.fixture def ceph_pool(needs_ceph): pool = PoolHelper(shard_max_size=10 * 1024 * 1024) pool.clobber() pool.pool_create() yield pool pool.images_clobber() pool.clobber() @pytest.fixture def storage(request, postgresql): marker = request.node.get_closest_marker("shard_max_size") if marker is None: shard_max_size = 1024 else: shard_max_size = marker.args[0] dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) storage = get_objstorage( cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=shard_max_size, throttle_write=200 * 1024 * 1024, throttle_read=100 * 1024 * 1024, ) yield storage storage.winery.uninit() # # pytest-postgresql will not remove databases that it did not # create between tests (only at the very end). # d = DatabaseAdmin(dsn) for database in d.list_databases(): if database != postgresql.info.dbname and database != "tests_tmpl": DatabaseAdmin(dsn, database).drop_database() @pytest.fixture def winery(storage): return storage.winery def test_winery_sharedbase(winery): base = winery.base shard1 = base.whoami assert shard1 is not None assert shard1 == base.whoami id1 = base.id assert id1 is not None assert id1 == base.id def test_winery_add_get(winery): shard = winery.base.whoami content = b"SOMETHING" - obj_id = winery.add(content=content, obj_id=compute_hash(content, "sha256")) + obj_id = compute_hash(content, "sha256") assert ( obj_id.hex() == "866878b165607851782d8d233edf0c261172ff67926330d3bbd10c705b92d24f" ) - assert winery.add(content=content, obj_id=obj_id) == obj_id - assert winery.add(content=content, obj_id=obj_id, check_presence=False) == obj_id + winery.add(content=content, obj_id=obj_id) + winery.add(content=content, obj_id=obj_id) + winery.add(content=content, obj_id=obj_id, check_presence=False) assert winery.base.whoami == shard assert winery.get(obj_id) == content with pytest.raises(exc.ObjNotFoundError): winery.get(b"unknown") winery.shard.drop() @pytest.mark.shard_max_size(1) def test_winery_add_and_pack(winery, mocker): mocker.patch("swh.objstorage.backends.winery.objstorage.pack", return_value=True) shard = winery.base.whoami content = b"SOMETHING" - obj_id = winery.add(content=content, obj_id=compute_hash(content, "sha256")) - assert ( - obj_id.hex() - == "866878b165607851782d8d233edf0c261172ff67926330d3bbd10c705b92d24f" - ) + winery.add(content=content, obj_id=compute_hash(content, "sha256")) assert winery.base.whoami != shard assert len(winery.packers) == 1 packer = winery.packers[0] packer.join() assert packer.exitcode == 0 def test_winery_delete(storage): with pytest.raises(PermissionError): storage.delete(None) def test_winery_get_shard_info(winery): assert winery.base.get_shard_info(1234) is None assert SharedBaseHelper(winery.base).get_shard_info_by_name("nothing") is None @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_packer(winery, ceph_pool): shard = winery.base.whoami content = b"SOMETHING" winery.add(content=content) winery.base.shard_packing_starts() packer = Packer(shard, **winery.args) try: assert packer.run() is True finally: packer.uninit() readonly, packing = SharedBaseHelper(winery.base).get_shard_info_by_name(shard) assert readonly is True assert packing is False @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_get_object(winery, ceph_pool): shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) winery.base.shard_packing_starts() assert pack(shard, **winery.args) is True assert winery.get(obj_id) == content def test_winery_ceph_pool(needs_ceph): name = "IMAGE" pool = PoolHelper(shard_max_size=10 * 1024 * 1024) pool.clobber() pool.pool_create() pool.image_create(name) p = pool.image_path(name) assert p.endswith(name) something = "SOMETHING" open(p, "w").write(something) assert open(p).read(len(something)) == something assert pool.image_list() == [name] pool.image_remap_ro(name) pool.images_clobber() assert pool.image_list() == [name] pool.clobber() assert pool.image_list() == [] @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_bench_work(winery, ceph_pool, tmpdir): # # rw worker creates a shard # whoami = winery.base.whoami shards_info = list(winery.base.list_shards()) assert len(shards_info) == 1 shard, readonly, packing = shards_info[0] assert (readonly, packing) == (False, False) winery.args["dir"] = str(tmpdir) assert work("rw", winery.args) == "rw" shards_info = { name: (readonly, packing) for name, readonly, packing in winery.base.list_shards() } assert len(shards_info) == 2 assert shards_info[whoami] == (True, False) # # ro worker reads a shard # winery.args["ro_worker_max_request"] = 1 assert work("ro", winery.args) == "ro" def test_winery_bench_real(pytestconfig, postgresql, ceph_pool): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) kwargs = { "output_dir": pytestconfig.getoption("--winery-bench-output-directory"), "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), "shard_max_size": pytestconfig.getoption("--winery-shard-max-size"), "ro_worker_max_request": pytestconfig.getoption( "--winery-bench-ro-worker-max-request" ), "duration": pytestconfig.getoption("--winery-bench-duration"), "base_dsn": dsn, "shard_dsn": dsn, "throttle_read": pytestconfig.getoption("--winery-bench-throttle-read"), "throttle_write": pytestconfig.getoption("--winery-bench-throttle-write"), } count = call_async(Bench(kwargs).run) assert count > 0 def test_winery_bench_fake(pytestconfig, mocker): kwargs = { "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), "duration": pytestconfig.getoption("--winery-bench-duration"), } def run(kind): time.sleep(kwargs["duration"] * 2) return kind mocker.patch("swh.objstorage.tests.winery_benchmark.Worker.run", side_effect=run) assert call_async(Bench(kwargs).run) == kwargs["rw_workers"] + kwargs["ro_workers"] def test_winery_leaky_bucket_tick(mocker): total = 100 half = 50 b = LeakyBucket(total) sleep = mocker.spy(time, "sleep") assert b.current == b.total sleep.assert_not_called() # # Bucket is at 100, add(50) => drops to 50 # b.add(half) assert b.current == half sleep.assert_not_called() # # Bucket is at 50, add(50) => drops to 0 # b.add(half) assert b.current == 0 sleep.assert_not_called() # # Bucket is at 0, add(50) => waits until it is at 50 and then drops to 0 # b.add(half) assert b.current == 0 sleep.assert_called_once() # # Sleep more than one second, bucket is full again, i.e. at 100 # time.sleep(2) mocker.resetall() b.add(0) assert b.current == total sleep.assert_not_called() # # Bucket is full at 100 and and waits when requesting 150 which is # more than it can contain # b.add(total + half) assert b.current == 0 sleep.assert_called_once() mocker.resetall() # # Bucket is empty and and waits when requesting 150 which is more # than it can contain # b.add(total + half) assert b.current == 0 sleep.assert_called_once() mocker.resetall() def test_winery_leaky_bucket_reset(): b = LeakyBucket(100) assert b.total == 100 assert b.current == b.total b.reset(50) assert b.total == 50 assert b.current == b.total b.reset(100) assert b.total == 100 assert b.current == 50 def test_winery_bandwidth_calculator(mocker): now = 1 def monotonic(): return now mocker.patch("time.monotonic", side_effect=monotonic) b = BandwidthCalculator() assert b.get() == 0 count = 100 * 1024 * 1024 going_up = [] for t in range(b.duration): now += 1 b.add(count) going_up.append(b.get()) assert b.get() == count going_down = [] for t in range(b.duration - 1): now += 1 b.add(0) going_down.append(b.get()) going_down.reverse() assert going_up[:-1] == going_down assert len(b.history) == b.duration - 1 def test_winery_io_throttler(postgresql, mocker): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) DatabaseAdmin(dsn, "throttler").create_database() sleep = mocker.spy(time, "sleep") speed = 100 i = IOThrottler("read", base_dsn=dsn, throttle_read=100) count = speed i.add(count) sleep.assert_not_called() i.add(count) sleep.assert_called_once() # # Force slow down # mocker.resetall() i.sync_interval = 0 i.max_speed = 1 assert i.max_speed != i.bucket.total i.add(2) assert i.max_speed == i.bucket.total sleep.assert_called_once() def test_winery_throttler(postgresql): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) t = Throttler(base_dsn=dsn, throttle_read=100, throttle_write=100) base = {} key = "KEY" content = "CONTENT" def reader(k): return base[k] def writer(k, v): base[k] = v return True assert t.throttle_add(writer, key, content) is True assert t.throttle_get(reader, key) == content def test_winery_stats(tmpdir): s = Stats(None) assert s.stats_active is False s = Stats(tmpdir / "stats") assert s.stats_active is True assert os.path.exists(s.stats_filename) size = os.path.getsize(s.stats_filename) s._stats_flush_interval = 0 k = "KEY" v = "CONTENT" s.stats_read(k, v) s.stats_write(k, v) s.stats_read(k, v) s.stats_write(k, v) s.__del__() assert os.path.getsize(s.stats_filename) > size