diff --git a/swh/graph/luigi/__init__.py b/swh/graph/luigi/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/graph/luigi/__init__.py
@@ -0,0 +1,75 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""
+Luigi tasks
+===========
+
+This package contains `Luigi `_ tasks.
+These come in two kinds:
+
+* in :mod:`swh.graph.luigi.compressed_graph`: an alternative to the 'swh graph compress'
+ CLI that can be composed with other tasks, such as swh-dataset's
+* in other submodules: tasks driving the creation of specific datasets that are
+ generated using the compressed graph
+
+
+The overall directory structure is::
+
+ base_dir/
+ [_]/
+ edges/
+ ...
+ orc/
+ ...
+ compressed/
+ graph.graph
+ graph.mph
+ ...
+ meta/
+ export.json
+ compression.json
+ datasets/
+ contribution_graph.csv.zst
+ topology/
+ topological_order_dfs.csv.zst
+
+And optionally::
+
+ sensitive_base_dir/
+ [_]/
+ persons_sha256_to_name.csv.zst
+ datasets/
+ contribution_graph.deanonymized.csv.zst
+"""
+
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+from typing import List
+
+import luigi
+
+from . import compressed_graph, origin_contributors
+
+
+class RunAll(luigi.Task):
+ """Runs dataset export, graph compression, and generates datasets using the graph."""
+
+ def requires(self) -> List[luigi.Task]:
+ from swh.dataset.luigi import RunExportAll
+
+ # Technically RunExportAll and DeanonymizeOriginContributors together depend
+ # on everything else, but it's best to be explicit
+ return [
+ RunExportAll(),
+ compressed_graph.LocalGraph(),
+ origin_contributors.ListOriginContributors(),
+ origin_contributors.DeanonymizeOriginContributors(),
+ ]
+
+ def complete(self) -> bool:
+ # Dependencies perform their own completeness check, and this task
+ # does no work itself
+ return False
diff --git a/swh/graph/luigi.py b/swh/graph/luigi/compressed_graph.py
rename from swh/graph/luigi.py
rename to swh/graph/luigi/compressed_graph.py
--- a/swh/graph/luigi.py
+++ b/swh/graph/luigi/compressed_graph.py
@@ -4,8 +4,8 @@
# See top-level LICENSE file for more information
"""
-Luigi tasks
-===========
+Luigi tasks for compression
+===========================
This module contains `Luigi `_ tasks,
as an alternative to the CLI that can be composed with other tasks,
@@ -19,13 +19,15 @@
In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`,
:file:`graph.mph`, ...), tasks in this module produce this directory structure::
- swh_[_]/
- graph.graph
- graph.mph
- ...
- meta/
- export.json
- compression.json
+ base_dir/
+ [_]/
+ compressed/
+ graph.graph
+ graph.mph
+ ...
+ meta/
+ export.json
+ compression.json
``graph.meta/export.json`` is copied from the ORC dataset exported by
:mod:`swh.dataset.luigi`.
@@ -73,8 +75,7 @@
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
-from pathlib import Path
-from typing import Dict, List, Tuple
+from typing import List
import luigi
@@ -435,220 +436,3 @@
def _meta(self):
return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json")
-
-
-def _run_script(script: str, output_path: Path) -> None:
- import os
- import subprocess
-
- from .config import check_config
-
- conf: Dict = {} # TODO: configurable
-
- conf = check_config(conf)
- env = {
- **os.environ.copy(),
- "JAVA_TOOL_OPTIONS": conf["java_tool_options"],
- "CLASSPATH": conf["classpath"],
- }
-
- tmp_output_path = Path(f"{output_path}.tmp")
-
- subprocess.run(
- ["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True
- )
-
- # Atomically write the output file
- tmp_output_path.replace(output_path)
-
-
-class TopoSort(luigi.Task):
- """Creates a file that contains all SWHIDs in topological order from a compressed
- graph."""
-
- local_graph_path = luigi.PathParameter()
- topological_order_path = luigi.PathParameter()
- graph_name = luigi.Parameter(default="graph")
-
- def requires(self) -> List[luigi.Task]:
- """Returns an instance of :class:`LocalGraph`."""
- return [LocalGraph(local_graph_path=self.local_graph_path)]
-
- def output(self) -> luigi.Target:
- """.csv.zst file that contains the topological order."""
- return luigi.LocalTarget(self.topological_order_path)
-
- def run(self) -> None:
- """Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
- object_types = "rev,rel,snp,ori"
- class_name = "org.softwareheritage.graph.utils.TopoSort"
- script = f"""
- java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \
- | pv --line-mode --wait \
- | zstdmt -19
- """
- _run_script(script, self.topological_order_path)
-
-
-class ListOriginContributors(luigi.Task):
- """Creates a file that contains all SWHIDs in topological order from a compressed
- graph."""
-
- local_graph_path = luigi.PathParameter()
- topological_order_path = luigi.PathParameter()
- origin_contributors_path = luigi.PathParameter()
- graph_name = luigi.Parameter(default="graph")
-
- def requires(self) -> List[luigi.Task]:
- """Returns an instance of :class:`LocalGraph` and :class:`TopoSort`."""
- return [
- LocalGraph(local_graph_path=self.local_graph_path),
- TopoSort(
- local_graph_path=self.local_graph_path,
- topological_order_path=self.topological_order_path,
- graph_name=self.graph_name,
- ),
- ]
-
- def output(self) -> luigi.Target:
- """.csv.zst file that contains the topological order."""
- return luigi.LocalTarget(self.origin_contributors_path)
-
- def run(self) -> None:
- """Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
- class_name = "org.softwareheritage.graph.utils.ListOriginContributors"
- script = f"""
- zstdcat {self.topological_order_path} \
- | java {class_name} '{self.local_graph_path}/{self.graph_name}' \
- | pv --line-mode --wait \
- | zstdmt -19
- """
- _run_script(script, self.origin_contributors_path)
-
-
-class ExportDeanonymizationTable(luigi.Task):
- """Exports (from swh-storage) a .csv.zst file that contains the columns:
- ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``.
-
- The first column is the anonymized full name found in :file:`graph.persons.csv.zst`
- in the compressed graph, and the latter two are the original name."""
-
- storage_dsn = luigi.Parameter(
- default="service=swh",
- description="postgresql DSN of the swh-storage database to read from.",
- )
- deanonymization_table_path = luigi.PathParameter()
-
- def output(self) -> luigi.Target:
- """.csv.zst file that contains the table."""
- return luigi.LocalTarget(self.deanonymization_table_path)
-
- def run(self) -> None:
- """Runs a postgresql query to compute the table."""
-
- _run_script(
- f"""
- psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19
- """, # noqa
- self.deanonymization_table_path,
- )
-
-
-class DeanonymizeOriginContributors(luigi.Task):
- """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s,
- but with ``person_base64`` and ``person_escaped`` columns in addition to
- ``person_id``.
-
- This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names
- instead of names); which may not be true depending on how the swh-dataset export
- cas configured.
- """
-
- local_graph_path = luigi.PathParameter()
- graph_name = luigi.Parameter(default="graph")
- origin_contributors_path = luigi.PathParameter()
- deanonymization_table_path = luigi.PathParameter()
- deanonymized_origin_contributors_path = luigi.PathParameter()
-
- def requires(self) -> List[luigi.Task]:
- """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`,
- and :class:`ExportDeanonymizationTable`."""
- return [
- LocalGraph(local_graph_path=self.local_graph_path),
- ListOriginContributors(
- local_graph_path=self.local_graph_path,
- origin_contributors_path=self.origin_contributors_path,
- ),
- ExportDeanonymizationTable(
- deanonymization_table_path=self.deanonymization_table_path,
- ),
- ]
-
- def output(self) -> luigi.Target:
- """.csv.zst file similar to :meth:`ListOriginContributors.output`'s,
- but with ``person_base64`` and ``person_escaped`` columns in addition to
- ``person_id``"""
- return luigi.LocalTarget(self.deanonymized_origin_contributors_path)
-
- def run(self) -> None:
- """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset
- and the deanonymization table in memory, then uses them to map each row
- in the original (anonymized) contributors list to the deanonymized one."""
- # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export
- # was configured to do so); this should add support for it.
-
- import base64
- import csv
-
- import pyzstd
-
- # Load the deanonymization table, to map sha256(name) to base64(name)
- # and escape(name)
- sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {}
- with pyzstd.open(self.deanonymization_table_path, "rt") as fd:
- csv_reader = csv.reader(fd)
- header = next(csv_reader)
- assert header == ["sha256_base64", "base64", "escaped"], header
- for line in csv_reader:
- (base64_sha256_name, base64_name, escaped_name) = line
- sha256_name = base64.b64decode(base64_sha256_name)
- name = base64.b64decode(base64_name)
- sha256_to_names[sha256_name] = (name, escaped_name)
-
- # Combine with the list of sha256(name), to get the list of base64(name)
- # and escape(name)
- persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst"
- with pyzstd.open(persons_path, "rb") as fd:
- person_id_to_names: List[Tuple[bytes, str]] = [
- sha256_to_names.pop(base64.b64decode(line.strip()), (b"", ""))
- for line in fd
- ]
-
- tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp")
-
- # Finally, write a new table of origin_contributors, by reading the anonymized
- # table line-by-line and deanonymizing each id
-
- # Open temporary output for writes as CSV
- with pyzstd.open(tmp_output_path, "wt") as output_fd:
- csv_writer = csv.writer(output_fd, lineterminator="\n")
- # write header
- csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped"))
-
- # Open input for reads as CSV
- with pyzstd.open(self.origin_contributors_path, "rt") as input_fd:
- csv_reader = csv.reader(input_fd)
- header = next(csv_reader)
- assert header == ["origin_SWHID", "person_id"], header
- for (origin_swhid, person_id) in csv_reader:
- if person_id == "null":
- # FIXME: workaround for a bug in contribution graphs generated
- # before 2022-12-01. Those were only used in tests and never
- # published, so the conditional can be removed when this is
- # productionized
- continue
- (name, escaped_name) = person_id_to_names[int(person_id)]
- base64_name = base64.b64encode(name).decode("ascii")
- csv_writer.writerow((origin_swhid, base64_name, escaped_name))
-
- tmp_output_path.replace(self.deanonymized_origin_contributors_path)
diff --git a/swh/graph/luigi/misc_datasets.py b/swh/graph/luigi/misc_datasets.py
new file mode 100644
--- /dev/null
+++ b/swh/graph/luigi/misc_datasets.py
@@ -0,0 +1,70 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""
+Luigi tasks for various derived datasets
+========================================
+
+This module contains `Luigi `_ tasks
+driving the creation of derived datasets.
+
+File layout
+-----------
+
+This assumes a local compressed graph (from :mod:`swh.graph.luigi.compressed_graph`)
+is present, and generates/manipulates the following files::
+
+ base_dir/
+ [_]/
+ datasets/
+ contribution_graph.csv.zst
+ topology/
+ topological_order_dfs.csv.zst
+
+And optionally::
+
+ sensitive_base_dir/
+ [_]/
+ persons_sha256_to_name.csv.zst
+ datasets/
+ contribution_graph.deanonymized.csv.zst
+"""
+
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+from typing import List
+
+import luigi
+
+from .compressed_graph import LocalGraph
+from .utils import run_script
+
+
+class TopoSort(luigi.Task):
+ """Creates a file that contains all SWHIDs in topological order from a compressed
+ graph."""
+
+ local_graph_path = luigi.PathParameter()
+ topological_order_path = luigi.PathParameter()
+ graph_name = luigi.Parameter(default="graph")
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns an instance of :class:`LocalGraph`."""
+ return [LocalGraph(local_graph_path=self.local_graph_path)]
+
+ def output(self) -> luigi.Target:
+ """.csv.zst file that contains the topological order."""
+ return luigi.LocalTarget(self.topological_order_path)
+
+ def run(self) -> None:
+ """Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
+ object_types = "rev,rel,snp,ori"
+ class_name = "org.softwareheritage.graph.utils.TopoSort"
+ script = f"""
+ java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \
+ | pv --line-mode --wait \
+ | zstdmt -19
+ """
+ run_script(script, self.topological_order_path)
diff --git a/swh/graph/luigi/origin_contributors.py b/swh/graph/luigi/origin_contributors.py
new file mode 100644
--- /dev/null
+++ b/swh/graph/luigi/origin_contributors.py
@@ -0,0 +1,188 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""
+Luigi tasks for contribution graph
+==================================
+
+This module contains `Luigi `_ tasks
+driving the creation of the graph of contributions of people (pseudonymized
+by default).
+"""
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+from pathlib import Path
+from typing import Dict, List, Tuple
+
+import luigi
+
+from .compressed_graph import LocalGraph
+from .misc_datasets import TopoSort
+from .utils import run_script
+
+
+class ListOriginContributors(luigi.Task):
+ """Creates a file that contains all SWHIDs in topological order from a compressed
+ graph."""
+
+ local_graph_path = luigi.PathParameter()
+ topological_order_path = luigi.PathParameter()
+ origin_contributors_path = luigi.PathParameter()
+ graph_name = luigi.Parameter(default="graph")
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph`
+ and :class:`swh.graph.luigi.misc_datasets.TopoSort`."""
+ return [
+ LocalGraph(local_graph_path=self.local_graph_path),
+ TopoSort(
+ local_graph_path=self.local_graph_path,
+ topological_order_path=self.topological_order_path,
+ graph_name=self.graph_name,
+ ),
+ ]
+
+ def output(self) -> luigi.Target:
+ """.csv.zst file that contains the topological order."""
+ return luigi.LocalTarget(self.origin_contributors_path)
+
+ def run(self) -> None:
+ """Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
+ class_name = "org.softwareheritage.graph.utils.ListOriginContributors"
+ script = f"""
+ zstdcat {self.topological_order_path} \
+ | java {class_name} '{self.local_graph_path}/{self.graph_name}' \
+ | pv --line-mode --wait \
+ | zstdmt -19
+ """
+ run_script(script, self.origin_contributors_path)
+
+
+class ExportDeanonymizationTable(luigi.Task):
+ """Exports (from swh-storage) a .csv.zst file that contains the columns:
+ ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``.
+
+ The first column is the anonymized full name found in :file:`graph.persons.csv.zst`
+ in the compressed graph, and the latter two are the original name."""
+
+ storage_dsn = luigi.Parameter(
+ default="service=swh",
+ description="postgresql DSN of the swh-storage database to read from.",
+ )
+ deanonymization_table_path = luigi.PathParameter()
+
+ def output(self) -> luigi.Target:
+ """.csv.zst file that contains the table."""
+ return luigi.LocalTarget(self.deanonymization_table_path)
+
+ def run(self) -> None:
+ """Runs a postgresql query to compute the table."""
+
+ run_script(
+ f"""
+ psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19
+ """, # noqa
+ self.deanonymization_table_path,
+ )
+
+
+class DeanonymizeOriginContributors(luigi.Task):
+ """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s,
+ but with ``person_base64`` and ``person_escaped`` columns in addition to
+ ``person_id``.
+
+ This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names
+ instead of names); which may not be true depending on how the swh-dataset export
+ cas configured.
+ """
+
+ local_graph_path = luigi.PathParameter()
+ graph_name = luigi.Parameter(default="graph")
+ origin_contributors_path = luigi.PathParameter()
+ deanonymization_table_path = luigi.PathParameter()
+ deanonymized_origin_contributors_path = luigi.PathParameter()
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`,
+ and :class:`ExportDeanonymizationTable`."""
+ return [
+ LocalGraph(local_graph_path=self.local_graph_path),
+ ListOriginContributors(
+ local_graph_path=self.local_graph_path,
+ origin_contributors_path=self.origin_contributors_path,
+ ),
+ ExportDeanonymizationTable(
+ deanonymization_table_path=self.deanonymization_table_path,
+ ),
+ ]
+
+ def output(self) -> luigi.Target:
+ """.csv.zst file similar to :meth:`ListOriginContributors.output`'s,
+ but with ``person_base64`` and ``person_escaped`` columns in addition to
+ ``person_id``"""
+ return luigi.LocalTarget(self.deanonymized_origin_contributors_path)
+
+ def run(self) -> None:
+ """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset
+ and the deanonymization table in memory, then uses them to map each row
+ in the original (anonymized) contributors list to the deanonymized one."""
+ # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export
+ # was configured to do so); this should add support for it.
+
+ import base64
+ import csv
+
+ import pyzstd
+
+ # Load the deanonymization table, to map sha256(name) to base64(name)
+ # and escape(name)
+ sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {}
+ with pyzstd.open(self.deanonymization_table_path, "rt") as fd:
+ csv_reader = csv.reader(fd)
+ header = next(csv_reader)
+ assert header == ["sha256_base64", "base64", "escaped"], header
+ for line in csv_reader:
+ (base64_sha256_name, base64_name, escaped_name) = line
+ sha256_name = base64.b64decode(base64_sha256_name)
+ name = base64.b64decode(base64_name)
+ sha256_to_names[sha256_name] = (name, escaped_name)
+
+ # Combine with the list of sha256(name), to get the list of base64(name)
+ # and escape(name)
+ persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst"
+ with pyzstd.open(persons_path, "rb") as fd:
+ person_id_to_names: List[Tuple[bytes, str]] = [
+ sha256_to_names.pop(base64.b64decode(line.strip()), (b"", ""))
+ for line in fd
+ ]
+
+ tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp")
+
+ # Finally, write a new table of origin_contributors, by reading the anonymized
+ # table line-by-line and deanonymizing each id
+
+ # Open temporary output for writes as CSV
+ with pyzstd.open(tmp_output_path, "wt") as output_fd:
+ csv_writer = csv.writer(output_fd, lineterminator="\n")
+ # write header
+ csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped"))
+
+ # Open input for reads as CSV
+ with pyzstd.open(self.origin_contributors_path, "rt") as input_fd:
+ csv_reader = csv.reader(input_fd)
+ header = next(csv_reader)
+ assert header == ["origin_SWHID", "person_id"], header
+ for (origin_swhid, person_id) in csv_reader:
+ if person_id == "null":
+ # FIXME: workaround for a bug in contribution graphs generated
+ # before 2022-12-01. Those were only used in tests and never
+ # published, so the conditional can be removed when this is
+ # productionized
+ continue
+ (name, escaped_name) = person_id_to_names[int(person_id)]
+ base64_name = base64.b64encode(name).decode("ascii")
+ csv_writer.writerow((origin_swhid, base64_name, escaped_name))
+
+ tmp_output_path.replace(self.deanonymized_origin_contributors_path)
diff --git a/swh/graph/luigi/utils.py b/swh/graph/luigi/utils.py
new file mode 100644
--- /dev/null
+++ b/swh/graph/luigi/utils.py
@@ -0,0 +1,34 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from pathlib import Path
+from typing import Dict
+
+
+def run_script(script: str, output_path: Path) -> None:
+ import os
+ import subprocess
+
+ from ..config import check_config
+
+ conf: Dict = {} # TODO: configurable
+
+ conf = check_config(conf)
+ env = {
+ **os.environ.copy(),
+ "JAVA_TOOL_OPTIONS": conf["java_tool_options"],
+ "CLASSPATH": conf["classpath"],
+ }
+
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+
+ tmp_output_path = Path(f"{output_path}.tmp")
+
+ subprocess.run(
+ ["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True
+ )
+
+ # Atomically write the output file
+ tmp_output_path.replace(output_path)
diff --git a/swh/graph/tests/test_luigi.py b/swh/graph/tests/test_luigi.py
--- a/swh/graph/tests/test_luigi.py
+++ b/swh/graph/tests/test_luigi.py
@@ -6,7 +6,7 @@
import json
from pathlib import Path
-from swh.graph.luigi import CompressGraph
+from swh.graph.luigi.compressed_graph import CompressGraph
from .test_cli import read_properties
diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py
--- a/swh/graph/tests/test_origin_contributors.py
+++ b/swh/graph/tests/test_origin_contributors.py
@@ -7,7 +7,7 @@
from pathlib import Path
import subprocess
-from swh.graph.luigi import (
+from swh.graph.luigi.origin_contributors import (
DeanonymizeOriginContributors,
ExportDeanonymizationTable,
ListOriginContributors,
diff --git a/swh/graph/tests/test_toposort.py b/swh/graph/tests/test_toposort.py
--- a/swh/graph/tests/test_toposort.py
+++ b/swh/graph/tests/test_toposort.py
@@ -6,7 +6,7 @@
from pathlib import Path
import subprocess
-from swh.graph.luigi import TopoSort
+from swh.graph.luigi.misc_datasets import TopoSort
DATA_DIR = Path(__file__).parents[0] / "dataset"