diff --git a/swh/graph/luigi/__init__.py b/swh/graph/luigi/__init__.py new file mode 100644 --- /dev/null +++ b/swh/graph/luigi/__init__.py @@ -0,0 +1,75 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +Luigi tasks +=========== + +This package contains `Luigi `_ tasks. +These come in two kinds: + +* in :mod:`swh.graph.luigi.compressed_graph`: an alternative to the 'swh graph compress' + CLI that can be composed with other tasks, such as swh-dataset's +* in other submodules: tasks driving the creation of specific datasets that are + generated using the compressed graph + + +The overall directory structure is:: + + base_dir/ + [_]/ + edges/ + ... + orc/ + ... + compressed/ + graph.graph + graph.mph + ... + meta/ + export.json + compression.json + datasets/ + contribution_graph.csv.zst + topology/ + topological_order_dfs.csv.zst + +And optionally:: + + sensitive_base_dir/ + [_]/ + persons_sha256_to_name.csv.zst + datasets/ + contribution_graph.deanonymized.csv.zst +""" + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control +from typing import List + +import luigi + +from . import compressed_graph, origin_contributors + + +class RunAll(luigi.Task): + """Runs dataset export, graph compression, and generates datasets using the graph.""" + + def requires(self) -> List[luigi.Task]: + from swh.dataset.luigi import RunExportAll + + # Technically RunExportAll and DeanonymizeOriginContributors together depend + # on everything else, but it's best to be explicit + return [ + RunExportAll(), + compressed_graph.LocalGraph(), + origin_contributors.ListOriginContributors(), + origin_contributors.DeanonymizeOriginContributors(), + ] + + def complete(self) -> bool: + # Dependencies perform their own completeness check, and this task + # does no work itself + return False diff --git a/swh/graph/luigi.py b/swh/graph/luigi/compressed_graph.py rename from swh/graph/luigi.py rename to swh/graph/luigi/compressed_graph.py --- a/swh/graph/luigi.py +++ b/swh/graph/luigi/compressed_graph.py @@ -4,8 +4,8 @@ # See top-level LICENSE file for more information """ -Luigi tasks -=========== +Luigi tasks for compression +=========================== This module contains `Luigi `_ tasks, as an alternative to the CLI that can be composed with other tasks, @@ -19,13 +19,15 @@ In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`, :file:`graph.mph`, ...), tasks in this module produce this directory structure:: - swh_[_]/ - graph.graph - graph.mph - ... - meta/ - export.json - compression.json + base_dir/ + [_]/ + compressed/ + graph.graph + graph.mph + ... + meta/ + export.json + compression.json ``graph.meta/export.json`` is copied from the ORC dataset exported by :mod:`swh.dataset.luigi`. @@ -73,8 +75,7 @@ # WARNING: do not import unnecessary things here to keep cli startup time under # control -from pathlib import Path -from typing import Dict, List, Tuple +from typing import List import luigi @@ -435,220 +436,3 @@ def _meta(self): return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json") - - -def _run_script(script: str, output_path: Path) -> None: - import os - import subprocess - - from .config import check_config - - conf: Dict = {} # TODO: configurable - - conf = check_config(conf) - env = { - **os.environ.copy(), - "JAVA_TOOL_OPTIONS": conf["java_tool_options"], - "CLASSPATH": conf["classpath"], - } - - tmp_output_path = Path(f"{output_path}.tmp") - - subprocess.run( - ["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True - ) - - # Atomically write the output file - tmp_output_path.replace(output_path) - - -class TopoSort(luigi.Task): - """Creates a file that contains all SWHIDs in topological order from a compressed - graph.""" - - local_graph_path = luigi.PathParameter() - topological_order_path = luigi.PathParameter() - graph_name = luigi.Parameter(default="graph") - - def requires(self) -> List[luigi.Task]: - """Returns an instance of :class:`LocalGraph`.""" - return [LocalGraph(local_graph_path=self.local_graph_path)] - - def output(self) -> luigi.Target: - """.csv.zst file that contains the topological order.""" - return luigi.LocalTarget(self.topological_order_path) - - def run(self) -> None: - """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" - object_types = "rev,rel,snp,ori" - class_name = "org.softwareheritage.graph.utils.TopoSort" - script = f""" - java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \ - | pv --line-mode --wait \ - | zstdmt -19 - """ - _run_script(script, self.topological_order_path) - - -class ListOriginContributors(luigi.Task): - """Creates a file that contains all SWHIDs in topological order from a compressed - graph.""" - - local_graph_path = luigi.PathParameter() - topological_order_path = luigi.PathParameter() - origin_contributors_path = luigi.PathParameter() - graph_name = luigi.Parameter(default="graph") - - def requires(self) -> List[luigi.Task]: - """Returns an instance of :class:`LocalGraph` and :class:`TopoSort`.""" - return [ - LocalGraph(local_graph_path=self.local_graph_path), - TopoSort( - local_graph_path=self.local_graph_path, - topological_order_path=self.topological_order_path, - graph_name=self.graph_name, - ), - ] - - def output(self) -> luigi.Target: - """.csv.zst file that contains the topological order.""" - return luigi.LocalTarget(self.origin_contributors_path) - - def run(self) -> None: - """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" - class_name = "org.softwareheritage.graph.utils.ListOriginContributors" - script = f""" - zstdcat {self.topological_order_path} \ - | java {class_name} '{self.local_graph_path}/{self.graph_name}' \ - | pv --line-mode --wait \ - | zstdmt -19 - """ - _run_script(script, self.origin_contributors_path) - - -class ExportDeanonymizationTable(luigi.Task): - """Exports (from swh-storage) a .csv.zst file that contains the columns: - ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. - - The first column is the anonymized full name found in :file:`graph.persons.csv.zst` - in the compressed graph, and the latter two are the original name.""" - - storage_dsn = luigi.Parameter( - default="service=swh", - description="postgresql DSN of the swh-storage database to read from.", - ) - deanonymization_table_path = luigi.PathParameter() - - def output(self) -> luigi.Target: - """.csv.zst file that contains the table.""" - return luigi.LocalTarget(self.deanonymization_table_path) - - def run(self) -> None: - """Runs a postgresql query to compute the table.""" - - _run_script( - f""" - psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19 - """, # noqa - self.deanonymization_table_path, - ) - - -class DeanonymizeOriginContributors(luigi.Task): - """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, - but with ``person_base64`` and ``person_escaped`` columns in addition to - ``person_id``. - - This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names - instead of names); which may not be true depending on how the swh-dataset export - cas configured. - """ - - local_graph_path = luigi.PathParameter() - graph_name = luigi.Parameter(default="graph") - origin_contributors_path = luigi.PathParameter() - deanonymization_table_path = luigi.PathParameter() - deanonymized_origin_contributors_path = luigi.PathParameter() - - def requires(self) -> List[luigi.Task]: - """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, - and :class:`ExportDeanonymizationTable`.""" - return [ - LocalGraph(local_graph_path=self.local_graph_path), - ListOriginContributors( - local_graph_path=self.local_graph_path, - origin_contributors_path=self.origin_contributors_path, - ), - ExportDeanonymizationTable( - deanonymization_table_path=self.deanonymization_table_path, - ), - ] - - def output(self) -> luigi.Target: - """.csv.zst file similar to :meth:`ListOriginContributors.output`'s, - but with ``person_base64`` and ``person_escaped`` columns in addition to - ``person_id``""" - return luigi.LocalTarget(self.deanonymized_origin_contributors_path) - - def run(self) -> None: - """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset - and the deanonymization table in memory, then uses them to map each row - in the original (anonymized) contributors list to the deanonymized one.""" - # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export - # was configured to do so); this should add support for it. - - import base64 - import csv - - import pyzstd - - # Load the deanonymization table, to map sha256(name) to base64(name) - # and escape(name) - sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {} - with pyzstd.open(self.deanonymization_table_path, "rt") as fd: - csv_reader = csv.reader(fd) - header = next(csv_reader) - assert header == ["sha256_base64", "base64", "escaped"], header - for line in csv_reader: - (base64_sha256_name, base64_name, escaped_name) = line - sha256_name = base64.b64decode(base64_sha256_name) - name = base64.b64decode(base64_name) - sha256_to_names[sha256_name] = (name, escaped_name) - - # Combine with the list of sha256(name), to get the list of base64(name) - # and escape(name) - persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" - with pyzstd.open(persons_path, "rb") as fd: - person_id_to_names: List[Tuple[bytes, str]] = [ - sha256_to_names.pop(base64.b64decode(line.strip()), (b"", "")) - for line in fd - ] - - tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") - - # Finally, write a new table of origin_contributors, by reading the anonymized - # table line-by-line and deanonymizing each id - - # Open temporary output for writes as CSV - with pyzstd.open(tmp_output_path, "wt") as output_fd: - csv_writer = csv.writer(output_fd, lineterminator="\n") - # write header - csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped")) - - # Open input for reads as CSV - with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: - csv_reader = csv.reader(input_fd) - header = next(csv_reader) - assert header == ["origin_SWHID", "person_id"], header - for (origin_swhid, person_id) in csv_reader: - if person_id == "null": - # FIXME: workaround for a bug in contribution graphs generated - # before 2022-12-01. Those were only used in tests and never - # published, so the conditional can be removed when this is - # productionized - continue - (name, escaped_name) = person_id_to_names[int(person_id)] - base64_name = base64.b64encode(name).decode("ascii") - csv_writer.writerow((origin_swhid, base64_name, escaped_name)) - - tmp_output_path.replace(self.deanonymized_origin_contributors_path) diff --git a/swh/graph/luigi/misc_datasets.py b/swh/graph/luigi/misc_datasets.py new file mode 100644 --- /dev/null +++ b/swh/graph/luigi/misc_datasets.py @@ -0,0 +1,70 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +Luigi tasks for various derived datasets +======================================== + +This module contains `Luigi `_ tasks +driving the creation of derived datasets. + +File layout +----------- + +This assumes a local compressed graph (from :mod:`swh.graph.luigi.compressed_graph`) +is present, and generates/manipulates the following files:: + + base_dir/ + [_]/ + datasets/ + contribution_graph.csv.zst + topology/ + topological_order_dfs.csv.zst + +And optionally:: + + sensitive_base_dir/ + [_]/ + persons_sha256_to_name.csv.zst + datasets/ + contribution_graph.deanonymized.csv.zst +""" + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control +from typing import List + +import luigi + +from .compressed_graph import LocalGraph +from .utils import run_script + + +class TopoSort(luigi.Task): + """Creates a file that contains all SWHIDs in topological order from a compressed + graph.""" + + local_graph_path = luigi.PathParameter() + topological_order_path = luigi.PathParameter() + graph_name = luigi.Parameter(default="graph") + + def requires(self) -> List[luigi.Task]: + """Returns an instance of :class:`LocalGraph`.""" + return [LocalGraph(local_graph_path=self.local_graph_path)] + + def output(self) -> luigi.Target: + """.csv.zst file that contains the topological order.""" + return luigi.LocalTarget(self.topological_order_path) + + def run(self) -> None: + """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" + object_types = "rev,rel,snp,ori" + class_name = "org.softwareheritage.graph.utils.TopoSort" + script = f""" + java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \ + | pv --line-mode --wait \ + | zstdmt -19 + """ + run_script(script, self.topological_order_path) diff --git a/swh/graph/luigi/origin_contributors.py b/swh/graph/luigi/origin_contributors.py new file mode 100644 --- /dev/null +++ b/swh/graph/luigi/origin_contributors.py @@ -0,0 +1,188 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +Luigi tasks for contribution graph +================================== + +This module contains `Luigi `_ tasks +driving the creation of the graph of contributions of people (pseudonymized +by default). +""" +# WARNING: do not import unnecessary things here to keep cli startup time under +# control +from pathlib import Path +from typing import Dict, List, Tuple + +import luigi + +from .compressed_graph import LocalGraph +from .misc_datasets import TopoSort +from .utils import run_script + + +class ListOriginContributors(luigi.Task): + """Creates a file that contains all SWHIDs in topological order from a compressed + graph.""" + + local_graph_path = luigi.PathParameter() + topological_order_path = luigi.PathParameter() + origin_contributors_path = luigi.PathParameter() + graph_name = luigi.Parameter(default="graph") + + def requires(self) -> List[luigi.Task]: + """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph` + and :class:`swh.graph.luigi.misc_datasets.TopoSort`.""" + return [ + LocalGraph(local_graph_path=self.local_graph_path), + TopoSort( + local_graph_path=self.local_graph_path, + topological_order_path=self.topological_order_path, + graph_name=self.graph_name, + ), + ] + + def output(self) -> luigi.Target: + """.csv.zst file that contains the topological order.""" + return luigi.LocalTarget(self.origin_contributors_path) + + def run(self) -> None: + """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" + class_name = "org.softwareheritage.graph.utils.ListOriginContributors" + script = f""" + zstdcat {self.topological_order_path} \ + | java {class_name} '{self.local_graph_path}/{self.graph_name}' \ + | pv --line-mode --wait \ + | zstdmt -19 + """ + run_script(script, self.origin_contributors_path) + + +class ExportDeanonymizationTable(luigi.Task): + """Exports (from swh-storage) a .csv.zst file that contains the columns: + ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. + + The first column is the anonymized full name found in :file:`graph.persons.csv.zst` + in the compressed graph, and the latter two are the original name.""" + + storage_dsn = luigi.Parameter( + default="service=swh", + description="postgresql DSN of the swh-storage database to read from.", + ) + deanonymization_table_path = luigi.PathParameter() + + def output(self) -> luigi.Target: + """.csv.zst file that contains the table.""" + return luigi.LocalTarget(self.deanonymization_table_path) + + def run(self) -> None: + """Runs a postgresql query to compute the table.""" + + run_script( + f""" + psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19 + """, # noqa + self.deanonymization_table_path, + ) + + +class DeanonymizeOriginContributors(luigi.Task): + """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, + but with ``person_base64`` and ``person_escaped`` columns in addition to + ``person_id``. + + This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names + instead of names); which may not be true depending on how the swh-dataset export + cas configured. + """ + + local_graph_path = luigi.PathParameter() + graph_name = luigi.Parameter(default="graph") + origin_contributors_path = luigi.PathParameter() + deanonymization_table_path = luigi.PathParameter() + deanonymized_origin_contributors_path = luigi.PathParameter() + + def requires(self) -> List[luigi.Task]: + """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, + and :class:`ExportDeanonymizationTable`.""" + return [ + LocalGraph(local_graph_path=self.local_graph_path), + ListOriginContributors( + local_graph_path=self.local_graph_path, + origin_contributors_path=self.origin_contributors_path, + ), + ExportDeanonymizationTable( + deanonymization_table_path=self.deanonymization_table_path, + ), + ] + + def output(self) -> luigi.Target: + """.csv.zst file similar to :meth:`ListOriginContributors.output`'s, + but with ``person_base64`` and ``person_escaped`` columns in addition to + ``person_id``""" + return luigi.LocalTarget(self.deanonymized_origin_contributors_path) + + def run(self) -> None: + """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset + and the deanonymization table in memory, then uses them to map each row + in the original (anonymized) contributors list to the deanonymized one.""" + # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export + # was configured to do so); this should add support for it. + + import base64 + import csv + + import pyzstd + + # Load the deanonymization table, to map sha256(name) to base64(name) + # and escape(name) + sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {} + with pyzstd.open(self.deanonymization_table_path, "rt") as fd: + csv_reader = csv.reader(fd) + header = next(csv_reader) + assert header == ["sha256_base64", "base64", "escaped"], header + for line in csv_reader: + (base64_sha256_name, base64_name, escaped_name) = line + sha256_name = base64.b64decode(base64_sha256_name) + name = base64.b64decode(base64_name) + sha256_to_names[sha256_name] = (name, escaped_name) + + # Combine with the list of sha256(name), to get the list of base64(name) + # and escape(name) + persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" + with pyzstd.open(persons_path, "rb") as fd: + person_id_to_names: List[Tuple[bytes, str]] = [ + sha256_to_names.pop(base64.b64decode(line.strip()), (b"", "")) + for line in fd + ] + + tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") + + # Finally, write a new table of origin_contributors, by reading the anonymized + # table line-by-line and deanonymizing each id + + # Open temporary output for writes as CSV + with pyzstd.open(tmp_output_path, "wt") as output_fd: + csv_writer = csv.writer(output_fd, lineterminator="\n") + # write header + csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped")) + + # Open input for reads as CSV + with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: + csv_reader = csv.reader(input_fd) + header = next(csv_reader) + assert header == ["origin_SWHID", "person_id"], header + for (origin_swhid, person_id) in csv_reader: + if person_id == "null": + # FIXME: workaround for a bug in contribution graphs generated + # before 2022-12-01. Those were only used in tests and never + # published, so the conditional can be removed when this is + # productionized + continue + (name, escaped_name) = person_id_to_names[int(person_id)] + base64_name = base64.b64encode(name).decode("ascii") + csv_writer.writerow((origin_swhid, base64_name, escaped_name)) + + tmp_output_path.replace(self.deanonymized_origin_contributors_path) diff --git a/swh/graph/luigi/utils.py b/swh/graph/luigi/utils.py new file mode 100644 --- /dev/null +++ b/swh/graph/luigi/utils.py @@ -0,0 +1,34 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from pathlib import Path +from typing import Dict + + +def run_script(script: str, output_path: Path) -> None: + import os + import subprocess + + from ..config import check_config + + conf: Dict = {} # TODO: configurable + + conf = check_config(conf) + env = { + **os.environ.copy(), + "JAVA_TOOL_OPTIONS": conf["java_tool_options"], + "CLASSPATH": conf["classpath"], + } + + output_path.parent.mkdir(parents=True, exist_ok=True) + + tmp_output_path = Path(f"{output_path}.tmp") + + subprocess.run( + ["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True + ) + + # Atomically write the output file + tmp_output_path.replace(output_path) diff --git a/swh/graph/tests/test_luigi.py b/swh/graph/tests/test_luigi.py --- a/swh/graph/tests/test_luigi.py +++ b/swh/graph/tests/test_luigi.py @@ -6,7 +6,7 @@ import json from pathlib import Path -from swh.graph.luigi import CompressGraph +from swh.graph.luigi.compressed_graph import CompressGraph from .test_cli import read_properties diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py --- a/swh/graph/tests/test_origin_contributors.py +++ b/swh/graph/tests/test_origin_contributors.py @@ -7,7 +7,7 @@ from pathlib import Path import subprocess -from swh.graph.luigi import ( +from swh.graph.luigi.origin_contributors import ( DeanonymizeOriginContributors, ExportDeanonymizationTable, ListOriginContributors, diff --git a/swh/graph/tests/test_toposort.py b/swh/graph/tests/test_toposort.py --- a/swh/graph/tests/test_toposort.py +++ b/swh/graph/tests/test_toposort.py @@ -6,7 +6,7 @@ from pathlib import Path import subprocess -from swh.graph.luigi import TopoSort +from swh.graph.luigi.misc_datasets import TopoSort DATA_DIR = Path(__file__).parents[0] / "dataset"