diff --git a/mypy.ini b/mypy.ini
index dffcb6e..d91201c 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,21 +1,24 @@
[mypy]
namespace_packages = True
warn_unused_ignores = True
exclude = (?x)(
^swh/graph/grpc
)
# 3rd party libraries without stubs (yet)
+[mypy-luigi.*]
+ignore_missing_imports = True
+
[mypy-pkg_resources.*]
ignore_missing_imports = True
[mypy-psutil.*]
ignore_missing_imports = True
[mypy-py4j.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
diff --git a/requirements-luigi.txt b/requirements-luigi.txt
new file mode 100644
index 0000000..dc52dc6
--- /dev/null
+++ b/requirements-luigi.txt
@@ -0,0 +1 @@
+luigi
diff --git a/requirements-swh-luigi.txt b/requirements-swh-luigi.txt
new file mode 100644
index 0000000..3c7e36b
--- /dev/null
+++ b/requirements-swh-luigi.txt
@@ -0,0 +1 @@
+swh.dataset[luigi] >= v0.3.2
diff --git a/setup.py b/setup.py
index 089e343..6cd90e5 100755
--- a/setup.py
+++ b/setup.py
@@ -1,77 +1,80 @@
#!/usr/bin/env python3
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from glob import glob
from io import open
from os import path
from setuptools import find_packages, setup
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, "README.rst"), encoding="utf-8") as f:
long_description = f.read()
def parse_requirements(name=None):
if name:
reqf = "requirements-%s.txt" % name
else:
reqf = "requirements.txt"
requirements = []
if not path.exists(reqf):
return requirements
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith("#"):
continue
requirements.append(line)
return requirements
JAR_PATHS = list(glob("java/target/swh-graph-*.jar"))
setup(
name="swh.graph",
description="Software Heritage graph service",
long_description=long_description,
long_description_content_type="text/x-rst",
python_requires=">=3.7",
author="Software Heritage developers",
author_email="swh-devel@inria.fr",
url="https://forge.softwareheritage.org/diffusion/DGRPH",
packages=find_packages(),
install_requires=parse_requirements() + parse_requirements("swh"),
tests_require=parse_requirements("test"),
setup_requires=["setuptools-scm"],
use_scm_version=True,
- extras_require={"testing": parse_requirements("test")},
+ extras_require={
+ "testing": parse_requirements("test"),
+ "luigi": parse_requirements("luigi"),
+ },
include_package_data=True,
data_files=[("share/swh-graph", JAR_PATHS)],
entry_points="""
[console_scripts]
swh-graph=swh.graph.cli:main
[swh.cli.subcommands]
graph=swh.graph.cli
""",
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 3 - Alpha",
],
project_urls={
"Bug Reports": "https://forge.softwareheritage.org/maniphest",
"Funding": "https://www.softwareheritage.org/donate",
"Source": "https://forge.softwareheritage.org/source/swh-graph",
"Documentation": "https://docs.softwareheritage.org/devel/swh-graph/",
},
)
diff --git a/swh/graph/luigi.py b/swh/graph/luigi.py
new file mode 100644
index 0000000..027b7ae
--- /dev/null
+++ b/swh/graph/luigi.py
@@ -0,0 +1,198 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""
+Luigi tasks
+===========
+
+This module contains `Luigi `_ tasks,
+as an alternative to the CLI that can be composed with other tasks,
+such as swh-dataset's.
+
+File layout
+-----------
+
+(This section assumes a graph named `graph`.
+
+In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`,
+:file:`graph.mph`, ...), tasks in this module produce this directory structure:
+
+ swh_[_]/
+ graph.graph
+ graph.mph
+ ...
+ graph.stamps/
+ origin
+ snapshot
+ ...
+ graph.meta/
+ export.json
+ compression.json
+
+``stamps`` files are written after corresponding directories are written.
+Their presence indicates the corresponding graph was fully generated/copied,
+This allows skipping work that was already done, while ignoring interrupted jobs.
+
+``graph.meta/export.json`` is copied from the ORC dataset exported by
+:mod:`swh.dataset.luigi`.
+
+`graph.meta/compression.json`` contains information about the compression itself,
+for provenance tracking.
+For example:
+
+.. code-block:: json
+
+ [
+ {
+ "steps": None,
+ "export_start": "2022-11-08T11:00:54.998799+00:00",
+ "export_end": "2022-11-08T11:05:53.105519+00:00",
+ "object_type": [
+ "origin",
+ "origin_visit"
+ ],
+ "hostname": "desktop5",
+ "conf": {},
+ "tool": {
+ "name": "swh.graph",
+ "version": "2.2.0",
+ }
+ }
+ ]
+
+When the compression pipeline is run in separate steps, each of the steps is recorded
+as an object in the root list.
+"""
+
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+from pathlib import Path
+from typing import List
+
+import luigi
+
+from swh.dataset.luigi import Format, LocalExport, ObjectType, merge_lists
+
+
+class CompressGraph(luigi.Task):
+ local_export_path = luigi.PathParameter()
+ object_types = luigi.EnumListParameter(
+ enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
+ )
+ output_directory = luigi.PathParameter()
+ graph_name = luigi.Parameter("graph")
+ batch_size = luigi.IntParameter(
+ default=0,
+ significant=False,
+ description="""
+ Size of work batches to use while compressing.
+ Larger is faster, but consumes more resources.
+ """,
+ )
+
+ def requires(self) -> List[luigi.Task]:
+ return [
+ LocalExport(
+ local_export_path=self.local_export_path,
+ formats=[Format.orc], # type: ignore[attr-defined]
+ object_types=self.object_types,
+ )
+ ]
+
+ def output(self) -> List[luigi.LocalTarget]:
+ return self._stamps() + [self._export_meta(), self._compression_meta()]
+
+ def _stamps_dir(self) -> Path:
+ # TODO: read export.json to use it as stamp for the list of object types
+ # (instead of adding one file per object type)
+ return self.output_directory / f"{self.graph_name}.stamps"
+
+ def _stamps(self) -> List[luigi.Target]:
+ return [
+ luigi.LocalTarget(self._stamps_dir() / object_type.name)
+ for object_type in self.object_types
+ ]
+
+ def _export_meta(self) -> luigi.Target:
+ """Returns the metadata on the dataset export"""
+ return luigi.LocalTarget(
+ self.output_directory / f"{self.graph_name}.meta" / "export.json"
+ )
+
+ def _compression_meta(self) -> luigi.Target:
+ """Returns the metadata on the compression pipeline"""
+ return luigi.LocalTarget(
+ self.output_directory / f"{self.graph_name}.meta" / "compression.json"
+ )
+
+ def run(self):
+ import datetime
+ import json
+ import shutil
+ import socket
+
+ import pkg_resources
+
+ from swh.graph import webgraph
+
+ conf = {} # TODO: make this configurable
+ steps = None # TODO: make this configurable
+
+ if self.batch_size:
+ conf["batch_size"] = self.batch_size
+
+ # Delete stamps. Otherwise interrupting this compression pipeline may leave
+ # stamps from a previous successful compression
+ if self._stamps_dir().exists():
+ shutil.rmtree(self._stamps_dir())
+ if self._export_meta().exists():
+ self._export_meta().remove()
+ if self._compression_meta().exists():
+ self._compression_meta().remove()
+
+ start_date = datetime.datetime.now(tz=datetime.timezone.utc)
+ webgraph.compress(
+ self.graph_name,
+ self.local_export_path / "orc",
+ self.output_directory,
+ steps,
+ conf,
+ )
+ end_date = datetime.datetime.now(tz=datetime.timezone.utc)
+
+ # Create stamps
+ for output in self._stamps():
+ output.makedirs()
+ with output.open("w") as fd:
+ pass
+
+ # Copy dataset export metadata
+ with self._export_meta().open("w") as write_fd:
+ with (self.local_export_path / "meta" / "export.json").open() as read_fd:
+ write_fd.write(read_fd.read())
+
+ # Append metadata about this compression pipeline
+ if self._compression_meta().exists():
+ with self._compression_meta().open("w") as fd:
+ meta = json.load(fd)
+ else:
+ meta = []
+
+ meta.append(
+ {
+ "steps": steps,
+ "compression_start": start_date.isoformat(),
+ "compression_end": end_date.isoformat(),
+ "object_type": [object_type.name for object_type in self.object_types],
+ "hostname": socket.getfqdn(),
+ "conf": conf,
+ "tool": {
+ "name": "swh.graph",
+ "version": pkg_resources.get_distribution("swh.graph").version,
+ },
+ }
+ )
+ with self._compression_meta().open("w") as fd:
+ json.dump(meta, fd, indent=4)
diff --git a/swh/graph/tests/dataset/meta/export.json b/swh/graph/tests/dataset/meta/export.json
new file mode 100644
index 0000000..ec2b5ed
--- /dev/null
+++ b/swh/graph/tests/dataset/meta/export.json
@@ -0,0 +1,13 @@
+{
+ "object_type": [
+ "origin_visit",
+ "revision",
+ "skipped_content",
+ "directory",
+ "snapshot",
+ "origin",
+ "release",
+ "content",
+ "origin_visit_status"
+]
+}
diff --git a/swh/graph/tests/test_luigi.py b/swh/graph/tests/test_luigi.py
new file mode 100644
index 0000000..938179e
--- /dev/null
+++ b/swh/graph/tests/test_luigi.py
@@ -0,0 +1,40 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import json
+from pathlib import Path
+
+from swh.graph.luigi import CompressGraph
+
+from .test_cli import read_properties
+
+DATA_DIR = Path(__file__).parents[0] / "dataset"
+
+
+def test_compressgraph(tmpdir):
+ tmpdir = Path(tmpdir)
+
+ task = CompressGraph(
+ local_export_path=DATA_DIR / "dataset",
+ output_directory=tmpdir / "compressed_graph",
+ graph_name="example",
+ batch_size=1000, # go fast on the trivial dataset
+ )
+
+ task.run()
+
+ properties = read_properties(tmpdir / "compressed_graph" / "example.properties")
+
+ assert int(properties["nodes"]) == 21
+ assert int(properties["arcs"]) == 23
+
+ export_meta_path = tmpdir / "compressed_graph/example.meta/export.json"
+ assert (
+ export_meta_path.read_bytes()
+ == (DATA_DIR / "dataset/meta/export.json").read_bytes()
+ )
+
+ compression_meta_path = tmpdir / "compressed_graph/example.meta/compression.json"
+ assert json.load(compression_meta_path.open())[0]["conf"] == {"batch_size": 1000}
diff --git a/tox.ini b/tox.ini
index 654d161..c81f9b8 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,79 +1,82 @@
[tox]
envlist=black,flake8,mypy,py3
[testenv]
extras =
testing
+ luigi
deps =
pytest-cov
whitelist_externals =
mvn
sh
commands =
sh -c 'if ! [ -d {envdir}/share/swh-graph ]; then mvn -f java/pom.xml compile assembly:single; mkdir {envdir}/share/swh-graph; cp java/target/*.jar {envdir}/share/swh-graph; fi'
pytest --cov={envsitepackagesdir}/swh/graph \
{envsitepackagesdir}/swh/graph \
--doctest-modules \
--cov-branch {posargs}
[testenv:black]
skip_install = true
deps =
black==22.10.0
commands =
{envpython} -m black --check swh
[testenv:flake8]
skip_install = true
deps =
flake8==5.0.4
flake8-bugbear==22.9.23
pycodestyle==2.9.1
commands =
{envpython} -m flake8
[testenv:mypy]
extras =
testing
deps =
mypy==0.942
commands =
mypy swh
# build documentation outside swh-environment using the current
# git HEAD of swh-docs, is executed on CI for each diff to prevent
# breaking doc build
[testenv:sphinx]
whitelist_externals = make
usedevelop = true
extras =
testing
+ luigi
deps =
# fetch and install swh-docs in develop mode
-e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs
setenv =
SWH_PACKAGE_DOC_TOX_BUILD = 1
# turn warnings into errors
SPHINXOPTS = -W
commands =
make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs
# build documentation only inside swh-environment using local state
# of swh-docs package
[testenv:sphinx-dev]
whitelist_externals = make
usedevelop = true
extras =
testing
+ luigi
deps =
# install swh-docs in develop mode
-e ../swh-docs
setenv =
SWH_PACKAGE_DOC_TOX_BUILD = 1
# turn warnings into errors
SPHINXOPTS = -W
commands =
make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs