Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163848
D8919.id32176.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
D8919.id32176.diff
View Options
diff --git a/swh/graph/cli.py b/swh/graph/cli.py
--- a/swh/graph/cli.py
+++ b/swh/graph/cli.py
@@ -6,7 +6,7 @@
import logging
from pathlib import Path
import shlex
-from typing import TYPE_CHECKING, Any, Dict, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
@@ -248,6 +248,182 @@
webgraph.compress(graph_name, input_dataset, output_directory, steps, conf)
+def get_all_subclasses(cls):
+ all_subclasses = []
+
+ for subclass in cls.__subclasses__():
+ all_subclasses.append(subclass)
+ all_subclasses.extend(get_all_subclasses(subclass))
+
+ return all_subclasses
+
+
+@graph_cli_group.command()
+@click.option(
+ "--base-directory",
+ required=True,
+ type=PathlibPath(),
+ help="""The base directory where all datasets and compressed graphs are.
+ Its subdirectories should be named after a date (and optional flavor).
+ For example: ``/poolswh/softwareheritage/``.""",
+)
+@click.option(
+ "--base-sensitive-directory",
+ required=False,
+ type=PathlibPath(),
+ help="""The base directory for any data that should not be publicly available
+ (eg. because it contains people's names).
+ For example: ``/poolswh/softwareheritage/``.""",
+)
+@click.option(
+ "--athena-prefix",
+ required=False,
+ type=str,
+ help="""A prefix for the Athena Database that will be created and/or used.
+ For example: ``swh``.""",
+)
+@click.option(
+ "--s3-prefix",
+ required=False,
+ type=str,
+ help="""The base S3 "directory" where all datasets and compressed graphs are.
+ Its subdirectories should be named after a date (and optional flavor).
+ For example: ``s3://softwareheritage/graph/``.""",
+)
+@click.option(
+ "--graph-base-directory",
+ required=False,
+ type=PathlibPath(),
+ help="""Overrides the path of the graph to use. Defaults to the value of
+ ``{base_directory}/{dataset_name}/{compressed}/``.
+ For example: ``/dev/shm/swh-graph/default/``.""",
+)
+@click.option(
+ "--dataset-name",
+ required=True,
+ type=str,
+ help="""Should be a date and optionally a flavor, which will be used
+ as directory name. For example: ``2022-04-25`` or ``2022-11-12_staging``.""",
+)
+@click.option(
+ "--luigi-config",
+ type=PathlibPath(),
+ help="""Extra options to add to ``luigi.cfg``, following the same format.
+ This overrides any option that would be other set automatically.""",
+)
+@click.argument("luigi_param", nargs=-1)
+@click.pass_context
+def luigi(
+ ctx,
+ base_directory: Path,
+ graph_base_directory: Optional[Path],
+ base_sensitive_directory: Optional[Path],
+ s3_prefix: Optional[str],
+ athena_prefix: Optional[str],
+ dataset_name: str,
+ luigi_config: Optional[Path],
+ luigi_param: List[str],
+):
+ """
+ Calls Luigi with the given task and params, and automatically
+ configures paths based on --base-directory and --dataset-name.
+
+ The list of Luigi params should be prefixed with ``--`` so they are not interpreted
+ by the ``swh`` CLI. For example::
+
+ swh graph luigi \
+ --base-directory ~/tmp/ \
+ --dataset-name 2022-12-05_test ListOriginContributors \
+ -- \
+ RunAll \
+ --local-scheduler
+
+ to pass ``RunAll --local-scheduler`` as Luigi params
+ """
+ import configparser
+ import os
+ import subprocess
+ import tempfile
+
+ import luigi
+
+ # Popular the list of subclasses of luigi.Task
+ import swh.dataset.luigi # noqa
+ import swh.graph.luigi # noqa
+
+ config = configparser.ConfigParser()
+
+ dataset_path = base_directory / dataset_name
+
+ default_values = dict(
+ local_export_path=dataset_path,
+ export_task_type="ExportGraph",
+ compression_task_type="CompressGraph",
+ local_graph_path=dataset_path / "compressed",
+ topological_order_path=dataset_path / "topology/topological_order_dfs.csv.zst",
+ origin_contributors_path=dataset_path / "datasets/contribution_graph.csv.zst",
+ )
+
+ if graph_base_directory:
+ default_values["local_graph_path"] = graph_base_directory
+
+ if s3_prefix:
+ dataset_s3_prefix = f"{s3_prefix.rstrip('/')}/{dataset_name}"
+ default_values["s3_export_path"] = dataset_s3_prefix
+ default_values["s3_graph_path"] = f"{dataset_s3_prefix}/compressed"
+
+ if base_sensitive_directory:
+ sensitive_path = base_sensitive_directory / dataset_name
+ default_values["deanonymized_origin_contributors_path"] = (
+ sensitive_path / "datasets/contribution_graph.deanonymized.csv.zst"
+ )
+ default_values["deanonymization_table_path"] = (
+ sensitive_path / "persons_sha256_to_name.csv.zst"
+ )
+
+ if athena_prefix:
+ default_values[
+ "athena_db_name"
+ ] = f"{athena_prefix}_{dataset_name.replace('-', '')}"
+
+ for task_cls in get_all_subclasses(luigi.Task):
+ task_name = task_cls.__name__
+ # If the task has an argument with one of the known name, add the default value
+ # to its config.
+ task_config = {
+ arg_name: str(arg_value)
+ for arg_name, arg_value in default_values.items()
+ if hasattr(task_cls, arg_name)
+ }
+ if task_config:
+ config[task_name] = task_config
+
+ # If any config is provided, add it.
+ # This may override default arguments configured above.
+ if luigi_config is not None:
+ config.read(luigi_config)
+
+ with tempfile.NamedTemporaryFile(mode="w+t", prefix="luigi_", suffix=".cfg") as fd:
+ config.write(fd)
+ fd.flush()
+
+ proc = subprocess.run(
+ [
+ "luigi",
+ "--module",
+ "swh.dataset.luigi",
+ "--module",
+ "swh.graph.luigi",
+ *luigi_param,
+ ],
+ env={
+ "LUIGI_CONFIG_PATH": fd.name,
+ **os.environ,
+ },
+ )
+ exit(proc.returncode)
+
+
def main():
return graph_cli_group(auto_envvar_prefix="SWH_GRAPH")
diff --git a/swh/graph/tests/test_cli.py b/swh/graph/tests/test_cli.py
--- a/swh/graph/tests/test_cli.py
+++ b/swh/graph/tests/test_cli.py
@@ -1,13 +1,15 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import os
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict
from click.testing import CliRunner
+import pytest
import yaml
from swh.graph.cli import graph_cli_group
@@ -56,3 +58,49 @@
assert int(properties["nodes"]) == 24
assert int(properties["arcs"]) == 28
+
+
+@pytest.mark.parametrize("exit_code", [0, 1])
+def test_luigi(mocker, tmpdir, exit_code):
+ """calls Luigi with the given configuration"""
+ # bare bone configuration, to allow testing the compression pipeline
+ # with minimum RAM requirements on trivial graphs
+ runner = CliRunner()
+
+ subprocess_run = mocker.patch("subprocess.run")
+ subprocess_run.return_value.returncode = exit_code
+
+ with TemporaryDirectory(suffix=".swh-graph-test") as tmpdir:
+ result = runner.invoke(
+ graph_cli_group,
+ [
+ "luigi",
+ "--base-directory",
+ f"{tmpdir}/base_dir",
+ "--dataset-name",
+ "2022-12-07",
+ "--",
+ "foo",
+ "bar",
+ "--baz",
+ "qux",
+ ],
+ catch_exceptions=False,
+ )
+ assert result.exit_code == exit_code, result
+
+ luigi_config_path = subprocess_run.mock_calls[0][2]["env"]["LUIGI_CONFIG_PATH"]
+ subprocess_run.assert_called_once_with(
+ [
+ "luigi",
+ "--module",
+ "swh.dataset.luigi",
+ "--module",
+ "swh.graph.luigi",
+ "foo",
+ "bar",
+ "--baz",
+ "qux",
+ ],
+ env={"LUIGI_CONFIG_PATH": luigi_config_path, **os.environ},
+ )
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 4:43 PM (2 h, 45 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214967
Attached To
D8919: Add CLI script to generate Luigi config and call it
Event Timeline
Log In to Comment