diff --git a/swh/graph/luigi.py b/swh/graph/luigi.py
index 027b7ae..e2c19c5 100644
--- a/swh/graph/luigi.py
+++ b/swh/graph/luigi.py
@@ -1,198 +1,172 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Luigi tasks
===========
This module contains `Luigi `_ tasks,
as an alternative to the CLI that can be composed with other tasks,
such as swh-dataset's.
File layout
-----------
(This section assumes a graph named `graph`.
In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`,
:file:`graph.mph`, ...), tasks in this module produce this directory structure:
swh_[_]/
graph.graph
graph.mph
...
- graph.stamps/
- origin
- snapshot
- ...
graph.meta/
export.json
compression.json
-``stamps`` files are written after corresponding directories are written.
-Their presence indicates the corresponding graph was fully generated/copied,
-This allows skipping work that was already done, while ignoring interrupted jobs.
-
``graph.meta/export.json`` is copied from the ORC dataset exported by
:mod:`swh.dataset.luigi`.
`graph.meta/compression.json`` contains information about the compression itself,
for provenance tracking.
For example:
.. code-block:: json
[
{
"steps": None,
"export_start": "2022-11-08T11:00:54.998799+00:00",
"export_end": "2022-11-08T11:05:53.105519+00:00",
"object_type": [
"origin",
"origin_visit"
],
"hostname": "desktop5",
"conf": {},
"tool": {
"name": "swh.graph",
"version": "2.2.0",
}
}
]
When the compression pipeline is run in separate steps, each of the steps is recorded
as an object in the root list.
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
-from pathlib import Path
from typing import List
import luigi
-from swh.dataset.luigi import Format, LocalExport, ObjectType, merge_lists
+from swh.dataset.luigi import Format, LocalExport, ObjectType
class CompressGraph(luigi.Task):
local_export_path = luigi.PathParameter()
- object_types = luigi.EnumListParameter(
- enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
- )
- output_directory = luigi.PathParameter()
- graph_name = luigi.Parameter("graph")
+ local_graph_path = luigi.PathParameter()
batch_size = luigi.IntParameter(
default=0,
significant=False,
description="""
Size of work batches to use while compressing.
Larger is faster, but consumes more resources.
""",
)
+ object_types = list(ObjectType)
+ # To make this configurable, we could use this:
+ # object_types = luigi.EnumListParameter(
+ # enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
+ # )
+ # then use swh.dataset.luigi._export_metadata_has_object_types to check in
+ # .meta/export.json that all objects are present before skipping the task
+
def requires(self) -> List[luigi.Task]:
return [
LocalExport(
local_export_path=self.local_export_path,
formats=[Format.orc], # type: ignore[attr-defined]
object_types=self.object_types,
)
]
def output(self) -> List[luigi.LocalTarget]:
- return self._stamps() + [self._export_meta(), self._compression_meta()]
-
- def _stamps_dir(self) -> Path:
- # TODO: read export.json to use it as stamp for the list of object types
- # (instead of adding one file per object type)
- return self.output_directory / f"{self.graph_name}.stamps"
-
- def _stamps(self) -> List[luigi.Target]:
- return [
- luigi.LocalTarget(self._stamps_dir() / object_type.name)
- for object_type in self.object_types
- ]
+ return [self._export_meta(), self._compression_meta()]
def _export_meta(self) -> luigi.Target:
"""Returns the metadata on the dataset export"""
- return luigi.LocalTarget(
- self.output_directory / f"{self.graph_name}.meta" / "export.json"
- )
+ return luigi.LocalTarget(f"{self.local_graph_path}.meta/export.json")
def _compression_meta(self) -> luigi.Target:
"""Returns the metadata on the compression pipeline"""
- return luigi.LocalTarget(
- self.output_directory / f"{self.graph_name}.meta" / "compression.json"
- )
+ return luigi.LocalTarget(f"{self.local_graph_path}.meta/compression.json")
def run(self):
import datetime
import json
- import shutil
import socket
import pkg_resources
from swh.graph import webgraph
conf = {} # TODO: make this configurable
steps = None # TODO: make this configurable
if self.batch_size:
conf["batch_size"] = self.batch_size
# Delete stamps. Otherwise interrupting this compression pipeline may leave
# stamps from a previous successful compression
- if self._stamps_dir().exists():
- shutil.rmtree(self._stamps_dir())
if self._export_meta().exists():
self._export_meta().remove()
if self._compression_meta().exists():
self._compression_meta().remove()
+ output_directory = self.local_graph_path.parent
+ graph_name = self.local_graph_path.name
+
start_date = datetime.datetime.now(tz=datetime.timezone.utc)
webgraph.compress(
- self.graph_name,
+ graph_name,
self.local_export_path / "orc",
- self.output_directory,
+ output_directory,
steps,
conf,
)
end_date = datetime.datetime.now(tz=datetime.timezone.utc)
- # Create stamps
- for output in self._stamps():
- output.makedirs()
- with output.open("w") as fd:
- pass
-
# Copy dataset export metadata
with self._export_meta().open("w") as write_fd:
with (self.local_export_path / "meta" / "export.json").open() as read_fd:
write_fd.write(read_fd.read())
# Append metadata about this compression pipeline
if self._compression_meta().exists():
with self._compression_meta().open("w") as fd:
meta = json.load(fd)
else:
meta = []
meta.append(
{
"steps": steps,
"compression_start": start_date.isoformat(),
"compression_end": end_date.isoformat(),
"object_type": [object_type.name for object_type in self.object_types],
"hostname": socket.getfqdn(),
"conf": conf,
"tool": {
"name": "swh.graph",
"version": pkg_resources.get_distribution("swh.graph").version,
},
}
)
with self._compression_meta().open("w") as fd:
json.dump(meta, fd, indent=4)
diff --git a/swh/graph/tests/test_luigi.py b/swh/graph/tests/test_luigi.py
index 938179e..a0dd46a 100644
--- a/swh/graph/tests/test_luigi.py
+++ b/swh/graph/tests/test_luigi.py
@@ -1,40 +1,36 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from pathlib import Path
from swh.graph.luigi import CompressGraph
from .test_cli import read_properties
DATA_DIR = Path(__file__).parents[0] / "dataset"
def test_compressgraph(tmpdir):
tmpdir = Path(tmpdir)
task = CompressGraph(
- local_export_path=DATA_DIR / "dataset",
- output_directory=tmpdir / "compressed_graph",
- graph_name="example",
+ local_export_path=DATA_DIR,
+ local_graph_path=tmpdir / "compressed_graph" / "example",
batch_size=1000, # go fast on the trivial dataset
)
task.run()
properties = read_properties(tmpdir / "compressed_graph" / "example.properties")
assert int(properties["nodes"]) == 21
assert int(properties["arcs"]) == 23
export_meta_path = tmpdir / "compressed_graph/example.meta/export.json"
- assert (
- export_meta_path.read_bytes()
- == (DATA_DIR / "dataset/meta/export.json").read_bytes()
- )
+ assert export_meta_path.read_bytes() == (DATA_DIR / "meta/export.json").read_bytes()
compression_meta_path = tmpdir / "compressed_graph/example.meta/compression.json"
assert json.load(compression_meta_path.open())[0]["conf"] == {"batch_size": 1000}