Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9348258
D7010.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
D7010.diff
View Options
diff --git a/swh/dataset/athena.py b/swh/dataset/athena.py
--- a/swh/dataset/athena.py
+++ b/swh/dataset/athena.py
@@ -37,7 +37,7 @@
)
STORED AS ORC
LOCATION '{location}/'
- TBLPROPERTIES ("orc.compress"="ZLIB");
+ TBLPROPERTIES ("orc.compress"="ZSTD");
"""
).format(
db=database_name,
@@ -142,6 +142,12 @@
return f"{n} " + units[0] if n < 1024 else human_size(n >> 10, units[1:])
+def _s3_url_to_bucket_path(s3_url):
+ loc = s3_url.removeprefix("s3://")
+ bucket, path = loc.split("/", 1)
+ return bucket, path
+
+
def run_query_get_results(
database_name, query_string, output_location=None,
):
@@ -163,6 +169,105 @@
),
)
- loc = result["ResultConfiguration"]["OutputLocation"][len("s3://") :]
- bucket, path = loc.split("/", 1)
+ bucket, path = _s3_url_to_bucket_path(
+ result["ResultConfiguration"]["OutputLocation"]
+ )
return s3.get_object(Bucket=bucket, Key=path)["Body"].read().decode()
+
+
+def generate_subdataset(
+ dataset_db, subdataset_db, subdataset_s3_path, swhids_file, output_location=None,
+):
+ # Upload list of all the swhids included in the dataset
+ subdataset_bucket, subdataset_path = _s3_url_to_bucket_path(subdataset_s3_path)
+ s3_client = boto3.client("s3")
+ print(f"Uploading {swhids_file} to S3...")
+ s3_client.upload_file(
+ swhids_file,
+ subdataset_bucket,
+ os.path.join(subdataset_path, "swhids", "swhids.csv"),
+ )
+
+ athena_client = boto3.client("athena")
+ athena_client.output_location = output_location
+ athena_client.database_name = subdataset_db
+
+ # Create subdataset database
+ query(
+ athena_client,
+ create_database(subdataset_db),
+ desc="Creating {} database".format(subdataset_db),
+ )
+
+ # Create SWHID temporary table
+ create_swhid_table_query = textwrap.dedent(
+ """\
+ CREATE EXTERNAL TABLE IF NOT EXISTS {newdb}.swhids (
+ swhprefix string,
+ version int,
+ type string,
+ hash string
+ )
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY ':'
+ STORED AS TEXTFILE
+ LOCATION '{location}/swhids/'
+ """
+ ).format(newdb=subdataset_db, location=subdataset_s3_path)
+ query(
+ athena_client,
+ create_swhid_table_query,
+ desc="Creating SWHIDs table of subdataset",
+ )
+ query(
+ athena_client,
+ repair_table(subdataset_db, "swhids"),
+ desc="Refreshing table metadata for swhids table",
+ )
+
+ # Create join tables
+ query_tpl = textwrap.dedent(
+ """\
+ CREATE TABLE IF NOT EXISTS {newdb}.{table}
+ WITH (
+ format = 'ORC',
+ write_compression = 'ZSTD',
+ external_location = '{location}/{table}/'
+ )
+ AS SELECT * FROM {basedb}.{table}
+ WHERE {field} IN (select hash from swhids)
+ """
+ )
+ tables_join_field = [
+ ("origin", "lower(to_hex(sha1(to_utf8(url))))"),
+ ("origin_visit", "lower(to_hex(sha1(to_utf8(origin))))"),
+ ("origin_visit_status", "lower(to_hex(sha1(to_utf8(origin))))"),
+ ("snapshot", "id"),
+ ("snapshot_branch", "snapshot_id"),
+ ("release", "id"),
+ ("revision", "id"),
+ ("revision_history", "id"),
+ ("directory", "id"),
+ ("directory_entry", "directory_id"),
+ ("content", "sha1_git"),
+ ("skipped_content", "sha1_git"),
+ ]
+
+ for table, join_field in tables_join_field:
+ ctas_query = query_tpl.format(
+ newdb=subdataset_db,
+ basedb=dataset_db,
+ location=subdataset_s3_path,
+ table=table,
+ field=join_field,
+ )
+
+ # Temporary fix: Athena no longer supports >32MB rows, but some of
+ # the objects were added to the dataset before this restriction was
+ # in place.
+ if table == "revision":
+ ctas_query += " AND length(message) < 100000"
+
+ query(
+ athena_client, ctas_query, desc="Creating join table {}".format(table),
+ )
diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py
--- a/swh/dataset/cli.py
+++ b/swh/dataset/cli.py
@@ -181,3 +181,37 @@
),
end="",
) # CSV already ends with \n
+
+
+@athena.command("gensubdataset")
+@click.option("--database", "-d", default="swh", help="Name of the base database")
+@click.option(
+ "--subdataset-database", required=True,
+ help="Name of the subdataset database to create"
+)
+@click.option(
+ "--subdataset-location",
+ required=True,
+ help="S3 prefix where the subdataset should be stored",
+)
+@click.option(
+ "--swhids",
+ required=True,
+ help="File containing the list of SWHIDs to include in the subdataset",
+)
+def athena_gensubdataset(database, subdataset_database, subdataset_location, swhids):
+ """
+ Generate a subdataset with Athena, from an existing database and a list
+ of SWHIDs. Athena will generate a new dataset with the same tables as in
+ the base dataset, but only containing the objects present in the SWHID
+ list.
+ """
+ from swh.dataset.athena import generate_subdataset
+
+ generate_subdataset(
+ database,
+ subdataset_database,
+ subdataset_location,
+ swhids,
+ os.path.join(subdataset_location, "queries"),
+ )
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jul 3 2025, 6:20 PM (5 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224661
Attached To
D7010: Add a command to generate a subdataset from a list of SWHIDs using S3
Event Timeline
Log In to Comment