Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/cli.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
import os | import os | ||||
import pathlib | import pathlib | ||||
import sys | import sys | ||||
from typing import Any, Dict, List, Optional, Set | |||||
import click | import click | ||||
from swh.core.cli import CONTEXT_SETTINGS | from swh.core.cli import CONTEXT_SETTINGS | ||||
from swh.core.cli import swh as swh_cli_group | from swh.core.cli import swh as swh_cli_group | ||||
from swh.dataset.relational import MAIN_TABLES | from swh.dataset.relational import MAIN_TABLES | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | @click.option( | ||||
type=click.FloatRange(0, 1), | type=click.FloatRange(0, 1), | ||||
help=( | help=( | ||||
"Offset margin to start consuming from. E.g. is set to '0.95', " | "Offset margin to start consuming from. E.g. is set to '0.95', " | ||||
"consumers will start at 95% of the last committed offset; " | "consumers will start at 95% of the last committed offset; " | ||||
"in other words, start earlier than last committed position." | "in other words, start earlier than last committed position." | ||||
), | ), | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def export_graph( | def export_graph(ctx, export_path, formats, exclude, object_types, **kwargs): | ||||
ctx, export_path, export_id, formats, exclude, object_types, processes, margin | |||||
): | |||||
"""Export the Software Heritage graph as an edge dataset.""" | """Export the Software Heritage graph as an edge dataset.""" | ||||
from importlib import import_module | |||||
import logging | |||||
import resource | |||||
import uuid | |||||
from swh.dataset.journalprocessor import ParallelJournalProcessor | |||||
logger = logging.getLogger(__name__) | |||||
config = ctx.obj["config"] | config = ctx.obj["config"] | ||||
if not export_id: | |||||
export_id = str(uuid.uuid4()) | |||||
if object_types: | if object_types: | ||||
object_types = {o.strip() for o in object_types.split(",")} | object_types = {o.strip() for o in object_types.split(",")} | ||||
invalid_object_types = object_types - set(MAIN_TABLES.keys()) | invalid_object_types = object_types - set(MAIN_TABLES.keys()) | ||||
if invalid_object_types: | if invalid_object_types: | ||||
raise click.BadOptionUsage( | raise click.BadOptionUsage( | ||||
option_name="types", | option_name="types", | ||||
message=f"Invalid object types: {', '.join(invalid_object_types)}.", | message=f"Invalid object types: {', '.join(invalid_object_types)}.", | ||||
) | ) | ||||
else: | else: | ||||
object_types = set(MAIN_TABLES.keys()) | object_types = set(MAIN_TABLES.keys()) | ||||
exclude_obj_types = {o.strip() for o in (exclude.split(",") if exclude else [])} | exclude_obj_types = {o.strip() for o in (exclude.split(",") if exclude else [])} | ||||
export_formats = [c.strip() for c in formats.split(",")] | export_formats = [c.strip() for c in formats.split(",")] | ||||
for f in export_formats: | for f in export_formats: | ||||
if f not in AVAILABLE_EXPORTERS: | if f not in AVAILABLE_EXPORTERS: | ||||
raise click.BadOptionUsage( | raise click.BadOptionUsage( | ||||
option_name="formats", message=f"{f} is not an available format." | option_name="formats", message=f"{f} is not an available format." | ||||
) | ) | ||||
run_export_graph( | |||||
config, | |||||
pathlib.Path(export_path), | |||||
export_formats, | |||||
list(object_types), | |||||
exclude_obj_types=exclude_obj_types, | |||||
**kwargs, | |||||
) | |||||
def run_export_graph( | |||||
config: Dict[str, Any], | |||||
export_path: pathlib.Path, | |||||
export_formats: List[str], | |||||
object_types: List[str], | |||||
exclude_obj_types: Set[str], | |||||
export_id: Optional[str], | |||||
processes: int, | |||||
margin: Optional[float], | |||||
): | |||||
from importlib import import_module | |||||
import logging | |||||
import resource | |||||
import uuid | |||||
from swh.dataset.journalprocessor import ParallelJournalProcessor | |||||
logger = logging.getLogger(__name__) | |||||
if not export_id: | |||||
export_id = str(uuid.uuid4()) | |||||
# Enforce order (from origin to contents) to reduce number of holes in the graph. | # Enforce order (from origin to contents) to reduce number of holes in the graph. | ||||
object_types = [ | object_types = [ | ||||
obj_type for obj_type in MAIN_TABLES.keys() if obj_type in object_types | obj_type for obj_type in MAIN_TABLES.keys() if obj_type in object_types | ||||
] | ] | ||||
# ParallelJournalProcessor opens 256 LevelDBs in total. Depending on the number of | # ParallelJournalProcessor opens 256 LevelDBs in total. Depending on the number of | ||||
# processes, this can exceed the maximum number of file descriptors (soft limit | # processes, this can exceed the maximum number of file descriptors (soft limit | ||||
# defaults to 1024 on Debian), so let's increase it. | # defaults to 1024 on Debian), so let's increase it. | ||||
Show All 27 Lines | exporter_cls = dict( | ||||
for (fmt, clspath) in AVAILABLE_EXPORTERS.items() | for (fmt, clspath) in AVAILABLE_EXPORTERS.items() | ||||
if fmt in export_formats | if fmt in export_formats | ||||
) | ) | ||||
# Run the exporter for each edge type. | # Run the exporter for each edge type. | ||||
for obj_type in object_types: | for obj_type in object_types: | ||||
if obj_type in exclude_obj_types: | if obj_type in exclude_obj_types: | ||||
continue | continue | ||||
exporters = [ | exporters = [ | ||||
( | (exporter_cls[f], {"export_path": export_path / f}) for f in export_formats | ||||
exporter_cls[f], | |||||
{"export_path": os.path.join(export_path, f)}, | |||||
) | |||||
for f in export_formats | |||||
] | ] | ||||
parallel_exporter = ParallelJournalProcessor( | parallel_exporter = ParallelJournalProcessor( | ||||
config, | config, | ||||
exporters, | exporters, | ||||
export_id, | export_id, | ||||
obj_type, | obj_type, | ||||
node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type, | node_sets_path=export_path / ".node_sets" / obj_type, | ||||
processes=processes, | processes=processes, | ||||
offset_margin=margin, | offset_margin=margin, | ||||
) | ) | ||||
print("Exporting {}:".format(obj_type)) | print("Exporting {}:".format(obj_type)) | ||||
parallel_exporter.run() | parallel_exporter.run() | ||||
@graph.command("sort") | @graph.command("sort") | ||||
▲ Show 20 Lines • Show All 105 Lines • Show Last 20 Lines |