from collections import defaultdict import csv import itertools import os from pprint import pprint import random import statistics import time from swh.storage import get_storage SKIP_N_FIRST = 20000 SAMPLE_SIZE = 1000 CONTENT_HASH_ALGOS = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] def random_sha1(): return os.urandom(20) random_sha1s = (random_sha1() for _ in itertools.count()) class Timer: def __init__(self): self._start_time = self._end_time = None def __enter__(self): self._start_time = time.time() return self def __exit__(self, exc_type, exc_value, exc_traceback): self._end_time = time.time() def __call__(self): return self._end_time - self._start_time args = { 'keyspace': 'swh_test', 'hosts': ['128.93.66.190'], 'objstorage': { 'cls': 'memory', 'args': {}, }, } cassandra_storage = get_storage('cassandra', args) args = { # swh-replica db 'db': 'dbname=softwareheritage user=guest host=somerset.internal.softwareheritage.org port=5433', 'objstorage': { 'cls': 'memory', 'args': {}, }, } postgres_storage = get_storage('local', args) cassandra_storage.check_config(check_write=False) postgres_storage.check_config(check_write=False) def run_timer(inputs): cassandra_times = defaultdict(list) postgres_times = defaultdict(list) nb_queries = 0 for (bucket, method_name, query) in inputs: method = getattr(cassandra_storage, method_name) with Timer() as cassandra_timer: res = method(query) if res is not None: res = list(res) if not res or not res[0]: continue # Missing from Cassandra DB method = getattr(postgres_storage, method_name) with Timer() as postgres_timer: res = method(query) if res is not None: res = list(res) cassandra_times[bucket].append(cassandra_timer()) postgres_times[bucket].append(postgres_timer()) nb_queries += 1 if nb_queries >= SAMPLE_SIZE: break return (dict(cassandra_times), dict(postgres_times)) def iter_contents(): with open('/home/dev/samples/content.csv') as fd: reader = csv.reader(fd) header = next(reader) for row in reader: yield {hash_: bytes.fromhex(cell[2:]) for (hash_, cell) in zip(header, row) if hash_ in CONTENT_HASH_ALGOS} def iter_ids(file_name): with open('/home/dev/samples/{}'.format(file_name)) as fd: reader = csv.reader(fd) rows = itertools.islice(reader, SKIP_N_FIRST, None) for row in rows: yield bytes.fromhex(row[0][2:]) def format_stats_on_bucket(bucket): return '\tavg = {} ms,\tstdev = {} ms'.format( int(statistics.mean(bucket)*1000), int(statistics.stdev(bucket)*10000)/10) def bench_content_find(): contents = iter_contents() contents = itertools.islice(contents, SKIP_N_FIRST, None) random_hashes = (random.choice(CONTENT_HASH_ALGOS) for _ in itertools.count()) inputs = ((hash_, 'content_find', {hash_: dict_[hash_]}) for hash_, dict_ in zip(random_hashes, contents)) (cassandra_times, postgres_times) = \ run_timer(inputs) print('Benchmark results for content_find:') for hash_ in CONTENT_HASH_ALGOS: if hash_ in cassandra_times: print('\thash_algo = {}\t(sample size={}):'.format( hash_, len(cassandra_times[hash_]))) print('\t\tcassandra:{}'.format( format_stats_on_bucket(cassandra_times[hash_]))) print('\t\tpostgres:{}'.format( format_stats_on_bucket(postgres_times[hash_]))) print() def bench_get_one(method_name, ids, fn_id_to_query=lambda id_: [id_]): random_hashes = (random.choice(CONTENT_HASH_ALGOS) for _ in itertools.count()) inputs = ((None, method_name, fn_id_to_query(id_)) for id_ in ids) (cassandra_times, postgres_times) = \ run_timer(inputs) print('Benchmark results for {} (1 arg)\t(sample size={}):'.format( method_name, len(cassandra_times[None]))) print('\tcassandra:{}'.format( format_stats_on_bucket(cassandra_times[None]))) print('\tpostgres:{}'.format( format_stats_on_bucket(postgres_times[None]))) print() def grouper(iterable, n): "Collect data into fixed-length chunks or blocks" # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" args = [iter(iterable)] * n return zip(*args) def bench_get_100(method_name, ids): random_hashes = (random.choice(CONTENT_HASH_ALGOS) for _ in itertools.count()) groups = grouper(ids, 100) inputs = ((None, method_name, group) for group in groups) (cassandra_times, postgres_times) = \ run_timer(inputs) print('Benchmark results for {} (100 args)\t(sample size={}):'.format( method_name, len(cassandra_times[None]))) print('\tcassandra:{}'.format( format_stats_on_bucket(cassandra_times[None]))) print('\tpostgres:{}'.format( format_stats_on_bucket(postgres_times[None]))) print() bench_content_find() bench_get_one('content_missing_per_sha1', random_sha1s) bench_get_100('content_missing_per_sha1', random_sha1s) rev_ids = iter_ids('revision.csv') bench_get_one('revision_get', rev_ids) bench_get_100('revision_get', rev_ids) bench_get_one('revision_missing', random_sha1s) bench_get_100('revision_missing', random_sha1s) bench_get_one('directory_ls', iter_ids('directory.csv'), fn_id_to_query=lambda id_: id_) bench_get_one('directory_missing', random_sha1s) bench_get_100('directory_missing', random_sha1s) rel_ids = iter_ids('release.csv') bench_get_one('release_get', rel_ids) bench_get_100('release_get', rel_ids) bench_get_one('release_missing', random_sha1s) bench_get_100('release_missing', random_sha1s) snap_ids = iter_ids('snapshot.csv') bench_get_one('snapshot_get', snap_ids, fn_id_to_query=lambda id_: id_)