diff --git a/mypy.ini b/mypy.ini index dffcb6e..d91201c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,21 +1,24 @@ [mypy] namespace_packages = True warn_unused_ignores = True exclude = (?x)( ^swh/graph/grpc ) # 3rd party libraries without stubs (yet) +[mypy-luigi.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-psutil.*] ignore_missing_imports = True [mypy-py4j.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True diff --git a/requirements-luigi.txt b/requirements-luigi.txt new file mode 100644 index 0000000..dc52dc6 --- /dev/null +++ b/requirements-luigi.txt @@ -0,0 +1 @@ +luigi diff --git a/requirements-swh-luigi.txt b/requirements-swh-luigi.txt new file mode 100644 index 0000000..3c7e36b --- /dev/null +++ b/requirements-swh-luigi.txt @@ -0,0 +1 @@ +swh.dataset[luigi] >= v0.3.2 diff --git a/setup.py b/setup.py index 089e343..6cd90e5 100755 --- a/setup.py +++ b/setup.py @@ -1,77 +1,80 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from glob import glob from io import open from os import path from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.rst"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements JAR_PATHS = list(glob("java/target/swh-graph-*.jar")) setup( name="swh.graph", description="Software Heritage graph service", long_description=long_description, long_description_content_type="text/x-rst", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DGRPH", packages=find_packages(), install_requires=parse_requirements() + parse_requirements("swh"), tests_require=parse_requirements("test"), setup_requires=["setuptools-scm"], use_scm_version=True, - extras_require={"testing": parse_requirements("test")}, + extras_require={ + "testing": parse_requirements("test"), + "luigi": parse_requirements("luigi"), + }, include_package_data=True, data_files=[("share/swh-graph", JAR_PATHS)], entry_points=""" [console_scripts] swh-graph=swh.graph.cli:main [swh.cli.subcommands] graph=swh.graph.cli """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 3 - Alpha", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-graph", "Documentation": "https://docs.softwareheritage.org/devel/swh-graph/", }, ) diff --git a/swh/graph/luigi.py b/swh/graph/luigi.py new file mode 100644 index 0000000..027b7ae --- /dev/null +++ b/swh/graph/luigi.py @@ -0,0 +1,198 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +Luigi tasks +=========== + +This module contains `Luigi `_ tasks, +as an alternative to the CLI that can be composed with other tasks, +such as swh-dataset's. + +File layout +----------- + +(This section assumes a graph named `graph`. + +In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`, +:file:`graph.mph`, ...), tasks in this module produce this directory structure: + + swh_[_]/ + graph.graph + graph.mph + ... + graph.stamps/ + origin + snapshot + ... + graph.meta/ + export.json + compression.json + +``stamps`` files are written after corresponding directories are written. +Their presence indicates the corresponding graph was fully generated/copied, +This allows skipping work that was already done, while ignoring interrupted jobs. + +``graph.meta/export.json`` is copied from the ORC dataset exported by +:mod:`swh.dataset.luigi`. + +`graph.meta/compression.json`` contains information about the compression itself, +for provenance tracking. +For example: + +.. code-block:: json + + [ + { + "steps": None, + "export_start": "2022-11-08T11:00:54.998799+00:00", + "export_end": "2022-11-08T11:05:53.105519+00:00", + "object_type": [ + "origin", + "origin_visit" + ], + "hostname": "desktop5", + "conf": {}, + "tool": { + "name": "swh.graph", + "version": "2.2.0", + } + } + ] + +When the compression pipeline is run in separate steps, each of the steps is recorded +as an object in the root list. +""" + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control +from pathlib import Path +from typing import List + +import luigi + +from swh.dataset.luigi import Format, LocalExport, ObjectType, merge_lists + + +class CompressGraph(luigi.Task): + local_export_path = luigi.PathParameter() + object_types = luigi.EnumListParameter( + enum=ObjectType, default=list(ObjectType), batch_method=merge_lists + ) + output_directory = luigi.PathParameter() + graph_name = luigi.Parameter("graph") + batch_size = luigi.IntParameter( + default=0, + significant=False, + description=""" + Size of work batches to use while compressing. + Larger is faster, but consumes more resources. + """, + ) + + def requires(self) -> List[luigi.Task]: + return [ + LocalExport( + local_export_path=self.local_export_path, + formats=[Format.orc], # type: ignore[attr-defined] + object_types=self.object_types, + ) + ] + + def output(self) -> List[luigi.LocalTarget]: + return self._stamps() + [self._export_meta(), self._compression_meta()] + + def _stamps_dir(self) -> Path: + # TODO: read export.json to use it as stamp for the list of object types + # (instead of adding one file per object type) + return self.output_directory / f"{self.graph_name}.stamps" + + def _stamps(self) -> List[luigi.Target]: + return [ + luigi.LocalTarget(self._stamps_dir() / object_type.name) + for object_type in self.object_types + ] + + def _export_meta(self) -> luigi.Target: + """Returns the metadata on the dataset export""" + return luigi.LocalTarget( + self.output_directory / f"{self.graph_name}.meta" / "export.json" + ) + + def _compression_meta(self) -> luigi.Target: + """Returns the metadata on the compression pipeline""" + return luigi.LocalTarget( + self.output_directory / f"{self.graph_name}.meta" / "compression.json" + ) + + def run(self): + import datetime + import json + import shutil + import socket + + import pkg_resources + + from swh.graph import webgraph + + conf = {} # TODO: make this configurable + steps = None # TODO: make this configurable + + if self.batch_size: + conf["batch_size"] = self.batch_size + + # Delete stamps. Otherwise interrupting this compression pipeline may leave + # stamps from a previous successful compression + if self._stamps_dir().exists(): + shutil.rmtree(self._stamps_dir()) + if self._export_meta().exists(): + self._export_meta().remove() + if self._compression_meta().exists(): + self._compression_meta().remove() + + start_date = datetime.datetime.now(tz=datetime.timezone.utc) + webgraph.compress( + self.graph_name, + self.local_export_path / "orc", + self.output_directory, + steps, + conf, + ) + end_date = datetime.datetime.now(tz=datetime.timezone.utc) + + # Create stamps + for output in self._stamps(): + output.makedirs() + with output.open("w") as fd: + pass + + # Copy dataset export metadata + with self._export_meta().open("w") as write_fd: + with (self.local_export_path / "meta" / "export.json").open() as read_fd: + write_fd.write(read_fd.read()) + + # Append metadata about this compression pipeline + if self._compression_meta().exists(): + with self._compression_meta().open("w") as fd: + meta = json.load(fd) + else: + meta = [] + + meta.append( + { + "steps": steps, + "compression_start": start_date.isoformat(), + "compression_end": end_date.isoformat(), + "object_type": [object_type.name for object_type in self.object_types], + "hostname": socket.getfqdn(), + "conf": conf, + "tool": { + "name": "swh.graph", + "version": pkg_resources.get_distribution("swh.graph").version, + }, + } + ) + with self._compression_meta().open("w") as fd: + json.dump(meta, fd, indent=4) diff --git a/swh/graph/tests/dataset/meta/export.json b/swh/graph/tests/dataset/meta/export.json new file mode 100644 index 0000000..ec2b5ed --- /dev/null +++ b/swh/graph/tests/dataset/meta/export.json @@ -0,0 +1,13 @@ +{ + "object_type": [ + "origin_visit", + "revision", + "skipped_content", + "directory", + "snapshot", + "origin", + "release", + "content", + "origin_visit_status" +] +} diff --git a/swh/graph/tests/test_luigi.py b/swh/graph/tests/test_luigi.py new file mode 100644 index 0000000..938179e --- /dev/null +++ b/swh/graph/tests/test_luigi.py @@ -0,0 +1,40 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +from pathlib import Path + +from swh.graph.luigi import CompressGraph + +from .test_cli import read_properties + +DATA_DIR = Path(__file__).parents[0] / "dataset" + + +def test_compressgraph(tmpdir): + tmpdir = Path(tmpdir) + + task = CompressGraph( + local_export_path=DATA_DIR / "dataset", + output_directory=tmpdir / "compressed_graph", + graph_name="example", + batch_size=1000, # go fast on the trivial dataset + ) + + task.run() + + properties = read_properties(tmpdir / "compressed_graph" / "example.properties") + + assert int(properties["nodes"]) == 21 + assert int(properties["arcs"]) == 23 + + export_meta_path = tmpdir / "compressed_graph/example.meta/export.json" + assert ( + export_meta_path.read_bytes() + == (DATA_DIR / "dataset/meta/export.json").read_bytes() + ) + + compression_meta_path = tmpdir / "compressed_graph/example.meta/compression.json" + assert json.load(compression_meta_path.open())[0]["conf"] == {"batch_size": 1000} diff --git a/tox.ini b/tox.ini index 654d161..c81f9b8 100644 --- a/tox.ini +++ b/tox.ini @@ -1,79 +1,82 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing + luigi deps = pytest-cov whitelist_externals = mvn sh commands = sh -c 'if ! [ -d {envdir}/share/swh-graph ]; then mvn -f java/pom.xml compile assembly:single; mkdir {envdir}/share/swh-graph; cp java/target/*.jar {envdir}/share/swh-graph; fi' pytest --cov={envsitepackagesdir}/swh/graph \ {envsitepackagesdir}/swh/graph \ --doctest-modules \ --cov-branch {posargs} [testenv:black] skip_install = true deps = black==22.10.0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = flake8==5.0.4 flake8-bugbear==22.9.23 pycodestyle==2.9.1 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy==0.942 commands = mypy swh # build documentation outside swh-environment using the current # git HEAD of swh-docs, is executed on CI for each diff to prevent # breaking doc build [testenv:sphinx] whitelist_externals = make usedevelop = true extras = testing + luigi deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs # build documentation only inside swh-environment using local state # of swh-docs package [testenv:sphinx-dev] whitelist_externals = make usedevelop = true extras = testing + luigi deps = # install swh-docs in develop mode -e ../swh-docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs