Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/winery/bench.py
- This file was added.
| # Copyright (C) 2021 The Software Heritage developers | |||||
olasd: Could you consider `s/bench/benchmark/` (at least in the module name)? I do wonder if it should… | |||||
dacharyAuthorUnsubmitted Done Inline ActionsAgreed, it was moved and renamed into winery_benchmark.py dachary: Agreed, it was moved and renamed into winery_benchmark.py | |||||
| # See the AUTHORS file at the top-level directory of this distribution | |||||
| # License: GNU General Public License version 3, or any later version | |||||
| # See top-level LICENSE file for more information | |||||
| import asyncio | |||||
| import concurrent.futures | |||||
| import logging | |||||
| import os | |||||
| import random | |||||
| import time | |||||
| from swh.objstorage.factory import get_objstorage | |||||
| LOGGER = logging.getLogger(__name__) | |||||
| def work(kind, args): | |||||
| return Worker(args).run(kind) | |||||
| class Worker(object): | |||||
| def __init__(self, args): | |||||
| self.args = args | |||||
| def run(self, kind): | |||||
| getattr(self, kind)() | |||||
| return kind | |||||
| def ro(self): | |||||
| self.winery = get_objstorage( | |||||
| cls="winery", | |||||
| readonly=True, | |||||
| base_dsn=self.args["base_dsn"], | |||||
| shard_dsn=self.args["shard_dsn"], | |||||
| shard_max_size=self.args["shard_max_size"], | |||||
| ) | |||||
| with self.winery.base.db.cursor() as c: | |||||
| while True: | |||||
| c.execute( | |||||
| "SELECT signature FROM signature2shard WHERE inflight = FALSE " | |||||
| "ORDER BY random() LIMIT %s", | |||||
| (self.args["ro_worker_max_request"],), | |||||
| ) | |||||
| if c.rowcount > 0: | |||||
| break | |||||
| LOGGER.info(f"Worker(ro, {os.getpid()}): empty, waiting") | |||||
| time.sleep(1) | |||||
| LOGGER.info(f"Worker(ro, {os.getpid()}): requesting {c.rowcount} objects") | |||||
| start = time.time() | |||||
| for row in c: | |||||
| obj_id = row[0].tobytes() | |||||
| assert self.winery.get(obj_id) is not None | |||||
| elapsed = time.time() - start | |||||
| LOGGER.info(f"Worker(ro, {os.getpid()}): finished ({elapsed:.2f}s)") | |||||
| def payloads_define(self): | |||||
| self.payloads = [ | |||||
| 3 * 1024, | |||||
| 3 * 1024, | |||||
| 3 * 1024, | |||||
| 3 * 1024, | |||||
| 3 * 1024, | |||||
| 10 * 1024, | |||||
| 13 * 1024, | |||||
| 16 * 1024, | |||||
| 70 * 1024, | |||||
| 80 * 1024, | |||||
| ] | |||||
| def rw(self): | |||||
| self.winery = get_objstorage( | |||||
| cls="winery", | |||||
| base_dsn=self.args["base_dsn"], | |||||
| shard_dsn=self.args["shard_dsn"], | |||||
| shard_max_size=self.args["shard_max_size"], | |||||
| ) | |||||
| self.payloads_define() | |||||
| random_content = open("/dev/urandom", "rb") | |||||
| LOGGER.info(f"Worker(rw, {os.getpid()}): start") | |||||
| start = time.time() | |||||
| count = 0 | |||||
| while len(self.winery.packers) == 0: | |||||
| content = random_content.read(random.choice(self.payloads)) | |||||
olasdUnsubmitted Not Done Inline ActionsThis generates only contents that are neatly aligned on 1024 byte multiple boundaries. Maybe it would make sense to add some fuzz/randomness to these object sizes, to exercise some obviously-unaligned reads in the ceph cluster. olasd: This generates only contents that are neatly aligned on 1024 byte multiple boundaries. Maybe it… | |||||
dacharyAuthorUnsubmitted Done Inline ActionsGood point. I added +1 to all of them which makes them all non-aligned. Neatly aligned objects are the exception anyways. dachary: Good point. I added +1 to all of them which makes them all non-aligned. Neatly aligned objects… | |||||
| self.winery.add(content=content) | |||||
| count += 1 | |||||
| LOGGER.info(f"Worker(rw, {os.getpid()}): packing {count} objects") | |||||
| packer = self.winery.packers[0] | |||||
| packer.join() | |||||
| assert packer.exitcode == 0 | |||||
| elapsed = time.time() - start | |||||
| LOGGER.info(f"Worker(rw, {os.getpid()}): finished ({elapsed:.2f}s)") | |||||
| class Bench(object): | |||||
| def __init__(self, args): | |||||
| self.args = args | |||||
| def timer_start(self): | |||||
| self.start = time.time() | |||||
| def timeout(self): | |||||
| return time.time() - self.start > self.args["duration"] | |||||
| async def run(self): | |||||
| self.timer_start() | |||||
| loop = asyncio.get_running_loop() | |||||
| workers_count = self.args["rw_workers"] + self.args["ro_workers"] | |||||
| with concurrent.futures.ProcessPoolExecutor( | |||||
| max_workers=workers_count | |||||
| ) as executor: | |||||
| LOGGER.info("Bench.run: running") | |||||
| self.count = 0 | |||||
| workers = set() | |||||
| def create_worker(kind): | |||||
| self.count += 1 | |||||
| LOGGER.info(f"Bench.run: launched {kind} worker number {self.count}") | |||||
| return loop.run_in_executor(executor, work, kind, self.args) | |||||
| for kind in ["rw"] * self.args["rw_workers"] + ["ro"] * self.args[ | |||||
| "ro_workers" | |||||
| ]: | |||||
| workers.add(create_worker(kind)) | |||||
| while len(workers) > 0: | |||||
| LOGGER.info(f"Bench.run: waiting for {len(workers)} workers") | |||||
| current = workers | |||||
| done, pending = await asyncio.wait( | |||||
| current, return_when=asyncio.FIRST_COMPLETED | |||||
| ) | |||||
| workers = pending | |||||
| for task in done: | |||||
| kind = task.result() | |||||
| LOGGER.info(f"Bench.run: worker {kind} complete") | |||||
| if not self.timeout(): | |||||
| workers.add(create_worker(kind)) | |||||
| LOGGER.info("Bench.run: finished") | |||||
| return self.count | |||||
Could you consider s/bench/benchmark/ (at least in the module name)? I do wonder if it should move to the tests submodule too.