diff --git a/swh/graph/luigi/__init__.py b/swh/graph/luigi/__init__.py new file mode 100644 index 0000000..a82ff9b --- /dev/null +++ b/swh/graph/luigi/__init__.py @@ -0,0 +1,75 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +Luigi tasks +=========== + +This package contains `Luigi `_ tasks. +These come in two kinds: + +* in :mod:`swh.graph.luigi.compressed_graph`: an alternative to the 'swh graph compress' + CLI that can be composed with other tasks, such as swh-dataset's +* in other submodules: tasks driving the creation of specific datasets that are + generated using the compressed graph + + +The overall directory structure is:: + + base_dir/ + [_]/ + edges/ + ... + orc/ + ... + compressed/ + graph.graph + graph.mph + ... + meta/ + export.json + compression.json + datasets/ + contribution_graph.csv.zst + topology/ + topological_order_dfs.csv.zst + +And optionally:: + + sensitive_base_dir/ + [_]/ + persons_sha256_to_name.csv.zst + datasets/ + contribution_graph.deanonymized.csv.zst +""" + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control +from typing import List + +import luigi + +from . import compressed_graph, origin_contributors + + +class RunAll(luigi.Task): + """Runs dataset export, graph compression, and generates datasets using the graph.""" + + def requires(self) -> List[luigi.Task]: + from swh.dataset.luigi import RunExportAll + + # Technically RunExportAll and DeanonymizeOriginContributors together depend + # on everything else, but it's best to be explicit + return [ + RunExportAll(), + compressed_graph.LocalGraph(), + origin_contributors.ListOriginContributors(), + origin_contributors.DeanonymizeOriginContributors(), + ] + + def complete(self) -> bool: + # Dependencies perform their own completeness check, and this task + # does no work itself + return False diff --git a/swh/graph/luigi.py b/swh/graph/luigi/compressed_graph.py similarity index 61% rename from swh/graph/luigi.py rename to swh/graph/luigi/compressed_graph.py index a190a80..9f52c7e 100644 --- a/swh/graph/luigi.py +++ b/swh/graph/luigi/compressed_graph.py @@ -1,654 +1,438 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ -Luigi tasks -=========== +Luigi tasks for compression +=========================== This module contains `Luigi `_ tasks, as an alternative to the CLI that can be composed with other tasks, such as swh-dataset's. Unlike the CLI, this requires the graph to be named `graph`. File layout ----------- In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`, :file:`graph.mph`, ...), tasks in this module produce this directory structure:: - swh_[_]/ - graph.graph - graph.mph - ... - meta/ - export.json - compression.json + base_dir/ + [_]/ + compressed/ + graph.graph + graph.mph + ... + meta/ + export.json + compression.json ``graph.meta/export.json`` is copied from the ORC dataset exported by :mod:`swh.dataset.luigi`. ``graph.meta/compression.json`` contains information about the compression itself, for provenance tracking. For example: .. code-block:: json [ { "steps": null, "export_start": "2022-11-08T11:00:54.998799+00:00", "export_end": "2022-11-08T11:05:53.105519+00:00", "object_type": [ "origin", "origin_visit" ], "hostname": "desktop5", "conf": {}, "tool": { "name": "swh.graph", "version": "2.2.0" } } ] When the compression pipeline is run in separate steps, each of the steps is recorded as an object in the root list. S3 layout --------- As ``.bin`` files are meant to be accessed randomly, they are uncompressed on disk. However, this is undesirable on at-rest/long-term storage like on S3, because some are very sparse (eg. :file:`graph.property.committer_timestamp.bin` can be quickly compressed from 300 to 1GB). Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed when downloading. The layout is otherwise the same as the file layout. """ # WARNING: do not import unnecessary things here to keep cli startup time under # control -from pathlib import Path -from typing import Dict, List, Tuple +from typing import List import luigi from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter class CompressGraph(luigi.Task): local_export_path = luigi.PathParameter(significant=False) local_graph_path = luigi.PathParameter() batch_size = luigi.IntParameter( default=0, significant=False, description=""" Size of work batches to use while compressing. Larger is faster, but consumes more resources. """, ) object_types = list(ObjectType) # To make this configurable, we could use this: # object_types = luigi.EnumListParameter( # enum=ObjectType, default=list(ObjectType), batch_method=merge_lists # ) # then use swh.dataset.luigi._export_metadata_has_object_types to check in # .meta/export.json that all objects are present before skipping the task def requires(self) -> List[luigi.Task]: """Returns a :class:`LocalExport` task.""" return [ LocalExport( local_export_path=self.local_export_path, formats=[Format.orc], # type: ignore[attr-defined] object_types=self.object_types, ) ] def output(self) -> List[luigi.LocalTarget]: """Returns the ``meta/*.json`` targets""" return [self._export_meta(), self._compression_meta()] def _export_meta(self) -> luigi.Target: """Returns the metadata on the dataset export""" return luigi.LocalTarget(self.local_graph_path / "meta/export.json") def _compression_meta(self) -> luigi.Target: """Returns the metadata on the compression pipeline""" return luigi.LocalTarget(self.local_graph_path / "meta/compression.json") def run(self): """Runs the full compression pipeline, then writes :file:`meta/compression.json` This does not support running individual steps yet.""" import datetime import json import shutil import socket import pkg_resources from swh.graph import webgraph conf = {} # TODO: make this configurable steps = None # TODO: make this configurable if self.batch_size: conf["batch_size"] = self.batch_size # Delete stamps. Otherwise interrupting this compression pipeline may leave # stamps from a previous successful compression if self._export_meta().exists(): self._export_meta().remove() if self._compression_meta().exists(): self._compression_meta().remove() # Make sure we don't accidentally append to existing files if self.local_graph_path.exists(): shutil.rmtree(self.local_graph_path) output_directory = self.local_graph_path graph_name = "graph" def progress_cb(percentage: int, step: webgraph.CompressionStep): self.set_progress_percentage(percentage) self.set_status_message(f"Running {step.name} (step #{step.value})") start_date = datetime.datetime.now(tz=datetime.timezone.utc) webgraph.compress( graph_name, self.local_export_path / "orc", output_directory, steps, conf, ) end_date = datetime.datetime.now(tz=datetime.timezone.utc) # Copy dataset export metadata with self._export_meta().open("w") as write_fd: with (self.local_export_path / "meta" / "export.json").open() as read_fd: write_fd.write(read_fd.read()) # Append metadata about this compression pipeline if self._compression_meta().exists(): with self._compression_meta().open("w") as fd: meta = json.load(fd) else: meta = [] meta.append( { "steps": steps, "compression_start": start_date.isoformat(), "compression_end": end_date.isoformat(), "object_type": [object_type.name for object_type in self.object_types], "hostname": socket.getfqdn(), "conf": conf, "tool": { "name": "swh.graph", "version": pkg_resources.get_distribution("swh.graph").version, }, } ) with self._compression_meta().open("w") as fd: json.dump(meta, fd, indent=4) class UploadGraphToS3(luigi.Task): """Uploads a local compressed graphto S3; creating automatically if it does not exist. Example invocation:: luigi --local-scheduler --module swh.graph.luigi UploadGraphToS3 \ --local-graph-path=graph/ \ --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/ """ local_graph_path = luigi.PathParameter(significant=False) s3_graph_path = S3PathParameter() def requires(self) -> List[luigi.Task]: """Returns a :class:`CompressGraph` task that writes local files at the expected location.""" return [ CompressGraph( local_graph_path=self.local_graph_path, ) ] def output(self) -> List[luigi.Target]: """Returns stamp and meta paths on S3.""" return [self._meta()] def _meta(self): import luigi.contrib.s3 return luigi.contrib.s3.S3Target(f"{self.s3_graph_path}/meta/compression.json") def run(self) -> None: """Copies all files: first the graph itself, then :file:`meta/compression.json`.""" import subprocess import tempfile import luigi.contrib.s3 import tqdm compression_metadata_path = self.local_graph_path / "meta" / "compression.json" seen_compression_metadata = False client = luigi.contrib.s3.S3Client() # recursively copy local files to S3, and end with compression metadata paths = list(self.local_graph_path.glob("**/*")) for (i, path) in tqdm.tqdm( list(enumerate(paths)), desc="Uploading compressed graph", ): if path == compression_metadata_path: # Write it last seen_compression_metadata = True continue if path.is_dir(): continue relative_path = path.relative_to(self.local_graph_path) self.set_progress_percentage(int(i * 100 / len(paths))) if path.suffix == ".bin": # Large sparse file; store it compressed on S3. with tempfile.NamedTemporaryFile( prefix=path.stem, suffix=".bin.zst" ) as fd: self.set_status_message(f"Compressing {relative_path}") subprocess.run( ["zstdmt", "--force", "--keep", path, "-o", fd.name], check=True ) self.set_status_message(f"Uploading {relative_path} (compressed)") client.put_multipart( fd.name, f"{self.s3_graph_path}/{relative_path}.zst", ACL="public-read", ) else: self.set_status_message(f"Uploading {relative_path}") client.put_multipart( path, f"{self.s3_graph_path}/{relative_path}", ACL="public-read" ) assert ( seen_compression_metadata ), "did not see meta/compression.json in directory listing" # Write it last, to act as a stamp client.put( compression_metadata_path, self._meta().path, ACL="public-read", ) class DownloadGraphFromS3(luigi.Task): """Downloads a local dataset graph from S3. This performs the inverse operation of :class:`UploadGraphToS3` Example invocation:: luigi --local-scheduler --module swh.graph.luigi DownloadGraphFromS3 \ --local-graph-path=graph/ \ --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/ """ local_graph_path = luigi.PathParameter() s3_graph_path = S3PathParameter(significant=False) def requires(self) -> List[luigi.Task]: """Returns a :class:`ExportGraph` task that writes local files at the expected location.""" return [ UploadGraphToS3( local_graph_path=self.local_graph_path, s3_graph_path=self.s3_graph_path, ) ] def output(self) -> List[luigi.Target]: """Returns stamp and meta paths on the local filesystem.""" return [self._meta()] def _meta(self): return luigi.LocalTarget(self.local_graph_path / "meta" / "export.json") def run(self) -> None: """Copies all files: first the graph itself, then :file:`meta/compression.json`.""" import subprocess import tempfile import luigi.contrib.s3 import tqdm client = luigi.contrib.s3.S3Client() compression_metadata_path = f"{self.s3_graph_path}/meta/compression.json" seen_compression_metadata = False # recursively copy local files to S3, and end with compression metadata files = list(client.list(self.s3_graph_path)) for (i, file_) in tqdm.tqdm( list(enumerate(files)), desc="Downloading", ): if file_ == compression_metadata_path: # Will copy it last seen_compression_metadata = True continue self.set_progress_percentage(int(i * 100 / len(files))) local_path = self.local_graph_path / file_ local_path.parent.mkdir(parents=True, exist_ok=True) if file_.endswith(".bin.zst"): # The file was compressed before uploading to S3, we need it # to be decompressed locally with tempfile.NamedTemporaryFile( prefix=local_path.stem, suffix=".bin.zst" ) as fd: self.set_status_message(f"Downloading {file_} (compressed)") client.get( f"{self.s3_graph_path}/{file_}", fd.name, ) self.set_status_message(f"Decompressing {file_}") subprocess.run( [ "zstdmt", "--force", "-d", fd.name, "-o", str(local_path)[0:-4], ], check=True, ) else: self.set_status_message(f"Downloading {file_}") client.get( f"{self.s3_graph_path}/{file_}", str(local_path), ) assert ( seen_compression_metadata ), "did not see meta/compression.json in directory listing" # Write it last, to act as a stamp client.get( compression_metadata_path, self._meta().path, ) class LocalGraph(luigi.Task): """Task that depends on a local dataset being present -- either directly from :class:`ExportGraph` or via :class:`DownloadGraphFromS3`. """ local_graph_path = luigi.PathParameter() compression_task_type = luigi.TaskParameter( default=DownloadGraphFromS3, significant=False, description="""The task used to get the compressed graph if it is not present. Should be either ``swh.graph.luigi.CompressGraph`` or ``swh.graph.luigi.DownloadGraphFromS3``.""", ) def requires(self) -> List[luigi.Task]: """Returns an instance of either :class:`CompressGraph` or :class:`DownloadGraphFromS3` depending on the value of :attr:`compression_task_type`.""" if issubclass(self.compression_task_type, CompressGraph): return [ CompressGraph( local_graph_path=self.local_graph_path, ) ] elif issubclass(self.compression_task_type, DownloadGraphFromS3): return [ DownloadGraphFromS3( local_graph_path=self.local_graph_path, ) ] else: raise ValueError( f"Unexpected compression_task_type: " f"{self.compression_task_type.__name__}" ) def output(self) -> List[luigi.Target]: """Returns stamp and meta paths on the local filesystem.""" return [self._meta()] def _meta(self): return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json") - - -def _run_script(script: str, output_path: Path) -> None: - import os - import subprocess - - from .config import check_config - - conf: Dict = {} # TODO: configurable - - conf = check_config(conf) - env = { - **os.environ.copy(), - "JAVA_TOOL_OPTIONS": conf["java_tool_options"], - "CLASSPATH": conf["classpath"], - } - - tmp_output_path = Path(f"{output_path}.tmp") - - subprocess.run( - ["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True - ) - - # Atomically write the output file - tmp_output_path.replace(output_path) - - -class TopoSort(luigi.Task): - """Creates a file that contains all SWHIDs in topological order from a compressed - graph.""" - - local_graph_path = luigi.PathParameter() - topological_order_path = luigi.PathParameter() - graph_name = luigi.Parameter(default="graph") - - def requires(self) -> List[luigi.Task]: - """Returns an instance of :class:`LocalGraph`.""" - return [LocalGraph(local_graph_path=self.local_graph_path)] - - def output(self) -> luigi.Target: - """.csv.zst file that contains the topological order.""" - return luigi.LocalTarget(self.topological_order_path) - - def run(self) -> None: - """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" - object_types = "rev,rel,snp,ori" - class_name = "org.softwareheritage.graph.utils.TopoSort" - script = f""" - java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \ - | pv --line-mode --wait \ - | zstdmt -19 - """ - _run_script(script, self.topological_order_path) - - -class ListOriginContributors(luigi.Task): - """Creates a file that contains all SWHIDs in topological order from a compressed - graph.""" - - local_graph_path = luigi.PathParameter() - topological_order_path = luigi.PathParameter() - origin_contributors_path = luigi.PathParameter() - graph_name = luigi.Parameter(default="graph") - - def requires(self) -> List[luigi.Task]: - """Returns an instance of :class:`LocalGraph` and :class:`TopoSort`.""" - return [ - LocalGraph(local_graph_path=self.local_graph_path), - TopoSort( - local_graph_path=self.local_graph_path, - topological_order_path=self.topological_order_path, - graph_name=self.graph_name, - ), - ] - - def output(self) -> luigi.Target: - """.csv.zst file that contains the topological order.""" - return luigi.LocalTarget(self.origin_contributors_path) - - def run(self) -> None: - """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" - class_name = "org.softwareheritage.graph.utils.ListOriginContributors" - script = f""" - zstdcat {self.topological_order_path} \ - | java {class_name} '{self.local_graph_path}/{self.graph_name}' \ - | pv --line-mode --wait \ - | zstdmt -19 - """ - _run_script(script, self.origin_contributors_path) - - -class ExportDeanonymizationTable(luigi.Task): - """Exports (from swh-storage) a .csv.zst file that contains the columns: - ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. - - The first column is the anonymized full name found in :file:`graph.persons.csv.zst` - in the compressed graph, and the latter two are the original name.""" - - storage_dsn = luigi.Parameter( - default="service=swh", - description="postgresql DSN of the swh-storage database to read from.", - ) - deanonymization_table_path = luigi.PathParameter() - - def output(self) -> luigi.Target: - """.csv.zst file that contains the table.""" - return luigi.LocalTarget(self.deanonymization_table_path) - - def run(self) -> None: - """Runs a postgresql query to compute the table.""" - - _run_script( - f""" - psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19 - """, # noqa - self.deanonymization_table_path, - ) - - -class DeanonymizeOriginContributors(luigi.Task): - """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, - but with ``person_base64`` and ``person_escaped`` columns in addition to - ``person_id``. - - This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names - instead of names); which may not be true depending on how the swh-dataset export - cas configured. - """ - - local_graph_path = luigi.PathParameter() - graph_name = luigi.Parameter(default="graph") - origin_contributors_path = luigi.PathParameter() - deanonymization_table_path = luigi.PathParameter() - deanonymized_origin_contributors_path = luigi.PathParameter() - - def requires(self) -> List[luigi.Task]: - """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, - and :class:`ExportDeanonymizationTable`.""" - return [ - LocalGraph(local_graph_path=self.local_graph_path), - ListOriginContributors( - local_graph_path=self.local_graph_path, - origin_contributors_path=self.origin_contributors_path, - ), - ExportDeanonymizationTable( - deanonymization_table_path=self.deanonymization_table_path, - ), - ] - - def output(self) -> luigi.Target: - """.csv.zst file similar to :meth:`ListOriginContributors.output`'s, - but with ``person_base64`` and ``person_escaped`` columns in addition to - ``person_id``""" - return luigi.LocalTarget(self.deanonymized_origin_contributors_path) - - def run(self) -> None: - """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset - and the deanonymization table in memory, then uses them to map each row - in the original (anonymized) contributors list to the deanonymized one.""" - # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export - # was configured to do so); this should add support for it. - - import base64 - import csv - - import pyzstd - - # Load the deanonymization table, to map sha256(name) to base64(name) - # and escape(name) - sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {} - with pyzstd.open(self.deanonymization_table_path, "rt") as fd: - csv_reader = csv.reader(fd) - header = next(csv_reader) - assert header == ["sha256_base64", "base64", "escaped"], header - for line in csv_reader: - (base64_sha256_name, base64_name, escaped_name) = line - sha256_name = base64.b64decode(base64_sha256_name) - name = base64.b64decode(base64_name) - sha256_to_names[sha256_name] = (name, escaped_name) - - # Combine with the list of sha256(name), to get the list of base64(name) - # and escape(name) - persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" - with pyzstd.open(persons_path, "rb") as fd: - person_id_to_names: List[Tuple[bytes, str]] = [ - sha256_to_names.pop(base64.b64decode(line.strip()), (b"", "")) - for line in fd - ] - - tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") - - # Finally, write a new table of origin_contributors, by reading the anonymized - # table line-by-line and deanonymizing each id - - # Open temporary output for writes as CSV - with pyzstd.open(tmp_output_path, "wt") as output_fd: - csv_writer = csv.writer(output_fd, lineterminator="\n") - # write header - csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped")) - - # Open input for reads as CSV - with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: - csv_reader = csv.reader(input_fd) - header = next(csv_reader) - assert header == ["origin_SWHID", "person_id"], header - for (origin_swhid, person_id) in csv_reader: - if person_id == "null": - # FIXME: workaround for a bug in contribution graphs generated - # before 2022-12-01. Those were only used in tests and never - # published, so the conditional can be removed when this is - # productionized - continue - (name, escaped_name) = person_id_to_names[int(person_id)] - base64_name = base64.b64encode(name).decode("ascii") - csv_writer.writerow((origin_swhid, base64_name, escaped_name)) - - tmp_output_path.replace(self.deanonymized_origin_contributors_path) diff --git a/swh/graph/luigi/misc_datasets.py b/swh/graph/luigi/misc_datasets.py new file mode 100644 index 0000000..f755a7f --- /dev/null +++ b/swh/graph/luigi/misc_datasets.py @@ -0,0 +1,70 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +Luigi tasks for various derived datasets +======================================== + +This module contains `Luigi `_ tasks +driving the creation of derived datasets. + +File layout +----------- + +This assumes a local compressed graph (from :mod:`swh.graph.luigi.compressed_graph`) +is present, and generates/manipulates the following files:: + + base_dir/ + [_]/ + datasets/ + contribution_graph.csv.zst + topology/ + topological_order_dfs.csv.zst + +And optionally:: + + sensitive_base_dir/ + [_]/ + persons_sha256_to_name.csv.zst + datasets/ + contribution_graph.deanonymized.csv.zst +""" + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control +from typing import List + +import luigi + +from .compressed_graph import LocalGraph +from .utils import run_script + + +class TopoSort(luigi.Task): + """Creates a file that contains all SWHIDs in topological order from a compressed + graph.""" + + local_graph_path = luigi.PathParameter() + topological_order_path = luigi.PathParameter() + graph_name = luigi.Parameter(default="graph") + + def requires(self) -> List[luigi.Task]: + """Returns an instance of :class:`LocalGraph`.""" + return [LocalGraph(local_graph_path=self.local_graph_path)] + + def output(self) -> luigi.Target: + """.csv.zst file that contains the topological order.""" + return luigi.LocalTarget(self.topological_order_path) + + def run(self) -> None: + """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" + object_types = "rev,rel,snp,ori" + class_name = "org.softwareheritage.graph.utils.TopoSort" + script = f""" + java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \ + | pv --line-mode --wait \ + | zstdmt -19 + """ + run_script(script, self.topological_order_path) diff --git a/swh/graph/luigi/origin_contributors.py b/swh/graph/luigi/origin_contributors.py new file mode 100644 index 0000000..ce404aa --- /dev/null +++ b/swh/graph/luigi/origin_contributors.py @@ -0,0 +1,188 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +Luigi tasks for contribution graph +================================== + +This module contains `Luigi `_ tasks +driving the creation of the graph of contributions of people (pseudonymized +by default). +""" +# WARNING: do not import unnecessary things here to keep cli startup time under +# control +from pathlib import Path +from typing import Dict, List, Tuple + +import luigi + +from .compressed_graph import LocalGraph +from .misc_datasets import TopoSort +from .utils import run_script + + +class ListOriginContributors(luigi.Task): + """Creates a file that contains all SWHIDs in topological order from a compressed + graph.""" + + local_graph_path = luigi.PathParameter() + topological_order_path = luigi.PathParameter() + origin_contributors_path = luigi.PathParameter() + graph_name = luigi.Parameter(default="graph") + + def requires(self) -> List[luigi.Task]: + """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph` + and :class:`swh.graph.luigi.misc_datasets.TopoSort`.""" + return [ + LocalGraph(local_graph_path=self.local_graph_path), + TopoSort( + local_graph_path=self.local_graph_path, + topological_order_path=self.topological_order_path, + graph_name=self.graph_name, + ), + ] + + def output(self) -> luigi.Target: + """.csv.zst file that contains the topological order.""" + return luigi.LocalTarget(self.origin_contributors_path) + + def run(self) -> None: + """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" + class_name = "org.softwareheritage.graph.utils.ListOriginContributors" + script = f""" + zstdcat {self.topological_order_path} \ + | java {class_name} '{self.local_graph_path}/{self.graph_name}' \ + | pv --line-mode --wait \ + | zstdmt -19 + """ + run_script(script, self.origin_contributors_path) + + +class ExportDeanonymizationTable(luigi.Task): + """Exports (from swh-storage) a .csv.zst file that contains the columns: + ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. + + The first column is the anonymized full name found in :file:`graph.persons.csv.zst` + in the compressed graph, and the latter two are the original name.""" + + storage_dsn = luigi.Parameter( + default="service=swh", + description="postgresql DSN of the swh-storage database to read from.", + ) + deanonymization_table_path = luigi.PathParameter() + + def output(self) -> luigi.Target: + """.csv.zst file that contains the table.""" + return luigi.LocalTarget(self.deanonymization_table_path) + + def run(self) -> None: + """Runs a postgresql query to compute the table.""" + + run_script( + f""" + psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19 + """, # noqa + self.deanonymization_table_path, + ) + + +class DeanonymizeOriginContributors(luigi.Task): + """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, + but with ``person_base64`` and ``person_escaped`` columns in addition to + ``person_id``. + + This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names + instead of names); which may not be true depending on how the swh-dataset export + cas configured. + """ + + local_graph_path = luigi.PathParameter() + graph_name = luigi.Parameter(default="graph") + origin_contributors_path = luigi.PathParameter() + deanonymization_table_path = luigi.PathParameter() + deanonymized_origin_contributors_path = luigi.PathParameter() + + def requires(self) -> List[luigi.Task]: + """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, + and :class:`ExportDeanonymizationTable`.""" + return [ + LocalGraph(local_graph_path=self.local_graph_path), + ListOriginContributors( + local_graph_path=self.local_graph_path, + origin_contributors_path=self.origin_contributors_path, + ), + ExportDeanonymizationTable( + deanonymization_table_path=self.deanonymization_table_path, + ), + ] + + def output(self) -> luigi.Target: + """.csv.zst file similar to :meth:`ListOriginContributors.output`'s, + but with ``person_base64`` and ``person_escaped`` columns in addition to + ``person_id``""" + return luigi.LocalTarget(self.deanonymized_origin_contributors_path) + + def run(self) -> None: + """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset + and the deanonymization table in memory, then uses them to map each row + in the original (anonymized) contributors list to the deanonymized one.""" + # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export + # was configured to do so); this should add support for it. + + import base64 + import csv + + import pyzstd + + # Load the deanonymization table, to map sha256(name) to base64(name) + # and escape(name) + sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {} + with pyzstd.open(self.deanonymization_table_path, "rt") as fd: + csv_reader = csv.reader(fd) + header = next(csv_reader) + assert header == ["sha256_base64", "base64", "escaped"], header + for line in csv_reader: + (base64_sha256_name, base64_name, escaped_name) = line + sha256_name = base64.b64decode(base64_sha256_name) + name = base64.b64decode(base64_name) + sha256_to_names[sha256_name] = (name, escaped_name) + + # Combine with the list of sha256(name), to get the list of base64(name) + # and escape(name) + persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" + with pyzstd.open(persons_path, "rb") as fd: + person_id_to_names: List[Tuple[bytes, str]] = [ + sha256_to_names.pop(base64.b64decode(line.strip()), (b"", "")) + for line in fd + ] + + tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") + + # Finally, write a new table of origin_contributors, by reading the anonymized + # table line-by-line and deanonymizing each id + + # Open temporary output for writes as CSV + with pyzstd.open(tmp_output_path, "wt") as output_fd: + csv_writer = csv.writer(output_fd, lineterminator="\n") + # write header + csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped")) + + # Open input for reads as CSV + with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: + csv_reader = csv.reader(input_fd) + header = next(csv_reader) + assert header == ["origin_SWHID", "person_id"], header + for (origin_swhid, person_id) in csv_reader: + if person_id == "null": + # FIXME: workaround for a bug in contribution graphs generated + # before 2022-12-01. Those were only used in tests and never + # published, so the conditional can be removed when this is + # productionized + continue + (name, escaped_name) = person_id_to_names[int(person_id)] + base64_name = base64.b64encode(name).decode("ascii") + csv_writer.writerow((origin_swhid, base64_name, escaped_name)) + + tmp_output_path.replace(self.deanonymized_origin_contributors_path) diff --git a/swh/graph/luigi/utils.py b/swh/graph/luigi/utils.py new file mode 100644 index 0000000..ae60091 --- /dev/null +++ b/swh/graph/luigi/utils.py @@ -0,0 +1,34 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from pathlib import Path +from typing import Dict + + +def run_script(script: str, output_path: Path) -> None: + import os + import subprocess + + from ..config import check_config + + conf: Dict = {} # TODO: configurable + + conf = check_config(conf) + env = { + **os.environ.copy(), + "JAVA_TOOL_OPTIONS": conf["java_tool_options"], + "CLASSPATH": conf["classpath"], + } + + output_path.parent.mkdir(parents=True, exist_ok=True) + + tmp_output_path = Path(f"{output_path}.tmp") + + subprocess.run( + ["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True + ) + + # Atomically write the output file + tmp_output_path.replace(output_path) diff --git a/swh/graph/tests/test_luigi.py b/swh/graph/tests/test_luigi.py index 1cce95c..a05bb54 100644 --- a/swh/graph/tests/test_luigi.py +++ b/swh/graph/tests/test_luigi.py @@ -1,36 +1,36 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from pathlib import Path -from swh.graph.luigi import CompressGraph +from swh.graph.luigi.compressed_graph import CompressGraph from .test_cli import read_properties DATA_DIR = Path(__file__).parents[0] / "dataset" def test_compressgraph(tmpdir): tmpdir = Path(tmpdir) task = CompressGraph( local_export_path=DATA_DIR, local_graph_path=tmpdir / "compressed_graph", batch_size=1000, # go fast on the trivial dataset ) task.run() properties = read_properties(tmpdir / "compressed_graph" / "graph.properties") assert int(properties["nodes"]) == 24 assert int(properties["arcs"]) == 28 export_meta_path = tmpdir / "compressed_graph/meta/export.json" assert export_meta_path.read_bytes() == (DATA_DIR / "meta/export.json").read_bytes() compression_meta_path = tmpdir / "compressed_graph/meta/compression.json" assert json.load(compression_meta_path.open())[0]["conf"] == {"batch_size": 1000} diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py index 883872c..233ad14 100644 --- a/swh/graph/tests/test_origin_contributors.py +++ b/swh/graph/tests/test_origin_contributors.py @@ -1,186 +1,186 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from pathlib import Path import subprocess -from swh.graph.luigi import ( +from swh.graph.luigi.origin_contributors import ( DeanonymizeOriginContributors, ExportDeanonymizationTable, ListOriginContributors, ) from swh.model.model import ( ObjectType, Person, Release, Revision, RevisionType, TimestampWithTimezone, ) from .test_toposort import EXPECTED as TOPOLOGICAL_ORDER DATA_DIR = Path(__file__).parents[0] / "dataset" # FIXME: do not hardcode ids here; they should be dynamically loaded # from the test graph ORIGIN_CONTRIBUTORS = """\ origin_SWHID,person_id swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,0 swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,2 swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,0 swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,1 swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,2 """ DEANONYMIZATION_TABLE = """\ sha256_base64,base64,escaped 8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe """ # noqa PERSONS = """\ aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U= UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs= 8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI= """ DEANONYMIZED_ORIGIN_CONTRIBUTORS = """\ origin_SWHID,person_base64,person_escaped swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe """ # noqa def test_list_origin_contributors(tmpdir): tmpdir = Path(tmpdir) topological_order_path = tmpdir / "topo_order.csv.zst" origin_contributors_path = tmpdir / "origin_contributors.csv.zst" subprocess.run( ["zstdmt", "-o", topological_order_path], input=TOPOLOGICAL_ORDER.encode(), check=True, ) task = ListOriginContributors( local_graph_path=DATA_DIR / "compressed", topological_order_path=topological_order_path, origin_contributors_path=origin_contributors_path, graph_name="example", ) task.run() csv_text = subprocess.check_output(["zstdcat", origin_contributors_path]).decode() assert csv_text == ORIGIN_CONTRIBUTORS def test_export_deanonymization_table(tmpdir, swh_storage_postgresql, swh_storage): tmpdir = Path(tmpdir) tstz = TimestampWithTimezone.from_datetime( datetime.datetime.now(tz=datetime.timezone.utc) ) swh_storage.release_add( [ Release( name=b"v1.0", message=b"first release", author=Person.from_fullname(b"John Doe "), target=b"\x00" * 20, target_type=ObjectType.REVISION, synthetic=True, ) ] ) swh_storage.revision_add( [ Revision( message=b"first commit", author=Person.from_fullname(b"Jane Doe "), committer=Person.from_fullname(b"Jane Doe "), date=tstz, committer_date=tstz, directory=b"\x00" * 20, type=RevisionType.GIT, synthetic=True, ) ] ) deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" task = ExportDeanonymizationTable( storage_dsn=swh_storage_postgresql.dsn, deanonymization_table_path=deanonymization_table_path, ) task.run() csv_text = subprocess.check_output(["zstdcat", deanonymization_table_path]).decode() (header, *rows) = csv_text.split("\n") (expected_header, *expected_rows) = DEANONYMIZATION_TABLE.split("\n") assert header == expected_header assert rows.pop() == "", "Missing trailing newline" expected_rows.pop() assert set(rows) == set(expected_rows) def test_deanonymize_origin_contributors(tmpdir): tmpdir = Path(tmpdir) persons_path = tmpdir / "example.persons.csv.zst" origin_contributors_path = tmpdir / "origin_contributors.csv.zst" deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" deanonymized_origin_contributors_path = ( tmpdir / "origin_contributors.deanonymized.csv.zst" ) subprocess.run( ["zstdmt", "-o", origin_contributors_path], input=ORIGIN_CONTRIBUTORS.encode(), check=True, ) subprocess.run( ["zstdmt", "-o", persons_path], input=PERSONS.encode(), check=True, ) subprocess.run( ["zstdmt", "-o", deanonymization_table_path], input=DEANONYMIZATION_TABLE.encode(), check=True, ) task = DeanonymizeOriginContributors( local_graph_path=tmpdir, origin_contributors_path=origin_contributors_path, deanonymization_table_path=deanonymization_table_path, deanonymized_origin_contributors_path=deanonymized_origin_contributors_path, graph_name="example", ) task.run() csv_text = subprocess.check_output( ["zstdcat", deanonymized_origin_contributors_path] ).decode() assert csv_text == DEANONYMIZED_ORIGIN_CONTRIBUTORS diff --git a/swh/graph/tests/test_toposort.py b/swh/graph/tests/test_toposort.py index 6d35628..67dde59 100644 --- a/swh/graph/tests/test_toposort.py +++ b/swh/graph/tests/test_toposort.py @@ -1,67 +1,67 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from pathlib import Path import subprocess -from swh.graph.luigi import TopoSort +from swh.graph.luigi.misc_datasets import TopoSort DATA_DIR = Path(__file__).parents[0] / "dataset" # FIXME: the order of sample ancestors should not be hardcoded # FIXME: swh:1:snp:0000000000000000000000000000000000000022,3,1,swh has three possible # sample ancestors; they should not be hardecoded here EXPECTED = """\ SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2 swh:1:rev:0000000000000000000000000000000000000003,0,1,, swh:1:rev:0000000000000000000000000000000000000009,1,4,swh:1:rev:0000000000000000000000000000000000000003, swh:1:rel:0000000000000000000000000000000000000010,1,2,swh:1:rev:0000000000000000000000000000000000000009, swh:1:snp:0000000000000000000000000000000000000020,2,1,swh:1:rev:0000000000000000000000000000000000000009,swh:1:rel:0000000000000000000000000000000000000010 swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,1,0,swh:1:snp:0000000000000000000000000000000000000020, swh:1:rev:0000000000000000000000000000000000000013,1,1,swh:1:rev:0000000000000000000000000000000000000009, swh:1:rev:0000000000000000000000000000000000000018,1,2,swh:1:rev:0000000000000000000000000000000000000013, swh:1:rel:0000000000000000000000000000000000000019,1,0,swh:1:rev:0000000000000000000000000000000000000018, swh:1:rel:0000000000000000000000000000000000000021,1,1,swh:1:rev:0000000000000000000000000000000000000018, swh:1:snp:0000000000000000000000000000000000000022,3,1,swh:1:rev:0000000000000000000000000000000000000009,swh:1:rel:0000000000000000000000000000000000000010 swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,1,0,swh:1:snp:0000000000000000000000000000000000000022, """ def test_toposort(tmpdir): tmpdir = Path(tmpdir) topological_order_path = tmpdir / "topo_order.csv.zst" task = TopoSort( local_graph_path=DATA_DIR / "compressed", topological_order_path=topological_order_path, graph_name="example", ) task.run() csv_text = subprocess.check_output(["zstdcat", topological_order_path]).decode() (header, *rows) = csv_text.split("\n") (expected_header, *expected_lines) = EXPECTED.split("\n") assert header == expected_header # The only possible first line assert rows[0] == "swh:1:rev:0000000000000000000000000000000000000003,0,1,," assert set(rows) == set(expected_lines) assert rows.pop() == "", "Missing trailing newline" # The only three possible last lines assert rows[-1] in [ "swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,1,0" ",swh:1:snp:0000000000000000000000000000000000000020,", "swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,1,0" ",swh:1:snp:0000000000000000000000000000000000000022,", "swh:1:rel:0000000000000000000000000000000000000019,1,0" ",swh:1:rev:0000000000000000000000000000000000000018,", ]