Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/mypy.ini b/mypy.ini
index eb12064..f609020 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,27 +1,33 @@
[mypy]
namespace_packages = True
warn_unused_ignores = True
# 3rd party libraries without stubs (yet)
[mypy-pkg_resources.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
[mypy-tqdm.*]
ignore_missing_imports = True
[mypy-confluent_kafka.*]
ignore_missing_imports = True
[mypy-pyorc.*]
ignore_missing_imports = True
[mypy-plyvel.*]
ignore_missing_imports = True
+[mypy-boto3.*]
+ignore_missing_imports = True
+
+[mypy-botocore.*]
+ignore_missing_imports = True
+
# [mypy-add_your_lib_here.*]
# ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
index 7368380..b7f41f0 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,7 +1,8 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
+boto3
click
tqdm
pyorc
plyvel
diff --git a/swh/dataset/athena.py b/swh/dataset/athena.py
new file mode 100644
index 0000000..73250f9
--- /dev/null
+++ b/swh/dataset/athena.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python3
+
+import datetime
+import logging
+import os
+import sys
+import textwrap
+import time
+
+import boto3
+import botocore.exceptions
+
+from swh.dataset.relational import TABLES
+
+
+def create_database(database_name):
+ return "CREATE DATABASE IF NOT EXISTS {};".format(database_name)
+
+
+def drop_table(database_name, table):
+ return "DROP TABLE IF EXISTS {}.{};".format(database_name, table)
+
+
+def create_table(database_name, table, location_prefix):
+ req = textwrap.dedent(
+ """\
+ CREATE EXTERNAL TABLE IF NOT EXISTS {db}.{table} (
+ {fields}
+ )
+ STORED AS ORC
+ LOCATION '{location}/'
+ TBLPROPERTIES ("orc.compress"="ZLIB");
+ """
+ ).format(
+ db=database_name,
+ table=table,
+ fields=",\n".join(
+ [
+ " `{}` {}".format(col_name, col_type)
+ for col_name, col_type in TABLES[table]
+ ]
+ ),
+ location=os.path.join(location_prefix, "orc", table),
+ )
+ return req
+
+
+def repair_table(database_name, table):
+ return "MSCK REPAIR TABLE {}.{};".format(database_name, table)
+
+
+def query(client, query_string, *, desc="Querying", delay_secs=0.5, silent=False):
+ def log(*args, **kwargs):
+ if not silent:
+ print(*args, **kwargs, flush=True, file=sys.stderr)
+
+ log(desc, end="...")
+ query_options = {
+ "QueryString": query_string,
+ "ResultConfiguration": {},
+ "QueryExecutionContext": {},
+ }
+ if client.output_location:
+ query_options["ResultConfiguration"]["OutputLocation"] = client.output_location
+ if client.database_name:
+ query_options["QueryExecutionContext"]["Database"] = client.database_name
+ try:
+ res = client.start_query_execution(**query_options)
+ except botocore.exceptions.ClientError as e:
+ raise RuntimeError(
+ str(e) + "\n\nQuery:\n" + textwrap.indent(query_string, " " * 2)
+ )
+ qid = res["QueryExecutionId"]
+ while True:
+ time.sleep(delay_secs)
+ log(".", end="")
+ execution = client.get_query_execution(QueryExecutionId=qid)
+ status = execution["QueryExecution"]["Status"]
+ if status["State"] in ("SUCCEEDED", "FAILED", "CANCELLED"):
+ break
+ log(" {}.".format(status["State"]))
+ if status["State"] != "SUCCEEDED":
+ raise RuntimeError(
+ status["StateChangeReason"]
+ + "\n\nQuery:\n"
+ + textwrap.indent(query_string, " " * 2)
+ )
+
+ return execution["QueryExecution"]
+
+
+def create_tables(database_name, dataset_location, output_location=None, replace=False):
+ client = boto3.client("athena")
+ client.output_location = output_location
+ query(
+ client,
+ create_database(database_name),
+ desc="Creating {} database".format(database_name),
+ )
+
+ if replace:
+ for table in TABLES:
+ query(
+ client,
+ drop_table(database_name, table),
+ desc="Dropping table {}".format(table),
+ )
+
+ for table in TABLES:
+ query(
+ client,
+ create_table(database_name, table, dataset_location),
+ desc="Creating table {}".format(table),
+ )
+
+ for table in TABLES:
+ query(
+ client,
+ repair_table(database_name, table),
+ desc="Refreshing table metadata for {}".format(table),
+ )
+
+
+def human_size(n, units=["bytes", "KB", "MB", "GB", "TB", "PB", "EB"]):
+ """ Returns a human readable string representation of bytes """
+ return f"{n} " + units[0] if n < 1024 else human_size(n >> 10, units[1:])
+
+
+def run_query_get_results(
+ database_name, query_string, output_location=None,
+):
+ athena = boto3.client("athena")
+ athena.output_location = output_location
+ athena.database_name = database_name
+
+ s3 = boto3.client("s3")
+
+ result = query(athena, query_string, silent=True)
+ logging.info(
+ "Scanned %s in %s",
+ human_size(result["Statistics"]["DataScannedInBytes"]),
+ datetime.timedelta(
+ milliseconds=result["Statistics"]["TotalExecutionTimeInMillis"]
+ ),
+ )
+
+ loc = result["ResultConfiguration"]["OutputLocation"][len("s3://") :]
+ bucket, path = loc.split("/", 1)
+ result = s3.get_object(Bucket=bucket, Key=path)["Body"].read().decode()
+ print(result, end="") # CSV already ends with \n
diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py
index 58ea746..dfae757 100644
--- a/swh/dataset/cli.py
+++ b/swh/dataset/cli.py
@@ -1,123 +1,180 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import os
import pathlib
+import sys
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.dataset.exporters.edges import GraphEdgesExporter
from swh.dataset.exporters.orc import ORCExporter
from swh.dataset.journalprocessor import ParallelJournalProcessor
@swh_cli_group.group(name="dataset", context_settings=CONTEXT_SETTINGS)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False),
help="Configuration file.",
)
@click.pass_context
def dataset_cli_group(ctx, config_file):
"""Software Heritage Dataset Tools"""
from swh.core import config
ctx.ensure_object(dict)
conf = config.read(config_file)
ctx.obj["config"] = conf
@dataset_cli_group.group("graph")
@click.pass_context
def graph(ctx):
"""Manage graph export"""
pass
AVAILABLE_EXPORTERS = {
"edges": GraphEdgesExporter,
"orc": ORCExporter,
}
@graph.command("export")
@click.argument("export-path", type=click.Path())
@click.option("--export-id", "-e", help="Unique ID of the export run.")
@click.option(
"--formats",
"-f",
type=click.STRING,
default=",".join(AVAILABLE_EXPORTERS.keys()),
show_default=True,
help="Formats to export.",
)
@click.option("--processes", "-p", default=1, help="Number of parallel processes")
@click.option(
"--exclude",
type=click.STRING,
help="Comma-separated list of object types to exclude",
)
@click.pass_context
def export_graph(ctx, export_path, export_id, formats, exclude, processes):
"""Export the Software Heritage graph as an edge dataset."""
import uuid
config = ctx.obj["config"]
if not export_id:
export_id = str(uuid.uuid4())
exclude_obj_types = {o.strip() for o in (exclude.split(",") if exclude else [])}
export_formats = [c.strip() for c in formats.split(",")]
for f in export_formats:
if f not in AVAILABLE_EXPORTERS:
raise click.BadOptionUsage(
option_name="formats", message=f"{f} is not an available format."
)
# Run the exporter for each edge type.
object_types = [
"origin",
"origin_visit",
"origin_visit_status",
"snapshot",
"release",
"revision",
"directory",
"content",
"skipped_content",
]
for obj_type in object_types:
if obj_type in exclude_obj_types:
continue
exporters = [
(AVAILABLE_EXPORTERS[f], {"export_path": os.path.join(export_path, f)},)
for f in export_formats
]
parallel_exporter = ParallelJournalProcessor(
config,
exporters,
export_id,
obj_type,
node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type,
processes=processes,
)
print("Exporting {}:".format(obj_type))
parallel_exporter.run()
@graph.command("sort")
@click.argument("export-path", type=click.Path())
@click.pass_context
def sort_graph(ctx, export_path):
config = ctx.obj["config"]
from swh.dataset.exporters.edges import sort_graph_nodes
sort_graph_nodes(export_path, config)
+
+
+@dataset_cli_group.group("athena")
+@click.pass_context
+def athena(ctx):
+ """Manage remote AWS Athena database"""
+ pass
+
+
+@athena.command("create")
+@click.option(
+ "--database-name", "-d", default="swh", help="Name of the database to create"
+)
+@click.option(
+ "--location-prefix",
+ "-l",
+ required=True,
+ help="S3 prefix where the dataset can be found",
+)
+@click.option(
+ "-o", "--output-location", help="S3 prefix where results should be stored"
+)
+@click.option(
+ "-r", "--replace-tables", is_flag=True, help="Replace the tables that already exist"
+)
+def athena_create(
+ database_name, location_prefix, output_location=None, replace_tables=False
+):
+ """Create tables on AWS Athena pointing to a given graph dataset on S3."""
+ from swh.dataset.athena import create_tables
+
+ create_tables(
+ database_name,
+ location_prefix,
+ output_location=output_location,
+ replace=replace_tables,
+ )
+
+
+@athena.command("query")
+@click.option(
+ "--database-name", "-d", default="swh", help="Name of the database to query"
+)
+@click.option(
+ "-o", "--output-location", help="S3 prefix where results should be stored"
+)
+@click.argument("query_file", type=click.File("r"), default=sys.stdin)
+def athena_query(
+ database_name, query_file, output_location=None,
+):
+ """Query the AWS Athena database with a given command"""
+ from swh.dataset.athena import run_query_get_results
+
+ run_query_get_results(
+ database_name, query_file.read(), output_location=output_location,
+ )

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 8:50 AM (6 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3291684

Event Timeline