diff --git a/swh/graph/luigi/__init__.py b/swh/graph/luigi/__init__.py
new file mode 100644
index 0000000..a82ff9b
--- /dev/null
+++ b/swh/graph/luigi/__init__.py
@@ -0,0 +1,75 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""
+Luigi tasks
+===========
+
+This package contains `Luigi `_ tasks.
+These come in two kinds:
+
+* in :mod:`swh.graph.luigi.compressed_graph`: an alternative to the 'swh graph compress'
+ CLI that can be composed with other tasks, such as swh-dataset's
+* in other submodules: tasks driving the creation of specific datasets that are
+ generated using the compressed graph
+
+
+The overall directory structure is::
+
+ base_dir/
+ [_]/
+ edges/
+ ...
+ orc/
+ ...
+ compressed/
+ graph.graph
+ graph.mph
+ ...
+ meta/
+ export.json
+ compression.json
+ datasets/
+ contribution_graph.csv.zst
+ topology/
+ topological_order_dfs.csv.zst
+
+And optionally::
+
+ sensitive_base_dir/
+ [_]/
+ persons_sha256_to_name.csv.zst
+ datasets/
+ contribution_graph.deanonymized.csv.zst
+"""
+
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+from typing import List
+
+import luigi
+
+from . import compressed_graph, origin_contributors
+
+
+class RunAll(luigi.Task):
+ """Runs dataset export, graph compression, and generates datasets using the graph."""
+
+ def requires(self) -> List[luigi.Task]:
+ from swh.dataset.luigi import RunExportAll
+
+ # Technically RunExportAll and DeanonymizeOriginContributors together depend
+ # on everything else, but it's best to be explicit
+ return [
+ RunExportAll(),
+ compressed_graph.LocalGraph(),
+ origin_contributors.ListOriginContributors(),
+ origin_contributors.DeanonymizeOriginContributors(),
+ ]
+
+ def complete(self) -> bool:
+ # Dependencies perform their own completeness check, and this task
+ # does no work itself
+ return False
diff --git a/swh/graph/luigi.py b/swh/graph/luigi/compressed_graph.py
similarity index 61%
rename from swh/graph/luigi.py
rename to swh/graph/luigi/compressed_graph.py
index a190a80..9f52c7e 100644
--- a/swh/graph/luigi.py
+++ b/swh/graph/luigi/compressed_graph.py
@@ -1,654 +1,438 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
-Luigi tasks
-===========
+Luigi tasks for compression
+===========================
This module contains `Luigi `_ tasks,
as an alternative to the CLI that can be composed with other tasks,
such as swh-dataset's.
Unlike the CLI, this requires the graph to be named `graph`.
File layout
-----------
In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`,
:file:`graph.mph`, ...), tasks in this module produce this directory structure::
- swh_[_]/
- graph.graph
- graph.mph
- ...
- meta/
- export.json
- compression.json
+ base_dir/
+ [_]/
+ compressed/
+ graph.graph
+ graph.mph
+ ...
+ meta/
+ export.json
+ compression.json
``graph.meta/export.json`` is copied from the ORC dataset exported by
:mod:`swh.dataset.luigi`.
``graph.meta/compression.json`` contains information about the compression itself,
for provenance tracking.
For example:
.. code-block:: json
[
{
"steps": null,
"export_start": "2022-11-08T11:00:54.998799+00:00",
"export_end": "2022-11-08T11:05:53.105519+00:00",
"object_type": [
"origin",
"origin_visit"
],
"hostname": "desktop5",
"conf": {},
"tool": {
"name": "swh.graph",
"version": "2.2.0"
}
}
]
When the compression pipeline is run in separate steps, each of the steps is recorded
as an object in the root list.
S3 layout
---------
As ``.bin`` files are meant to be accessed randomly, they are uncompressed on disk.
However, this is undesirable on at-rest/long-term storage like on S3, because
some are very sparse (eg. :file:`graph.property.committer_timestamp.bin` can be
quickly compressed from 300 to 1GB).
Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed
when downloading.
The layout is otherwise the same as the file layout.
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
-from pathlib import Path
-from typing import Dict, List, Tuple
+from typing import List
import luigi
from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter
class CompressGraph(luigi.Task):
local_export_path = luigi.PathParameter(significant=False)
local_graph_path = luigi.PathParameter()
batch_size = luigi.IntParameter(
default=0,
significant=False,
description="""
Size of work batches to use while compressing.
Larger is faster, but consumes more resources.
""",
)
object_types = list(ObjectType)
# To make this configurable, we could use this:
# object_types = luigi.EnumListParameter(
# enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
# )
# then use swh.dataset.luigi._export_metadata_has_object_types to check in
# .meta/export.json that all objects are present before skipping the task
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`LocalExport` task."""
return [
LocalExport(
local_export_path=self.local_export_path,
formats=[Format.orc], # type: ignore[attr-defined]
object_types=self.object_types,
)
]
def output(self) -> List[luigi.LocalTarget]:
"""Returns the ``meta/*.json`` targets"""
return [self._export_meta(), self._compression_meta()]
def _export_meta(self) -> luigi.Target:
"""Returns the metadata on the dataset export"""
return luigi.LocalTarget(self.local_graph_path / "meta/export.json")
def _compression_meta(self) -> luigi.Target:
"""Returns the metadata on the compression pipeline"""
return luigi.LocalTarget(self.local_graph_path / "meta/compression.json")
def run(self):
"""Runs the full compression pipeline, then writes :file:`meta/compression.json`
This does not support running individual steps yet."""
import datetime
import json
import shutil
import socket
import pkg_resources
from swh.graph import webgraph
conf = {} # TODO: make this configurable
steps = None # TODO: make this configurable
if self.batch_size:
conf["batch_size"] = self.batch_size
# Delete stamps. Otherwise interrupting this compression pipeline may leave
# stamps from a previous successful compression
if self._export_meta().exists():
self._export_meta().remove()
if self._compression_meta().exists():
self._compression_meta().remove()
# Make sure we don't accidentally append to existing files
if self.local_graph_path.exists():
shutil.rmtree(self.local_graph_path)
output_directory = self.local_graph_path
graph_name = "graph"
def progress_cb(percentage: int, step: webgraph.CompressionStep):
self.set_progress_percentage(percentage)
self.set_status_message(f"Running {step.name} (step #{step.value})")
start_date = datetime.datetime.now(tz=datetime.timezone.utc)
webgraph.compress(
graph_name,
self.local_export_path / "orc",
output_directory,
steps,
conf,
)
end_date = datetime.datetime.now(tz=datetime.timezone.utc)
# Copy dataset export metadata
with self._export_meta().open("w") as write_fd:
with (self.local_export_path / "meta" / "export.json").open() as read_fd:
write_fd.write(read_fd.read())
# Append metadata about this compression pipeline
if self._compression_meta().exists():
with self._compression_meta().open("w") as fd:
meta = json.load(fd)
else:
meta = []
meta.append(
{
"steps": steps,
"compression_start": start_date.isoformat(),
"compression_end": end_date.isoformat(),
"object_type": [object_type.name for object_type in self.object_types],
"hostname": socket.getfqdn(),
"conf": conf,
"tool": {
"name": "swh.graph",
"version": pkg_resources.get_distribution("swh.graph").version,
},
}
)
with self._compression_meta().open("w") as fd:
json.dump(meta, fd, indent=4)
class UploadGraphToS3(luigi.Task):
"""Uploads a local compressed graphto S3; creating automatically if it does
not exist.
Example invocation::
luigi --local-scheduler --module swh.graph.luigi UploadGraphToS3 \
--local-graph-path=graph/ \
--s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
"""
local_graph_path = luigi.PathParameter(significant=False)
s3_graph_path = S3PathParameter()
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`CompressGraph` task that writes local files at the
expected location."""
return [
CompressGraph(
local_graph_path=self.local_graph_path,
)
]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on S3."""
return [self._meta()]
def _meta(self):
import luigi.contrib.s3
return luigi.contrib.s3.S3Target(f"{self.s3_graph_path}/meta/compression.json")
def run(self) -> None:
"""Copies all files: first the graph itself, then :file:`meta/compression.json`."""
import subprocess
import tempfile
import luigi.contrib.s3
import tqdm
compression_metadata_path = self.local_graph_path / "meta" / "compression.json"
seen_compression_metadata = False
client = luigi.contrib.s3.S3Client()
# recursively copy local files to S3, and end with compression metadata
paths = list(self.local_graph_path.glob("**/*"))
for (i, path) in tqdm.tqdm(
list(enumerate(paths)),
desc="Uploading compressed graph",
):
if path == compression_metadata_path:
# Write it last
seen_compression_metadata = True
continue
if path.is_dir():
continue
relative_path = path.relative_to(self.local_graph_path)
self.set_progress_percentage(int(i * 100 / len(paths)))
if path.suffix == ".bin":
# Large sparse file; store it compressed on S3.
with tempfile.NamedTemporaryFile(
prefix=path.stem, suffix=".bin.zst"
) as fd:
self.set_status_message(f"Compressing {relative_path}")
subprocess.run(
["zstdmt", "--force", "--keep", path, "-o", fd.name], check=True
)
self.set_status_message(f"Uploading {relative_path} (compressed)")
client.put_multipart(
fd.name,
f"{self.s3_graph_path}/{relative_path}.zst",
ACL="public-read",
)
else:
self.set_status_message(f"Uploading {relative_path}")
client.put_multipart(
path, f"{self.s3_graph_path}/{relative_path}", ACL="public-read"
)
assert (
seen_compression_metadata
), "did not see meta/compression.json in directory listing"
# Write it last, to act as a stamp
client.put(
compression_metadata_path,
self._meta().path,
ACL="public-read",
)
class DownloadGraphFromS3(luigi.Task):
"""Downloads a local dataset graph from S3.
This performs the inverse operation of :class:`UploadGraphToS3`
Example invocation::
luigi --local-scheduler --module swh.graph.luigi DownloadGraphFromS3 \
--local-graph-path=graph/ \
--s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
"""
local_graph_path = luigi.PathParameter()
s3_graph_path = S3PathParameter(significant=False)
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`ExportGraph` task that writes local files at the
expected location."""
return [
UploadGraphToS3(
local_graph_path=self.local_graph_path,
s3_graph_path=self.s3_graph_path,
)
]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on the local filesystem."""
return [self._meta()]
def _meta(self):
return luigi.LocalTarget(self.local_graph_path / "meta" / "export.json")
def run(self) -> None:
"""Copies all files: first the graph itself, then :file:`meta/compression.json`."""
import subprocess
import tempfile
import luigi.contrib.s3
import tqdm
client = luigi.contrib.s3.S3Client()
compression_metadata_path = f"{self.s3_graph_path}/meta/compression.json"
seen_compression_metadata = False
# recursively copy local files to S3, and end with compression metadata
files = list(client.list(self.s3_graph_path))
for (i, file_) in tqdm.tqdm(
list(enumerate(files)),
desc="Downloading",
):
if file_ == compression_metadata_path:
# Will copy it last
seen_compression_metadata = True
continue
self.set_progress_percentage(int(i * 100 / len(files)))
local_path = self.local_graph_path / file_
local_path.parent.mkdir(parents=True, exist_ok=True)
if file_.endswith(".bin.zst"):
# The file was compressed before uploading to S3, we need it
# to be decompressed locally
with tempfile.NamedTemporaryFile(
prefix=local_path.stem, suffix=".bin.zst"
) as fd:
self.set_status_message(f"Downloading {file_} (compressed)")
client.get(
f"{self.s3_graph_path}/{file_}",
fd.name,
)
self.set_status_message(f"Decompressing {file_}")
subprocess.run(
[
"zstdmt",
"--force",
"-d",
fd.name,
"-o",
str(local_path)[0:-4],
],
check=True,
)
else:
self.set_status_message(f"Downloading {file_}")
client.get(
f"{self.s3_graph_path}/{file_}",
str(local_path),
)
assert (
seen_compression_metadata
), "did not see meta/compression.json in directory listing"
# Write it last, to act as a stamp
client.get(
compression_metadata_path,
self._meta().path,
)
class LocalGraph(luigi.Task):
"""Task that depends on a local dataset being present -- either directly from
:class:`ExportGraph` or via :class:`DownloadGraphFromS3`.
"""
local_graph_path = luigi.PathParameter()
compression_task_type = luigi.TaskParameter(
default=DownloadGraphFromS3,
significant=False,
description="""The task used to get the compressed graph if it is not present.
Should be either ``swh.graph.luigi.CompressGraph`` or
``swh.graph.luigi.DownloadGraphFromS3``.""",
)
def requires(self) -> List[luigi.Task]:
"""Returns an instance of either :class:`CompressGraph` or
:class:`DownloadGraphFromS3` depending on the value of
:attr:`compression_task_type`."""
if issubclass(self.compression_task_type, CompressGraph):
return [
CompressGraph(
local_graph_path=self.local_graph_path,
)
]
elif issubclass(self.compression_task_type, DownloadGraphFromS3):
return [
DownloadGraphFromS3(
local_graph_path=self.local_graph_path,
)
]
else:
raise ValueError(
f"Unexpected compression_task_type: "
f"{self.compression_task_type.__name__}"
)
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on the local filesystem."""
return [self._meta()]
def _meta(self):
return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json")
-
-
-def _run_script(script: str, output_path: Path) -> None:
- import os
- import subprocess
-
- from .config import check_config
-
- conf: Dict = {} # TODO: configurable
-
- conf = check_config(conf)
- env = {
- **os.environ.copy(),
- "JAVA_TOOL_OPTIONS": conf["java_tool_options"],
- "CLASSPATH": conf["classpath"],
- }
-
- tmp_output_path = Path(f"{output_path}.tmp")
-
- subprocess.run(
- ["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True
- )
-
- # Atomically write the output file
- tmp_output_path.replace(output_path)
-
-
-class TopoSort(luigi.Task):
- """Creates a file that contains all SWHIDs in topological order from a compressed
- graph."""
-
- local_graph_path = luigi.PathParameter()
- topological_order_path = luigi.PathParameter()
- graph_name = luigi.Parameter(default="graph")
-
- def requires(self) -> List[luigi.Task]:
- """Returns an instance of :class:`LocalGraph`."""
- return [LocalGraph(local_graph_path=self.local_graph_path)]
-
- def output(self) -> luigi.Target:
- """.csv.zst file that contains the topological order."""
- return luigi.LocalTarget(self.topological_order_path)
-
- def run(self) -> None:
- """Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
- object_types = "rev,rel,snp,ori"
- class_name = "org.softwareheritage.graph.utils.TopoSort"
- script = f"""
- java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \
- | pv --line-mode --wait \
- | zstdmt -19
- """
- _run_script(script, self.topological_order_path)
-
-
-class ListOriginContributors(luigi.Task):
- """Creates a file that contains all SWHIDs in topological order from a compressed
- graph."""
-
- local_graph_path = luigi.PathParameter()
- topological_order_path = luigi.PathParameter()
- origin_contributors_path = luigi.PathParameter()
- graph_name = luigi.Parameter(default="graph")
-
- def requires(self) -> List[luigi.Task]:
- """Returns an instance of :class:`LocalGraph` and :class:`TopoSort`."""
- return [
- LocalGraph(local_graph_path=self.local_graph_path),
- TopoSort(
- local_graph_path=self.local_graph_path,
- topological_order_path=self.topological_order_path,
- graph_name=self.graph_name,
- ),
- ]
-
- def output(self) -> luigi.Target:
- """.csv.zst file that contains the topological order."""
- return luigi.LocalTarget(self.origin_contributors_path)
-
- def run(self) -> None:
- """Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
- class_name = "org.softwareheritage.graph.utils.ListOriginContributors"
- script = f"""
- zstdcat {self.topological_order_path} \
- | java {class_name} '{self.local_graph_path}/{self.graph_name}' \
- | pv --line-mode --wait \
- | zstdmt -19
- """
- _run_script(script, self.origin_contributors_path)
-
-
-class ExportDeanonymizationTable(luigi.Task):
- """Exports (from swh-storage) a .csv.zst file that contains the columns:
- ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``.
-
- The first column is the anonymized full name found in :file:`graph.persons.csv.zst`
- in the compressed graph, and the latter two are the original name."""
-
- storage_dsn = luigi.Parameter(
- default="service=swh",
- description="postgresql DSN of the swh-storage database to read from.",
- )
- deanonymization_table_path = luigi.PathParameter()
-
- def output(self) -> luigi.Target:
- """.csv.zst file that contains the table."""
- return luigi.LocalTarget(self.deanonymization_table_path)
-
- def run(self) -> None:
- """Runs a postgresql query to compute the table."""
-
- _run_script(
- f"""
- psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19
- """, # noqa
- self.deanonymization_table_path,
- )
-
-
-class DeanonymizeOriginContributors(luigi.Task):
- """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s,
- but with ``person_base64`` and ``person_escaped`` columns in addition to
- ``person_id``.
-
- This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names
- instead of names); which may not be true depending on how the swh-dataset export
- cas configured.
- """
-
- local_graph_path = luigi.PathParameter()
- graph_name = luigi.Parameter(default="graph")
- origin_contributors_path = luigi.PathParameter()
- deanonymization_table_path = luigi.PathParameter()
- deanonymized_origin_contributors_path = luigi.PathParameter()
-
- def requires(self) -> List[luigi.Task]:
- """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`,
- and :class:`ExportDeanonymizationTable`."""
- return [
- LocalGraph(local_graph_path=self.local_graph_path),
- ListOriginContributors(
- local_graph_path=self.local_graph_path,
- origin_contributors_path=self.origin_contributors_path,
- ),
- ExportDeanonymizationTable(
- deanonymization_table_path=self.deanonymization_table_path,
- ),
- ]
-
- def output(self) -> luigi.Target:
- """.csv.zst file similar to :meth:`ListOriginContributors.output`'s,
- but with ``person_base64`` and ``person_escaped`` columns in addition to
- ``person_id``"""
- return luigi.LocalTarget(self.deanonymized_origin_contributors_path)
-
- def run(self) -> None:
- """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset
- and the deanonymization table in memory, then uses them to map each row
- in the original (anonymized) contributors list to the deanonymized one."""
- # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export
- # was configured to do so); this should add support for it.
-
- import base64
- import csv
-
- import pyzstd
-
- # Load the deanonymization table, to map sha256(name) to base64(name)
- # and escape(name)
- sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {}
- with pyzstd.open(self.deanonymization_table_path, "rt") as fd:
- csv_reader = csv.reader(fd)
- header = next(csv_reader)
- assert header == ["sha256_base64", "base64", "escaped"], header
- for line in csv_reader:
- (base64_sha256_name, base64_name, escaped_name) = line
- sha256_name = base64.b64decode(base64_sha256_name)
- name = base64.b64decode(base64_name)
- sha256_to_names[sha256_name] = (name, escaped_name)
-
- # Combine with the list of sha256(name), to get the list of base64(name)
- # and escape(name)
- persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst"
- with pyzstd.open(persons_path, "rb") as fd:
- person_id_to_names: List[Tuple[bytes, str]] = [
- sha256_to_names.pop(base64.b64decode(line.strip()), (b"", ""))
- for line in fd
- ]
-
- tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp")
-
- # Finally, write a new table of origin_contributors, by reading the anonymized
- # table line-by-line and deanonymizing each id
-
- # Open temporary output for writes as CSV
- with pyzstd.open(tmp_output_path, "wt") as output_fd:
- csv_writer = csv.writer(output_fd, lineterminator="\n")
- # write header
- csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped"))
-
- # Open input for reads as CSV
- with pyzstd.open(self.origin_contributors_path, "rt") as input_fd:
- csv_reader = csv.reader(input_fd)
- header = next(csv_reader)
- assert header == ["origin_SWHID", "person_id"], header
- for (origin_swhid, person_id) in csv_reader:
- if person_id == "null":
- # FIXME: workaround for a bug in contribution graphs generated
- # before 2022-12-01. Those were only used in tests and never
- # published, so the conditional can be removed when this is
- # productionized
- continue
- (name, escaped_name) = person_id_to_names[int(person_id)]
- base64_name = base64.b64encode(name).decode("ascii")
- csv_writer.writerow((origin_swhid, base64_name, escaped_name))
-
- tmp_output_path.replace(self.deanonymized_origin_contributors_path)
diff --git a/swh/graph/luigi/misc_datasets.py b/swh/graph/luigi/misc_datasets.py
new file mode 100644
index 0000000..f755a7f
--- /dev/null
+++ b/swh/graph/luigi/misc_datasets.py
@@ -0,0 +1,70 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""
+Luigi tasks for various derived datasets
+========================================
+
+This module contains `Luigi `_ tasks
+driving the creation of derived datasets.
+
+File layout
+-----------
+
+This assumes a local compressed graph (from :mod:`swh.graph.luigi.compressed_graph`)
+is present, and generates/manipulates the following files::
+
+ base_dir/
+ [_]/
+ datasets/
+ contribution_graph.csv.zst
+ topology/
+ topological_order_dfs.csv.zst
+
+And optionally::
+
+ sensitive_base_dir/
+ [_]/
+ persons_sha256_to_name.csv.zst
+ datasets/
+ contribution_graph.deanonymized.csv.zst
+"""
+
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+from typing import List
+
+import luigi
+
+from .compressed_graph import LocalGraph
+from .utils import run_script
+
+
+class TopoSort(luigi.Task):
+ """Creates a file that contains all SWHIDs in topological order from a compressed
+ graph."""
+
+ local_graph_path = luigi.PathParameter()
+ topological_order_path = luigi.PathParameter()
+ graph_name = luigi.Parameter(default="graph")
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns an instance of :class:`LocalGraph`."""
+ return [LocalGraph(local_graph_path=self.local_graph_path)]
+
+ def output(self) -> luigi.Target:
+ """.csv.zst file that contains the topological order."""
+ return luigi.LocalTarget(self.topological_order_path)
+
+ def run(self) -> None:
+ """Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
+ object_types = "rev,rel,snp,ori"
+ class_name = "org.softwareheritage.graph.utils.TopoSort"
+ script = f"""
+ java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \
+ | pv --line-mode --wait \
+ | zstdmt -19
+ """
+ run_script(script, self.topological_order_path)
diff --git a/swh/graph/luigi/origin_contributors.py b/swh/graph/luigi/origin_contributors.py
new file mode 100644
index 0000000..ce404aa
--- /dev/null
+++ b/swh/graph/luigi/origin_contributors.py
@@ -0,0 +1,188 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""
+Luigi tasks for contribution graph
+==================================
+
+This module contains `Luigi `_ tasks
+driving the creation of the graph of contributions of people (pseudonymized
+by default).
+"""
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+from pathlib import Path
+from typing import Dict, List, Tuple
+
+import luigi
+
+from .compressed_graph import LocalGraph
+from .misc_datasets import TopoSort
+from .utils import run_script
+
+
+class ListOriginContributors(luigi.Task):
+ """Creates a file that contains all SWHIDs in topological order from a compressed
+ graph."""
+
+ local_graph_path = luigi.PathParameter()
+ topological_order_path = luigi.PathParameter()
+ origin_contributors_path = luigi.PathParameter()
+ graph_name = luigi.Parameter(default="graph")
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph`
+ and :class:`swh.graph.luigi.misc_datasets.TopoSort`."""
+ return [
+ LocalGraph(local_graph_path=self.local_graph_path),
+ TopoSort(
+ local_graph_path=self.local_graph_path,
+ topological_order_path=self.topological_order_path,
+ graph_name=self.graph_name,
+ ),
+ ]
+
+ def output(self) -> luigi.Target:
+ """.csv.zst file that contains the topological order."""
+ return luigi.LocalTarget(self.origin_contributors_path)
+
+ def run(self) -> None:
+ """Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
+ class_name = "org.softwareheritage.graph.utils.ListOriginContributors"
+ script = f"""
+ zstdcat {self.topological_order_path} \
+ | java {class_name} '{self.local_graph_path}/{self.graph_name}' \
+ | pv --line-mode --wait \
+ | zstdmt -19
+ """
+ run_script(script, self.origin_contributors_path)
+
+
+class ExportDeanonymizationTable(luigi.Task):
+ """Exports (from swh-storage) a .csv.zst file that contains the columns:
+ ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``.
+
+ The first column is the anonymized full name found in :file:`graph.persons.csv.zst`
+ in the compressed graph, and the latter two are the original name."""
+
+ storage_dsn = luigi.Parameter(
+ default="service=swh",
+ description="postgresql DSN of the swh-storage database to read from.",
+ )
+ deanonymization_table_path = luigi.PathParameter()
+
+ def output(self) -> luigi.Target:
+ """.csv.zst file that contains the table."""
+ return luigi.LocalTarget(self.deanonymization_table_path)
+
+ def run(self) -> None:
+ """Runs a postgresql query to compute the table."""
+
+ run_script(
+ f"""
+ psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19
+ """, # noqa
+ self.deanonymization_table_path,
+ )
+
+
+class DeanonymizeOriginContributors(luigi.Task):
+ """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s,
+ but with ``person_base64`` and ``person_escaped`` columns in addition to
+ ``person_id``.
+
+ This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names
+ instead of names); which may not be true depending on how the swh-dataset export
+ cas configured.
+ """
+
+ local_graph_path = luigi.PathParameter()
+ graph_name = luigi.Parameter(default="graph")
+ origin_contributors_path = luigi.PathParameter()
+ deanonymization_table_path = luigi.PathParameter()
+ deanonymized_origin_contributors_path = luigi.PathParameter()
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`,
+ and :class:`ExportDeanonymizationTable`."""
+ return [
+ LocalGraph(local_graph_path=self.local_graph_path),
+ ListOriginContributors(
+ local_graph_path=self.local_graph_path,
+ origin_contributors_path=self.origin_contributors_path,
+ ),
+ ExportDeanonymizationTable(
+ deanonymization_table_path=self.deanonymization_table_path,
+ ),
+ ]
+
+ def output(self) -> luigi.Target:
+ """.csv.zst file similar to :meth:`ListOriginContributors.output`'s,
+ but with ``person_base64`` and ``person_escaped`` columns in addition to
+ ``person_id``"""
+ return luigi.LocalTarget(self.deanonymized_origin_contributors_path)
+
+ def run(self) -> None:
+ """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset
+ and the deanonymization table in memory, then uses them to map each row
+ in the original (anonymized) contributors list to the deanonymized one."""
+ # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export
+ # was configured to do so); this should add support for it.
+
+ import base64
+ import csv
+
+ import pyzstd
+
+ # Load the deanonymization table, to map sha256(name) to base64(name)
+ # and escape(name)
+ sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {}
+ with pyzstd.open(self.deanonymization_table_path, "rt") as fd:
+ csv_reader = csv.reader(fd)
+ header = next(csv_reader)
+ assert header == ["sha256_base64", "base64", "escaped"], header
+ for line in csv_reader:
+ (base64_sha256_name, base64_name, escaped_name) = line
+ sha256_name = base64.b64decode(base64_sha256_name)
+ name = base64.b64decode(base64_name)
+ sha256_to_names[sha256_name] = (name, escaped_name)
+
+ # Combine with the list of sha256(name), to get the list of base64(name)
+ # and escape(name)
+ persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst"
+ with pyzstd.open(persons_path, "rb") as fd:
+ person_id_to_names: List[Tuple[bytes, str]] = [
+ sha256_to_names.pop(base64.b64decode(line.strip()), (b"", ""))
+ for line in fd
+ ]
+
+ tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp")
+
+ # Finally, write a new table of origin_contributors, by reading the anonymized
+ # table line-by-line and deanonymizing each id
+
+ # Open temporary output for writes as CSV
+ with pyzstd.open(tmp_output_path, "wt") as output_fd:
+ csv_writer = csv.writer(output_fd, lineterminator="\n")
+ # write header
+ csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped"))
+
+ # Open input for reads as CSV
+ with pyzstd.open(self.origin_contributors_path, "rt") as input_fd:
+ csv_reader = csv.reader(input_fd)
+ header = next(csv_reader)
+ assert header == ["origin_SWHID", "person_id"], header
+ for (origin_swhid, person_id) in csv_reader:
+ if person_id == "null":
+ # FIXME: workaround for a bug in contribution graphs generated
+ # before 2022-12-01. Those were only used in tests and never
+ # published, so the conditional can be removed when this is
+ # productionized
+ continue
+ (name, escaped_name) = person_id_to_names[int(person_id)]
+ base64_name = base64.b64encode(name).decode("ascii")
+ csv_writer.writerow((origin_swhid, base64_name, escaped_name))
+
+ tmp_output_path.replace(self.deanonymized_origin_contributors_path)
diff --git a/swh/graph/luigi/utils.py b/swh/graph/luigi/utils.py
new file mode 100644
index 0000000..ae60091
--- /dev/null
+++ b/swh/graph/luigi/utils.py
@@ -0,0 +1,34 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from pathlib import Path
+from typing import Dict
+
+
+def run_script(script: str, output_path: Path) -> None:
+ import os
+ import subprocess
+
+ from ..config import check_config
+
+ conf: Dict = {} # TODO: configurable
+
+ conf = check_config(conf)
+ env = {
+ **os.environ.copy(),
+ "JAVA_TOOL_OPTIONS": conf["java_tool_options"],
+ "CLASSPATH": conf["classpath"],
+ }
+
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+
+ tmp_output_path = Path(f"{output_path}.tmp")
+
+ subprocess.run(
+ ["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True
+ )
+
+ # Atomically write the output file
+ tmp_output_path.replace(output_path)
diff --git a/swh/graph/tests/test_luigi.py b/swh/graph/tests/test_luigi.py
index 1cce95c..a05bb54 100644
--- a/swh/graph/tests/test_luigi.py
+++ b/swh/graph/tests/test_luigi.py
@@ -1,36 +1,36 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from pathlib import Path
-from swh.graph.luigi import CompressGraph
+from swh.graph.luigi.compressed_graph import CompressGraph
from .test_cli import read_properties
DATA_DIR = Path(__file__).parents[0] / "dataset"
def test_compressgraph(tmpdir):
tmpdir = Path(tmpdir)
task = CompressGraph(
local_export_path=DATA_DIR,
local_graph_path=tmpdir / "compressed_graph",
batch_size=1000, # go fast on the trivial dataset
)
task.run()
properties = read_properties(tmpdir / "compressed_graph" / "graph.properties")
assert int(properties["nodes"]) == 24
assert int(properties["arcs"]) == 28
export_meta_path = tmpdir / "compressed_graph/meta/export.json"
assert export_meta_path.read_bytes() == (DATA_DIR / "meta/export.json").read_bytes()
compression_meta_path = tmpdir / "compressed_graph/meta/compression.json"
assert json.load(compression_meta_path.open())[0]["conf"] == {"batch_size": 1000}
diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py
index 883872c..233ad14 100644
--- a/swh/graph/tests/test_origin_contributors.py
+++ b/swh/graph/tests/test_origin_contributors.py
@@ -1,186 +1,186 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from pathlib import Path
import subprocess
-from swh.graph.luigi import (
+from swh.graph.luigi.origin_contributors import (
DeanonymizeOriginContributors,
ExportDeanonymizationTable,
ListOriginContributors,
)
from swh.model.model import (
ObjectType,
Person,
Release,
Revision,
RevisionType,
TimestampWithTimezone,
)
from .test_toposort import EXPECTED as TOPOLOGICAL_ORDER
DATA_DIR = Path(__file__).parents[0] / "dataset"
# FIXME: do not hardcode ids here; they should be dynamically loaded
# from the test graph
ORIGIN_CONTRIBUTORS = """\
origin_SWHID,person_id
swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,0
swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,2
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,0
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,1
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,2
"""
DEANONYMIZATION_TABLE = """\
sha256_base64,base64,escaped
8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe
aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe
UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe
""" # noqa
PERSONS = """\
aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=
UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=
8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=
"""
DEANONYMIZED_ORIGIN_CONTRIBUTORS = """\
origin_SWHID,person_base64,person_escaped
swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe
swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe
""" # noqa
def test_list_origin_contributors(tmpdir):
tmpdir = Path(tmpdir)
topological_order_path = tmpdir / "topo_order.csv.zst"
origin_contributors_path = tmpdir / "origin_contributors.csv.zst"
subprocess.run(
["zstdmt", "-o", topological_order_path],
input=TOPOLOGICAL_ORDER.encode(),
check=True,
)
task = ListOriginContributors(
local_graph_path=DATA_DIR / "compressed",
topological_order_path=topological_order_path,
origin_contributors_path=origin_contributors_path,
graph_name="example",
)
task.run()
csv_text = subprocess.check_output(["zstdcat", origin_contributors_path]).decode()
assert csv_text == ORIGIN_CONTRIBUTORS
def test_export_deanonymization_table(tmpdir, swh_storage_postgresql, swh_storage):
tmpdir = Path(tmpdir)
tstz = TimestampWithTimezone.from_datetime(
datetime.datetime.now(tz=datetime.timezone.utc)
)
swh_storage.release_add(
[
Release(
name=b"v1.0",
message=b"first release",
author=Person.from_fullname(b"John Doe "),
target=b"\x00" * 20,
target_type=ObjectType.REVISION,
synthetic=True,
)
]
)
swh_storage.revision_add(
[
Revision(
message=b"first commit",
author=Person.from_fullname(b"Jane Doe "),
committer=Person.from_fullname(b"Jane Doe "),
date=tstz,
committer_date=tstz,
directory=b"\x00" * 20,
type=RevisionType.GIT,
synthetic=True,
)
]
)
deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst"
task = ExportDeanonymizationTable(
storage_dsn=swh_storage_postgresql.dsn,
deanonymization_table_path=deanonymization_table_path,
)
task.run()
csv_text = subprocess.check_output(["zstdcat", deanonymization_table_path]).decode()
(header, *rows) = csv_text.split("\n")
(expected_header, *expected_rows) = DEANONYMIZATION_TABLE.split("\n")
assert header == expected_header
assert rows.pop() == "", "Missing trailing newline"
expected_rows.pop()
assert set(rows) == set(expected_rows)
def test_deanonymize_origin_contributors(tmpdir):
tmpdir = Path(tmpdir)
persons_path = tmpdir / "example.persons.csv.zst"
origin_contributors_path = tmpdir / "origin_contributors.csv.zst"
deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst"
deanonymized_origin_contributors_path = (
tmpdir / "origin_contributors.deanonymized.csv.zst"
)
subprocess.run(
["zstdmt", "-o", origin_contributors_path],
input=ORIGIN_CONTRIBUTORS.encode(),
check=True,
)
subprocess.run(
["zstdmt", "-o", persons_path],
input=PERSONS.encode(),
check=True,
)
subprocess.run(
["zstdmt", "-o", deanonymization_table_path],
input=DEANONYMIZATION_TABLE.encode(),
check=True,
)
task = DeanonymizeOriginContributors(
local_graph_path=tmpdir,
origin_contributors_path=origin_contributors_path,
deanonymization_table_path=deanonymization_table_path,
deanonymized_origin_contributors_path=deanonymized_origin_contributors_path,
graph_name="example",
)
task.run()
csv_text = subprocess.check_output(
["zstdcat", deanonymized_origin_contributors_path]
).decode()
assert csv_text == DEANONYMIZED_ORIGIN_CONTRIBUTORS
diff --git a/swh/graph/tests/test_toposort.py b/swh/graph/tests/test_toposort.py
index 6d35628..67dde59 100644
--- a/swh/graph/tests/test_toposort.py
+++ b/swh/graph/tests/test_toposort.py
@@ -1,67 +1,67 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from pathlib import Path
import subprocess
-from swh.graph.luigi import TopoSort
+from swh.graph.luigi.misc_datasets import TopoSort
DATA_DIR = Path(__file__).parents[0] / "dataset"
# FIXME: the order of sample ancestors should not be hardcoded
# FIXME: swh:1:snp:0000000000000000000000000000000000000022,3,1,swh has three possible
# sample ancestors; they should not be hardecoded here
EXPECTED = """\
SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2
swh:1:rev:0000000000000000000000000000000000000003,0,1,,
swh:1:rev:0000000000000000000000000000000000000009,1,4,swh:1:rev:0000000000000000000000000000000000000003,
swh:1:rel:0000000000000000000000000000000000000010,1,2,swh:1:rev:0000000000000000000000000000000000000009,
swh:1:snp:0000000000000000000000000000000000000020,2,1,swh:1:rev:0000000000000000000000000000000000000009,swh:1:rel:0000000000000000000000000000000000000010
swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,1,0,swh:1:snp:0000000000000000000000000000000000000020,
swh:1:rev:0000000000000000000000000000000000000013,1,1,swh:1:rev:0000000000000000000000000000000000000009,
swh:1:rev:0000000000000000000000000000000000000018,1,2,swh:1:rev:0000000000000000000000000000000000000013,
swh:1:rel:0000000000000000000000000000000000000019,1,0,swh:1:rev:0000000000000000000000000000000000000018,
swh:1:rel:0000000000000000000000000000000000000021,1,1,swh:1:rev:0000000000000000000000000000000000000018,
swh:1:snp:0000000000000000000000000000000000000022,3,1,swh:1:rev:0000000000000000000000000000000000000009,swh:1:rel:0000000000000000000000000000000000000010
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,1,0,swh:1:snp:0000000000000000000000000000000000000022,
"""
def test_toposort(tmpdir):
tmpdir = Path(tmpdir)
topological_order_path = tmpdir / "topo_order.csv.zst"
task = TopoSort(
local_graph_path=DATA_DIR / "compressed",
topological_order_path=topological_order_path,
graph_name="example",
)
task.run()
csv_text = subprocess.check_output(["zstdcat", topological_order_path]).decode()
(header, *rows) = csv_text.split("\n")
(expected_header, *expected_lines) = EXPECTED.split("\n")
assert header == expected_header
# The only possible first line
assert rows[0] == "swh:1:rev:0000000000000000000000000000000000000003,0,1,,"
assert set(rows) == set(expected_lines)
assert rows.pop() == "", "Missing trailing newline"
# The only three possible last lines
assert rows[-1] in [
"swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,1,0"
",swh:1:snp:0000000000000000000000000000000000000020,",
"swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,1,0"
",swh:1:snp:0000000000000000000000000000000000000022,",
"swh:1:rel:0000000000000000000000000000000000000019,1,0"
",swh:1:rev:0000000000000000000000000000000000000018,",
]