diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -23,5 +23,11 @@ [mypy-plyvel.*] ignore_missing_imports = True +[mypy-boto3.*] +ignore_missing_imports = True + +[mypy-botocore.*] +ignore_missing_imports = True + # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html +boto3 click tqdm pyorc diff --git a/swh/dataset/athena.py b/swh/dataset/athena.py new file mode 100644 --- /dev/null +++ b/swh/dataset/athena.py @@ -0,0 +1,167 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +This module implements the "athena" subcommands for the CLI. It can install and +query a remote AWS Athena database. +""" + +import datetime +import logging +import os +import sys +import textwrap +import time + +import boto3 +import botocore.exceptions + +from swh.dataset.relational import TABLES + + +def create_database(database_name): + return "CREATE DATABASE IF NOT EXISTS {};".format(database_name) + + +def drop_table(database_name, table): + return "DROP TABLE IF EXISTS {}.{};".format(database_name, table) + + +def create_table(database_name, table, location_prefix): + req = textwrap.dedent( + """\ + CREATE EXTERNAL TABLE IF NOT EXISTS {db}.{table} ( + {fields} + ) + STORED AS ORC + LOCATION '{location}/' + TBLPROPERTIES ("orc.compress"="ZLIB"); + """ + ).format( + db=database_name, + table=table, + fields=",\n".join( + [ + " `{}` {}".format(col_name, col_type) + for col_name, col_type in TABLES[table] + ] + ), + location=os.path.join(location_prefix, "orc", table), + ) + return req + + +def repair_table(database_name, table): + return "MSCK REPAIR TABLE {}.{};".format(database_name, table) + + +def query(client, query_string, *, desc="Querying", delay_secs=0.5, silent=False): + def log(*args, **kwargs): + if not silent: + print(*args, **kwargs, flush=True, file=sys.stderr) + + log(desc, end="...") + query_options = { + "QueryString": query_string, + "ResultConfiguration": {}, + "QueryExecutionContext": {}, + } + if client.output_location: + query_options["ResultConfiguration"]["OutputLocation"] = client.output_location + if client.database_name: + query_options["QueryExecutionContext"]["Database"] = client.database_name + try: + res = client.start_query_execution(**query_options) + except botocore.exceptions.ClientError as e: + raise RuntimeError( + str(e) + "\n\nQuery:\n" + textwrap.indent(query_string, " " * 2) + ) + qid = res["QueryExecutionId"] + while True: + time.sleep(delay_secs) + log(".", end="") + execution = client.get_query_execution(QueryExecutionId=qid) + status = execution["QueryExecution"]["Status"] + if status["State"] in ("SUCCEEDED", "FAILED", "CANCELLED"): + break + log(" {}.".format(status["State"])) + if status["State"] != "SUCCEEDED": + raise RuntimeError( + status["StateChangeReason"] + + "\n\nQuery:\n" + + textwrap.indent(query_string, " " * 2) + ) + + return execution["QueryExecution"] + + +def create_tables(database_name, dataset_location, output_location=None, replace=False): + """ + Create the Software Heritage Dataset tables on AWS Athena. + + Athena works on external columnar data stored in S3, but requires a schema + for each table to run queries. This creates all the necessary tables + remotely by using the relational schemas in swh.dataset.relational. + """ + client = boto3.client("athena") + client.output_location = output_location + query( + client, + create_database(database_name), + desc="Creating {} database".format(database_name), + ) + + if replace: + for table in TABLES: + query( + client, + drop_table(database_name, table), + desc="Dropping table {}".format(table), + ) + + for table in TABLES: + query( + client, + create_table(database_name, table, dataset_location), + desc="Creating table {}".format(table), + ) + + for table in TABLES: + query( + client, + repair_table(database_name, table), + desc="Refreshing table metadata for {}".format(table), + ) + + +def human_size(n, units=["bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]): + """ Returns a human readable string representation of bytes """ + return f"{n} " + units[0] if n < 1024 else human_size(n >> 10, units[1:]) + + +def run_query_get_results( + database_name, query_string, output_location=None, +): + """ + Run a query on AWS Athena and return the resulting data in CSV format. + """ + athena = boto3.client("athena") + athena.output_location = output_location + athena.database_name = database_name + + s3 = boto3.client("s3") + + result = query(athena, query_string, silent=True) + logging.info( + "Scanned %s in %s", + human_size(result["Statistics"]["DataScannedInBytes"]), + datetime.timedelta( + milliseconds=result["Statistics"]["TotalExecutionTimeInMillis"] + ), + ) + + loc = result["ResultConfiguration"]["OutputLocation"][len("s3://") :] + bucket, path = loc.split("/", 1) + return s3.get_object(Bucket=bucket, Key=path)["Body"].read().decode() diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -7,6 +7,7 @@ # control import os import pathlib +import sys import click @@ -121,3 +122,62 @@ from swh.dataset.exporters.edges import sort_graph_nodes sort_graph_nodes(export_path, config) + + +@dataset_cli_group.group("athena") +@click.pass_context +def athena(ctx): + """Manage and query a remote AWS Athena database""" + pass + + +@athena.command("create") +@click.option( + "--database-name", "-d", default="swh", help="Name of the database to create" +) +@click.option( + "--location-prefix", + "-l", + required=True, + help="S3 prefix where the dataset can be found", +) +@click.option( + "-o", "--output-location", help="S3 prefix where results should be stored" +) +@click.option( + "-r", "--replace-tables", is_flag=True, help="Replace the tables that already exist" +) +def athena_create( + database_name, location_prefix, output_location=None, replace_tables=False +): + """Create tables on AWS Athena pointing to a given graph dataset on S3.""" + from swh.dataset.athena import create_tables + + create_tables( + database_name, + location_prefix, + output_location=output_location, + replace=replace_tables, + ) + + +@athena.command("query") +@click.option( + "--database-name", "-d", default="swh", help="Name of the database to query" +) +@click.option( + "-o", "--output-location", help="S3 prefix where results should be stored" +) +@click.argument("query_file", type=click.File("r"), default=sys.stdin) +def athena_query( + database_name, query_file, output_location=None, +): + """Query the AWS Athena database with a given command""" + from swh.dataset.athena import run_query_get_results + + print( + run_query_get_results( + database_name, query_file.read(), output_location=output_location, + ), + end="", + ) # CSV already ends with \n diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py --- a/swh/dataset/exporters/orc.py +++ b/swh/dataset/exporters/orc.py @@ -9,86 +9,28 @@ from pyorc import BigInt, Binary, Int, SmallInt, String, Struct, Timestamp, Writer from swh.dataset.exporter import ExporterDispatch +from swh.dataset.relational import TABLES from swh.dataset.utils import remove_pull_requests from swh.model.hashutil import hash_to_hex -# fmt: off +ORC_TYPE_MAP = { + "string": String, + "smallint": SmallInt, + "int": Int, + "bigint": BigInt, + "timestamp": Timestamp, + "binary": Binary, +} + EXPORT_SCHEMA = { - 'origin': Struct( - url=String() - ), - 'origin_visit': Struct( - origin=String(), - visit=BigInt(), - date=Timestamp(), - type=String(), - ), - 'origin_visit_status': Struct( - origin=String(), - visit=BigInt(), - date=Timestamp(), - status=String(), - snapshot=String(), - ), - 'snapshot': Struct( - id=String(), - ), - 'snapshot_branch': Struct( - snapshot_id=String(), - name=Binary(), - target=String(), - target_type=String(), - ), - 'release': Struct( - id=String(), - name=Binary(), - message=Binary(), - target=String(), - target_type=String(), - author=Binary(), - date=Timestamp(), - date_offset=SmallInt(), - ), - 'revision': Struct( - id=String(), - message=Binary(), - author=Binary(), - date=Timestamp(), - date_offset=SmallInt(), - committer=Binary(), - committer_date=Timestamp(), - committer_offset=SmallInt(), - directory=String(), - ), - 'directory': Struct( - id=String(), - ), - 'directory_entry': Struct( - directory_id=String(), - name=Binary(), - type=String(), - target=String(), - perms=Int(), - ), - 'content': Struct( - sha1=String(), - sha1_git=String(), - sha256=String(), - blake2s256=String(), - length=BigInt(), - status=String(), - ), - 'skipped_content': Struct( - sha1=String(), - sha1_git=String(), - sha256=String(), - blake2s256=String(), - length=BigInt(), - status=String(), - reason=String(), - ), + table_name: Struct( + **{ + column_name: ORC_TYPE_MAP[column_type]() + for column_name, column_type in columns + } + ) + for table_name, columns in TABLES.items() } -# fmt: on def hash_to_hex_or_none(hash): diff --git a/swh/dataset/relational.py b/swh/dataset/relational.py new file mode 100644 --- /dev/null +++ b/swh/dataset/relational.py @@ -0,0 +1,82 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# fmt: off +TABLES = { + "origin": [ + ("url", "string"), + ], + "origin_visit": [ + ("origin", "string"), + ("visit", "bigint"), + ("date", "timestamp"), + ("type", "string"), + ], + "origin_visit_status": [ + ("origin", "string"), + ("visit", "bigint"), + ("date", "timestamp"), + ("status", "string"), + ("snapshot", "string"), + ], + "snapshot": [ + ("id", "string"), + ], + "snapshot_branch": [ + ("snapshot_id", "string"), + ("name", "binary"), + ("target", "string"), + ("target_type", "string"), + ], + "release": [ + ("id", "string"), + ("name", "binary"), + ("message", "binary"), + ("target", "string"), + ("target_type", "string"), + ("author", "binary"), + ("date", "timestamp"), + ("date_offset", "smallint"), + ], + "revision": [ + ("id", "string"), + ("message", "binary"), + ("author", "binary"), + ("date", "timestamp"), + ("date_offset", "smallint"), + ("committer", "binary"), + ("committer_date", "timestamp"), + ("committer_offset", "smallint"), + ("directory", "string"), + ], + "directory": [ + ("id", "string"), + ], + "directory_entry": [ + ("directory_id", "string"), + ("name", "binary"), + ("type", "string"), + ("target", "string"), + ("perms", "int"), + ], + "content": [ + ("sha1", "string"), + ("sha1_git", "string"), + ("sha256", "string"), + ("blake2s256", "string"), + ("length", "bigint"), + ("status", "string"), + ], + "skipped_content": [ + ("sha1", "string"), + ("sha1_git", "string"), + ("sha256", "string"), + ("blake2s256", "string"), + ("length", "bigint"), + ("status", "string"), + ("reason", "string"), + ], +} +# fmt: on