Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9338478
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Subscribers
None
View Options
diff --git a/mypy.ini b/mypy.ini
index eb12064..f609020 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,27 +1,33 @@
[mypy]
namespace_packages = True
warn_unused_ignores = True
# 3rd party libraries without stubs (yet)
[mypy-pkg_resources.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
[mypy-tqdm.*]
ignore_missing_imports = True
[mypy-confluent_kafka.*]
ignore_missing_imports = True
[mypy-pyorc.*]
ignore_missing_imports = True
[mypy-plyvel.*]
ignore_missing_imports = True
+[mypy-boto3.*]
+ignore_missing_imports = True
+
+[mypy-botocore.*]
+ignore_missing_imports = True
+
# [mypy-add_your_lib_here.*]
# ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
index 7368380..b7f41f0 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,7 +1,8 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
+boto3
click
tqdm
pyorc
plyvel
diff --git a/swh/dataset/athena.py b/swh/dataset/athena.py
new file mode 100644
index 0000000..73250f9
--- /dev/null
+++ b/swh/dataset/athena.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python3
+
+import datetime
+import logging
+import os
+import sys
+import textwrap
+import time
+
+import boto3
+import botocore.exceptions
+
+from swh.dataset.relational import TABLES
+
+
+def create_database(database_name):
+ return "CREATE DATABASE IF NOT EXISTS {};".format(database_name)
+
+
+def drop_table(database_name, table):
+ return "DROP TABLE IF EXISTS {}.{};".format(database_name, table)
+
+
+def create_table(database_name, table, location_prefix):
+ req = textwrap.dedent(
+ """\
+ CREATE EXTERNAL TABLE IF NOT EXISTS {db}.{table} (
+ {fields}
+ )
+ STORED AS ORC
+ LOCATION '{location}/'
+ TBLPROPERTIES ("orc.compress"="ZLIB");
+ """
+ ).format(
+ db=database_name,
+ table=table,
+ fields=",\n".join(
+ [
+ " `{}` {}".format(col_name, col_type)
+ for col_name, col_type in TABLES[table]
+ ]
+ ),
+ location=os.path.join(location_prefix, "orc", table),
+ )
+ return req
+
+
+def repair_table(database_name, table):
+ return "MSCK REPAIR TABLE {}.{};".format(database_name, table)
+
+
+def query(client, query_string, *, desc="Querying", delay_secs=0.5, silent=False):
+ def log(*args, **kwargs):
+ if not silent:
+ print(*args, **kwargs, flush=True, file=sys.stderr)
+
+ log(desc, end="...")
+ query_options = {
+ "QueryString": query_string,
+ "ResultConfiguration": {},
+ "QueryExecutionContext": {},
+ }
+ if client.output_location:
+ query_options["ResultConfiguration"]["OutputLocation"] = client.output_location
+ if client.database_name:
+ query_options["QueryExecutionContext"]["Database"] = client.database_name
+ try:
+ res = client.start_query_execution(**query_options)
+ except botocore.exceptions.ClientError as e:
+ raise RuntimeError(
+ str(e) + "\n\nQuery:\n" + textwrap.indent(query_string, " " * 2)
+ )
+ qid = res["QueryExecutionId"]
+ while True:
+ time.sleep(delay_secs)
+ log(".", end="")
+ execution = client.get_query_execution(QueryExecutionId=qid)
+ status = execution["QueryExecution"]["Status"]
+ if status["State"] in ("SUCCEEDED", "FAILED", "CANCELLED"):
+ break
+ log(" {}.".format(status["State"]))
+ if status["State"] != "SUCCEEDED":
+ raise RuntimeError(
+ status["StateChangeReason"]
+ + "\n\nQuery:\n"
+ + textwrap.indent(query_string, " " * 2)
+ )
+
+ return execution["QueryExecution"]
+
+
+def create_tables(database_name, dataset_location, output_location=None, replace=False):
+ client = boto3.client("athena")
+ client.output_location = output_location
+ query(
+ client,
+ create_database(database_name),
+ desc="Creating {} database".format(database_name),
+ )
+
+ if replace:
+ for table in TABLES:
+ query(
+ client,
+ drop_table(database_name, table),
+ desc="Dropping table {}".format(table),
+ )
+
+ for table in TABLES:
+ query(
+ client,
+ create_table(database_name, table, dataset_location),
+ desc="Creating table {}".format(table),
+ )
+
+ for table in TABLES:
+ query(
+ client,
+ repair_table(database_name, table),
+ desc="Refreshing table metadata for {}".format(table),
+ )
+
+
+def human_size(n, units=["bytes", "KB", "MB", "GB", "TB", "PB", "EB"]):
+ """ Returns a human readable string representation of bytes """
+ return f"{n} " + units[0] if n < 1024 else human_size(n >> 10, units[1:])
+
+
+def run_query_get_results(
+ database_name, query_string, output_location=None,
+):
+ athena = boto3.client("athena")
+ athena.output_location = output_location
+ athena.database_name = database_name
+
+ s3 = boto3.client("s3")
+
+ result = query(athena, query_string, silent=True)
+ logging.info(
+ "Scanned %s in %s",
+ human_size(result["Statistics"]["DataScannedInBytes"]),
+ datetime.timedelta(
+ milliseconds=result["Statistics"]["TotalExecutionTimeInMillis"]
+ ),
+ )
+
+ loc = result["ResultConfiguration"]["OutputLocation"][len("s3://") :]
+ bucket, path = loc.split("/", 1)
+ result = s3.get_object(Bucket=bucket, Key=path)["Body"].read().decode()
+ print(result, end="") # CSV already ends with \n
diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py
index 58ea746..dfae757 100644
--- a/swh/dataset/cli.py
+++ b/swh/dataset/cli.py
@@ -1,123 +1,180 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import os
import pathlib
+import sys
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.dataset.exporters.edges import GraphEdgesExporter
from swh.dataset.exporters.orc import ORCExporter
from swh.dataset.journalprocessor import ParallelJournalProcessor
@swh_cli_group.group(name="dataset", context_settings=CONTEXT_SETTINGS)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False),
help="Configuration file.",
)
@click.pass_context
def dataset_cli_group(ctx, config_file):
"""Software Heritage Dataset Tools"""
from swh.core import config
ctx.ensure_object(dict)
conf = config.read(config_file)
ctx.obj["config"] = conf
@dataset_cli_group.group("graph")
@click.pass_context
def graph(ctx):
"""Manage graph export"""
pass
AVAILABLE_EXPORTERS = {
"edges": GraphEdgesExporter,
"orc": ORCExporter,
}
@graph.command("export")
@click.argument("export-path", type=click.Path())
@click.option("--export-id", "-e", help="Unique ID of the export run.")
@click.option(
"--formats",
"-f",
type=click.STRING,
default=",".join(AVAILABLE_EXPORTERS.keys()),
show_default=True,
help="Formats to export.",
)
@click.option("--processes", "-p", default=1, help="Number of parallel processes")
@click.option(
"--exclude",
type=click.STRING,
help="Comma-separated list of object types to exclude",
)
@click.pass_context
def export_graph(ctx, export_path, export_id, formats, exclude, processes):
"""Export the Software Heritage graph as an edge dataset."""
import uuid
config = ctx.obj["config"]
if not export_id:
export_id = str(uuid.uuid4())
exclude_obj_types = {o.strip() for o in (exclude.split(",") if exclude else [])}
export_formats = [c.strip() for c in formats.split(",")]
for f in export_formats:
if f not in AVAILABLE_EXPORTERS:
raise click.BadOptionUsage(
option_name="formats", message=f"{f} is not an available format."
)
# Run the exporter for each edge type.
object_types = [
"origin",
"origin_visit",
"origin_visit_status",
"snapshot",
"release",
"revision",
"directory",
"content",
"skipped_content",
]
for obj_type in object_types:
if obj_type in exclude_obj_types:
continue
exporters = [
(AVAILABLE_EXPORTERS[f], {"export_path": os.path.join(export_path, f)},)
for f in export_formats
]
parallel_exporter = ParallelJournalProcessor(
config,
exporters,
export_id,
obj_type,
node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type,
processes=processes,
)
print("Exporting {}:".format(obj_type))
parallel_exporter.run()
@graph.command("sort")
@click.argument("export-path", type=click.Path())
@click.pass_context
def sort_graph(ctx, export_path):
config = ctx.obj["config"]
from swh.dataset.exporters.edges import sort_graph_nodes
sort_graph_nodes(export_path, config)
+
+
+@dataset_cli_group.group("athena")
+@click.pass_context
+def athena(ctx):
+ """Manage remote AWS Athena database"""
+ pass
+
+
+@athena.command("create")
+@click.option(
+ "--database-name", "-d", default="swh", help="Name of the database to create"
+)
+@click.option(
+ "--location-prefix",
+ "-l",
+ required=True,
+ help="S3 prefix where the dataset can be found",
+)
+@click.option(
+ "-o", "--output-location", help="S3 prefix where results should be stored"
+)
+@click.option(
+ "-r", "--replace-tables", is_flag=True, help="Replace the tables that already exist"
+)
+def athena_create(
+ database_name, location_prefix, output_location=None, replace_tables=False
+):
+ """Create tables on AWS Athena pointing to a given graph dataset on S3."""
+ from swh.dataset.athena import create_tables
+
+ create_tables(
+ database_name,
+ location_prefix,
+ output_location=output_location,
+ replace=replace_tables,
+ )
+
+
+@athena.command("query")
+@click.option(
+ "--database-name", "-d", default="swh", help="Name of the database to query"
+)
+@click.option(
+ "-o", "--output-location", help="S3 prefix where results should be stored"
+)
+@click.argument("query_file", type=click.File("r"), default=sys.stdin)
+def athena_query(
+ database_name, query_file, output_location=None,
+):
+ """Query the AWS Athena database with a given command"""
+ from swh.dataset.athena import run_query_get_results
+
+ run_query_get_results(
+ database_name, query_file.read(), output_location=output_location,
+ )
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 8:50 AM (6 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3291684
Attached To
rDDATASET Datasets
Event Timeline
Log In to Comment