diff --git a/requirements-swh.txt b/requirements-swh.txt index 02a6eb4..ddd74f3 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[http] >= 0.3 swh.model >= 0.0.27 -swh.perfecthash +swh.perfecthash >= 0.1.2 diff --git a/swh/objstorage/backends/winery/objstorage.py b/swh/objstorage/backends/winery/objstorage.py index d362625..4a9474a 100644 --- a/swh/objstorage/backends/winery/objstorage.py +++ b/swh/objstorage/backends/winery/objstorage.py @@ -1,173 +1,185 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from multiprocessing import Process +from swh.model import hashutil from swh.objstorage import exc -from swh.objstorage.objstorage import ObjStorage, compute_hash +from swh.objstorage.objstorage import ObjStorage from .roshard import ROShard from .rwshard import RWShard from .sharedbase import SharedBase from .stats import Stats logger = logging.getLogger(__name__) +def compute_hash(content): + algo = "sha256" + return hashutil.MultiHash.from_data(content, hash_names=[algo],).digest().get(algo) + + class WineryObjStorage(ObjStorage): def __init__(self, **kwargs): super().__init__(**kwargs) if kwargs.get("readonly"): self.winery = WineryReader(**kwargs) else: self.winery = WineryWriter(**kwargs) def uninit(self): self.winery.uninit() def get(self, obj_id): return self.winery.get(obj_id) def check_config(self, *, check_write): return True def __contains__(self, obj_id): return obj_id in self.winery def add(self, content, obj_id=None, check_presence=True): return self.winery.add(content, obj_id, check_presence) def check(self, obj_id): return self.winery.check(obj_id) def delete(self, obj_id): raise PermissionError("Delete is not allowed.") class WineryBase: def __init__(self, **kwargs): self.args = kwargs self.init() def init(self): self.base = SharedBase(**self.args) def uninit(self): self.base.uninit() def __contains__(self, obj_id): return self.base.contains(obj_id) class WineryReader(WineryBase): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.shards = {} + def roshard(self, name): - shard = ROShard(name, **self.args) - shard.load() - return shard + if name not in self.shards: + shard = ROShard(name, **self.args) + shard.load() + self.shards[name] = shard + return self.shards[name] def get(self, obj_id): shard_info = self.base.get(obj_id) if shard_info is None: raise exc.ObjNotFoundError(obj_id) name, readonly = shard_info if readonly: shard = self.roshard(name) content = shard.get(obj_id) del shard else: shard = RWShard(name, **self.args) content = shard.get(obj_id) if content is None: raise exc.ObjNotFoundError(obj_id) return content def pack(shard, **kwargs): return Packer(shard, **kwargs).run() class Packer: def __init__(self, shard, **kwargs): self.stats = Stats(kwargs.get("output_dir")) self.args = kwargs self.shard = shard self.init() def init(self): self.rw = RWShard(self.shard, **self.args) self.ro = ROShard(self.shard, **self.args) def uninit(self): del self.ro self.rw.uninit() def run(self): self.ro.create(self.rw.count()) for obj_id, content in self.rw.all(): self.ro.add(content, obj_id) if self.stats.stats_active: self.stats.stats_read(obj_id, content) self.stats.stats_write(obj_id, content) self.ro.save() base = SharedBase(**self.args) base.shard_packing_ends(self.shard) base.uninit() self.rw.uninit() self.rw.drop() return True class WineryWriter(WineryReader): def __init__(self, **kwargs): super().__init__(**kwargs) self.packers = [] self.init() def init(self): super().init() self.shard = RWShard(self.base.whoami, **self.args) def uninit(self): self.shard.uninit() super().uninit() def add(self, content, obj_id=None, check_presence=True): if obj_id is None: obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id shard = self.base.add_phase_1(obj_id) if shard != self.base.id: # this object is the responsibility of another shard return obj_id self.shard.add(obj_id, content) self.base.add_phase_2(obj_id) if self.shard.is_full(): self.pack() return obj_id def check(self, obj_id): # load all shards packing == True and not locked (i.e. packer # was interrupted for whatever reason) run pack for each of them pass def pack(self): self.base.shard_packing_starts() p = Process(target=pack, args=(self.shard.name,), kwargs=self.args) self.uninit() p.start() self.packers.append(p) self.init() def __del__(self): for p in self.packers: p.kill() p.join() diff --git a/swh/objstorage/tests/test_objstorage_winery.py b/swh/objstorage/tests/test_objstorage_winery.py index d7b5ba3..b39b176 100644 --- a/swh/objstorage/tests/test_objstorage_winery.py +++ b/swh/objstorage/tests/test_objstorage_winery.py @@ -1,396 +1,402 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import time import pytest import sh from swh.objstorage import exc from swh.objstorage.backends.winery.database import DatabaseAdmin from swh.objstorage.backends.winery.objstorage import Packer, pack from swh.objstorage.backends.winery.stats import Stats from swh.objstorage.backends.winery.throttler import ( BandwidthCalculator, IOThrottler, LeakyBucket, Throttler, ) from swh.objstorage.factory import get_objstorage from .winery_benchmark import Bench, work from .winery_testing_helpers import PoolHelper, SharedBaseHelper @pytest.fixture def needs_ceph(): try: sh.ceph("--version") except sh.CommandNotFound: pytest.skip("the ceph CLI was not found") @pytest.fixture def ceph_pool(needs_ceph): pool = PoolHelper(shard_max_size=10 * 1024 * 1024) pool.clobber() pool.pool_create() yield pool pool.images_clobber() pool.clobber() @pytest.fixture def storage(request, postgresql): marker = request.node.get_closest_marker("shard_max_size") if marker is None: shard_max_size = 1024 else: shard_max_size = marker.args[0] dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) storage = get_objstorage( cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=shard_max_size, throttle_write=200 * 1024 * 1024, throttle_read=100 * 1024 * 1024, ) yield storage storage.winery.uninit() # # pytest-postgresql will not remove databases that it did not # create between tests (only at the very end). # d = DatabaseAdmin(dsn) for database in d.list_databases(): if database != postgresql.info.dbname and database != "tests_tmpl": DatabaseAdmin(dsn, database).drop_database() @pytest.fixture def winery(storage): return storage.winery def test_winery_sharedbase(winery): base = winery.base shard1 = base.whoami assert shard1 is not None assert shard1 == base.whoami id1 = base.id assert id1 is not None assert id1 == base.id def test_winery_add_get(winery): shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) - assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" + assert ( + obj_id.hex() + == "866878b165607851782d8d233edf0c261172ff67926330d3bbd10c705b92d24f" + ) assert winery.add(content=content, obj_id=obj_id) == obj_id assert winery.add(content=content, obj_id=obj_id, check_presence=False) == obj_id assert winery.base.whoami == shard assert winery.get(obj_id) == content with pytest.raises(exc.ObjNotFoundError): winery.get(b"unknown") winery.shard.drop() @pytest.mark.shard_max_size(1) def test_winery_add_and_pack(winery, mocker): mocker.patch("swh.objstorage.backends.winery.objstorage.pack", return_value=True) shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) - assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" + assert ( + obj_id.hex() + == "866878b165607851782d8d233edf0c261172ff67926330d3bbd10c705b92d24f" + ) assert winery.base.whoami != shard assert len(winery.packers) == 1 packer = winery.packers[0] packer.join() assert packer.exitcode == 0 def test_winery_delete(storage): with pytest.raises(PermissionError): storage.delete(None) def test_winery_get_shard_info(winery): assert winery.base.get_shard_info(1234) is None assert SharedBaseHelper(winery.base).get_shard_info_by_name("nothing") is None @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_packer(winery, ceph_pool): shard = winery.base.whoami content = b"SOMETHING" winery.add(content=content) winery.base.shard_packing_starts() packer = Packer(shard, **winery.args) try: assert packer.run() is True finally: packer.uninit() readonly, packing = SharedBaseHelper(winery.base).get_shard_info_by_name(shard) assert readonly is True assert packing is False @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_get_object(winery, ceph_pool): shard = winery.base.whoami content = b"SOMETHING" obj_id = winery.add(content=content) winery.base.shard_packing_starts() assert pack(shard, **winery.args) is True assert winery.get(obj_id) == content def test_winery_ceph_pool(needs_ceph): name = "IMAGE" pool = PoolHelper(shard_max_size=10 * 1024 * 1024) pool.clobber() pool.pool_create() pool.image_create(name) p = pool.image_path(name) assert p.endswith(name) something = "SOMETHING" open(p, "w").write(something) assert open(p).read(len(something)) == something assert pool.image_list() == [name] pool.image_remap_ro(name) pool.images_clobber() assert pool.image_list() == [name] pool.clobber() assert pool.image_list() == [] @pytest.mark.shard_max_size(10 * 1024 * 1024) def test_winery_bench_work(winery, ceph_pool, tmpdir): # # rw worker creates a shard # whoami = winery.base.whoami shards_info = list(winery.base.list_shards()) assert len(shards_info) == 1 shard, readonly, packing = shards_info[0] assert (readonly, packing) == (False, False) winery.args["dir"] = str(tmpdir) assert work("rw", winery.args) == "rw" shards_info = { name: (readonly, packing) for name, readonly, packing in winery.base.list_shards() } assert len(shards_info) == 2 assert shards_info[whoami] == (True, False) # # ro worker reads a shard # winery.args["ro_worker_max_request"] = 1 assert work("ro", winery.args) == "ro" @pytest.mark.asyncio async def test_winery_bench_real(pytestconfig, postgresql, ceph_pool): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) kwargs = { "output_dir": pytestconfig.getoption("--winery-bench-output-directory"), "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), "shard_max_size": pytestconfig.getoption("--winery-shard-max-size"), "ro_worker_max_request": pytestconfig.getoption( "--winery-bench-ro-worker-max-request" ), "duration": pytestconfig.getoption("--winery-bench-duration"), "base_dsn": dsn, "shard_dsn": dsn, "throttle_read": pytestconfig.getoption("--winery-bench-throttle-read"), "throttle_write": pytestconfig.getoption("--winery-bench-throttle-write"), } count = await Bench(kwargs).run() assert count > 0 @pytest.mark.asyncio async def test_winery_bench_fake(pytestconfig, mocker): kwargs = { "rw_workers": pytestconfig.getoption("--winery-bench-rw-workers"), "ro_workers": pytestconfig.getoption("--winery-bench-ro-workers"), "duration": pytestconfig.getoption("--winery-bench-duration"), } def run(kind): time.sleep(kwargs["duration"] * 2) return kind mocker.patch("swh.objstorage.tests.winery_benchmark.Worker.run", side_effect=run) assert await Bench(kwargs).run() == kwargs["rw_workers"] + kwargs["ro_workers"] def test_winery_leaky_bucket_tick(mocker): total = 100 half = 50 b = LeakyBucket(total) sleep = mocker.spy(time, "sleep") assert b.current == b.total sleep.assert_not_called() # # Bucket is at 100, add(50) => drops to 50 # b.add(half) assert b.current == half sleep.assert_not_called() # # Bucket is at 50, add(50) => drops to 0 # b.add(half) assert b.current == 0 sleep.assert_not_called() # # Bucket is at 0, add(50) => waits until it is at 50 and then drops to 0 # b.add(half) assert b.current == 0 sleep.assert_called_once() # # Sleep more than one second, bucket is full again, i.e. at 100 # time.sleep(2) mocker.resetall() b.add(0) assert b.current == total sleep.assert_not_called() # # Bucket is full at 100 and and waits when requesting 150 which is # more than it can contain # b.add(total + half) assert b.current == 0 sleep.assert_called_once() mocker.resetall() # # Bucket is empty and and waits when requesting 150 which is more # than it can contain # b.add(total + half) assert b.current == 0 sleep.assert_called_once() mocker.resetall() def test_winery_leaky_bucket_reset(): b = LeakyBucket(100) assert b.total == 100 assert b.current == b.total b.reset(50) assert b.total == 50 assert b.current == b.total b.reset(100) assert b.total == 100 assert b.current == 50 def test_winery_bandwidth_calculator(mocker): now = 1 def monotonic(): return now mocker.patch("time.monotonic", side_effect=monotonic) b = BandwidthCalculator() assert b.get() == 0 count = 100 * 1024 * 1024 going_up = [] for t in range(b.duration): now += 1 b.add(count) going_up.append(b.get()) assert b.get() == count going_down = [] for t in range(b.duration - 1): now += 1 b.add(0) going_down.append(b.get()) going_down.reverse() assert going_up[:-1] == going_down assert len(b.history) == b.duration - 1 def test_winery_io_throttler(postgresql, mocker): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) DatabaseAdmin(dsn, "throttler").create_database() sleep = mocker.spy(time, "sleep") speed = 100 i = IOThrottler("read", base_dsn=dsn, throttle_read=100) count = speed i.add(count) sleep.assert_not_called() i.add(count) sleep.assert_called_once() # # Force slow down # mocker.resetall() i.sync_interval = 0 i.max_speed = 1 assert i.max_speed != i.bucket.total i.add(2) assert i.max_speed == i.bucket.total sleep.assert_called_once() def test_winery_throttler(postgresql): dsn = ( f"postgres://{postgresql.info.user}" f":@{postgresql.info.host}:{postgresql.info.port}" ) t = Throttler(base_dsn=dsn, throttle_read=100, throttle_write=100) base = {} key = "KEY" content = "CONTENT" def reader(k): return base[k] def writer(k, v): base[k] = v return True assert t.throttle_add(writer, key, content) is True assert t.throttle_get(reader, key) == content def test_winery_stats(tmpdir): s = Stats(None) assert s.stats_active is False s = Stats(tmpdir / "stats") assert s.stats_active is True assert os.path.exists(s.stats_filename) size = os.path.getsize(s.stats_filename) s._stats_flush_interval = 0 k = "KEY" v = "CONTENT" s.stats_read(k, v) s.stats_write(k, v) s.stats_read(k, v) s.stats_write(k, v) s.__del__() assert os.path.getsize(s.stats_filename) > size diff --git a/winery-test-environment/grid5000.yml b/winery-test-environment/grid5000.yml index aedf3fc..b53203e 100644 --- a/winery-test-environment/grid5000.yml +++ b/winery-test-environment/grid5000.yml @@ -1,78 +1,81 @@ # https://www.grid5000.fr/w/Docker#Using_docker-cache.grid5000.fr - hosts: mon gather_facts: no become: true tasks: - name: Add the user 'debian' user: name: debian - name: Allow 'debian' group to have passwordless sudo lineinfile: dest: /etc/sudoers state: present regexp: '^%debian' line: '%debian ALL=(ALL) NOPASSWD: ALL' validate: visudo -cf %s - name: mkdir /home/debian/.ssh file: path: /home/debian/.ssh state: directory mode: 0700 owner: debian group: debian - name: copy authorized_keys to /home/debian shell: | cp /root/.ssh/authorized_keys /home/debian/.ssh/authorized_keys chown debian:debian /home/debian/.ssh/authorized_keys chmod 0600 /home/debian/.ssh/authorized_keys - hosts: osd become: true tasks: # do that before lvm gets a chance to investigate and get the wrong idea - # about /dev/sdc on grid5000 because there surely will be leftovers from + # about /dev/disk2 on grid5000 because there surely will be leftovers from # whoever used the machine last - - name: zap /dev/sdc + - name: clear leftovers from the disk to be used for OSDs shell: | - dd if=/dev/zero of=/dev/sdc count=100 bs=1024k + dd if=/dev/zero of=/dev/disk2 count=100 bs=1024k + touch /etc/dd.done + args: + creates: /etc/dd.done - hosts: all become: true pre_tasks: - name: mkdir /etc/docker file: path: /etc/docker state: directory mode: 755 roles: - geerlingguy.docker tasks: - name: docker cache copy: content: | { "registry-mirrors": [ "http://docker-cache.grid5000.fr" ], "bip": "192.168.42.1/24" } dest: /etc/docker/daemon.json - name: systemctl restart docker service: name: docker state: restarted diff --git a/winery-test-environment/osd.yml b/winery-test-environment/osd.yml index 44081a3..8c5e32f 100644 --- a/winery-test-environment/osd.yml +++ b/winery-test-environment/osd.yml @@ -1,40 +1,40 @@ --- - hosts: osd gather_facts: no become: true tasks: - name: add host shell: | ceph orch host add {{ inventory_hostname }} delegate_to: ceph1 - hosts: osd gather_facts: no become: true tasks: - name: wait for host shell: | ceph orch host ls | grep '^{{ inventory_hostname }} ' delegate_to: ceph1 register: host until: host is success retries: 30 delay: 5 - hosts: osd gather_facts: no become: true tasks: # the desired side effect here is twofold # * device zap blocks until the osd daemon is ready on the target host - # * on grid5000 /dev/sdc needs to be applied - - name: zap /dev/sdc + # * on grid5000 /dev/disk2 needs to be applied + - name: zap /dev/disk2 shell: | - ceph orch device zap {{ inventory_hostname }} /dev/sdc --force || true + ceph orch device zap {{ inventory_hostname }} /dev/disk2 --force || true delegate_to: ceph1