diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -14,6 +14,9 @@
[mypy-confluent_kafka.*]
ignore_missing_imports = True
+[mypy-luigi.*]
+ignore_missing_imports = True
+
[mypy-pkg_resources.*]
ignore_missing_imports = True
diff --git a/requirements-luigi.txt b/requirements-luigi.txt
new file mode 100644
--- /dev/null
+++ b/requirements-luigi.txt
@@ -0,0 +1 @@
+luigi
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -54,6 +54,7 @@
extras_require={
"testing": parse_requirements("test"),
"with-content": parse_requirements("swh-with-content"),
+ "luigi": parse_requirements("luigi"),
},
use_scm_version=True,
include_package_data=True,
diff --git a/swh/dataset/luigi.py b/swh/dataset/luigi.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/luigi.py
@@ -0,0 +1,534 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""
+Luigi tasks
+===========
+
+This module contains `Luigi `_ tasks,
+as an alternative to the CLI that can be composed with other tasks,
+such as swh-graph's.
+
+File layout
+-----------
+
+Tasks in this module work on "export directories", which have this layout::
+
+ swh_[_]/
+ edges/
+ origin/
+ snapshot/
+ ...
+ stamps/
+ origin
+ snapshot
+ ...
+ orc/
+ origin/
+ snapshot/
+ ...
+ stamps/
+ origin
+ snapshot
+ ...
+ meta/
+ export.json
+
+``stamps`` files are written after corresponding directories are written.
+Their presence indicates the corresponding directory was fully generated/copied.
+This allows skipping work that was already done, while ignoring interrupted jobs.
+
+``meta/export.json`` contains information about the dataset, for provenance tracking.
+For example:
+
+.. code-block:: json
+
+ {
+ "flavor": "full",
+ "export_start": "2022-11-08T11:00:54.998799+00:00",
+ "export_end": "2022-11-08T11:05:53.105519+00:00",
+ "brokers": [
+ "broker1.journal.staging.swh.network:9093"
+ ],
+ "prefix": "swh.journal.objects",
+ "formats": [
+ "edges",
+ "orc"
+ ],
+ "object_type": [
+ "origin",
+ "origin_visit"
+ ],
+ "hostname": "desktop5",
+ "privileged": false
+ }
+
+Running all on staging
+----------------------
+
+An easy way to run it (eg. on the staging database), is to have these config
+files:
+
+.. code-block: yaml
+ :caption: graph.staging.yml
+
+ journal:
+ brokers:
+ - broker1.journal.staging.swh.network:9093
+ prefix: swh.journal.objects
+ sasl.mechanism: "SCRAM-SHA-512"
+ security.protocol: "sasl_ssl"
+ sasl.username: ""
+ sasl.password: ""
+ privileged: false
+ group_id: "-test-dataset-export"
+
+.. code-block: yaml
+ :caption: luigi.cfg
+
+ [ExportGraph]
+ config=graph.staging.yml
+ processes=16
+
+ [RunAll]
+ formats=edges,orc
+ s3_athena_output_location=s3://vlorentz-test2/tmp/athena-output/
+
+And run this command, for example::
+
+ luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunAll \
+ --UploadToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ \
+ --s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/ \
+ --athena-db-name=vlorentz_20221109_staging
+
+Note that this arbitrarily divides config options between :file:`luigi.cfg` and the CLI
+for readability; but `they can be used interchangeably `__
+""" # noqa
+
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+import enum
+from pathlib import Path
+from typing import Hashable, Iterator, List, TypeVar
+
+import luigi
+
+from swh.dataset import cli
+from swh.dataset.relational import MAIN_TABLES
+
+_ObjectType = enum.Enum( # type: ignore[misc]
+ "_ObjectType", [obj_type for obj_type in MAIN_TABLES.keys()]
+)
+_Format = enum.Enum("_Format", list(cli.AVAILABLE_EXPORTERS)) # type: ignore[misc]
+
+
+T = TypeVar("T", bound=Hashable)
+
+
+def merge_lists(lists: Iterator[List[T]]) -> List[T]:
+ """Returns a list made of all items of the arguments, with no duplicate."""
+ res = set()
+ for list_ in lists:
+ res.update(set(list_))
+ return list(res)
+
+
+class PathParameter(luigi.PathParameter):
+ """
+ A parameter that is a local filesystem path.
+
+ If ``is_dir``, ``is_file``, or ``exists`` is :const:`True`, then existence of
+ the path (and optionally type) is checked.
+
+ If ``create`` is set, then ``is_dir`` must be :const:`True`, and the directory
+ is created if it does not already exist.
+ """
+
+ def __init__(
+ self,
+ is_dir: bool = False,
+ is_file: bool = False,
+ exists: bool = False,
+ create: bool = False,
+ **kwargs,
+ ):
+ if create and not is_dir:
+ raise ValueError("`is_dir` must be True if `create` is True")
+ if is_dir and is_file:
+ raise ValueError("`is_dir` and `is_file` are mutually exclusive")
+
+ super().__init__(**kwargs)
+
+ self.is_dir = is_dir
+ self.is_file = is_file
+ self.exists = exists
+ self.create = create
+
+ def parse(self, s: str) -> Path:
+ path = Path(s)
+
+ if self.create:
+ path.mkdir(parents=True, exist_ok=True)
+
+ if (self.exists or self.is_dir or self.is_file) and not path.exists():
+ raise ValueError(f"{s} does not exist")
+ if self.is_dir and not path.is_dir():
+ raise ValueError(f"{s} is not a directory")
+ if self.is_file and not path.is_file():
+ raise ValueError(f"{s} is not a file")
+
+ return path
+
+
+class S3PathParameter(luigi.Parameter):
+ """A parameter that strip trailing slashes"""
+
+ def normalize(self, s):
+ return s.rstrip("/")
+
+
+class FractionalFloatParameter(luigi.FloatParameter):
+ """A float parameter that must be between 0 and 1"""
+
+ def parse(self, s):
+ v = super().parse(s)
+
+ if not 0.0 <= v <= 1.0:
+ raise ValueError(f"{s} is not a float between 0 and 1")
+
+ return v
+
+
+def stamps_paths(formats: List[_Format], object_types: List[_ObjectType]) -> List[str]:
+ """Returns a list of (local FS or S3) paths used to mark tables are successfully
+ exported.
+ """
+ return [
+ f"{format_.name}/stamps/{object_type.name.lower()}"
+ for format_ in formats
+ for object_type in object_types
+ ]
+
+
+class ExportGraph(luigi.Task):
+ """Exports the entire graph to the local filesystem.
+
+ Example invocation::
+
+ luigi --local-scheduler --module swh.dataset.luigi ExportGraph \
+ --config=graph.prod.yml \
+ --local-export-path=export/ \
+ --formats=edges
+
+ which is equivalent to this CLI call:
+
+ swh dataset --config-file graph.prod.yml graph export export/ --formats=edges
+ """
+
+ config_file = PathParameter(is_file=True)
+ local_export_path = PathParameter(is_dir=True, create=True)
+ export_id = luigi.OptionalParameter(
+ default=None,
+ description="""
+ Unique ID of the export run. This is appended to the kafka
+ group_id config file option. If group_id is not set in the
+ 'journal' section of the config file, defaults to 'swh-dataset-export-'.
+ """,
+ )
+ formats = luigi.EnumListParameter(enum=_Format, batch_method=merge_lists)
+ processes = luigi.IntParameter(default=1, significant=False)
+ margin = FractionalFloatParameter(
+ default=1.0,
+ description="""
+ Offset margin to start consuming from. E.g. is set to '0.95',
+ consumers will start at 95%% of the last committed offset;
+ in other words, start earlier than last committed position.
+ """,
+ )
+ object_types = luigi.EnumListParameter(
+ enum=_ObjectType, default=list(_ObjectType), batch_method=merge_lists
+ )
+
+ def output(self) -> List[luigi.Target]:
+ """Returns stamp and meta paths on the local FS."""
+ return self._stamps() + [self._meta()]
+
+ def _stamps(self):
+ return [
+ luigi.LocalTarget(self.local_export_path / path)
+ for path in stamps_paths(self.formats, self.object_types)
+ ]
+
+ def _meta(self):
+ return luigi.LocalTarget(self.local_export_path / "meta" / "export.json")
+
+ def run(self) -> None:
+ """Runs the full export, then writes stamps, then :file:`meta.json`."""
+ import datetime
+ import json
+ import socket
+
+ from swh.core import config
+
+ # we are about to overwrite files, so remove any existing stamp
+ for output in self.output():
+ if output.exists():
+ output.remove()
+
+ conf = config.read(self.config_file)
+
+ start_date = datetime.datetime.now(tz=datetime.timezone.utc)
+ cli.run_export_graph(
+ config=conf,
+ export_path=self.local_export_path,
+ export_formats=[format_.name for format_ in self.formats],
+ object_types=[obj_type.name.lower() for obj_type in self.object_types],
+ exclude_obj_types=set(),
+ export_id=self.export_id,
+ processes=self.processes,
+ margin=self.margin,
+ )
+ end_date = datetime.datetime.now(tz=datetime.timezone.utc)
+
+ # Create stamps
+ for output in self._stamps():
+ output.makedirs()
+ with output.open("w") as fd:
+ pass
+
+ # Write export metadata
+ meta = {
+ "flavor": "full",
+ "export_start": start_date.isoformat(),
+ "export_end": end_date.isoformat(),
+ "brokers": conf["journal"]["brokers"],
+ "prefix": conf["journal"]["prefix"],
+ "formats": [format_.name for format_ in self.formats],
+ "object_type": [object_type.name for object_type in self.object_types],
+ "hostname": socket.getfqdn(),
+ "privileged": conf["journal"].get("privileged"),
+ }
+ with self._meta().open("w") as fd:
+ json.dump(meta, fd, indent=4)
+
+
+class UploadToS3(luigi.Task):
+ """Uploads a local dataset export to S3; creating automatically if it does
+ not exist.
+
+ Example invocation::
+
+ luigi --local-scheduler --module swh.dataset.luigi UploadToS3 \
+ --config=graph.prod.yml \
+ --local-export-path=export/ \
+ --formats=edges \
+ --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
+ """
+
+ local_export_path = PathParameter(is_dir=True, create=True, significant=False)
+ formats = luigi.EnumListParameter(enum=_Format, batch_method=merge_lists)
+ object_types = luigi.EnumListParameter(
+ enum=_ObjectType, default=list(_ObjectType), batch_method=merge_lists
+ )
+ s3_export_path = S3PathParameter()
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns a :class:`ExportGraph` task that writes local files at the
+ expected location."""
+ return [
+ ExportGraph(
+ local_export_path=self.local_export_path,
+ formats=self.formats,
+ object_types=self.object_types,
+ )
+ ]
+
+ def output(self) -> List[luigi.Target]:
+ """Returns stamp and meta paths on S3."""
+ return self._stamps() + [self._meta()]
+
+ def _stamps(self):
+ import luigi.contrib.s3
+
+ return [
+ luigi.contrib.s3.S3Target(f"{self.s3_export_path}/{path}")
+ for path in stamps_paths(self.formats, self.object_types)
+ ]
+
+ def _meta(self):
+ import luigi.contrib.s3
+
+ return luigi.contrib.s3.S3Target(f"{self.s3_export_path}/meta/export.json")
+
+ def run(self) -> None:
+ """Copies all files: first the export itself, then stamps, then
+ :file:`meta.json`.
+ """
+ import os
+
+ import luigi.contrib.s3
+ import tqdm
+
+ client = luigi.contrib.s3.S3Client()
+
+ # recursively copy local files to S3, and end with stamps and export metadata
+ for format_ in self.formats:
+ for object_type in self.object_types:
+ local_dir = self.local_export_path / format_.name / object_type.name
+ s3_dir = f"{self.s3_export_path}/{format_.name}/{object_type.name}"
+ if not local_dir.exists():
+ # intermediary object types (eg. origin_visit, origin_visit_status)
+ # do not have their own directory
+ continue
+ for file_ in tqdm.tqdm(
+ list(os.listdir(local_dir)),
+ desc=f"Uploading {format_.name}/{object_type.name}/",
+ ):
+ client.put_multipart(
+ local_dir / file_, f"{s3_dir}/{file_}", ACL="public-read"
+ )
+
+ for stamp in stamps_paths(self.formats, self.object_types):
+ client.put_multipart(
+ self.local_export_path / stamp,
+ f"{self.s3_export_path}/{stamp}",
+ ACL="public-read",
+ )
+
+ client.put(
+ self.local_export_path / "meta" / "export.json",
+ self._meta().path,
+ ACL="public-read",
+ )
+
+
+class AthenaDatabaseTarget(luigi.Target):
+ """Target for the existence of a database on Athena."""
+
+ def __init__(self, name: str):
+ self.name = name
+
+ def exists(self) -> bool:
+ import boto3
+
+ client = boto3.client("athena")
+ database_list = client.list_databases(CatalogName="AwsDataCatalog")
+ for database in database_list["DatabaseList"]:
+ if database["Name"] == self.name:
+ # TODO: check all expected tables are present
+ return True
+ return False
+
+
+class CreateAthena(luigi.Task):
+ """Creates tables on AWS Athena pointing to a given graph dataset on S3.
+
+ Example invocation::
+
+ luigi --local-scheduler --module swh.dataset.luigi CreateAthena \
+ --ExportGraph-config=graph.staging.yml \
+ --athena-db-name=swh_20221108 \
+ --object-types=origin,origin_visit \
+ --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 \
+ --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
+
+ which is equivalent to this CLI call:
+
+ swh dataset athena create \
+ --database-name swh_20221108 \
+ --location-prefix s3://softwareheritage/graph/swh_2022-11-08 \
+ --output-location s3://softwareheritage/graph/tmp/athena \
+ --replace-tables
+ """
+
+ object_types = luigi.EnumListParameter(
+ enum=_ObjectType, default=list(_ObjectType), batch_method=merge_lists
+ )
+ s3_export_path = S3PathParameter()
+ s3_athena_output_location = S3PathParameter()
+ athena_db_name = luigi.Parameter()
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ if not self.s3_export_path.replace("-", "").endswith(f"/{self.athena_db_name}"):
+ raise ValueError(
+ f"S3 export path ({self.s3_export_path}) does not match "
+ f"Athena database name ({self.athena_db_name})"
+ )
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns the corresponding :class:`UploadToS3` instance, with ORC as only
+ format."""
+ return [
+ UploadToS3(
+ formats=[_Format.orc], # type: ignore[attr-defined]
+ object_types=self.object_types,
+ s3_export_path=self.s3_export_path,
+ )
+ ]
+
+ def output(self) -> List[luigi.Target]:
+ """Returns an instance of :class:`AthenaDatabaseTarget`."""
+ return [AthenaDatabaseTarget(self.athena_db_name)]
+
+ def run(self) -> None:
+ """Creates tables from the ORC dataset."""
+ from swh.dataset.athena import create_tables
+
+ create_tables(
+ self.athena_db_name,
+ self.s3_export_path,
+ output_location=self.s3_athena_output_location,
+ replace=True,
+ )
+
+
+class RunAll(luigi.Task):
+ """Runs both the S3 and Athena export.
+
+ Example invocation::
+
+ luigi --local-scheduler --module swh.dataset.luigi RunAll \
+ --ExportGraph-config=graph.staging.yml \
+ --ExportGraph-processes=12 \
+ --UploadToS3-local-export-path=/tmp/export_2022-11-08_staging/ \
+ --formats=edges \
+ --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 \
+ --athena-db-name=swh_20221108 \
+ --object-types=origin,origin_visit \
+ --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
+ """
+
+ formats = luigi.EnumListParameter(enum=_Format, batch_method=merge_lists)
+ object_types = luigi.EnumListParameter(
+ enum=_ObjectType, default=list(_ObjectType), batch_method=merge_lists
+ )
+ s3_export_path = S3PathParameter()
+ s3_athena_output_location = S3PathParameter()
+ athena_db_name = luigi.Parameter()
+
+ def requires(self) -> List[luigi.Task]:
+ # CreateAthena depends on UploadToS3(formats=[edges]), so we need to
+ # explicitly depend on UploadToS3(formats=self.formats) here, to also
+ # export the formats requested by the user.
+ return [
+ CreateAthena(
+ object_types=self.object_types,
+ s3_export_path=self.s3_export_path,
+ s3_athena_output_location=self.s3_athena_output_location,
+ athena_db_name=self.athena_db_name,
+ ),
+ UploadToS3(
+ formats=self.formats,
+ object_types=self.object_types,
+ s3_export_path=self.s3_export_path,
+ ),
+ ]
+
+ def complete(self) -> bool:
+ # Dependencies perform their own completeness check, and this task
+ # does no work itself
+ return False
diff --git a/tox.ini b/tox.ini
--- a/tox.ini
+++ b/tox.ini
@@ -3,6 +3,7 @@
[testenv]
extras =
+ luigi
testing
deps =
pytest-cov
@@ -44,6 +45,7 @@
whitelist_externals = make
usedevelop = true
extras =
+ luigi
testing
deps =
# fetch and install swh-docs in develop mode
@@ -63,6 +65,7 @@
whitelist_externals = make
usedevelop = true
extras =
+ luigi
testing
deps =
# install swh-docs in develop mode