diff --git a/swh/graph/luigi/origin_contributors.py b/swh/graph/luigi/origin_contributors.py index e73a5d2..a3f4489 100644 --- a/swh/graph/luigi/origin_contributors.py +++ b/swh/graph/luigi/origin_contributors.py @@ -1,198 +1,199 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ Luigi tasks for contribution graph ================================== This module contains `Luigi `_ tasks driving the creation of the graph of contributions of people (pseudonymized by default). """ # WARNING: do not import unnecessary things here to keep cli startup time under # control from pathlib import Path from typing import Dict, Iterable, List, Tuple, cast import luigi from .compressed_graph import LocalGraph from .misc_datasets import TopoSort from .utils import run_script class ListOriginContributors(luigi.Task): """Creates a file that contains all SWHIDs in topological order from a compressed graph.""" local_graph_path = luigi.PathParameter() topological_order_path = luigi.PathParameter() origin_contributors_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") def requires(self) -> List[luigi.Task]: """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph` and :class:`swh.graph.luigi.misc_datasets.TopoSort`.""" return [ LocalGraph(local_graph_path=self.local_graph_path), TopoSort( local_graph_path=self.local_graph_path, topological_order_path=self.topological_order_path, graph_name=self.graph_name, ), ] def output(self) -> luigi.Target: """.csv.zst file that contains the topological order.""" return luigi.LocalTarget(self.origin_contributors_path) def run(self) -> None: """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" class_name = "org.softwareheritage.graph.utils.ListOriginContributors" script = f""" zstdcat {self.topological_order_path} \ | java {class_name} '{self.local_graph_path}/{self.graph_name}' \ | pv --line-mode --wait \ | zstdmt -19 """ run_script(script, self.origin_contributors_path) class ExportDeanonymizationTable(luigi.Task): """Exports (from swh-storage) a .csv.zst file that contains the columns: ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. The first column is the anonymized full name found in :file:`graph.persons.csv.zst` in the compressed graph, and the latter two are the original name.""" storage_dsn = luigi.Parameter( default="service=swh", description="postgresql DSN of the swh-storage database to read from.", ) deanonymization_table_path = luigi.PathParameter() def output(self) -> luigi.Target: """.csv.zst file that contains the table.""" return luigi.LocalTarget(self.deanonymization_table_path) def run(self) -> None: """Runs a postgresql query to compute the table.""" run_script( f""" psql '{self.storage_dsn}' -c "\ COPY ( SELECT encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, \ encode(fullname, 'base64') as base64, \ encode(fullname, 'escape') as escaped \ FROM person \ ) TO STDOUT CSV HEADER \ " | zstdmt -19 """, # noqa self.deanonymization_table_path, ) class DeanonymizeOriginContributors(luigi.Task): """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, but with ``person_base64`` and ``person_escaped`` columns in addition to ``person_id``. This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names instead of names); which may not be true depending on how the swh-dataset export was configured. """ local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") origin_contributors_path = luigi.PathParameter() deanonymization_table_path = luigi.PathParameter() deanonymized_origin_contributors_path = luigi.PathParameter() def requires(self) -> List[luigi.Task]: """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, and :class:`ExportDeanonymizationTable`.""" return [ LocalGraph(local_graph_path=self.local_graph_path), ListOriginContributors( local_graph_path=self.local_graph_path, origin_contributors_path=self.origin_contributors_path, ), ExportDeanonymizationTable( deanonymization_table_path=self.deanonymization_table_path, ), ] def output(self) -> luigi.Target: """.csv.zst file similar to :meth:`ListOriginContributors.output`'s, but with ``person_base64`` and ``person_escaped`` columns in addition to ``person_id``""" return luigi.LocalTarget(self.deanonymized_origin_contributors_path) def run(self) -> None: """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset and the deanonymization table in memory, then uses them to map each row in the original (anonymized) contributors list to the deanonymized one.""" # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export # was configured to do so); this should add support for it. import base64 import csv import pyzstd # Load the deanonymization table, to map sha256(name) to base64(name) # and escape(name) sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {} with pyzstd.open(self.deanonymization_table_path, "rt") as fd: # TODO: remove that cast once we dropped Python 3.7 support csv_reader = csv.reader(cast(Iterable[str], fd)) header = next(csv_reader) assert header == ["sha256_base64", "base64", "escaped"], header for line in csv_reader: (base64_sha256_name, base64_name, escaped_name) = line sha256_name = base64.b64decode(base64_sha256_name) name = base64.b64decode(base64_name) sha256_to_names[sha256_name] = (name, escaped_name) # Combine with the list of sha256(name), to get the list of base64(name) # and escape(name) persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" with pyzstd.open(persons_path, "rb") as fd: person_id_to_names: List[Tuple[bytes, str]] = [ sha256_to_names.pop(base64.b64decode(line.strip()), (b"", "")) for line in fd ] tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") + tmp_output_path.parent.mkdir(parents=True, exist_ok=True) # Finally, write a new table of origin_contributors, by reading the anonymized # table line-by-line and deanonymizing each id # Open temporary output for writes as CSV with pyzstd.open(tmp_output_path, "wt") as output_fd: csv_writer = csv.writer(output_fd, lineterminator="\n") # write header csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped")) # Open input for reads as CSV with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: # TODO: remove that cast once we dropped Python 3.7 support csv_reader = csv.reader(cast(Iterable[str], input_fd)) header = next(csv_reader) assert header == ["origin_SWHID", "person_id"], header for (origin_swhid, person_id) in csv_reader: if person_id == "null": # FIXME: workaround for a bug in contribution graphs generated # before 2022-12-01. Those were only used in tests and never # published, so the conditional can be removed when this is # productionized continue (name, escaped_name) = person_id_to_names[int(person_id)] base64_name = base64.b64encode(name).decode("ascii") csv_writer.writerow((origin_swhid, base64_name, escaped_name)) tmp_output_path.replace(self.deanonymized_origin_contributors_path) diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py index 233ad14..1749775 100644 --- a/swh/graph/tests/test_origin_contributors.py +++ b/swh/graph/tests/test_origin_contributors.py @@ -1,186 +1,186 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from pathlib import Path import subprocess from swh.graph.luigi.origin_contributors import ( DeanonymizeOriginContributors, ExportDeanonymizationTable, ListOriginContributors, ) from swh.model.model import ( ObjectType, Person, Release, Revision, RevisionType, TimestampWithTimezone, ) from .test_toposort import EXPECTED as TOPOLOGICAL_ORDER DATA_DIR = Path(__file__).parents[0] / "dataset" # FIXME: do not hardcode ids here; they should be dynamically loaded # from the test graph ORIGIN_CONTRIBUTORS = """\ origin_SWHID,person_id swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,0 swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,2 swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,0 swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,1 swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,2 """ DEANONYMIZATION_TABLE = """\ sha256_base64,base64,escaped 8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe """ # noqa PERSONS = """\ aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U= UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs= 8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI= """ DEANONYMIZED_ORIGIN_CONTRIBUTORS = """\ origin_SWHID,person_base64,person_escaped swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe """ # noqa def test_list_origin_contributors(tmpdir): tmpdir = Path(tmpdir) topological_order_path = tmpdir / "topo_order.csv.zst" origin_contributors_path = tmpdir / "origin_contributors.csv.zst" subprocess.run( ["zstdmt", "-o", topological_order_path], input=TOPOLOGICAL_ORDER.encode(), check=True, ) task = ListOriginContributors( local_graph_path=DATA_DIR / "compressed", topological_order_path=topological_order_path, origin_contributors_path=origin_contributors_path, graph_name="example", ) task.run() csv_text = subprocess.check_output(["zstdcat", origin_contributors_path]).decode() assert csv_text == ORIGIN_CONTRIBUTORS def test_export_deanonymization_table(tmpdir, swh_storage_postgresql, swh_storage): tmpdir = Path(tmpdir) tstz = TimestampWithTimezone.from_datetime( datetime.datetime.now(tz=datetime.timezone.utc) ) swh_storage.release_add( [ Release( name=b"v1.0", message=b"first release", author=Person.from_fullname(b"John Doe "), target=b"\x00" * 20, target_type=ObjectType.REVISION, synthetic=True, ) ] ) swh_storage.revision_add( [ Revision( message=b"first commit", author=Person.from_fullname(b"Jane Doe "), committer=Person.from_fullname(b"Jane Doe "), date=tstz, committer_date=tstz, directory=b"\x00" * 20, type=RevisionType.GIT, synthetic=True, ) ] ) deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" task = ExportDeanonymizationTable( storage_dsn=swh_storage_postgresql.dsn, deanonymization_table_path=deanonymization_table_path, ) task.run() csv_text = subprocess.check_output(["zstdcat", deanonymization_table_path]).decode() (header, *rows) = csv_text.split("\n") (expected_header, *expected_rows) = DEANONYMIZATION_TABLE.split("\n") assert header == expected_header assert rows.pop() == "", "Missing trailing newline" expected_rows.pop() assert set(rows) == set(expected_rows) def test_deanonymize_origin_contributors(tmpdir): tmpdir = Path(tmpdir) persons_path = tmpdir / "example.persons.csv.zst" origin_contributors_path = tmpdir / "origin_contributors.csv.zst" deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" deanonymized_origin_contributors_path = ( - tmpdir / "origin_contributors.deanonymized.csv.zst" + tmpdir / "sensitive" / "origin_contributors.deanonymized.csv.zst" ) subprocess.run( ["zstdmt", "-o", origin_contributors_path], input=ORIGIN_CONTRIBUTORS.encode(), check=True, ) subprocess.run( ["zstdmt", "-o", persons_path], input=PERSONS.encode(), check=True, ) subprocess.run( ["zstdmt", "-o", deanonymization_table_path], input=DEANONYMIZATION_TABLE.encode(), check=True, ) task = DeanonymizeOriginContributors( local_graph_path=tmpdir, origin_contributors_path=origin_contributors_path, deanonymization_table_path=deanonymization_table_path, deanonymized_origin_contributors_path=deanonymized_origin_contributors_path, graph_name="example", ) task.run() csv_text = subprocess.check_output( ["zstdcat", deanonymized_origin_contributors_path] ).decode() assert csv_text == DEANONYMIZED_ORIGIN_CONTRIBUTORS