Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/winery/bench.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
olasd: Could you consider `s/bench/benchmark/` (at least in the module name)? I do wonder if it should… | |||||
dacharyAuthorUnsubmitted Done Inline ActionsAgreed, it was moved and renamed into winery_benchmark.py dachary: Agreed, it was moved and renamed into winery_benchmark.py | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import asyncio | |||||
import concurrent.futures | |||||
import logging | |||||
import os | |||||
import random | |||||
import time | |||||
from swh.objstorage.factory import get_objstorage | |||||
LOGGER = logging.getLogger(__name__) | |||||
def work(kind, args): | |||||
return Worker(args).run(kind) | |||||
class Worker(object): | |||||
def __init__(self, args): | |||||
self.args = args | |||||
def run(self, kind): | |||||
getattr(self, kind)() | |||||
return kind | |||||
def ro(self): | |||||
self.winery = get_objstorage( | |||||
cls="winery", | |||||
readonly=True, | |||||
base_dsn=self.args["base_dsn"], | |||||
shard_dsn=self.args["shard_dsn"], | |||||
shard_max_size=self.args["shard_max_size"], | |||||
) | |||||
with self.winery.base.db.cursor() as c: | |||||
while True: | |||||
c.execute( | |||||
"SELECT signature FROM signature2shard WHERE inflight = FALSE " | |||||
"ORDER BY random() LIMIT %s", | |||||
(self.args["ro_worker_max_request"],), | |||||
) | |||||
if c.rowcount > 0: | |||||
break | |||||
LOGGER.info(f"Worker(ro, {os.getpid()}): empty, waiting") | |||||
time.sleep(1) | |||||
LOGGER.info(f"Worker(ro, {os.getpid()}): requesting {c.rowcount} objects") | |||||
start = time.time() | |||||
for row in c: | |||||
obj_id = row[0].tobytes() | |||||
assert self.winery.get(obj_id) is not None | |||||
elapsed = time.time() - start | |||||
LOGGER.info(f"Worker(ro, {os.getpid()}): finished ({elapsed:.2f}s)") | |||||
def payloads_define(self): | |||||
self.payloads = [ | |||||
3 * 1024, | |||||
3 * 1024, | |||||
3 * 1024, | |||||
3 * 1024, | |||||
3 * 1024, | |||||
10 * 1024, | |||||
13 * 1024, | |||||
16 * 1024, | |||||
70 * 1024, | |||||
80 * 1024, | |||||
] | |||||
def rw(self): | |||||
self.winery = get_objstorage( | |||||
cls="winery", | |||||
base_dsn=self.args["base_dsn"], | |||||
shard_dsn=self.args["shard_dsn"], | |||||
shard_max_size=self.args["shard_max_size"], | |||||
) | |||||
self.payloads_define() | |||||
random_content = open("/dev/urandom", "rb") | |||||
LOGGER.info(f"Worker(rw, {os.getpid()}): start") | |||||
start = time.time() | |||||
count = 0 | |||||
while len(self.winery.packers) == 0: | |||||
content = random_content.read(random.choice(self.payloads)) | |||||
olasdUnsubmitted Not Done Inline ActionsThis generates only contents that are neatly aligned on 1024 byte multiple boundaries. Maybe it would make sense to add some fuzz/randomness to these object sizes, to exercise some obviously-unaligned reads in the ceph cluster. olasd: This generates only contents that are neatly aligned on 1024 byte multiple boundaries. Maybe it… | |||||
dacharyAuthorUnsubmitted Done Inline ActionsGood point. I added +1 to all of them which makes them all non-aligned. Neatly aligned objects are the exception anyways. dachary: Good point. I added +1 to all of them which makes them all non-aligned. Neatly aligned objects… | |||||
self.winery.add(content=content) | |||||
count += 1 | |||||
LOGGER.info(f"Worker(rw, {os.getpid()}): packing {count} objects") | |||||
packer = self.winery.packers[0] | |||||
packer.join() | |||||
assert packer.exitcode == 0 | |||||
elapsed = time.time() - start | |||||
LOGGER.info(f"Worker(rw, {os.getpid()}): finished ({elapsed:.2f}s)") | |||||
class Bench(object): | |||||
def __init__(self, args): | |||||
self.args = args | |||||
def timer_start(self): | |||||
self.start = time.time() | |||||
def timeout(self): | |||||
return time.time() - self.start > self.args["duration"] | |||||
async def run(self): | |||||
self.timer_start() | |||||
loop = asyncio.get_running_loop() | |||||
workers_count = self.args["rw_workers"] + self.args["ro_workers"] | |||||
with concurrent.futures.ProcessPoolExecutor( | |||||
max_workers=workers_count | |||||
) as executor: | |||||
LOGGER.info("Bench.run: running") | |||||
self.count = 0 | |||||
workers = set() | |||||
def create_worker(kind): | |||||
self.count += 1 | |||||
LOGGER.info(f"Bench.run: launched {kind} worker number {self.count}") | |||||
return loop.run_in_executor(executor, work, kind, self.args) | |||||
for kind in ["rw"] * self.args["rw_workers"] + ["ro"] * self.args[ | |||||
"ro_workers" | |||||
]: | |||||
workers.add(create_worker(kind)) | |||||
while len(workers) > 0: | |||||
LOGGER.info(f"Bench.run: waiting for {len(workers)} workers") | |||||
current = workers | |||||
done, pending = await asyncio.wait( | |||||
current, return_when=asyncio.FIRST_COMPLETED | |||||
) | |||||
workers = pending | |||||
for task in done: | |||||
kind = task.result() | |||||
LOGGER.info(f"Bench.run: worker {kind} complete") | |||||
if not self.timeout(): | |||||
workers.add(create_worker(kind)) | |||||
LOGGER.info("Bench.run: finished") | |||||
return self.count |
Could you consider s/bench/benchmark/ (at least in the module name)? I do wonder if it should move to the tests submodule too.