diff --git a/seirl/awsupload/parquet/gen_schema.py b/seirl/awsupload/parquet/gen_schema.py index 11f0579..57d7d4f 100755 --- a/seirl/awsupload/parquet/gen_schema.py +++ b/seirl/awsupload/parquet/gen_schema.py @@ -1,71 +1,84 @@ #!/usr/bin/env python3 import boto3 import botocore.exceptions +import sys import textwrap import time from tables import TABLES OUTPUT_LOCATION = 's3://softwareheritage/queries/' def create_database(database_name): return 'CREATE DATABASE IF NOT EXISTS {};'.format(database_name) +def drop_table(table): + return 'DROP TABLE IF EXISTS swh.{};'.format(table['name']) + + def create_table(table): l = ['CREATE EXTERNAL TABLE IF NOT EXISTS swh.{} ('.format(table['name'])] for i, (column_name, column_type) in enumerate(table['columns']): l.append(' `{}` {}{}'.format( column_name, column_type.upper(), ',' if i < len(table['columns']) - 1 else '')) l.append(')') l.append('STORED AS PARQUET') l.append("LOCATION 's3://softwareheritage/graph/{}/'" .format(table['name'])) l.append('TBLPROPERTIES ("parquet.compress"="SNAPPY");') return '\n'.join(l) def repair_table(table): return 'MSCK REPAIR TABLE {};'.format(table['name']) def query(client, query_string, *, desc='Querying', delay_secs=0.5): print(desc, end='...', flush=True) try: res = client.start_query_execution( QueryString=query_string, ResultConfiguration={'OutputLocation': OUTPUT_LOCATION} ) except botocore.exceptions.ClientError as e: raise RuntimeError(str(e) + '\n\nQuery:\n' + textwrap.indent(query_string, ' ' * 2)) qid = res['QueryExecutionId'] while True: time.sleep(delay_secs) print('.', end='', flush=True) execution = client.get_query_execution(QueryExecutionId=qid) status = execution['QueryExecution']['Status'] if status['State'] in ('SUCCEEDED', 'FAILED', 'CANCELLED'): break print(' {}.'.format(status['State']), flush=True) if status['State'] != 'SUCCEEDED': raise RuntimeError(status['StateChangeReason'] + '\n\nQuery:\n' + textwrap.indent(query_string, ' ' * 2)) +def query(client, query_string, **kwargs): + print(query_string) + + def main(): client = boto3.client('athena') query(client, create_database('swh'), desc='Creating swh database') + if len(sys.argv) >= 2 and sys.argv[1] == '--replace-tables': + for table in TABLES: + query(client, drop_table(table), + desc='Dropping table {}'.format(table['name'])) for table in TABLES: query(client, create_table(table), desc='Creating table {}'.format(table['name'])) for table in TABLES: query(client, repair_table(table), desc='Refreshing table metadata for {}'.format(table['name'])) if __name__ == '__main__': main() diff --git a/seirl/awsupload/parquet/tables.py b/seirl/awsupload/parquet/tables.py index 6a0986f..2e195f8 100644 --- a/seirl/awsupload/parquet/tables.py +++ b/seirl/awsupload/parquet/tables.py @@ -1,152 +1,153 @@ TABLES = [ { 'name': 'origin_visit', 'columns': [ - ('origin', 'int'), - ('visit', 'int'), - ('date', 'timestamp'), + ('origin', 'bigint'), + ('visit', 'bigint'), + ('date', 'bigint'), ('status', 'string'), - ('snapshot_id', 'int'), + ('metadata', 'string'), + ('snapshot_id', 'bigint'), ], 'partition': None }, { 'name': 'origin', 'columns': [ - ('id', 'int'), + ('id', 'bigint'), ('type', 'string'), ('url', 'string'), ], 'partition': None }, { 'name': 'snapshot_branches', 'columns': [ - ('snapshot_id', 'int'), - ('branch_id', 'int'), + ('snapshot_id', 'bigint'), + ('branch_id', 'bigint'), ], 'partition': None }, { 'name': 'snapshot_branch', 'columns': [ - ('object_id', 'int'), + ('object_id', 'bigint'), ('name', 'binary'), ('target', 'binary'), ('target_type', 'string'), ], 'partition': None }, { 'name': 'snapshot', 'columns': [ - ('object_id', 'int'), + ('object_id', 'bigint'), ('id', 'binary'), ], 'partition': 'id' }, { 'name': 'release', 'columns': [ ('id', 'binary'), ('target', 'binary'), - ('date', 'timestamp'), - ('date_offset', 'int'), + ('date', 'bigint'), + ('date_offset', 'smallint'), ('name', 'binary'), ('comment', 'binary'), - ('author', 'int'), + ('author', 'bigint'), ('target_type', 'string'), ], 'partition': 'id' }, { 'name': 'revision_history', 'columns': [ ('id', 'binary'), ('parent_id', 'binary'), ('parent_rank', 'int'), ], 'partition': 'id' }, { 'name': 'revision', 'columns': [ ('id', 'binary'), - ('date', 'timestamp'), - ('date_offset', 'int'), - ('committer_date', 'timestamp'), - ('committer_date_offset', 'int'), + ('date', 'bigint'), + ('date_offset', 'smallint'), + ('committer_date', 'bigint'), + ('committer_date_offset', 'smallint'), ('type', 'string'), ('directory', 'binary'), ('message', 'binary'), - ('author', 'int'), - ('committer', 'int'), + ('author', 'bigint'), + ('committer', 'bigint'), ], 'partition': 'id' }, { 'name': 'person', 'columns': [ # Don't export personal information - ('id', 'int'), + ('id', 'bigint'), ], 'partition': 'id' }, { 'name': 'directory_entry_rev', 'columns': [ - ('id', 'int'), + ('id', 'bigint'), ('target', 'binary'), ('name', 'binary'), ('perms', 'int'), ], 'partition': 'target' }, { 'name': 'directory_entry_file', 'columns': [ - ('id', 'int'), + ('id', 'bigint'), ('target', 'binary'), ('name', 'binary'), ('perms', 'int'), ], 'partition': 'target' }, { 'name': 'directory_entry_dir', 'columns': [ - ('id', 'int'), + ('id', 'bigint'), ('target', 'binary'), ('name', 'binary'), ('perms', 'int'), ], 'partition': 'target' }, { 'name': 'directory', 'columns': [ ('id', 'binary'), - ('dir_entries', 'array'), - ('file_entries', 'array'), - ('rev_entries', 'array'), + ('dir_entries', 'array'), + ('file_entries', 'array'), + ('rev_entries', 'array'), ], 'partition': 'id' }, { 'name': 'skipped_content', 'columns': [ ('sha1', 'binary'), ('sha1_git', 'binary'), - ('length', 'int'), + ('length', 'bigint'), ], 'partition': 'sha1_git' }, { 'name': 'content', 'columns': [ ('sha1', 'binary'), ('sha1_git', 'binary'), - ('length', 'int'), + ('length', 'bigint'), ], 'partition': 'sha1_git' }, ]