diff --git a/swh/graph/luigi/origin_contributors.py b/swh/graph/luigi/origin_contributors.py
index e73a5d2..a3f4489 100644
--- a/swh/graph/luigi/origin_contributors.py
+++ b/swh/graph/luigi/origin_contributors.py
@@ -1,198 +1,199 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Luigi tasks for contribution graph
==================================
This module contains `Luigi `_ tasks
driving the creation of the graph of contributions of people (pseudonymized
by default).
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from pathlib import Path
from typing import Dict, Iterable, List, Tuple, cast
import luigi
from .compressed_graph import LocalGraph
from .misc_datasets import TopoSort
from .utils import run_script
class ListOriginContributors(luigi.Task):
"""Creates a file that contains all SWHIDs in topological order from a compressed
graph."""
local_graph_path = luigi.PathParameter()
topological_order_path = luigi.PathParameter()
origin_contributors_path = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
def requires(self) -> List[luigi.Task]:
"""Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph`
and :class:`swh.graph.luigi.misc_datasets.TopoSort`."""
return [
LocalGraph(local_graph_path=self.local_graph_path),
TopoSort(
local_graph_path=self.local_graph_path,
topological_order_path=self.topological_order_path,
graph_name=self.graph_name,
),
]
def output(self) -> luigi.Target:
""".csv.zst file that contains the topological order."""
return luigi.LocalTarget(self.origin_contributors_path)
def run(self) -> None:
"""Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
class_name = "org.softwareheritage.graph.utils.ListOriginContributors"
script = f"""
zstdcat {self.topological_order_path} \
| java {class_name} '{self.local_graph_path}/{self.graph_name}' \
| pv --line-mode --wait \
| zstdmt -19
"""
run_script(script, self.origin_contributors_path)
class ExportDeanonymizationTable(luigi.Task):
"""Exports (from swh-storage) a .csv.zst file that contains the columns:
``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``.
The first column is the anonymized full name found in :file:`graph.persons.csv.zst`
in the compressed graph, and the latter two are the original name."""
storage_dsn = luigi.Parameter(
default="service=swh",
description="postgresql DSN of the swh-storage database to read from.",
)
deanonymization_table_path = luigi.PathParameter()
def output(self) -> luigi.Target:
""".csv.zst file that contains the table."""
return luigi.LocalTarget(self.deanonymization_table_path)
def run(self) -> None:
"""Runs a postgresql query to compute the table."""
run_script(
f"""
psql '{self.storage_dsn}' -c "\
COPY (
SELECT
encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, \
encode(fullname, 'base64') as base64, \
encode(fullname, 'escape') as escaped \
FROM person \
) TO STDOUT CSV HEADER \
" | zstdmt -19
""", # noqa
self.deanonymization_table_path,
)
class DeanonymizeOriginContributors(luigi.Task):
"""Generates a .csv.zst file similar to :class:`ListOriginContributors`'s,
but with ``person_base64`` and ``person_escaped`` columns in addition to
``person_id``.
This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names
instead of names); which may not be true depending on how the swh-dataset export
was configured.
"""
local_graph_path = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
origin_contributors_path = luigi.PathParameter()
deanonymization_table_path = luigi.PathParameter()
deanonymized_origin_contributors_path = luigi.PathParameter()
def requires(self) -> List[luigi.Task]:
"""Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`,
and :class:`ExportDeanonymizationTable`."""
return [
LocalGraph(local_graph_path=self.local_graph_path),
ListOriginContributors(
local_graph_path=self.local_graph_path,
origin_contributors_path=self.origin_contributors_path,
),
ExportDeanonymizationTable(
deanonymization_table_path=self.deanonymization_table_path,
),
]
def output(self) -> luigi.Target:
""".csv.zst file similar to :meth:`ListOriginContributors.output`'s,
but with ``person_base64`` and ``person_escaped`` columns in addition to
``person_id``"""
return luigi.LocalTarget(self.deanonymized_origin_contributors_path)
def run(self) -> None:
"""Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset
and the deanonymization table in memory, then uses them to map each row
in the original (anonymized) contributors list to the deanonymized one."""
# TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export
# was configured to do so); this should add support for it.
import base64
import csv
import pyzstd
# Load the deanonymization table, to map sha256(name) to base64(name)
# and escape(name)
sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {}
with pyzstd.open(self.deanonymization_table_path, "rt") as fd:
# TODO: remove that cast once we dropped Python 3.7 support
csv_reader = csv.reader(cast(Iterable[str], fd))
header = next(csv_reader)
assert header == ["sha256_base64", "base64", "escaped"], header
for line in csv_reader:
(base64_sha256_name, base64_name, escaped_name) = line
sha256_name = base64.b64decode(base64_sha256_name)
name = base64.b64decode(base64_name)
sha256_to_names[sha256_name] = (name, escaped_name)
# Combine with the list of sha256(name), to get the list of base64(name)
# and escape(name)
persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst"
with pyzstd.open(persons_path, "rb") as fd:
person_id_to_names: List[Tuple[bytes, str]] = [
sha256_to_names.pop(base64.b64decode(line.strip()), (b"", ""))
for line in fd
]
tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp")
+ tmp_output_path.parent.mkdir(parents=True, exist_ok=True)
# Finally, write a new table of origin_contributors, by reading the anonymized
# table line-by-line and deanonymizing each id
# Open temporary output for writes as CSV
with pyzstd.open(tmp_output_path, "wt") as output_fd:
csv_writer = csv.writer(output_fd, lineterminator="\n")
# write header
csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped"))
# Open input for reads as CSV
with pyzstd.open(self.origin_contributors_path, "rt") as input_fd:
# TODO: remove that cast once we dropped Python 3.7 support
csv_reader = csv.reader(cast(Iterable[str], input_fd))
header = next(csv_reader)
assert header == ["origin_SWHID", "person_id"], header
for (origin_swhid, person_id) in csv_reader:
if person_id == "null":
# FIXME: workaround for a bug in contribution graphs generated
# before 2022-12-01. Those were only used in tests and never
# published, so the conditional can be removed when this is
# productionized
continue
(name, escaped_name) = person_id_to_names[int(person_id)]
base64_name = base64.b64encode(name).decode("ascii")
csv_writer.writerow((origin_swhid, base64_name, escaped_name))
tmp_output_path.replace(self.deanonymized_origin_contributors_path)
diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py
index 233ad14..1749775 100644
--- a/swh/graph/tests/test_origin_contributors.py
+++ b/swh/graph/tests/test_origin_contributors.py
@@ -1,186 +1,186 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from pathlib import Path
import subprocess
from swh.graph.luigi.origin_contributors import (
DeanonymizeOriginContributors,
ExportDeanonymizationTable,
ListOriginContributors,
)
from swh.model.model import (
ObjectType,
Person,
Release,
Revision,
RevisionType,
TimestampWithTimezone,
)
from .test_toposort import EXPECTED as TOPOLOGICAL_ORDER
DATA_DIR = Path(__file__).parents[0] / "dataset"
# FIXME: do not hardcode ids here; they should be dynamically loaded
# from the test graph
ORIGIN_CONTRIBUTORS = """\
origin_SWHID,person_id
swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,0
swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,2
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,0
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,1
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,2
"""
DEANONYMIZATION_TABLE = """\
sha256_base64,base64,escaped
8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe
aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe
UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe
""" # noqa
PERSONS = """\
aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=
UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=
8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=
"""
DEANONYMIZED_ORIGIN_CONTRIBUTORS = """\
origin_SWHID,person_base64,person_escaped
swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe
swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe
swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe
""" # noqa
def test_list_origin_contributors(tmpdir):
tmpdir = Path(tmpdir)
topological_order_path = tmpdir / "topo_order.csv.zst"
origin_contributors_path = tmpdir / "origin_contributors.csv.zst"
subprocess.run(
["zstdmt", "-o", topological_order_path],
input=TOPOLOGICAL_ORDER.encode(),
check=True,
)
task = ListOriginContributors(
local_graph_path=DATA_DIR / "compressed",
topological_order_path=topological_order_path,
origin_contributors_path=origin_contributors_path,
graph_name="example",
)
task.run()
csv_text = subprocess.check_output(["zstdcat", origin_contributors_path]).decode()
assert csv_text == ORIGIN_CONTRIBUTORS
def test_export_deanonymization_table(tmpdir, swh_storage_postgresql, swh_storage):
tmpdir = Path(tmpdir)
tstz = TimestampWithTimezone.from_datetime(
datetime.datetime.now(tz=datetime.timezone.utc)
)
swh_storage.release_add(
[
Release(
name=b"v1.0",
message=b"first release",
author=Person.from_fullname(b"John Doe "),
target=b"\x00" * 20,
target_type=ObjectType.REVISION,
synthetic=True,
)
]
)
swh_storage.revision_add(
[
Revision(
message=b"first commit",
author=Person.from_fullname(b"Jane Doe "),
committer=Person.from_fullname(b"Jane Doe "),
date=tstz,
committer_date=tstz,
directory=b"\x00" * 20,
type=RevisionType.GIT,
synthetic=True,
)
]
)
deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst"
task = ExportDeanonymizationTable(
storage_dsn=swh_storage_postgresql.dsn,
deanonymization_table_path=deanonymization_table_path,
)
task.run()
csv_text = subprocess.check_output(["zstdcat", deanonymization_table_path]).decode()
(header, *rows) = csv_text.split("\n")
(expected_header, *expected_rows) = DEANONYMIZATION_TABLE.split("\n")
assert header == expected_header
assert rows.pop() == "", "Missing trailing newline"
expected_rows.pop()
assert set(rows) == set(expected_rows)
def test_deanonymize_origin_contributors(tmpdir):
tmpdir = Path(tmpdir)
persons_path = tmpdir / "example.persons.csv.zst"
origin_contributors_path = tmpdir / "origin_contributors.csv.zst"
deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst"
deanonymized_origin_contributors_path = (
- tmpdir / "origin_contributors.deanonymized.csv.zst"
+ tmpdir / "sensitive" / "origin_contributors.deanonymized.csv.zst"
)
subprocess.run(
["zstdmt", "-o", origin_contributors_path],
input=ORIGIN_CONTRIBUTORS.encode(),
check=True,
)
subprocess.run(
["zstdmt", "-o", persons_path],
input=PERSONS.encode(),
check=True,
)
subprocess.run(
["zstdmt", "-o", deanonymization_table_path],
input=DEANONYMIZATION_TABLE.encode(),
check=True,
)
task = DeanonymizeOriginContributors(
local_graph_path=tmpdir,
origin_contributors_path=origin_contributors_path,
deanonymization_table_path=deanonymization_table_path,
deanonymized_origin_contributors_path=deanonymized_origin_contributors_path,
graph_name="example",
)
task.run()
csv_text = subprocess.check_output(
["zstdcat", deanonymized_origin_contributors_path]
).decode()
assert csv_text == DEANONYMIZED_ORIGIN_CONTRIBUTORS