Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/luigi.py
Show First 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | |||||
when downloading. | when downloading. | ||||
The layout is otherwise the same as the file layout. | The layout is otherwise the same as the file layout. | ||||
""" | """ | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
from pathlib import Path | from pathlib import Path | ||||
from typing import Dict, List | from typing import Dict, List, Tuple | ||||
import luigi | import luigi | ||||
from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter | from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter | ||||
class CompressGraph(luigi.Task): | class CompressGraph(luigi.Task): | ||||
local_export_path = luigi.PathParameter(significant=False) | local_export_path = luigi.PathParameter(significant=False) | ||||
▲ Show 20 Lines • Show All 346 Lines • ▼ Show 20 Lines | class LocalGraph(luigi.Task): | ||||
def output(self) -> List[luigi.Target]: | def output(self) -> List[luigi.Target]: | ||||
"""Returns stamp and meta paths on the local filesystem.""" | """Returns stamp and meta paths on the local filesystem.""" | ||||
return [self._meta()] | return [self._meta()] | ||||
def _meta(self): | def _meta(self): | ||||
return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json") | return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json") | ||||
def _run_script(script: str, output_path: Path) -> None: | |||||
import os | |||||
import subprocess | |||||
from .config import check_config | |||||
conf: Dict = {} # TODO: configurable | |||||
conf = check_config(conf) | |||||
env = { | |||||
**os.environ.copy(), | |||||
"JAVA_TOOL_OPTIONS": conf["java_tool_options"], | |||||
"CLASSPATH": conf["classpath"], | |||||
} | |||||
tmp_output_path = Path(f"{output_path}.tmp") | |||||
subprocess.run( | |||||
["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True | |||||
) | |||||
anlambert: ```lang=python
with open(tmp_output_path, "wb") as tmp_output:
subprocess.run… | |||||
vlorentzAuthorUnsubmitted Done Inline ActionsYou're right, it works and has same throughput and CPU usage. vlorentz: You're right, it works and has same throughput and CPU usage. | |||||
# Atomically write the output file | |||||
tmp_output_path.replace(output_path) | |||||
class TopoSort(luigi.Task): | class TopoSort(luigi.Task): | ||||
"""Creates a file that contains all SWHIDs in topological order from a compressed | """Creates a file that contains all SWHIDs in topological order from a compressed | ||||
graph.""" | graph.""" | ||||
local_graph_path = luigi.PathParameter() | local_graph_path = luigi.PathParameter() | ||||
topological_order_path = luigi.PathParameter() | topological_order_path = luigi.PathParameter() | ||||
graph_name = luigi.Parameter(default="graph") | graph_name = luigi.Parameter(default="graph") | ||||
def requires(self) -> List[luigi.Task]: | def requires(self) -> List[luigi.Task]: | ||||
"""Returns an instance of :class:`LocalGraph`.""" | """Returns an instance of :class:`LocalGraph`.""" | ||||
return [LocalGraph(local_graph_path=self.local_graph_path)] | return [LocalGraph(local_graph_path=self.local_graph_path)] | ||||
def output(self) -> luigi.Target: | def output(self) -> luigi.Target: | ||||
""".csv.zst file that contains the topological order.""" | """.csv.zst file that contains the topological order.""" | ||||
return luigi.LocalTarget(self.topological_order_path) | return luigi.LocalTarget(self.topological_order_path) | ||||
def run(self) -> None: | def run(self) -> None: | ||||
"""Runs org.softwareheritage.graph.utils.TopoSort and compresses""" | """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" | ||||
import os | object_types = "rev,rel,snp,ori" | ||||
import subprocess | class_name = "org.softwareheritage.graph.utils.TopoSort" | ||||
script = f""" | |||||
java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \ | |||||
| pv --line-mode --wait \ | |||||
| zstdmt -19 | |||||
""" | |||||
_run_script(script, self.topological_order_path) | |||||
from .config import check_config | |||||
conf: Dict = {} # TODO: configurable | class ListOriginContributors(luigi.Task): | ||||
"""Creates a file that contains all SWHIDs in topological order from a compressed | |||||
graph.""" | |||||
conf = check_config(conf) | local_graph_path = luigi.PathParameter() | ||||
env = { | topological_order_path = luigi.PathParameter() | ||||
**os.environ.copy(), | origin_contributors_path = luigi.PathParameter() | ||||
"JAVA_TOOL_OPTIONS": conf["java_tool_options"], | graph_name = luigi.Parameter(default="graph") | ||||
"CLASSPATH": conf["classpath"], | |||||
} | def requires(self) -> List[luigi.Task]: | ||||
"""Returns an instance of :class:`LocalGraph` and :class:`TopoSort`.""" | |||||
return [ | |||||
LocalGraph(local_graph_path=self.local_graph_path), | |||||
TopoSort( | |||||
local_graph_path=self.local_graph_path, | |||||
topological_order_path=self.topological_order_path, | |||||
graph_name=self.graph_name, | |||||
), | |||||
] | |||||
tmp_path = Path(f"{self.topological_order_path}.tmp") | def output(self) -> luigi.Target: | ||||
""".csv.zst file that contains the topological order.""" | |||||
return luigi.LocalTarget(self.origin_contributors_path) | |||||
object_types = "rev,rel,snp,ori" | def run(self) -> None: | ||||
class_name = "org.softwareheritage.graph.utils.TopoSort" | """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" | ||||
class_name = "org.softwareheritage.graph.utils.ListOriginContributors" | |||||
script = f""" | script = f""" | ||||
java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \ | zstdcat {self.topological_order_path} \ | ||||
| java {class_name} '{self.local_graph_path}/{self.graph_name}' \ | |||||
| pv --line-mode --wait \ | | pv --line-mode --wait \ | ||||
| zstdmt -19 \ | | zstdmt -19 | ||||
> '{tmp_path}' | |||||
""" | """ | ||||
subprocess.run(["bash", "-c", script], env=env, check=True) | _run_script(script, self.origin_contributors_path) | ||||
# Atomically write the output file | |||||
tmp_path.replace(self.topological_order_path) | class ExportDeanonymizationTable(luigi.Task): | ||||
"""Exports (from swh-storage) a .csv.zst file that contains the columns: | |||||
``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. | |||||
The first column is the anonymized full name found in :file:`graph.persons.csv.zst` | |||||
in the compressed graph, and the latter two are the original name.""" | |||||
storage_dsn = luigi.Parameter( | |||||
default="service=swh", | |||||
description="postgresql DSN of the swh-storage database to read from.", | |||||
) | |||||
deanonymization_table_path = luigi.PathParameter() | |||||
def output(self) -> luigi.Target: | |||||
""".csv.zst file that contains the table.""" | |||||
return luigi.LocalTarget(self.deanonymization_table_path) | |||||
def run(self) -> None: | |||||
"""Runs a postgresql query to compute the table.""" | |||||
_run_script( | |||||
f""" | |||||
psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19 | |||||
""", # noqa | |||||
self.deanonymization_table_path, | |||||
) | |||||
anlambertUnsubmitted Done Inline ActionsMore readable by wrapping lines imho. _run_script( f""" psql '{self.storage_dsn}' -c "\ COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64,\ encode(fullname, 'base64') as base64,\ encode(fullname, 'escape') as escaped from person)\ TO STDOUT CSV HEADER\ " | zstdmt -19 """, self.deanonymization_table_path, ) anlambert: More readable by wrapping lines imho.
```lang=python
_run_script(
f"""
psql '{self. | |||||
class DeanonymizeOriginContributors(luigi.Task): | |||||
"""Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, | |||||
but with ``person_base64`` and ``person_escaped`` columns in addition to | |||||
``person_id``. | |||||
This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names | |||||
instead of names); which may not be true depending on how the swh-dataset export | |||||
cas configured. | |||||
anlambertUnsubmitted Done Inline Actionss/cas/was/ anlambert: s/cas/was/ | |||||
""" | |||||
local_graph_path = luigi.PathParameter() | |||||
graph_name = luigi.Parameter(default="graph") | |||||
origin_contributors_path = luigi.PathParameter() | |||||
deanonymization_table_path = luigi.PathParameter() | |||||
deanonymized_origin_contributors_path = luigi.PathParameter() | |||||
def requires(self) -> List[luigi.Task]: | |||||
"""Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, | |||||
and :class:`ExportDeanonymizationTable`.""" | |||||
return [ | |||||
LocalGraph(local_graph_path=self.local_graph_path), | |||||
ListOriginContributors( | |||||
local_graph_path=self.local_graph_path, | |||||
origin_contributors_path=self.origin_contributors_path, | |||||
), | |||||
ExportDeanonymizationTable( | |||||
deanonymization_table_path=self.deanonymization_table_path, | |||||
), | |||||
] | |||||
def output(self) -> luigi.Target: | |||||
""".csv.zst file similar to :meth:`ListOriginContributors.output`'s, | |||||
but with ``person_base64`` and ``person_escaped`` columns in addition to | |||||
``person_id``""" | |||||
return luigi.LocalTarget(self.deanonymized_origin_contributors_path) | |||||
def run(self) -> None: | |||||
"""Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset | |||||
and the deanonymization table in memory, then uses them to map each row | |||||
in the original (anonymized) contributors list to the deanonymized one.""" | |||||
# TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export | |||||
# was configured to do so); this should add support for it. | |||||
import base64 | |||||
import csv | |||||
import pyzstd | |||||
# Load the deanonymization table, to map sha256(name) to base64(name) | |||||
# and escape(name) | |||||
sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {} | |||||
with pyzstd.open(self.deanonymization_table_path, "rt") as fd: | |||||
csv_reader = csv.reader(fd) | |||||
header = next(csv_reader) | |||||
assert header == ["sha256_base64", "base64", "escaped"], header | |||||
for line in csv_reader: | |||||
(base64_sha256_name, base64_name, escaped_name) = line | |||||
sha256_name = base64.b64decode(base64_sha256_name) | |||||
name = base64.b64decode(base64_name) | |||||
sha256_to_names[sha256_name] = (name, escaped_name) | |||||
# Combine with the list of sha256(name), to get the list of base64(name) | |||||
# and escape(name) | |||||
persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" | |||||
with pyzstd.open(persons_path, "rb") as fd: | |||||
person_id_to_names: List[Tuple[bytes, str]] = [ | |||||
sha256_to_names.pop(base64.b64decode(line.strip()), (b"", "")) | |||||
for line in fd | |||||
] | |||||
tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") | |||||
# Finally, write a new table of origin_contributors, by reading the anonymized | |||||
# table line-by-line and deanonymizing each id | |||||
# Open temporary output for writes as CSV | |||||
with pyzstd.open(tmp_output_path, "wt") as output_fd: | |||||
csv_writer = csv.writer(output_fd, lineterminator="\n") | |||||
# write header | |||||
csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped")) | |||||
# Open input for reads as CSV | |||||
with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: | |||||
csv_reader = csv.reader(input_fd) | |||||
header = next(csv_reader) | |||||
assert header == ["origin_SWHID", "person_id"], header | |||||
for (origin_swhid, person_id) in csv_reader: | |||||
(name, escaped_name) = person_id_to_names[int(person_id)] | |||||
base64_name = base64.b64encode(name).decode("ascii") | |||||
csv_writer.writerow((origin_swhid, base64_name, escaped_name)) | |||||
tmp_output_path.replace(self.deanonymized_origin_contributors_path) |
I find it more readable this way but maybe there is a reason you cannot use it.