Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/luigi/compressed_graph.py
- This file was moved from swh/graph/luigi.py.
# Copyright (C) 2022 The Software Heritage developers | # Copyright (C) 2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
""" | """ | ||||
Luigi tasks | Luigi tasks for compression | ||||
=========== | =========================== | ||||
This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks, | This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks, | ||||
as an alternative to the CLI that can be composed with other tasks, | as an alternative to the CLI that can be composed with other tasks, | ||||
such as swh-dataset's. | such as swh-dataset's. | ||||
Unlike the CLI, this requires the graph to be named `graph`. | Unlike the CLI, this requires the graph to be named `graph`. | ||||
File layout | File layout | ||||
----------- | ----------- | ||||
In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`, | In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`, | ||||
:file:`graph.mph`, ...), tasks in this module produce this directory structure:: | :file:`graph.mph`, ...), tasks in this module produce this directory structure:: | ||||
swh_<date>[_<flavor>]/ | base_dir/ | ||||
<date>[_<flavor>]/ | |||||
compressed/ | |||||
graph.graph | graph.graph | ||||
graph.mph | graph.mph | ||||
... | ... | ||||
meta/ | meta/ | ||||
export.json | export.json | ||||
compression.json | compression.json | ||||
``graph.meta/export.json`` is copied from the ORC dataset exported by | ``graph.meta/export.json`` is copied from the ORC dataset exported by | ||||
:mod:`swh.dataset.luigi`. | :mod:`swh.dataset.luigi`. | ||||
``graph.meta/compression.json`` contains information about the compression itself, | ``graph.meta/compression.json`` contains information about the compression itself, | ||||
for provenance tracking. | for provenance tracking. | ||||
For example: | For example: | ||||
Show All 31 Lines | |||||
Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed | Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed | ||||
when downloading. | when downloading. | ||||
The layout is otherwise the same as the file layout. | The layout is otherwise the same as the file layout. | ||||
""" | """ | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
from pathlib import Path | from typing import List | ||||
from typing import Dict, List, Tuple | |||||
import luigi | import luigi | ||||
from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter | from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter | ||||
class CompressGraph(luigi.Task): | class CompressGraph(luigi.Task): | ||||
local_export_path = luigi.PathParameter(significant=False) | local_export_path = luigi.PathParameter(significant=False) | ||||
▲ Show 20 Lines • Show All 344 Lines • ▼ Show 20 Lines | def requires(self) -> List[luigi.Task]: | ||||
) | ) | ||||
def output(self) -> List[luigi.Target]: | def output(self) -> List[luigi.Target]: | ||||
"""Returns stamp and meta paths on the local filesystem.""" | """Returns stamp and meta paths on the local filesystem.""" | ||||
return [self._meta()] | return [self._meta()] | ||||
def _meta(self): | def _meta(self): | ||||
return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json") | return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json") | ||||
def _run_script(script: str, output_path: Path) -> None: | |||||
import os | |||||
import subprocess | |||||
from .config import check_config | |||||
conf: Dict = {} # TODO: configurable | |||||
conf = check_config(conf) | |||||
env = { | |||||
**os.environ.copy(), | |||||
"JAVA_TOOL_OPTIONS": conf["java_tool_options"], | |||||
"CLASSPATH": conf["classpath"], | |||||
} | |||||
tmp_output_path = Path(f"{output_path}.tmp") | |||||
subprocess.run( | |||||
["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True | |||||
) | |||||
# Atomically write the output file | |||||
tmp_output_path.replace(output_path) | |||||
class TopoSort(luigi.Task): | |||||
"""Creates a file that contains all SWHIDs in topological order from a compressed | |||||
graph.""" | |||||
local_graph_path = luigi.PathParameter() | |||||
topological_order_path = luigi.PathParameter() | |||||
graph_name = luigi.Parameter(default="graph") | |||||
def requires(self) -> List[luigi.Task]: | |||||
"""Returns an instance of :class:`LocalGraph`.""" | |||||
return [LocalGraph(local_graph_path=self.local_graph_path)] | |||||
def output(self) -> luigi.Target: | |||||
""".csv.zst file that contains the topological order.""" | |||||
return luigi.LocalTarget(self.topological_order_path) | |||||
def run(self) -> None: | |||||
"""Runs org.softwareheritage.graph.utils.TopoSort and compresses""" | |||||
object_types = "rev,rel,snp,ori" | |||||
class_name = "org.softwareheritage.graph.utils.TopoSort" | |||||
script = f""" | |||||
java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \ | |||||
| pv --line-mode --wait \ | |||||
| zstdmt -19 | |||||
""" | |||||
_run_script(script, self.topological_order_path) | |||||
class ListOriginContributors(luigi.Task): | |||||
"""Creates a file that contains all SWHIDs in topological order from a compressed | |||||
graph.""" | |||||
local_graph_path = luigi.PathParameter() | |||||
topological_order_path = luigi.PathParameter() | |||||
origin_contributors_path = luigi.PathParameter() | |||||
graph_name = luigi.Parameter(default="graph") | |||||
def requires(self) -> List[luigi.Task]: | |||||
"""Returns an instance of :class:`LocalGraph` and :class:`TopoSort`.""" | |||||
return [ | |||||
LocalGraph(local_graph_path=self.local_graph_path), | |||||
TopoSort( | |||||
local_graph_path=self.local_graph_path, | |||||
topological_order_path=self.topological_order_path, | |||||
graph_name=self.graph_name, | |||||
), | |||||
] | |||||
def output(self) -> luigi.Target: | |||||
""".csv.zst file that contains the topological order.""" | |||||
return luigi.LocalTarget(self.origin_contributors_path) | |||||
def run(self) -> None: | |||||
"""Runs org.softwareheritage.graph.utils.TopoSort and compresses""" | |||||
class_name = "org.softwareheritage.graph.utils.ListOriginContributors" | |||||
script = f""" | |||||
zstdcat {self.topological_order_path} \ | |||||
| java {class_name} '{self.local_graph_path}/{self.graph_name}' \ | |||||
| pv --line-mode --wait \ | |||||
| zstdmt -19 | |||||
""" | |||||
_run_script(script, self.origin_contributors_path) | |||||
class ExportDeanonymizationTable(luigi.Task): | |||||
"""Exports (from swh-storage) a .csv.zst file that contains the columns: | |||||
``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. | |||||
The first column is the anonymized full name found in :file:`graph.persons.csv.zst` | |||||
in the compressed graph, and the latter two are the original name.""" | |||||
storage_dsn = luigi.Parameter( | |||||
default="service=swh", | |||||
description="postgresql DSN of the swh-storage database to read from.", | |||||
) | |||||
deanonymization_table_path = luigi.PathParameter() | |||||
def output(self) -> luigi.Target: | |||||
""".csv.zst file that contains the table.""" | |||||
return luigi.LocalTarget(self.deanonymization_table_path) | |||||
def run(self) -> None: | |||||
"""Runs a postgresql query to compute the table.""" | |||||
_run_script( | |||||
f""" | |||||
psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19 | |||||
""", # noqa | |||||
self.deanonymization_table_path, | |||||
) | |||||
class DeanonymizeOriginContributors(luigi.Task): | |||||
"""Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, | |||||
but with ``person_base64`` and ``person_escaped`` columns in addition to | |||||
``person_id``. | |||||
This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names | |||||
instead of names); which may not be true depending on how the swh-dataset export | |||||
cas configured. | |||||
""" | |||||
local_graph_path = luigi.PathParameter() | |||||
graph_name = luigi.Parameter(default="graph") | |||||
origin_contributors_path = luigi.PathParameter() | |||||
deanonymization_table_path = luigi.PathParameter() | |||||
deanonymized_origin_contributors_path = luigi.PathParameter() | |||||
def requires(self) -> List[luigi.Task]: | |||||
"""Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, | |||||
and :class:`ExportDeanonymizationTable`.""" | |||||
return [ | |||||
LocalGraph(local_graph_path=self.local_graph_path), | |||||
ListOriginContributors( | |||||
local_graph_path=self.local_graph_path, | |||||
origin_contributors_path=self.origin_contributors_path, | |||||
), | |||||
ExportDeanonymizationTable( | |||||
deanonymization_table_path=self.deanonymization_table_path, | |||||
), | |||||
] | |||||
def output(self) -> luigi.Target: | |||||
""".csv.zst file similar to :meth:`ListOriginContributors.output`'s, | |||||
but with ``person_base64`` and ``person_escaped`` columns in addition to | |||||
``person_id``""" | |||||
return luigi.LocalTarget(self.deanonymized_origin_contributors_path) | |||||
def run(self) -> None: | |||||
"""Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset | |||||
and the deanonymization table in memory, then uses them to map each row | |||||
in the original (anonymized) contributors list to the deanonymized one.""" | |||||
# TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export | |||||
# was configured to do so); this should add support for it. | |||||
import base64 | |||||
import csv | |||||
import pyzstd | |||||
# Load the deanonymization table, to map sha256(name) to base64(name) | |||||
# and escape(name) | |||||
sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {} | |||||
with pyzstd.open(self.deanonymization_table_path, "rt") as fd: | |||||
csv_reader = csv.reader(fd) | |||||
header = next(csv_reader) | |||||
assert header == ["sha256_base64", "base64", "escaped"], header | |||||
for line in csv_reader: | |||||
(base64_sha256_name, base64_name, escaped_name) = line | |||||
sha256_name = base64.b64decode(base64_sha256_name) | |||||
name = base64.b64decode(base64_name) | |||||
sha256_to_names[sha256_name] = (name, escaped_name) | |||||
# Combine with the list of sha256(name), to get the list of base64(name) | |||||
# and escape(name) | |||||
persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" | |||||
with pyzstd.open(persons_path, "rb") as fd: | |||||
person_id_to_names: List[Tuple[bytes, str]] = [ | |||||
sha256_to_names.pop(base64.b64decode(line.strip()), (b"", "")) | |||||
for line in fd | |||||
] | |||||
tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") | |||||
# Finally, write a new table of origin_contributors, by reading the anonymized | |||||
# table line-by-line and deanonymizing each id | |||||
# Open temporary output for writes as CSV | |||||
with pyzstd.open(tmp_output_path, "wt") as output_fd: | |||||
csv_writer = csv.writer(output_fd, lineterminator="\n") | |||||
# write header | |||||
csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped")) | |||||
# Open input for reads as CSV | |||||
with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: | |||||
csv_reader = csv.reader(input_fd) | |||||
header = next(csv_reader) | |||||
assert header == ["origin_SWHID", "person_id"], header | |||||
for (origin_swhid, person_id) in csv_reader: | |||||
if person_id == "null": | |||||
# FIXME: workaround for a bug in contribution graphs generated | |||||
# before 2022-12-01. Those were only used in tests and never | |||||
# published, so the conditional can be removed when this is | |||||
# productionized | |||||
continue | |||||
(name, escaped_name) = person_id_to_names[int(person_id)] | |||||
base64_name = base64.b64encode(name).decode("ascii") | |||||
csv_writer.writerow((origin_swhid, base64_name, escaped_name)) | |||||
tmp_output_path.replace(self.deanonymized_origin_contributors_path) |