Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/cli.py
# Copyright (C) 2019-2022 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from pathlib import Path | from pathlib import Path | ||||
import shlex | import shlex | ||||
from typing import TYPE_CHECKING, Any, Dict, Set, Tuple | from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
import click | import click | ||||
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup | from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup | ||||
from swh.core.cli import swh as swh_cli_group | from swh.core.cli import swh as swh_cli_group | ||||
▲ Show 20 Lines • Show All 225 Lines • ▼ Show 20 Lines | def compress(ctx, input_dataset, output_directory, graph_name, steps): | ||||
try: | try: | ||||
conf = ctx.obj["config"]["graph"]["compress"] | conf = ctx.obj["config"]["graph"]["compress"] | ||||
except KeyError: | except KeyError: | ||||
conf = {} # use defaults | conf = {} # use defaults | ||||
webgraph.compress(graph_name, input_dataset, output_directory, steps, conf) | webgraph.compress(graph_name, input_dataset, output_directory, steps, conf) | ||||
def get_all_subclasses(cls): | |||||
all_subclasses = [] | |||||
for subclass in cls.__subclasses__(): | |||||
all_subclasses.append(subclass) | |||||
all_subclasses.extend(get_all_subclasses(subclass)) | |||||
return all_subclasses | |||||
@graph_cli_group.command() | |||||
@click.option( | |||||
"--base-directory", | |||||
required=True, | |||||
type=PathlibPath(), | |||||
help="""The base directory where all datasets and compressed graphs are. | |||||
anlambert: s/It/Its/ | |||||
Its subdirectories should be named after a date (and optional flavor). | |||||
For example: ``/poolswh/softwareheritage/``.""", | |||||
) | |||||
@click.option( | |||||
"--base-sensitive-directory", | |||||
required=False, | |||||
type=PathlibPath(), | |||||
help="""The base directory for any data that should not be publicly available | |||||
(eg. because it contains people's names). | |||||
For example: ``/poolswh/softwareheritage/``.""", | |||||
) | |||||
@click.option( | |||||
"--athena-prefix", | |||||
required=False, | |||||
type=str, | |||||
help="""A prefix for the Athena Database that will be created and/or used. | |||||
For example: ``swh``.""", | |||||
) | |||||
@click.option( | |||||
"--s3-prefix", | |||||
required=False, | |||||
type=str, | |||||
help="""The base S3 "directory" where all datasets and compressed graphs are. | |||||
Done Inline Actionss/It/Its/ anlambert: s/It/Its/ | |||||
Its subdirectories should be named after a date (and optional flavor). | |||||
For example: ``s3://softwareheritage/graph/``.""", | |||||
) | |||||
@click.option( | |||||
"--graph-base-directory", | |||||
required=False, | |||||
type=PathlibPath(), | |||||
help="""Overrides the path of the graph to use. Defaults to the value of | |||||
Done Inline Actionsfollowed by ??? anlambert: followed by ??? | |||||
``{base_directory}/{dataset_name}/{compressed}/``. | |||||
For example: ``/dev/shm/swh-graph/default/``.""", | |||||
) | |||||
@click.option( | |||||
"--dataset-name", | |||||
required=True, | |||||
type=str, | |||||
help="""Should be a date and optionally a flavor, which will be used | |||||
as directory name. For example: ``2022-04-25`` or ``2022-11-12_staging``.""", | |||||
) | |||||
@click.option( | |||||
"--luigi-config", | |||||
type=PathlibPath(), | |||||
help="""Extra options to add to ``luigi.cfg``, following the same format. | |||||
This overrides any option that would be other set automatically.""", | |||||
) | |||||
@click.argument("luigi_param", nargs=-1) | |||||
@click.pass_context | |||||
def luigi( | |||||
ctx, | |||||
base_directory: Path, | |||||
graph_base_directory: Optional[Path], | |||||
base_sensitive_directory: Optional[Path], | |||||
s3_prefix: Optional[str], | |||||
athena_prefix: Optional[str], | |||||
dataset_name: str, | |||||
luigi_config: Optional[Path], | |||||
luigi_param: List[str], | |||||
): | |||||
""" | |||||
Calls Luigi with the given task and params, and automatically | |||||
configures paths based on --base-directory and --dataset-name. | |||||
The list of Luigi params should be prefixed with ``--`` so they are not interpreted | |||||
by the ``swh`` CLI. For example:: | |||||
swh graph luigi \ | |||||
--base-directory ~/tmp/ \ | |||||
--dataset-name 2022-12-05_test ListOriginContributors \ | |||||
-- \ | |||||
RunAll \ | |||||
--local-scheduler | |||||
to pass ``RunAll --local-scheduler`` as Luigi params | |||||
""" | |||||
import configparser | |||||
import os | |||||
import subprocess | |||||
import tempfile | |||||
import luigi | |||||
# Popular the list of subclasses of luigi.Task | |||||
import swh.dataset.luigi # noqa | |||||
import swh.graph.luigi # noqa | |||||
config = configparser.ConfigParser() | |||||
dataset_path = base_directory / dataset_name | |||||
default_values = dict( | |||||
local_export_path=dataset_path, | |||||
export_task_type="ExportGraph", | |||||
compression_task_type="CompressGraph", | |||||
local_graph_path=dataset_path / "compressed", | |||||
topological_order_path=dataset_path / "topology/topological_order_dfs.csv.zst", | |||||
origin_contributors_path=dataset_path / "datasets/contribution_graph.csv.zst", | |||||
) | |||||
if graph_base_directory: | |||||
default_values["local_graph_path"] = graph_base_directory | |||||
if s3_prefix: | |||||
dataset_s3_prefix = f"{s3_prefix.rstrip('/')}/{dataset_name}" | |||||
default_values["s3_export_path"] = dataset_s3_prefix | |||||
default_values["s3_graph_path"] = f"{dataset_s3_prefix}/compressed" | |||||
if base_sensitive_directory: | |||||
sensitive_path = base_sensitive_directory / dataset_name | |||||
default_values["deanonymized_origin_contributors_path"] = ( | |||||
sensitive_path / "datasets/contribution_graph.deanonymized.csv.zst" | |||||
) | |||||
default_values["deanonymization_table_path"] = ( | |||||
sensitive_path / "persons_sha256_to_name.csv.zst" | |||||
) | |||||
if athena_prefix: | |||||
default_values[ | |||||
"athena_db_name" | |||||
] = f"{athena_prefix}_{dataset_name.replace('-', '')}" | |||||
for task_cls in get_all_subclasses(luigi.Task): | |||||
task_name = task_cls.__name__ | |||||
# If the task has an argument with one of the known name, add the default value | |||||
# to its config. | |||||
task_config = { | |||||
arg_name: str(arg_value) | |||||
for arg_name, arg_value in default_values.items() | |||||
if hasattr(task_cls, arg_name) | |||||
} | |||||
if task_config: | |||||
config[task_name] = task_config | |||||
# If any config is provided, add it. | |||||
# This may override default arguments configured above. | |||||
if luigi_config is not None: | |||||
config.read(luigi_config) | |||||
with tempfile.NamedTemporaryFile(mode="w+t", prefix="luigi_", suffix=".cfg") as fd: | |||||
config.write(fd) | |||||
fd.flush() | |||||
proc = subprocess.run( | |||||
[ | |||||
"luigi", | |||||
"--module", | |||||
"swh.dataset.luigi", | |||||
"--module", | |||||
"swh.graph.luigi", | |||||
*luigi_param, | |||||
], | |||||
env={ | |||||
"LUIGI_CONFIG_PATH": fd.name, | |||||
**os.environ, | |||||
}, | |||||
) | |||||
exit(proc.returncode) | |||||
def main(): | def main(): | ||||
return graph_cli_group(auto_envvar_prefix="SWH_GRAPH") | return graph_cli_group(auto_envvar_prefix="SWH_GRAPH") | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
main() | main() |
s/It/Its/