import os import sys import concurrent import time import yaml import multiprocessing as mp from urllib.parse import urlparse, parse_qs from itertools import count import signal import logging import zmq import click try: from setproctitle import setproctitle except ImportError: setproctitle = None from swh.core.statsd import statsd from swh.objstorage import get_objstorage from swh.objstorage.api.server import load_and_check_config ctx = zmq.Context() zurl = 'tcp://127.0.0.1:5556' PIPELINE = 10 def generate_obj_ids(storage): '''Simple generator that produces obj_ids from the given objstorage. If storage is None, generates random files of various size. ''' last_obj_id = None i = 0 while True: logging.info('New storage listing batch from %s', last_obj_id) for obj_id in storage.list_content(last_obj_id): yield obj_id i += 1 logging.debug('yield %s %s', i, obj_id) if last_obj_id == obj_id: logging.info('Exiting obj_id generator') # list_content returned an empty list, we are done here break last_obj_id = obj_id def obj_ids_srv(cfg, time_limit=None): '''generator that respond to any request on the REQ zmq channel with obj_ids from the given storage. ''' if setproctitle: setproctitle('bench objstorage: obj id gen') logger = logging.getLogger('ObjID gen') logger.info('Starting obj_id filler') sock = ctx.socket(zmq.REP) sock.set_hwm(PIPELINE) sock.bind(zurl) storage = get_objstorage(**cfg) t0 = time.time() for i, obj_id in zip( count(), generate_obj_ids(storage)): if time_limit and (time.time() - t0) > time_limit: logger.info('Time limit reached') break logger.debug('waiting for request') recv = sock.recv() if recv == b'exit': logging.info('QUITTING 1') sock.send(b'') return logger.debug('OID: RCV %s', recv) sock.send(obj_id) logger.debug('OID: SNT %s', obj_id) statsd.increment('swh_objstorage_bench_objid_count') logger.debug('End of loop') else: logger.debug('No more ID to produce') while True: if sock.recv() == b'exit': logger.info('QUITTING 2') sock.send(b'') break logger.debug('OBJ_ID SEND EOT') sock.send(b'') # send EOT, we are done def pull_push_cli(cfg_from, cfg_to): '''Consumes obj_ids from the zmq REQ/REPL channel and grab corresponding objects from the obstorage sto_from and push them is the sto_to objstorage. If this later is None, objects are dropped. ''' logger = logging.getLogger('P/P worker') if setproctitle: setproctitle('bench objstorage: P/P worker') sto_from = get_objstorage(**cfg_from) if cfg_to: sto_to = get_objstorage(**cfg_to) else: sto_to = None sock = ctx.socket(zmq.REQ) # sock.set_hwm(PIPELINE) sock.connect(zurl) logger.info('starting') t0 = time.time() vol = 0 get_errors = 0 put_errors = 0 pid = os.getpid() for i in count(): logger.debug('SEND REQ') sock.send(('pid:%s' % pid).encode()) logger.debug('SENT REQ') obj_id = sock.recv() # logger.info('W: got %s', obj_id) if not obj_id: logger.info('Exiting. Processed %s objects', i-1) return (i, vol, get_errors, put_errors) if (i % 100) == 0 and (time.time() - t0) > 1: logger.info('objects: %s', i) t0 = time.time() try: with statsd.timed('swh_objstorage_bench_get'): logger.debug('getting %r', obj_id) #logger.info('GET') obj = sto_from.get(obj_id) #logger.info('GET OK') except: # noqa get_errors += 1 else: if sto_to is not None: try: with statsd.timed('swh_objstorage_bench_put'): logger.debug('adding %r', obj_id) sto_to.add(obj) except: # noqa put_errors += 1 v = len(obj) vol += v statsd.increment('swh_objstorage_bench_volume', v) # logger.info('EOLoop') def worker(sto_from, sto_to, n=2): logging.info('Starting worker with %s threads', n) if n > 1: with concurrent.futures.ThreadPoolExecutor(max_workers=n) as executor: futures = [] for i in range(n): futures.append(executor.submit( pull_push_cli, sto_from, sto_to)) tot, vol, gerr, perr = map(sum, zip(*[f.result() for f in futures])) return tot, vol, gerr, perr else: return pull_push_cli(sto_from, sto_to) @click.command('swh-objstorage-copy') @click.argument('cfg-from', required=True) @click.argument('cfg-to', required=False, default=None) @click.option('--duration', '-t', type=int, default=None) @click.option('--workers', '-w', type=int, default=2) @click.option('--threads', '-c', type=int, default=2) @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys())) @click.option('--message', default=None) def copy(cfg_from, cfg_to, duration, workers, threads, log_level, message): """Copy from one objstorage to the other. This tool is dedicated to stress testing swh-objstorage backends. The obj storages arguments can be given a either a file name (the standard obj storage config file in yaml) or as an url of the form: ://?=&[...] If this url is a standard http one, it is expected to be the url of a running swh objstorage server. If the destination obj storage is not given, objects are dropped (for a read only stress test). It will produce simple performance summary as output, but is will also push statsd metrics on the UDP endpoint given by the STATDS_HOST/STATSD_PORT environment variable (these default to localhost:8125), if you want to see real time curves of how the copy is performing. Examples: - Generate random objects and put them in a running objstorage listening on port 5003: swh-objstorage-copy random:// http://127.0.0.1:5003 - Same as above, but limit the generator to 1000 fixed size objects: swh-objstorage-copy random://?total=1000&filesize=1024 \ http://127.0.0.1:5003 - Pull with 16 workers x 2 threads from a local pathslicer only for 2mn: swh-objstorage-copy -t 120 -w 16 -c 2 \ pathslicing://?root=/tmp/objstorage&slicing=0:4/8:10 """ if setproctitle: setproctitle('bench objstorage: main') logging.basicConfig( level=log_level, format='PID %(process)5s %(name)18s: %(message)s', stream=sys.stderr, ) original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, original_sigint_handler) storage = {'from': None, 'to': None} for url, name in ((cfg_from, 'from'), (cfg_to, 'to')): cfg = urlparse(url) cfg_dict = {} if cfg.scheme in ('http', 'https'): cfg_dict = {'objstorage': { 'cls': 'remote', 'args': {'url': url}}} elif cfg.scheme: args = {k: v[0] for k, v in parse_qs(cfg.query).items()} cfg_dict = {'objstorage': { 'cls': cfg.scheme, 'args': args, }} elif url: cfg_dict = load_and_check_config(url) if cfg_dict: storage[name] = cfg_dict['objstorage'] # storage[name] = get_objstorage(**cfg_dict['objstorage']) logging.info('Pulling from %s' % storage['from']) if storage['to'] is not None: logging.info('Pushing to %s' % storage['to']) else: logging.info('Droping objects') logging.info('Let\'s start...') futures = [] pool = mp.Pool(workers) for i in range(workers): futures.append(pool.apply_async( worker, (storage['from'], storage['to'], threads))) idprov = mp.Process(target=obj_ids_srv, args=(storage['from'], duration)) idprov.start() t0 = time.time() tot, vol, get_errors, put_errors = map( sum, zip(*[f.get() for f in futures])) logging.info('All workers are done...') dt = time.time() - t0 while 0: # True: if all(f.ready() for f in futures): dt = time.time() - t0 logging.info('All workers are done...') break time.sleep(0.01) # print([f.result() for f in futures]) sock = ctx.socket(zmq.REQ) # sock.set_hwm(PIPELINE) sock.connect(zurl) logging.info('we are done, tell it to objidgen') sock.send(b'exit') sock.recv() pool.close() pool.join() idprov.join() results = { 'config': { 'workers': workers, 'threads': threads, 'storage-in': storage['from'], 'storage-out': storage['to'], 'note': message or '', }, 'volume': {'total': vol, 'rate': vol/dt}, 'objects': {'total': tot, 'rate': tot/dt}, 'time': dt, 'errors': {'get': get_errors, 'put': put_errors} } print(yaml.dump([results], default_flow_style=False)) if __name__ == '__main__': copy()