Page MenuHomeSoftware Heritage
Paste P490

benchmark script cassandra vs postgresql
ActivePublic

Authored by vlorentz on Aug 8 2019, 12:21 PM.
from collections import defaultdict
import csv
import itertools
import os
from pprint import pprint
import random
import statistics
import time
from swh.storage import get_storage
SKIP_N_FIRST = 20000
SAMPLE_SIZE = 1000
CONTENT_HASH_ALGOS = ['sha1', 'sha1_git', 'sha256', 'blake2s256']
def random_sha1():
return os.urandom(20)
random_sha1s = (random_sha1() for _ in itertools.count())
class Timer:
def __init__(self):
self._start_time = self._end_time = None
def __enter__(self):
self._start_time = time.time()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self._end_time = time.time()
def __call__(self):
return self._end_time - self._start_time
args = {
'keyspace': 'swh_test',
'hosts': ['128.93.66.190'],
'objstorage': {
'cls': 'memory',
'args': {},
},
}
cassandra_storage = get_storage('cassandra', args)
args = {
# swh-replica db
'db': 'dbname=softwareheritage user=guest host=somerset.internal.softwareheritage.org port=5433',
'objstorage': {
'cls': 'memory',
'args': {},
},
}
postgres_storage = get_storage('local', args)
cassandra_storage.check_config(check_write=False)
postgres_storage.check_config(check_write=False)
def run_timer(inputs):
cassandra_times = defaultdict(list)
postgres_times = defaultdict(list)
nb_queries = 0
for (bucket, method_name, query) in inputs:
method = getattr(cassandra_storage, method_name)
with Timer() as cassandra_timer:
res = method(query)
if res is not None:
res = list(res)
if not res or not res[0]:
continue # Missing from Cassandra DB
method = getattr(postgres_storage, method_name)
with Timer() as postgres_timer:
res = method(query)
if res is not None:
res = list(res)
cassandra_times[bucket].append(cassandra_timer())
postgres_times[bucket].append(postgres_timer())
nb_queries += 1
if nb_queries >= SAMPLE_SIZE:
break
return (dict(cassandra_times), dict(postgres_times))
def iter_contents():
with open('/home/dev/samples/content.csv') as fd:
reader = csv.reader(fd)
header = next(reader)
for row in reader:
yield {hash_: bytes.fromhex(cell[2:])
for (hash_, cell) in zip(header, row)
if hash_ in CONTENT_HASH_ALGOS}
def iter_ids(file_name):
with open('/home/dev/samples/{}'.format(file_name)) as fd:
reader = csv.reader(fd)
rows = itertools.islice(reader, SKIP_N_FIRST, None)
for row in rows:
yield bytes.fromhex(row[0][2:])
def format_stats_on_bucket(bucket):
return '\tavg = {} ms,\tstdev = {} ms'.format(
int(statistics.mean(bucket)*1000), int(statistics.stdev(bucket)*10000)/10)
def bench_content_find():
contents = iter_contents()
contents = itertools.islice(contents, SKIP_N_FIRST, None)
random_hashes = (random.choice(CONTENT_HASH_ALGOS)
for _ in itertools.count())
inputs = ((hash_, 'content_find', {hash_: dict_[hash_]})
for hash_, dict_ in zip(random_hashes, contents))
(cassandra_times, postgres_times) = \
run_timer(inputs)
print('Benchmark results for content_find:')
for hash_ in CONTENT_HASH_ALGOS:
if hash_ in cassandra_times:
print('\thash_algo = {}\t(sample size={}):'.format(
hash_, len(cassandra_times[hash_])))
print('\t\tcassandra:{}'.format(
format_stats_on_bucket(cassandra_times[hash_])))
print('\t\tpostgres:{}'.format(
format_stats_on_bucket(postgres_times[hash_])))
print()
def bench_get_one(method_name, ids, fn_id_to_query=lambda id_: [id_]):
random_hashes = (random.choice(CONTENT_HASH_ALGOS)
for _ in itertools.count())
inputs = ((None, method_name, fn_id_to_query(id_)) for id_ in ids)
(cassandra_times, postgres_times) = \
run_timer(inputs)
print('Benchmark results for {} (1 arg)\t(sample size={}):'.format(
method_name, len(cassandra_times[None])))
print('\tcassandra:{}'.format(
format_stats_on_bucket(cassandra_times[None])))
print('\tpostgres:{}'.format(
format_stats_on_bucket(postgres_times[None])))
print()
def grouper(iterable, n):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip(*args)
def bench_get_100(method_name, ids):
random_hashes = (random.choice(CONTENT_HASH_ALGOS)
for _ in itertools.count())
groups = grouper(ids, 100)
inputs = ((None, method_name, group) for group in groups)
(cassandra_times, postgres_times) = \
run_timer(inputs)
print('Benchmark results for {} (100 args)\t(sample size={}):'.format(
method_name, len(cassandra_times[None])))
print('\tcassandra:{}'.format(
format_stats_on_bucket(cassandra_times[None])))
print('\tpostgres:{}'.format(
format_stats_on_bucket(postgres_times[None])))
print()
bench_content_find()
bench_get_one('content_missing_per_sha1', random_sha1s)
bench_get_100('content_missing_per_sha1', random_sha1s)
rev_ids = iter_ids('revision.csv')
bench_get_one('revision_get', rev_ids)
bench_get_100('revision_get', rev_ids)
bench_get_one('revision_missing', random_sha1s)
bench_get_100('revision_missing', random_sha1s)
bench_get_one('directory_ls', iter_ids('directory.csv'),
fn_id_to_query=lambda id_: id_)
bench_get_one('directory_missing', random_sha1s)
bench_get_100('directory_missing', random_sha1s)
rel_ids = iter_ids('release.csv')
bench_get_one('release_get', rel_ids)
bench_get_100('release_get', rel_ids)
bench_get_one('release_missing', random_sha1s)
bench_get_100('release_missing', random_sha1s)
snap_ids = iter_ids('snapshot.csv')
bench_get_one('snapshot_get', snap_ids,
fn_id_to_query=lambda id_: id_)