diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -8,6 +8,9 @@ # 3rd party libraries without stubs (yet) +[mypy-luigi.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True diff --git a/requirements-luigi.txt b/requirements-luigi.txt new file mode 100644 --- /dev/null +++ b/requirements-luigi.txt @@ -0,0 +1 @@ +luigi diff --git a/requirements-swh-luigi.txt b/requirements-swh-luigi.txt new file mode 100644 --- /dev/null +++ b/requirements-swh-luigi.txt @@ -0,0 +1 @@ +swh.dataset[luigi] >= v0.3.2 diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -52,7 +52,10 @@ tests_require=parse_requirements("test"), setup_requires=["setuptools-scm"], use_scm_version=True, - extras_require={"testing": parse_requirements("test")}, + extras_require={ + "testing": parse_requirements("test"), + "luigi": parse_requirements("luigi"), + }, include_package_data=True, data_files=[("share/swh-graph", JAR_PATHS)], entry_points=""" diff --git a/swh/graph/luigi.py b/swh/graph/luigi.py new file mode 100644 --- /dev/null +++ b/swh/graph/luigi.py @@ -0,0 +1,172 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +Luigi tasks +=========== + +This module contains `Luigi `_ tasks, +as an alternative to the CLI that can be composed with other tasks, +such as swh-dataset's. + +Unlike the CLI, this requires the graph to be named `graph`. + +File layout +----------- + +In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`, +:file:`graph.mph`, ...), tasks in this module produce this directory structure: + + swh_[_]/ + graph.graph + graph.mph + ... + meta/ + export.json + compression.json + +``graph.meta/export.json`` is copied from the ORC dataset exported by +:mod:`swh.dataset.luigi`. + +`graph.meta/compression.json`` contains information about the compression itself, +for provenance tracking. +For example: + +.. code-block:: json + + [ + { + "steps": None, + "export_start": "2022-11-08T11:00:54.998799+00:00", + "export_end": "2022-11-08T11:05:53.105519+00:00", + "object_type": [ + "origin", + "origin_visit" + ], + "hostname": "desktop5", + "conf": {}, + "tool": { + "name": "swh.graph", + "version": "2.2.0", + } + } + ] + +When the compression pipeline is run in separate steps, each of the steps is recorded +as an object in the root list. +""" + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control +from typing import List + +import luigi + +from swh.dataset.luigi import Format, LocalExport, ObjectType + + +class CompressGraph(luigi.Task): + local_export_path = luigi.PathParameter() + local_graph_path = luigi.PathParameter() + batch_size = luigi.IntParameter( + default=0, + significant=False, + description=""" + Size of work batches to use while compressing. + Larger is faster, but consumes more resources. + """, + ) + + object_types = list(ObjectType) + # To make this configurable, we could use this: + # object_types = luigi.EnumListParameter( + # enum=ObjectType, default=list(ObjectType), batch_method=merge_lists + # ) + # then use swh.dataset.luigi._export_metadata_has_object_types to check in + # .meta/export.json that all objects are present before skipping the task + + def requires(self) -> List[luigi.Task]: + return [ + LocalExport( + local_export_path=self.local_export_path, + formats=[Format.orc], # type: ignore[attr-defined] + object_types=self.object_types, + ) + ] + + def output(self) -> List[luigi.LocalTarget]: + return [self._export_meta(), self._compression_meta()] + + def _export_meta(self) -> luigi.Target: + """Returns the metadata on the dataset export""" + return luigi.LocalTarget(self.local_graph_path / "meta/export.json") + + def _compression_meta(self) -> luigi.Target: + """Returns the metadata on the compression pipeline""" + return luigi.LocalTarget(self.local_graph_path / "meta/compression.json") + + def run(self): + import datetime + import json + import socket + + import pkg_resources + + from swh.graph import webgraph + + conf = {} # TODO: make this configurable + steps = None # TODO: make this configurable + + if self.batch_size: + conf["batch_size"] = self.batch_size + + # Delete stamps. Otherwise interrupting this compression pipeline may leave + # stamps from a previous successful compression + if self._export_meta().exists(): + self._export_meta().remove() + if self._compression_meta().exists(): + self._compression_meta().remove() + + output_directory = self.local_graph_path + graph_name = "graph" + + start_date = datetime.datetime.now(tz=datetime.timezone.utc) + webgraph.compress( + graph_name, + self.local_export_path / "orc", + output_directory, + steps, + conf, + ) + end_date = datetime.datetime.now(tz=datetime.timezone.utc) + + # Copy dataset export metadata + with self._export_meta().open("w") as write_fd: + with (self.local_export_path / "meta" / "export.json").open() as read_fd: + write_fd.write(read_fd.read()) + + # Append metadata about this compression pipeline + if self._compression_meta().exists(): + with self._compression_meta().open("w") as fd: + meta = json.load(fd) + else: + meta = [] + + meta.append( + { + "steps": steps, + "compression_start": start_date.isoformat(), + "compression_end": end_date.isoformat(), + "object_type": [object_type.name for object_type in self.object_types], + "hostname": socket.getfqdn(), + "conf": conf, + "tool": { + "name": "swh.graph", + "version": pkg_resources.get_distribution("swh.graph").version, + }, + } + ) + with self._compression_meta().open("w") as fd: + json.dump(meta, fd, indent=4) diff --git a/swh/graph/tests/dataset/meta/export.json b/swh/graph/tests/dataset/meta/export.json new file mode 100644 --- /dev/null +++ b/swh/graph/tests/dataset/meta/export.json @@ -0,0 +1,13 @@ +{ + "object_type": [ + "origin_visit", + "revision", + "skipped_content", + "directory", + "snapshot", + "origin", + "release", + "content", + "origin_visit_status" +] +} diff --git a/swh/graph/tests/test_luigi.py b/swh/graph/tests/test_luigi.py new file mode 100644 --- /dev/null +++ b/swh/graph/tests/test_luigi.py @@ -0,0 +1,36 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +from pathlib import Path + +from swh.graph.luigi import CompressGraph + +from .test_cli import read_properties + +DATA_DIR = Path(__file__).parents[0] / "dataset" + + +def test_compressgraph(tmpdir): + tmpdir = Path(tmpdir) + + task = CompressGraph( + local_export_path=DATA_DIR, + local_graph_path=tmpdir / "compressed_graph", + batch_size=1000, # go fast on the trivial dataset + ) + + task.run() + + properties = read_properties(tmpdir / "compressed_graph" / "graph.properties") + + assert int(properties["nodes"]) == 21 + assert int(properties["arcs"]) == 23 + + export_meta_path = tmpdir / "compressed_graph/meta/export.json" + assert export_meta_path.read_bytes() == (DATA_DIR / "meta/export.json").read_bytes() + + compression_meta_path = tmpdir / "compressed_graph/meta/compression.json" + assert json.load(compression_meta_path.open())[0]["conf"] == {"batch_size": 1000} diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -4,6 +4,7 @@ [testenv] extras = testing + luigi deps = pytest-cov whitelist_externals = @@ -48,6 +49,7 @@ usedevelop = true extras = testing + luigi deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs @@ -67,6 +69,7 @@ usedevelop = true extras = testing + luigi deps = # install swh-docs in develop mode -e ../swh-docs