diff --git a/swh/graph/cli.py b/swh/graph/cli.py --- a/swh/graph/cli.py +++ b/swh/graph/cli.py @@ -6,7 +6,7 @@ import logging from pathlib import Path import shlex -from typing import TYPE_CHECKING, Any, Dict, Set, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple # WARNING: do not import unnecessary things here to keep cli startup time under # control @@ -248,6 +248,182 @@ webgraph.compress(graph_name, input_dataset, output_directory, steps, conf) +def get_all_subclasses(cls): + all_subclasses = [] + + for subclass in cls.__subclasses__(): + all_subclasses.append(subclass) + all_subclasses.extend(get_all_subclasses(subclass)) + + return all_subclasses + + +@graph_cli_group.command() +@click.option( + "--base-directory", + required=True, + type=PathlibPath(), + help="""The base directory where all datasets and compressed graphs are. + Its subdirectories should be named after a date (and optional flavor). + For example: ``/poolswh/softwareheritage/``.""", +) +@click.option( + "--base-sensitive-directory", + required=False, + type=PathlibPath(), + help="""The base directory for any data that should not be publicly available + (eg. because it contains people's names). + For example: ``/poolswh/softwareheritage/``.""", +) +@click.option( + "--athena-prefix", + required=False, + type=str, + help="""A prefix for the Athena Database that will be created and/or used. + For example: ``swh``.""", +) +@click.option( + "--s3-prefix", + required=False, + type=str, + help="""The base S3 "directory" where all datasets and compressed graphs are. + Its subdirectories should be named after a date (and optional flavor). + For example: ``s3://softwareheritage/graph/``.""", +) +@click.option( + "--graph-base-directory", + required=False, + type=PathlibPath(), + help="""Overrides the path of the graph to use. Defaults to the value of + ``{base_directory}/{dataset_name}/{compressed}/``. + For example: ``/dev/shm/swh-graph/default/``.""", +) +@click.option( + "--dataset-name", + required=True, + type=str, + help="""Should be a date and optionally a flavor, which will be used + as directory name. For example: ``2022-04-25`` or ``2022-11-12_staging``.""", +) +@click.option( + "--luigi-config", + type=PathlibPath(), + help="""Extra options to add to ``luigi.cfg``, following the same format. + This overrides any option that would be other set automatically.""", +) +@click.argument("luigi_param", nargs=-1) +@click.pass_context +def luigi( + ctx, + base_directory: Path, + graph_base_directory: Optional[Path], + base_sensitive_directory: Optional[Path], + s3_prefix: Optional[str], + athena_prefix: Optional[str], + dataset_name: str, + luigi_config: Optional[Path], + luigi_param: List[str], +): + """ + Calls Luigi with the given task and params, and automatically + configures paths based on --base-directory and --dataset-name. + + The list of Luigi params should be prefixed with ``--`` so they are not interpreted + by the ``swh`` CLI. For example:: + + swh graph luigi \ + --base-directory ~/tmp/ \ + --dataset-name 2022-12-05_test ListOriginContributors \ + -- \ + RunAll \ + --local-scheduler + + to pass ``RunAll --local-scheduler`` as Luigi params + """ + import configparser + import os + import subprocess + import tempfile + + import luigi + + # Popular the list of subclasses of luigi.Task + import swh.dataset.luigi # noqa + import swh.graph.luigi # noqa + + config = configparser.ConfigParser() + + dataset_path = base_directory / dataset_name + + default_values = dict( + local_export_path=dataset_path, + export_task_type="ExportGraph", + compression_task_type="CompressGraph", + local_graph_path=dataset_path / "compressed", + topological_order_path=dataset_path / "topology/topological_order_dfs.csv.zst", + origin_contributors_path=dataset_path / "datasets/contribution_graph.csv.zst", + ) + + if graph_base_directory: + default_values["local_graph_path"] = graph_base_directory + + if s3_prefix: + dataset_s3_prefix = f"{s3_prefix.rstrip('/')}/{dataset_name}" + default_values["s3_export_path"] = dataset_s3_prefix + default_values["s3_graph_path"] = f"{dataset_s3_prefix}/compressed" + + if base_sensitive_directory: + sensitive_path = base_sensitive_directory / dataset_name + default_values["deanonymized_origin_contributors_path"] = ( + sensitive_path / "datasets/contribution_graph.deanonymized.csv.zst" + ) + default_values["deanonymization_table_path"] = ( + sensitive_path / "persons_sha256_to_name.csv.zst" + ) + + if athena_prefix: + default_values[ + "athena_db_name" + ] = f"{athena_prefix}_{dataset_name.replace('-', '')}" + + for task_cls in get_all_subclasses(luigi.Task): + task_name = task_cls.__name__ + # If the task has an argument with one of the known name, add the default value + # to its config. + task_config = { + arg_name: str(arg_value) + for arg_name, arg_value in default_values.items() + if hasattr(task_cls, arg_name) + } + if task_config: + config[task_name] = task_config + + # If any config is provided, add it. + # This may override default arguments configured above. + if luigi_config is not None: + config.read(luigi_config) + + with tempfile.NamedTemporaryFile(mode="w+t", prefix="luigi_", suffix=".cfg") as fd: + config.write(fd) + fd.flush() + + proc = subprocess.run( + [ + "luigi", + "--module", + "swh.dataset.luigi", + "--module", + "swh.graph.luigi", + *luigi_param, + ], + env={ + "LUIGI_CONFIG_PATH": fd.name, + **os.environ, + }, + ) + exit(proc.returncode) + + def main(): return graph_cli_group(auto_envvar_prefix="SWH_GRAPH") diff --git a/swh/graph/tests/test_cli.py b/swh/graph/tests/test_cli.py --- a/swh/graph/tests/test_cli.py +++ b/swh/graph/tests/test_cli.py @@ -1,13 +1,15 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os from pathlib import Path from tempfile import TemporaryDirectory from typing import Dict from click.testing import CliRunner +import pytest import yaml from swh.graph.cli import graph_cli_group @@ -56,3 +58,49 @@ assert int(properties["nodes"]) == 24 assert int(properties["arcs"]) == 28 + + +@pytest.mark.parametrize("exit_code", [0, 1]) +def test_luigi(mocker, tmpdir, exit_code): + """calls Luigi with the given configuration""" + # bare bone configuration, to allow testing the compression pipeline + # with minimum RAM requirements on trivial graphs + runner = CliRunner() + + subprocess_run = mocker.patch("subprocess.run") + subprocess_run.return_value.returncode = exit_code + + with TemporaryDirectory(suffix=".swh-graph-test") as tmpdir: + result = runner.invoke( + graph_cli_group, + [ + "luigi", + "--base-directory", + f"{tmpdir}/base_dir", + "--dataset-name", + "2022-12-07", + "--", + "foo", + "bar", + "--baz", + "qux", + ], + catch_exceptions=False, + ) + assert result.exit_code == exit_code, result + + luigi_config_path = subprocess_run.mock_calls[0][2]["env"]["LUIGI_CONFIG_PATH"] + subprocess_run.assert_called_once_with( + [ + "luigi", + "--module", + "swh.dataset.luigi", + "--module", + "swh.graph.luigi", + "foo", + "bar", + "--baz", + "qux", + ], + env={"LUIGI_CONFIG_PATH": luigi_config_path, **os.environ}, + )