Page MenuHomeSoftware Heritage

D7010.diff
No OneTemporary

D7010.diff

diff --git a/swh/dataset/athena.py b/swh/dataset/athena.py
--- a/swh/dataset/athena.py
+++ b/swh/dataset/athena.py
@@ -37,7 +37,7 @@
)
STORED AS ORC
LOCATION '{location}/'
- TBLPROPERTIES ("orc.compress"="ZLIB");
+ TBLPROPERTIES ("orc.compress"="ZSTD");
"""
).format(
db=database_name,
@@ -142,6 +142,12 @@
return f"{n} " + units[0] if n < 1024 else human_size(n >> 10, units[1:])
+def _s3_url_to_bucket_path(s3_url):
+ loc = s3_url.removeprefix("s3://")
+ bucket, path = loc.split("/", 1)
+ return bucket, path
+
+
def run_query_get_results(
database_name, query_string, output_location=None,
):
@@ -163,6 +169,105 @@
),
)
- loc = result["ResultConfiguration"]["OutputLocation"][len("s3://") :]
- bucket, path = loc.split("/", 1)
+ bucket, path = _s3_url_to_bucket_path(
+ result["ResultConfiguration"]["OutputLocation"]
+ )
return s3.get_object(Bucket=bucket, Key=path)["Body"].read().decode()
+
+
+def generate_subdataset(
+ dataset_db, subdataset_db, subdataset_s3_path, swhids_file, output_location=None,
+):
+ # Upload list of all the swhids included in the dataset
+ subdataset_bucket, subdataset_path = _s3_url_to_bucket_path(subdataset_s3_path)
+ s3_client = boto3.client("s3")
+ print(f"Uploading {swhids_file} to S3...")
+ s3_client.upload_file(
+ swhids_file,
+ subdataset_bucket,
+ os.path.join(subdataset_path, "swhids", "swhids.csv"),
+ )
+
+ athena_client = boto3.client("athena")
+ athena_client.output_location = output_location
+ athena_client.database_name = subdataset_db
+
+ # Create subdataset database
+ query(
+ athena_client,
+ create_database(subdataset_db),
+ desc="Creating {} database".format(subdataset_db),
+ )
+
+ # Create SWHID temporary table
+ create_swhid_table_query = textwrap.dedent(
+ """\
+ CREATE EXTERNAL TABLE IF NOT EXISTS {newdb}.swhids (
+ swhprefix string,
+ version int,
+ type string,
+ hash string
+ )
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY ':'
+ STORED AS TEXTFILE
+ LOCATION '{location}/swhids/'
+ """
+ ).format(newdb=subdataset_db, location=subdataset_s3_path)
+ query(
+ athena_client,
+ create_swhid_table_query,
+ desc="Creating SWHIDs table of subdataset",
+ )
+ query(
+ athena_client,
+ repair_table(subdataset_db, "swhids"),
+ desc="Refreshing table metadata for swhids table",
+ )
+
+ # Create join tables
+ query_tpl = textwrap.dedent(
+ """\
+ CREATE TABLE IF NOT EXISTS {newdb}.{table}
+ WITH (
+ format = 'ORC',
+ write_compression = 'ZSTD',
+ external_location = '{location}/{table}/'
+ )
+ AS SELECT * FROM {basedb}.{table}
+ WHERE {field} IN (select hash from swhids)
+ """
+ )
+ tables_join_field = [
+ ("origin", "lower(to_hex(sha1(to_utf8(url))))"),
+ ("origin_visit", "lower(to_hex(sha1(to_utf8(origin))))"),
+ ("origin_visit_status", "lower(to_hex(sha1(to_utf8(origin))))"),
+ ("snapshot", "id"),
+ ("snapshot_branch", "snapshot_id"),
+ ("release", "id"),
+ ("revision", "id"),
+ ("revision_history", "id"),
+ ("directory", "id"),
+ ("directory_entry", "directory_id"),
+ ("content", "sha1_git"),
+ ("skipped_content", "sha1_git"),
+ ]
+
+ for table, join_field in tables_join_field:
+ ctas_query = query_tpl.format(
+ newdb=subdataset_db,
+ basedb=dataset_db,
+ location=subdataset_s3_path,
+ table=table,
+ field=join_field,
+ )
+
+ # Temporary fix: Athena no longer supports >32MB rows, but some of
+ # the objects were added to the dataset before this restriction was
+ # in place.
+ if table == "revision":
+ ctas_query += " AND length(message) < 100000"
+
+ query(
+ athena_client, ctas_query, desc="Creating join table {}".format(table),
+ )
diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py
--- a/swh/dataset/cli.py
+++ b/swh/dataset/cli.py
@@ -181,3 +181,37 @@
),
end="",
) # CSV already ends with \n
+
+
+@athena.command("gensubdataset")
+@click.option("--database", "-d", default="swh", help="Name of the base database")
+@click.option(
+ "--subdataset-database", required=True,
+ help="Name of the subdataset database to create"
+)
+@click.option(
+ "--subdataset-location",
+ required=True,
+ help="S3 prefix where the subdataset should be stored",
+)
+@click.option(
+ "--swhids",
+ required=True,
+ help="File containing the list of SWHIDs to include in the subdataset",
+)
+def athena_gensubdataset(database, subdataset_database, subdataset_location, swhids):
+ """
+ Generate a subdataset with Athena, from an existing database and a list
+ of SWHIDs. Athena will generate a new dataset with the same tables as in
+ the base dataset, but only containing the objects present in the SWHID
+ list.
+ """
+ from swh.dataset.athena import generate_subdataset
+
+ generate_subdataset(
+ database,
+ subdataset_database,
+ subdataset_location,
+ swhids,
+ os.path.join(subdataset_location, "queries"),
+ )

File Metadata

Mime Type
text/plain
Expires
Jul 3 2025, 6:20 PM (5 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224661

Event Timeline