Page MenuHomeSoftware Heritage
Paste P820

bench objstorage
ActivePublic

Authored by douardda on Oct 14 2020, 3:39 PM.
import os
import sys
import concurrent
import time
import yaml
import multiprocessing as mp
from urllib.parse import urlparse, parse_qs
from itertools import count
import signal
import logging
import zmq
import click
try:
from setproctitle import setproctitle
except ImportError:
setproctitle = None
from swh.core.statsd import statsd
from swh.objstorage import get_objstorage
from swh.objstorage.api.server import load_and_check_config
ctx = zmq.Context()
zurl = 'tcp://127.0.0.1:5556'
PIPELINE = 10
def generate_obj_ids(storage):
'''Simple generator that produces obj_ids from the given objstorage.
If storage is None, generates random files of various size.
'''
last_obj_id = None
i = 0
while True:
logging.info('New storage listing batch from %s', last_obj_id)
for obj_id in storage.list_content(last_obj_id):
yield obj_id
i += 1
logging.debug('yield %s %s', i, obj_id)
if last_obj_id == obj_id:
logging.info('Exiting obj_id generator')
# list_content returned an empty list, we are done here
break
last_obj_id = obj_id
def obj_ids_srv(cfg, time_limit=None):
'''generator that respond to any request on the REQ zmq channel with obj_ids
from the given storage.
'''
if setproctitle:
setproctitle('bench objstorage: obj id gen')
logger = logging.getLogger('ObjID gen')
logger.info('Starting obj_id filler')
sock = ctx.socket(zmq.REP)
sock.set_hwm(PIPELINE)
sock.bind(zurl)
storage = get_objstorage(**cfg)
t0 = time.time()
for i, obj_id in zip(
count(), generate_obj_ids(storage)):
if time_limit and (time.time() - t0) > time_limit:
logger.info('Time limit reached')
break
logger.debug('waiting for request')
recv = sock.recv()
if recv == b'exit':
logging.info('QUITTING 1')
sock.send(b'')
return
logger.debug('OID: RCV %s', recv)
sock.send(obj_id)
logger.debug('OID: SNT %s', obj_id)
statsd.increment('swh_objstorage_bench_objid_count')
logger.debug('End of loop')
else:
logger.debug('No more ID to produce')
while True:
if sock.recv() == b'exit':
logger.info('QUITTING 2')
sock.send(b'')
break
logger.debug('OBJ_ID SEND EOT')
sock.send(b'') # send EOT, we are done
def pull_push_cli(cfg_from, cfg_to):
'''Consumes obj_ids from the zmq REQ/REPL channel and grab corresponding
objects from the obstorage sto_from and push them is the sto_to objstorage.
If this later is None, objects are dropped.
'''
logger = logging.getLogger('P/P worker')
if setproctitle:
setproctitle('bench objstorage: P/P worker')
sto_from = get_objstorage(**cfg_from)
if cfg_to:
sto_to = get_objstorage(**cfg_to)
else:
sto_to = None
sock = ctx.socket(zmq.REQ)
# sock.set_hwm(PIPELINE)
sock.connect(zurl)
logger.info('starting')
t0 = time.time()
vol = 0
get_errors = 0
put_errors = 0
pid = os.getpid()
for i in count():
logger.debug('SEND REQ')
sock.send(('pid:%s' % pid).encode())
logger.debug('SENT REQ')
obj_id = sock.recv()
# logger.info('W: got %s', obj_id)
if not obj_id:
logger.info('Exiting. Processed %s objects', i-1)
return (i, vol, get_errors, put_errors)
if (i % 100) == 0 and (time.time() - t0) > 1:
logger.info('objects: %s', i)
t0 = time.time()
try:
with statsd.timed('swh_objstorage_bench_get'):
logger.debug('getting %r', obj_id)
#logger.info('GET')
obj = sto_from.get(obj_id)
#logger.info('GET OK')
except: # noqa
get_errors += 1
else:
if sto_to is not None:
try:
with statsd.timed('swh_objstorage_bench_put'):
logger.debug('adding %r', obj_id)
sto_to.add(obj)
except: # noqa
put_errors += 1
v = len(obj)
vol += v
statsd.increment('swh_objstorage_bench_volume', v)
# logger.info('EOLoop')
def worker(sto_from, sto_to, n=2):
logging.info('Starting worker with %s threads', n)
if n > 1:
with concurrent.futures.ThreadPoolExecutor(max_workers=n) as executor:
futures = []
for i in range(n):
futures.append(executor.submit(
pull_push_cli, sto_from, sto_to))
tot, vol, gerr, perr = map(sum, zip(*[f.result() for f in futures]))
return tot, vol, gerr, perr
else:
return pull_push_cli(sto_from, sto_to)
@click.command('swh-objstorage-copy')
@click.argument('cfg-from', required=True)
@click.argument('cfg-to', required=False, default=None)
@click.option('--duration', '-t', type=int, default=None)
@click.option('--workers', '-w', type=int, default=2)
@click.option('--threads', '-c', type=int, default=2)
@click.option('--log-level', '-l', default='INFO',
type=click.Choice(logging._nameToLevel.keys()))
@click.option('--message', default=None)
def copy(cfg_from, cfg_to, duration, workers, threads, log_level, message):
"""Copy from one objstorage to the other. This tool is dedicated to stress
testing swh-objstorage backends.
The obj storages arguments can be given a either a file name (the standard
obj storage config file in yaml) or as an url of the form:
<backend>://?<arg>=<value>&[...]
If this url is a standard http one, it is expected to be the url of a
running swh objstorage server.
If the destination obj storage is not given, objects are dropped (for a
read only stress test).
It will produce simple performance summary as output, but is will also push
statsd metrics on the UDP endpoint given by the STATDS_HOST/STATSD_PORT
environment variable (these default to localhost:8125), if you want to see
real time curves of how the copy is performing.
Examples:
- Generate random objects and put them in a running objstorage listening on
port 5003:
swh-objstorage-copy random:// http://127.0.0.1:5003
- Same as above, but limit the generator to 1000 fixed size objects:
swh-objstorage-copy random://?total=1000&filesize=1024 \
http://127.0.0.1:5003
- Pull with 16 workers x 2 threads from a local pathslicer only for 2mn:
swh-objstorage-copy -t 120 -w 16 -c 2 \
pathslicing://?root=/tmp/objstorage&slicing=0:4/8:10
"""
if setproctitle:
setproctitle('bench objstorage: main')
logging.basicConfig(
level=log_level,
format='PID %(process)5s %(name)18s: %(message)s',
stream=sys.stderr,
)
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGINT, original_sigint_handler)
storage = {'from': None, 'to': None}
for url, name in ((cfg_from, 'from'), (cfg_to, 'to')):
cfg = urlparse(url)
cfg_dict = {}
if cfg.scheme in ('http', 'https'):
cfg_dict = {'objstorage': {
'cls': 'remote', 'args': {'url': url}}}
elif cfg.scheme:
args = {k: v[0] for k, v in parse_qs(cfg.query).items()}
cfg_dict = {'objstorage': {
'cls': cfg.scheme,
'args': args,
}}
elif url:
cfg_dict = load_and_check_config(url)
if cfg_dict:
storage[name] = cfg_dict['objstorage']
# storage[name] = get_objstorage(**cfg_dict['objstorage'])
logging.info('Pulling from %s' % storage['from'])
if storage['to'] is not None:
logging.info('Pushing to %s' % storage['to'])
else:
logging.info('Droping objects')
logging.info('Let\'s start...')
futures = []
pool = mp.Pool(workers)
for i in range(workers):
futures.append(pool.apply_async(
worker, (storage['from'], storage['to'], threads)))
idprov = mp.Process(target=obj_ids_srv, args=(storage['from'], duration))
idprov.start()
t0 = time.time()
tot, vol, get_errors, put_errors = map(
sum, zip(*[f.get() for f in futures]))
logging.info('All workers are done...')
dt = time.time() - t0
while 0: # True:
if all(f.ready() for f in futures):
dt = time.time() - t0
logging.info('All workers are done...')
break
time.sleep(0.01)
# print([f.result() for f in futures])
sock = ctx.socket(zmq.REQ)
# sock.set_hwm(PIPELINE)
sock.connect(zurl)
logging.info('we are done, tell it to objidgen')
sock.send(b'exit')
sock.recv()
pool.close()
pool.join()
idprov.join()
results = {
'config': {
'workers': workers,
'threads': threads,
'storage-in': storage['from'],
'storage-out': storage['to'],
'note': message or '',
},
'volume': {'total': vol, 'rate': vol/dt},
'objects': {'total': tot, 'rate': tot/dt},
'time': dt,
'errors': {'get': get_errors, 'put': put_errors}
}
print(yaml.dump([results], default_flow_style=False))
if __name__ == '__main__':
copy()