diff --git a/seirl/awsupload/parquet/gen_schema.py b/seirl/awsupload/parquet/gen_schema.py new file mode 100755 index 0000000..11f0579 --- /dev/null +++ b/seirl/awsupload/parquet/gen_schema.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 + +import boto3 +import botocore.exceptions +import textwrap +import time + +from tables import TABLES + +OUTPUT_LOCATION = 's3://softwareheritage/queries/' + + +def create_database(database_name): + return 'CREATE DATABASE IF NOT EXISTS {};'.format(database_name) + + +def create_table(table): + l = ['CREATE EXTERNAL TABLE IF NOT EXISTS swh.{} ('.format(table['name'])] + for i, (column_name, column_type) in enumerate(table['columns']): + l.append(' `{}` {}{}'.format( + column_name, column_type.upper(), + ',' if i < len(table['columns']) - 1 else '')) + l.append(')') + l.append('STORED AS PARQUET') + l.append("LOCATION 's3://softwareheritage/graph/{}/'" + .format(table['name'])) + l.append('TBLPROPERTIES ("parquet.compress"="SNAPPY");') + return '\n'.join(l) + + +def repair_table(table): + return 'MSCK REPAIR TABLE {};'.format(table['name']) + + +def query(client, query_string, *, desc='Querying', delay_secs=0.5): + print(desc, end='...', flush=True) + try: + res = client.start_query_execution( + QueryString=query_string, + ResultConfiguration={'OutputLocation': OUTPUT_LOCATION} + ) + except botocore.exceptions.ClientError as e: + raise RuntimeError(str(e) + '\n\nQuery:\n' + + textwrap.indent(query_string, ' ' * 2)) + qid = res['QueryExecutionId'] + while True: + time.sleep(delay_secs) + print('.', end='', flush=True) + execution = client.get_query_execution(QueryExecutionId=qid) + status = execution['QueryExecution']['Status'] + if status['State'] in ('SUCCEEDED', 'FAILED', 'CANCELLED'): + break + print(' {}.'.format(status['State']), flush=True) + if status['State'] != 'SUCCEEDED': + raise RuntimeError(status['StateChangeReason'] + '\n\nQuery:\n' + + textwrap.indent(query_string, ' ' * 2)) + + +def main(): + client = boto3.client('athena') + query(client, create_database('swh'), desc='Creating swh database') + for table in TABLES: + query(client, create_table(table), + desc='Creating table {}'.format(table['name'])) + for table in TABLES: + query(client, repair_table(table), + desc='Refreshing table metadata for {}'.format(table['name'])) + + +if __name__ == '__main__': + main() diff --git a/seirl/awsupload/parquet/parquet.py b/seirl/awsupload/parquet/parquet.py index ca8f59e..62d4df7 100755 --- a/seirl/awsupload/parquet/parquet.py +++ b/seirl/awsupload/parquet/parquet.py @@ -1,265 +1,180 @@ #!/usr/bin/env python3 import datetime import os.path import pandas import pyarrow import pyarrow.parquet as parquet import queue import sqlalchemy import subprocess import threading import tqdm +from tables import TABLES + PARQUET_DATASET = '/srv/hdd/swh-parquet' ROW_CHUNK_SIZE = 10000 PARQUET_SUPERCHUNK_SIZE = 500 DB_CONN = ('postgresql://guest:guest@dbreplica0.euwest.azure.internal.' 'softwareheritage.org:5433/softwareheritage') ESTIMATE = False NUM_WRITER_THREADS = 1 -TABLES = [ - { - 'name': 'origin_visit', - 'columns': all, - 'partition': None - }, - { - 'name': 'origin', - 'columns': all, - 'partition': None - }, - { - 'name': 'snapshot_branches', - 'columns': all, - 'partition': None - }, - { - 'name': 'snapshot_branch', - 'columns': all, - 'partition': None - }, - { - 'name': 'snapshot', - 'columns': all, - 'partition': 'id' - }, - { - 'name': 'release', - 'columns': ['id', 'target', 'date', 'date_offset', 'name', 'comment', - 'author'], - 'partition': 'id' - }, - { - 'name': 'revision_history', - 'columns': all, - 'partition': 'id' - }, - { - 'name': 'release', - 'columns': ['id', 'target', 'date', 'date_offset', 'name', - 'comment', 'author', 'target_type'], - 'partition': 'id' - }, - { - 'name': 'revision', - 'columns': ['id', 'date', 'date_offset', 'committer_date', - 'committer_date_offset', 'type', 'directory', 'message', - 'author', 'committer'], - 'partition': 'id' - }, - { - 'name': 'person', - 'columns': ['id'], # Don't export personal information - 'partition': None - }, - { - 'name': 'directory_entry_rev', - 'columns': all, - 'partition': 'target' - }, - { - 'name': 'directory_entry_file', - 'columns': all, - 'partition': 'target' - }, - { - 'name': 'directory_entry_dir', - 'columns': all, - 'partition': 'target' - }, - { - 'name': 'directory', - 'columns': ['id', 'dir_entries', 'file_entries', 'rev_entries'], - 'partition': 'id' - }, - { - 'name': 'skipped_content', - 'columns': ['sha1', 'sha1_git', 'length'], - 'partition': 'sha1_git' - }, - { - 'name': 'content', - 'columns': ['sha1', 'sha1_git', 'length'], - 'partition': 'sha1_git' - }, -] - def memory_usage(): out = subprocess.check_output(['free', '-t', '-m'], universal_newlines=True) out = out.splitlines()[1].split()[1:] tot_m, used_m, free_m, *_ = map(int, out) return used_m / tot_m def dataframe_memoryview_to_bytes(dataframe): def converter(val): if type(val) == memoryview: return bytes(val) return val for col in dataframe.columns: for cell in dataframe[col]: if type(cell) == memoryview: dataframe[col] = dataframe[col].apply(converter) break if cell is not None: continue return dataframe def add_slice_column(dataframe, column): dataframe['s'] = dataframe.apply(lambda r: r[column][0:1].hex(), axis=1) return dataframe def get_tqdm_remaining(pbar): try: old_format = pbar.bar_format pbar.bar_format = '{remaining}' remaining_str = repr(pbar) pbar.bar_format = old_format L = remaining_str.split(':') if len(L) < 3: L = [0] + L return datetime.timedelta(hours=int(L[0]), minutes=int(L[1]), seconds=int(L[2])) except Exception: return datetime.timedelta(seconds=0) def get_estimated_row_count(engine, table_name): res = engine.execute('SELECT reltuples FROM pg_class WHERE relname = %s', (table_name,)) return int(next(res)[0]) class ParquetWriter: def __init__(self, table): self.table = table self.dataframes = [] def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.flush() def write(self, dataframe): dataframe = dataframe_memoryview_to_bytes(dataframe) self.dataframes.append(dataframe) if len(self.dataframes) > PARQUET_SUPERCHUNK_SIZE: self.flush() usage = memory_usage() if usage > 0.4: print("Flushing because of high memory usage ({}%)" .format(usage * 100)) self.flush() def flush(self): dataframe = pandas.concat(self.dataframes) partition_cols = None # partitioning was a stupid idea if self.table['partition'] and False: dataframe = add_slice_column(dataframe, self.table['partition']) partition_cols = ['s'] parquet_table = pyarrow.Table.from_pandas(dataframe) parquet_path = os.path.join(PARQUET_DATASET, self.table['name']) parquet.write_to_dataset(parquet_table, root_path=parquet_path, partition_cols=partition_cols) self.dataframes = [] def export(table, pandas_iterator, total): with tqdm.tqdm(total=total) as pbar: with ParquetWriter(table) as writer: for i, dataframe in enumerate(pandas_iterator): writer.write(dataframe) pbar.update(len(dataframe.index)) return get_tqdm_remaining(pbar) def write_worker(table, q, pbar): with ParquetWriter(table) as writer: while True: dataframe = q.get() if dataframe is None: break writer.write(dataframe) pbar.update(len(dataframe.index)) q.task_done() def export_parallel(table, pandas_iterator, total): with tqdm.tqdm(total=total) as pbar: q = queue.Queue(maxsize=NUM_WRITER_THREADS * 8) threads = [] for i in range(NUM_WRITER_THREADS): t = threading.Thread(target=write_worker, args=(table, q, pbar)) t.start() threads.append(t) for i, dataframe in enumerate(pandas_iterator): q.put(dataframe) q.join() for i in range(NUM_WRITER_THREADS): q.put(None) for t in threads: t.join() return get_tqdm_remaining(pbar) def main(): estimate = datetime.timedelta() for table in TABLES[5:6]: conn = sqlalchemy.create_engine(DB_CONN, server_side_cursors=True) total = get_estimated_row_count(conn, table['name']) if ESTIMATE: it = pandas.read_sql_query("select {} from {} limit 400000" .format('*' if table['columns'] is all else ','.join(table['columns']), table['name']), DB_CONN, chunksize=ROW_CHUNK_SIZE) else: it = pandas.read_sql_table(table['name'], conn, chunksize=ROW_CHUNK_SIZE, columns=(None if table['columns'] is all else table['columns'])) print('Table {}...'.format(table['name'])) estimate += export_parallel(table, it, total) print(estimate) if __name__ == '__main__': main() diff --git a/seirl/awsupload/parquet/tables.py b/seirl/awsupload/parquet/tables.py new file mode 100644 index 0000000..74560e7 --- /dev/null +++ b/seirl/awsupload/parquet/tables.py @@ -0,0 +1,143 @@ +TABLES = [ + { + 'name': 'origin_visit', + 'columns': [ + ('origin', 'int'), + ('visit', 'int'), + ('date', 'timestamp'), + ('status', 'string'), + ('snapshot_id', 'int'), + ], + 'partition': None + }, + { + 'name': 'origin', + 'columns': [ + ('id', 'int'), + ('type', 'string'), + ('url', 'string'), + ], + 'partition': None + }, + { + 'name': 'snapshot_branches', + 'columns': [ + ('snapshot_id', 'int'), + ('branch_id', 'int'), + ], + 'partition': None + }, + { + 'name': 'snapshot_branch', + 'columns': [ + ('object_id', 'int'), + ('name', 'binary'), + ('target', 'binary'), + ('target_type', 'string'), + ], + 'partition': None + }, + { + 'name': 'snapshot', + 'columns': [ + ('object_id', 'int'), + ('id', 'binary'), + ], + 'partition': 'id' + }, + { + 'name': 'release', + 'columns': [ + ('id', 'binary'), + ('target', 'binary'), + ('date', 'timestamp'), + ('date_offset', 'int'), + ('name', 'binary'), + ('comment', 'binary'), + ('author', 'int'), + ('target_type', 'string'), + ], + 'partition': 'id' + }, + { + 'name': 'revision', + 'columns': [ + ('id', 'binary'), + ('date', 'timestamp'), + ('date_offset', 'int'), + ('committer_date', 'timestamp'), + ('committer_date_offset', 'int'), + ('type', 'string'), + ('directory', 'binary'), + ('message', 'binary'), + ('author', 'int'), + ('committer', 'int'), + ], + 'partition': 'id' + }, + { + 'name': 'person', + 'columns': [ # Don't export personal information + ('id', 'int'), + ], + 'partition': 'id' + }, + { + 'name': 'directory_entry_rev', + 'columns': [ + ('id', 'int'), + ('target', 'binary'), + ('name', 'binary'), + ('perms', 'int'), + ], + 'partition': 'target' + }, + { + 'name': 'directory_entry_file', + 'columns': [ + ('id', 'int'), + ('target', 'binary'), + ('name', 'binary'), + ('perms', 'int'), + ], + 'partition': 'target' + }, + { + 'name': 'directory_entry_dir', + 'columns': [ + ('id', 'int'), + ('target', 'binary'), + ('name', 'binary'), + ('perms', 'int'), + ], + 'partition': 'target' + }, + { + 'name': 'directory', + 'columns': [ + ('id', 'binary'), + ('dir_entries', 'array'), + ('file_entries', 'array'), + ('rev_entries', 'array'), + ], + 'partition': 'id' + }, + { + 'name': 'skipped_content', + 'columns': [ + ('sha1', 'binary'), + ('sha1_git', 'binary'), + ('length', 'int'), + ], + 'partition': 'sha1_git' + }, + { + 'name': 'content', + 'columns': [ + ('sha1', 'binary'), + ('sha1_git', 'binary'), + ('length', 'int'), + ], + 'partition': 'sha1_git' + }, +]