Page MenuHomeSoftware Heritage

D8919.id32176.diff
No OneTemporary

D8919.id32176.diff

diff --git a/swh/graph/cli.py b/swh/graph/cli.py
--- a/swh/graph/cli.py
+++ b/swh/graph/cli.py
@@ -6,7 +6,7 @@
import logging
from pathlib import Path
import shlex
-from typing import TYPE_CHECKING, Any, Dict, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
@@ -248,6 +248,182 @@
webgraph.compress(graph_name, input_dataset, output_directory, steps, conf)
+def get_all_subclasses(cls):
+ all_subclasses = []
+
+ for subclass in cls.__subclasses__():
+ all_subclasses.append(subclass)
+ all_subclasses.extend(get_all_subclasses(subclass))
+
+ return all_subclasses
+
+
+@graph_cli_group.command()
+@click.option(
+ "--base-directory",
+ required=True,
+ type=PathlibPath(),
+ help="""The base directory where all datasets and compressed graphs are.
+ Its subdirectories should be named after a date (and optional flavor).
+ For example: ``/poolswh/softwareheritage/``.""",
+)
+@click.option(
+ "--base-sensitive-directory",
+ required=False,
+ type=PathlibPath(),
+ help="""The base directory for any data that should not be publicly available
+ (eg. because it contains people's names).
+ For example: ``/poolswh/softwareheritage/``.""",
+)
+@click.option(
+ "--athena-prefix",
+ required=False,
+ type=str,
+ help="""A prefix for the Athena Database that will be created and/or used.
+ For example: ``swh``.""",
+)
+@click.option(
+ "--s3-prefix",
+ required=False,
+ type=str,
+ help="""The base S3 "directory" where all datasets and compressed graphs are.
+ Its subdirectories should be named after a date (and optional flavor).
+ For example: ``s3://softwareheritage/graph/``.""",
+)
+@click.option(
+ "--graph-base-directory",
+ required=False,
+ type=PathlibPath(),
+ help="""Overrides the path of the graph to use. Defaults to the value of
+ ``{base_directory}/{dataset_name}/{compressed}/``.
+ For example: ``/dev/shm/swh-graph/default/``.""",
+)
+@click.option(
+ "--dataset-name",
+ required=True,
+ type=str,
+ help="""Should be a date and optionally a flavor, which will be used
+ as directory name. For example: ``2022-04-25`` or ``2022-11-12_staging``.""",
+)
+@click.option(
+ "--luigi-config",
+ type=PathlibPath(),
+ help="""Extra options to add to ``luigi.cfg``, following the same format.
+ This overrides any option that would be other set automatically.""",
+)
+@click.argument("luigi_param", nargs=-1)
+@click.pass_context
+def luigi(
+ ctx,
+ base_directory: Path,
+ graph_base_directory: Optional[Path],
+ base_sensitive_directory: Optional[Path],
+ s3_prefix: Optional[str],
+ athena_prefix: Optional[str],
+ dataset_name: str,
+ luigi_config: Optional[Path],
+ luigi_param: List[str],
+):
+ """
+ Calls Luigi with the given task and params, and automatically
+ configures paths based on --base-directory and --dataset-name.
+
+ The list of Luigi params should be prefixed with ``--`` so they are not interpreted
+ by the ``swh`` CLI. For example::
+
+ swh graph luigi \
+ --base-directory ~/tmp/ \
+ --dataset-name 2022-12-05_test ListOriginContributors \
+ -- \
+ RunAll \
+ --local-scheduler
+
+ to pass ``RunAll --local-scheduler`` as Luigi params
+ """
+ import configparser
+ import os
+ import subprocess
+ import tempfile
+
+ import luigi
+
+ # Popular the list of subclasses of luigi.Task
+ import swh.dataset.luigi # noqa
+ import swh.graph.luigi # noqa
+
+ config = configparser.ConfigParser()
+
+ dataset_path = base_directory / dataset_name
+
+ default_values = dict(
+ local_export_path=dataset_path,
+ export_task_type="ExportGraph",
+ compression_task_type="CompressGraph",
+ local_graph_path=dataset_path / "compressed",
+ topological_order_path=dataset_path / "topology/topological_order_dfs.csv.zst",
+ origin_contributors_path=dataset_path / "datasets/contribution_graph.csv.zst",
+ )
+
+ if graph_base_directory:
+ default_values["local_graph_path"] = graph_base_directory
+
+ if s3_prefix:
+ dataset_s3_prefix = f"{s3_prefix.rstrip('/')}/{dataset_name}"
+ default_values["s3_export_path"] = dataset_s3_prefix
+ default_values["s3_graph_path"] = f"{dataset_s3_prefix}/compressed"
+
+ if base_sensitive_directory:
+ sensitive_path = base_sensitive_directory / dataset_name
+ default_values["deanonymized_origin_contributors_path"] = (
+ sensitive_path / "datasets/contribution_graph.deanonymized.csv.zst"
+ )
+ default_values["deanonymization_table_path"] = (
+ sensitive_path / "persons_sha256_to_name.csv.zst"
+ )
+
+ if athena_prefix:
+ default_values[
+ "athena_db_name"
+ ] = f"{athena_prefix}_{dataset_name.replace('-', '')}"
+
+ for task_cls in get_all_subclasses(luigi.Task):
+ task_name = task_cls.__name__
+ # If the task has an argument with one of the known name, add the default value
+ # to its config.
+ task_config = {
+ arg_name: str(arg_value)
+ for arg_name, arg_value in default_values.items()
+ if hasattr(task_cls, arg_name)
+ }
+ if task_config:
+ config[task_name] = task_config
+
+ # If any config is provided, add it.
+ # This may override default arguments configured above.
+ if luigi_config is not None:
+ config.read(luigi_config)
+
+ with tempfile.NamedTemporaryFile(mode="w+t", prefix="luigi_", suffix=".cfg") as fd:
+ config.write(fd)
+ fd.flush()
+
+ proc = subprocess.run(
+ [
+ "luigi",
+ "--module",
+ "swh.dataset.luigi",
+ "--module",
+ "swh.graph.luigi",
+ *luigi_param,
+ ],
+ env={
+ "LUIGI_CONFIG_PATH": fd.name,
+ **os.environ,
+ },
+ )
+ exit(proc.returncode)
+
+
def main():
return graph_cli_group(auto_envvar_prefix="SWH_GRAPH")
diff --git a/swh/graph/tests/test_cli.py b/swh/graph/tests/test_cli.py
--- a/swh/graph/tests/test_cli.py
+++ b/swh/graph/tests/test_cli.py
@@ -1,13 +1,15 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import os
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict
from click.testing import CliRunner
+import pytest
import yaml
from swh.graph.cli import graph_cli_group
@@ -56,3 +58,49 @@
assert int(properties["nodes"]) == 24
assert int(properties["arcs"]) == 28
+
+
+@pytest.mark.parametrize("exit_code", [0, 1])
+def test_luigi(mocker, tmpdir, exit_code):
+ """calls Luigi with the given configuration"""
+ # bare bone configuration, to allow testing the compression pipeline
+ # with minimum RAM requirements on trivial graphs
+ runner = CliRunner()
+
+ subprocess_run = mocker.patch("subprocess.run")
+ subprocess_run.return_value.returncode = exit_code
+
+ with TemporaryDirectory(suffix=".swh-graph-test") as tmpdir:
+ result = runner.invoke(
+ graph_cli_group,
+ [
+ "luigi",
+ "--base-directory",
+ f"{tmpdir}/base_dir",
+ "--dataset-name",
+ "2022-12-07",
+ "--",
+ "foo",
+ "bar",
+ "--baz",
+ "qux",
+ ],
+ catch_exceptions=False,
+ )
+ assert result.exit_code == exit_code, result
+
+ luigi_config_path = subprocess_run.mock_calls[0][2]["env"]["LUIGI_CONFIG_PATH"]
+ subprocess_run.assert_called_once_with(
+ [
+ "luigi",
+ "--module",
+ "swh.dataset.luigi",
+ "--module",
+ "swh.graph.luigi",
+ "foo",
+ "bar",
+ "--baz",
+ "qux",
+ ],
+ env={"LUIGI_CONFIG_PATH": luigi_config_path, **os.environ},
+ )

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 4:43 PM (2 h, 45 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214967

Event Timeline