diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py index 97c7611..37a94db 100644 --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -1,298 +1,314 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os import pathlib import sys +from typing import Any, Dict, List, Optional, Set import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.dataset.relational import MAIN_TABLES @swh_cli_group.group(name="dataset", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False), help="Configuration file.", ) @click.pass_context def dataset_cli_group(ctx, config_file): """Dataset Tools. A set of tools to export datasets from the Software Heritage Archive in various formats. """ from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf @dataset_cli_group.group("graph") @click.pass_context def graph(ctx): """Manage graph export""" pass AVAILABLE_EXPORTERS = { "edges": "swh.dataset.exporters.edges:GraphEdgesExporter", "orc": "swh.dataset.exporters.orc:ORCExporter", } @graph.command("export") @click.argument("export-path", type=click.Path()) @click.option( "--export-id", "-e", help=( "Unique ID of the export run. This is appended to the kafka " "group_id config file option. If group_id is not set in the " "'journal' section of the config file, defaults to 'swh-dataset-export-'." ), ) @click.option( "--formats", "-f", type=click.STRING, default=",".join(AVAILABLE_EXPORTERS.keys()), show_default=True, help="Formats to export.", ) @click.option("--processes", "-p", default=1, help="Number of parallel processes") @click.option( "--exclude", type=click.STRING, help="Comma-separated list of object types to exclude", ) @click.option( "--types", "object_types", type=click.STRING, help="Comma-separated list of objects types to export", ) @click.option( "--margin", type=click.FloatRange(0, 1), help=( "Offset margin to start consuming from. E.g. is set to '0.95', " "consumers will start at 95% of the last committed offset; " "in other words, start earlier than last committed position." ), ) @click.pass_context -def export_graph( - ctx, export_path, export_id, formats, exclude, object_types, processes, margin -): +def export_graph(ctx, export_path, formats, exclude, object_types, **kwargs): """Export the Software Heritage graph as an edge dataset.""" - from importlib import import_module - import logging - import resource - import uuid - - from swh.dataset.journalprocessor import ParallelJournalProcessor - - logger = logging.getLogger(__name__) config = ctx.obj["config"] - if not export_id: - export_id = str(uuid.uuid4()) - if object_types: object_types = {o.strip() for o in object_types.split(",")} invalid_object_types = object_types - set(MAIN_TABLES.keys()) if invalid_object_types: raise click.BadOptionUsage( option_name="types", message=f"Invalid object types: {', '.join(invalid_object_types)}.", ) else: object_types = set(MAIN_TABLES.keys()) exclude_obj_types = {o.strip() for o in (exclude.split(",") if exclude else [])} export_formats = [c.strip() for c in formats.split(",")] for f in export_formats: if f not in AVAILABLE_EXPORTERS: raise click.BadOptionUsage( option_name="formats", message=f"{f} is not an available format." ) + run_export_graph( + config, + pathlib.Path(export_path), + export_formats, + list(object_types), + exclude_obj_types=exclude_obj_types, + **kwargs, + ) + + +def run_export_graph( + config: Dict[str, Any], + export_path: pathlib.Path, + export_formats: List[str], + object_types: List[str], + exclude_obj_types: Set[str], + export_id: Optional[str], + processes: int, + margin: Optional[float], +): + from importlib import import_module + import logging + import resource + import uuid + + from swh.dataset.journalprocessor import ParallelJournalProcessor + + logger = logging.getLogger(__name__) + + if not export_id: + export_id = str(uuid.uuid4()) + # Enforce order (from origin to contents) to reduce number of holes in the graph. object_types = [ obj_type for obj_type in MAIN_TABLES.keys() if obj_type in object_types ] # ParallelJournalProcessor opens 256 LevelDBs in total. Depending on the number of # processes, this can exceed the maximum number of file descriptors (soft limit # defaults to 1024 on Debian), so let's increase it. (soft, hard) = resource.getrlimit(resource.RLIMIT_NOFILE) nb_shards = 256 # TODO: make this configurable or detect nb of kafka partitions open_fds_per_shard = 61 # estimated with plyvel==1.3.0 and libleveldb1d==1.22-3 spare = 1024 # for everything other than LevelDB want_fd = nb_shards * open_fds_per_shard + spare if hard < want_fd: logger.warning( "Hard limit of open file descriptors (%d) is lower than ideal (%d)", hard, want_fd, ) if soft < want_fd: want_fd = min(want_fd, hard) logger.info( "Soft limit of open file descriptors (%d) is too low, increasing to %d", soft, want_fd, ) resource.setrlimit(resource.RLIMIT_NOFILE, (want_fd, hard)) def importcls(clspath): mod, cls = clspath.split(":") m = import_module(mod) return getattr(m, cls) exporter_cls = dict( (fmt, importcls(clspath)) for (fmt, clspath) in AVAILABLE_EXPORTERS.items() if fmt in export_formats ) # Run the exporter for each edge type. for obj_type in object_types: if obj_type in exclude_obj_types: continue exporters = [ - ( - exporter_cls[f], - {"export_path": os.path.join(export_path, f)}, - ) - for f in export_formats + (exporter_cls[f], {"export_path": export_path / f}) for f in export_formats ] parallel_exporter = ParallelJournalProcessor( config, exporters, export_id, obj_type, - node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type, + node_sets_path=export_path / ".node_sets" / obj_type, processes=processes, offset_margin=margin, ) print("Exporting {}:".format(obj_type)) parallel_exporter.run() @graph.command("sort") @click.argument("export-path", type=click.Path()) @click.pass_context def sort_graph(ctx, export_path): config = ctx.obj["config"] from swh.dataset.exporters.edges import sort_graph_nodes sort_graph_nodes(export_path, config) @dataset_cli_group.group("athena") @click.pass_context def athena(ctx): """Manage and query a remote AWS Athena database""" pass @athena.command("create") @click.option( "--database-name", "-d", default="swh", help="Name of the database to create" ) @click.option( "--location-prefix", "-l", required=True, help="S3 prefix where the dataset can be found", ) @click.option( "-o", "--output-location", help="S3 prefix where results should be stored" ) @click.option( "-r", "--replace-tables", is_flag=True, help="Replace the tables that already exist" ) def athena_create( database_name, location_prefix, output_location=None, replace_tables=False ): """Create tables on AWS Athena pointing to a given graph dataset on S3.""" from swh.dataset.athena import create_tables create_tables( database_name, location_prefix, output_location=output_location, replace=replace_tables, ) @athena.command("query") @click.option( "--database-name", "-d", default="swh", help="Name of the database to query" ) @click.option( "-o", "--output-location", help="S3 prefix where results should be stored" ) @click.argument("query_file", type=click.File("r"), default=sys.stdin) def athena_query( database_name, query_file, output_location=None, ): """Query the AWS Athena database with a given command""" from swh.dataset.athena import run_query_get_results print( run_query_get_results( database_name, query_file.read(), output_location=output_location, ), end="", ) # CSV already ends with \n @athena.command("gensubdataset") @click.option("--database", "-d", default="swh", help="Name of the base database") @click.option( "--subdataset-database", required=True, help="Name of the subdataset database to create", ) @click.option( "--subdataset-location", required=True, help="S3 prefix where the subdataset should be stored", ) @click.option( "--swhids", required=True, help="File containing the list of SWHIDs to include in the subdataset", ) def athena_gensubdataset(database, subdataset_database, subdataset_location, swhids): """ Generate a subdataset with Athena, from an existing database and a list of SWHIDs. Athena will generate a new dataset with the same tables as in the base dataset, but only containing the objects present in the SWHID list. """ from swh.dataset.athena import generate_subdataset generate_subdataset( database, subdataset_database, subdataset_location, swhids, os.path.join(subdataset_location, "queries"), )