diff --git a/swh/dataset/athena.py b/swh/dataset/athena.py --- a/swh/dataset/athena.py +++ b/swh/dataset/athena.py @@ -37,7 +37,7 @@ ) STORED AS ORC LOCATION '{location}/' - TBLPROPERTIES ("orc.compress"="ZLIB"); + TBLPROPERTIES ("orc.compress"="ZSTD"); """ ).format( db=database_name, @@ -142,6 +142,12 @@ return f"{n} " + units[0] if n < 1024 else human_size(n >> 10, units[1:]) +def _s3_url_to_bucket_path(s3_url): + loc = s3_url.removeprefix("s3://") + bucket, path = loc.split("/", 1) + return bucket, path + + def run_query_get_results( database_name, query_string, output_location=None, ): @@ -163,6 +169,105 @@ ), ) - loc = result["ResultConfiguration"]["OutputLocation"][len("s3://") :] - bucket, path = loc.split("/", 1) + bucket, path = _s3_url_to_bucket_path( + result["ResultConfiguration"]["OutputLocation"] + ) return s3.get_object(Bucket=bucket, Key=path)["Body"].read().decode() + + +def generate_subdataset( + dataset_db, subdataset_db, subdataset_s3_path, swhids_file, output_location=None, +): + # Upload list of all the swhids included in the dataset + subdataset_bucket, subdataset_path = _s3_url_to_bucket_path(subdataset_s3_path) + s3_client = boto3.client("s3") + print(f"Uploading {swhids_file} to S3...") + s3_client.upload_file( + swhids_file, + subdataset_bucket, + os.path.join(subdataset_path, "swhids", "swhids.csv"), + ) + + athena_client = boto3.client("athena") + athena_client.output_location = output_location + athena_client.database_name = subdataset_db + + # Create subdataset database + query( + athena_client, + create_database(subdataset_db), + desc="Creating {} database".format(subdataset_db), + ) + + # Create SWHID temporary table + create_swhid_table_query = textwrap.dedent( + """\ + CREATE EXTERNAL TABLE IF NOT EXISTS {newdb}.swhids ( + swhprefix string, + version int, + type string, + hash string + ) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ':' + STORED AS TEXTFILE + LOCATION '{location}/swhids/' + """ + ).format(newdb=subdataset_db, location=subdataset_s3_path) + query( + athena_client, + create_swhid_table_query, + desc="Creating SWHIDs table of subdataset", + ) + query( + athena_client, + repair_table(subdataset_db, "swhids"), + desc="Refreshing table metadata for swhids table", + ) + + # Create join tables + query_tpl = textwrap.dedent( + """\ + CREATE TABLE IF NOT EXISTS {newdb}.{table} + WITH ( + format = 'ORC', + write_compression = 'ZSTD', + external_location = '{location}/{table}/' + ) + AS SELECT * FROM {basedb}.{table} + WHERE {field} IN (select hash from swhids) + """ + ) + tables_join_field = [ + ("origin", "lower(to_hex(sha1(to_utf8(url))))"), + ("origin_visit", "lower(to_hex(sha1(to_utf8(origin))))"), + ("origin_visit_status", "lower(to_hex(sha1(to_utf8(origin))))"), + ("snapshot", "id"), + ("snapshot_branch", "snapshot_id"), + ("release", "id"), + ("revision", "id"), + ("revision_history", "id"), + ("directory", "id"), + ("directory_entry", "directory_id"), + ("content", "sha1_git"), + ("skipped_content", "sha1_git"), + ] + + for table, join_field in tables_join_field: + ctas_query = query_tpl.format( + newdb=subdataset_db, + basedb=dataset_db, + location=subdataset_s3_path, + table=table, + field=join_field, + ) + + # Temporary fix: Athena no longer supports >32MB rows, but some of + # the objects were added to the dataset before this restriction was + # in place. + if table == "revision": + ctas_query += " AND length(message) < 100000" + + query( + athena_client, ctas_query, desc="Creating join table {}".format(table), + ) diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -181,3 +181,37 @@ ), end="", ) # CSV already ends with \n + + +@athena.command("gensubdataset") +@click.option("--database", "-d", default="swh", help="Name of the base database") +@click.option( + "--subdataset-database", required=True, + help="Name of the subdataset database to create" +) +@click.option( + "--subdataset-location", + required=True, + help="S3 prefix where the subdataset should be stored", +) +@click.option( + "--swhids", + required=True, + help="File containing the list of SWHIDs to include in the subdataset", +) +def athena_gensubdataset(database, subdataset_database, subdataset_location, swhids): + """ + Generate a subdataset with Athena, from an existing database and a list + of SWHIDs. Athena will generate a new dataset with the same tables as in + the base dataset, but only containing the objects present in the SWHID + list. + """ + from swh.dataset.athena import generate_subdataset + + generate_subdataset( + database, + subdataset_database, + subdataset_location, + swhids, + os.path.join(subdataset_location, "queries"), + )