diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -8,6 +8,7 @@ import os import pathlib import sys +from typing import Any, Dict, List, Optional, Set import click @@ -93,23 +94,10 @@ ), ) @click.pass_context -def export_graph( - ctx, export_path, export_id, formats, exclude, object_types, processes, margin -): +def export_graph(ctx, export_path, formats, exclude, object_types, **kwargs): """Export the Software Heritage graph as an edge dataset.""" - from importlib import import_module - import logging - import resource - import uuid - - from swh.dataset.journalprocessor import ParallelJournalProcessor - - logger = logging.getLogger(__name__) config = ctx.obj["config"] - if not export_id: - export_id = str(uuid.uuid4()) - if object_types: object_types = {o.strip() for o in object_types.split(",")} invalid_object_types = object_types - set(MAIN_TABLES.keys()) @@ -128,6 +116,38 @@ option_name="formats", message=f"{f} is not an available format." ) + run_export_graph( + config, + pathlib.Path(export_path), + export_formats, + list(object_types), + exclude_obj_types=exclude_obj_types, + **kwargs, + ) + + +def run_export_graph( + config: Dict[str, Any], + export_path: pathlib.Path, + export_formats: List[str], + object_types: List[str], + exclude_obj_types: Set[str], + export_id: Optional[str], + processes: int, + margin: Optional[float], +): + from importlib import import_module + import logging + import resource + import uuid + + from swh.dataset.journalprocessor import ParallelJournalProcessor + + logger = logging.getLogger(__name__) + + if not export_id: + export_id = str(uuid.uuid4()) + # Enforce order (from origin to contents) to reduce number of holes in the graph. object_types = [ obj_type for obj_type in MAIN_TABLES.keys() if obj_type in object_types @@ -171,18 +191,14 @@ if obj_type in exclude_obj_types: continue exporters = [ - ( - exporter_cls[f], - {"export_path": os.path.join(export_path, f)}, - ) - for f in export_formats + (exporter_cls[f], {"export_path": export_path / f}) for f in export_formats ] parallel_exporter = ParallelJournalProcessor( config, exporters, export_id, obj_type, - node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type, + node_sets_path=export_path / ".node_sets" / obj_type, processes=processes, offset_margin=margin, )