diff --git a/java/src/main/java/org/softwareheritage/graph/SwhGraph.java b/java/src/main/java/org/softwareheritage/graph/SwhGraph.java index aee50cd..d95bc73 100644 --- a/java/src/main/java/org/softwareheritage/graph/SwhGraph.java +++ b/java/src/main/java/org/softwareheritage/graph/SwhGraph.java @@ -1,151 +1,156 @@ /* * Copyright (c) 2021-2022 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU General Public License version 3, or any later version * See top-level LICENSE file for more information */ package org.softwareheritage.graph; import java.io.IOException; /** * Common interface for SWH graph classes. * * This interface forwards all property loading/access methods to the SwhGraphProperties object * returned by the getProperties() method of the implementing class. This allows API users to write * graph.getNodeType() instead of graph.getProperties().getNodeType(). */ public interface SwhGraph { /** * Cleans up graph resources after use. */ void close() throws IOException; /** * Returns the SWH graph properties object of this graph. * * @return graph properties */ SwhGraphProperties getProperties(); /** @see SwhGraphProperties#getPath() */ default String getPath() { return getProperties().getPath(); } /** @see SwhGraphProperties#getNodeId(SWHID) */ default long getNodeId(SWHID swhid) { return getProperties().getNodeId(swhid); } /** @see SwhGraphProperties#getSWHID(long) */ default SWHID getSWHID(long nodeId) { return getProperties().getSWHID(nodeId); } /** @see SwhGraphProperties#getNodeType(long) */ default SwhType getNodeType(long nodeId) { return getProperties().getNodeType(nodeId); } /** @see SwhGraphProperties#loadContentLength() */ default void loadContentLength() throws IOException { getProperties().loadContentLength(); } /** @see SwhGraphProperties#getContentLength(long) */ default Long getContentLength(long nodeId) { return getProperties().getContentLength(nodeId); } /** @see SwhGraphProperties#loadPersonIds() */ default void loadPersonIds() throws IOException { getProperties().loadPersonIds(); } /** @see SwhGraphProperties#getAuthorId(long) */ default Long getAuthorId(long nodeId) { return getProperties().getAuthorId(nodeId); } /** @see SwhGraphProperties#getCommitterId(long) */ default Long getCommitterId(long nodeId) { return getProperties().getCommitterId(nodeId); } /** @see SwhGraphProperties#loadContentIsSkipped() */ default void loadContentIsSkipped() throws IOException { getProperties().loadContentIsSkipped(); } /** @see SwhGraphProperties#isContentSkipped(long) */ default boolean isContentSkipped(long nodeId) { return getProperties().isContentSkipped(nodeId); } /** @see SwhGraphProperties#loadAuthorTimestamps() */ default void loadAuthorTimestamps() throws IOException { getProperties().loadAuthorTimestamps(); } /** @see SwhGraphProperties#getAuthorTimestamp(long) */ default Long getAuthorTimestamp(long nodeId) { return getProperties().getAuthorTimestamp(nodeId); } /** @see SwhGraphProperties#getAuthorTimestampOffset(long) */ default Short getAuthorTimestampOffset(long nodeId) { return getProperties().getAuthorTimestampOffset(nodeId); } /** @see SwhGraphProperties#loadCommitterTimestamps() */ default void loadCommitterTimestamps() throws IOException { getProperties().loadCommitterTimestamps(); } /** @see SwhGraphProperties#getCommitterTimestamp(long) */ default Long getCommitterTimestamp(long nodeId) { return getProperties().getCommitterTimestamp(nodeId); } /** @see SwhGraphProperties#getCommitterTimestampOffset(long) */ default Short getCommitterTimestampOffset(long nodeId) { return getProperties().getCommitterTimestampOffset(nodeId); } /** @see SwhGraphProperties#loadMessages() */ default void loadMessages() throws IOException { getProperties().loadMessages(); } /** @see SwhGraphProperties#getMessage(long) */ default byte[] getMessage(long nodeId) { return getProperties().getMessage(nodeId); } + /** @see SwhGraphProperties#getMessageBase64(long) */ + default byte[] getMessageBase64(long nodeId) { + return getProperties().getMessageBase64(nodeId); + } + /** @see SwhGraphProperties#getUrl(long) */ default String getUrl(long nodeId) { return getProperties().getUrl(nodeId); } /** @see SwhGraphProperties#loadTagNames() */ default void loadTagNames() throws IOException { getProperties().loadTagNames(); } /** @see SwhGraphProperties#getTagName(long) */ default byte[] getTagName(long nodeId) { return getProperties().getTagName(nodeId); } /** @see SwhGraphProperties#loadLabelNames() */ default void loadLabelNames() throws IOException { getProperties().loadLabelNames(); } /** @see SwhGraphProperties#getLabelName(long) */ default byte[] getLabelName(long labelId) { return getProperties().getLabelName(labelId); } } diff --git a/java/src/main/java/org/softwareheritage/graph/SwhGraphProperties.java b/java/src/main/java/org/softwareheritage/graph/SwhGraphProperties.java index 3372947..064a7df 100644 --- a/java/src/main/java/org/softwareheritage/graph/SwhGraphProperties.java +++ b/java/src/main/java/org/softwareheritage/graph/SwhGraphProperties.java @@ -1,330 +1,335 @@ /* * Copyright (c) 2021-2022 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU General Public License version 3, or any later version * See top-level LICENSE file for more information */ package org.softwareheritage.graph; import it.unimi.dsi.big.util.MappedFrontCodedStringBigList; import it.unimi.dsi.bits.LongArrayBitVector; import it.unimi.dsi.fastutil.bytes.ByteBigList; import it.unimi.dsi.fastutil.bytes.ByteMappedBigList; import it.unimi.dsi.fastutil.ints.IntBigList; import it.unimi.dsi.fastutil.ints.IntMappedBigList; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigList; import it.unimi.dsi.fastutil.longs.LongMappedBigList; import it.unimi.dsi.fastutil.shorts.ShortBigList; import it.unimi.dsi.fastutil.shorts.ShortMappedBigList; import it.unimi.dsi.sux4j.util.EliasFanoLongBigList; import org.apache.commons.configuration2.ex.ConfigurationException; import org.softwareheritage.graph.maps.NodeIdMap; import org.softwareheritage.graph.maps.NodeTypesMap; import java.io.IOException; import java.io.RandomAccessFile; import java.util.Base64; /** * This objects contains SWH graph properties such as node labels. * * Some property mappings are necessary because Software Heritage uses string based persistent * identifiers (SWHID) while WebGraph uses integers internally. * * The two node ID mappings (long id ↔ SWHID) are used for the input (users refer to the graph * using SWHID) and the output (convert back to SWHID for users results). * * Since graph traversal can be restricted depending on the node type (see {@link AllowedEdges}), a * long id → node type map is stored as well to avoid a full SWHID lookup. * * @see NodeIdMap * @see NodeTypesMap */ public class SwhGraphProperties { private final String path; private final NodeIdMap nodeIdMap; private final NodeTypesMap nodeTypesMap; private LongBigList authorTimestamp; private ShortBigList authorTimestampOffset; private LongBigList committerTimestamp; private ShortBigList committerTimestampOffset; private LongBigList contentLength; private LongArrayBitVector contentIsSkipped; private IntBigList authorId; private IntBigList committerId; private ByteBigList messageBuffer; private LongBigList messageOffsets; private ByteBigList tagNameBuffer; private LongBigList tagNameOffsets; private MappedFrontCodedStringBigList edgeLabelNames; protected SwhGraphProperties(String path, NodeIdMap nodeIdMap, NodeTypesMap nodeTypesMap) { this.path = path; this.nodeIdMap = nodeIdMap; this.nodeTypesMap = nodeTypesMap; } public static SwhGraphProperties load(String path) throws IOException { return new SwhGraphProperties(path, new NodeIdMap(path), new NodeTypesMap(path)); } /** * Cleans up resources after use. */ public void close() throws IOException { edgeLabelNames.close(); } /** Return the basename of the compressed graph */ public String getPath() { return path; } /** * Converts {@link SWHID} node to long. * * @param swhid node specified as a {@link SWHID} * @return internal long node id * @see SWHID */ public long getNodeId(SWHID swhid) { return nodeIdMap.getNodeId(swhid); } /** * Converts long id node to {@link SWHID}. * * @param nodeId node specified as a long id * @return external SWHID * @see SWHID */ public SWHID getSWHID(long nodeId) { return nodeIdMap.getSWHID(nodeId); } /** * Returns node type. * * @param nodeId node specified as a long id * @return corresponding node type * @see SwhType */ public SwhType getNodeType(long nodeId) { return nodeTypesMap.getType(nodeId); } private static LongBigList loadMappedLongs(String path) throws IOException { try (RandomAccessFile raf = new RandomAccessFile(path, "r")) { return LongMappedBigList.map(raf.getChannel()); } } private static IntBigList loadMappedInts(String path) throws IOException { try (RandomAccessFile raf = new RandomAccessFile(path, "r")) { return IntMappedBigList.map(raf.getChannel()); } } private static ShortBigList loadMappedShorts(String path) throws IOException { try (RandomAccessFile raf = new RandomAccessFile(path, "r")) { return ShortMappedBigList.map(raf.getChannel()); } } private static ByteBigList loadMappedBytes(String path) throws IOException { try (RandomAccessFile raf = new RandomAccessFile(path, "r")) { return ByteMappedBigList.map(raf.getChannel()); } } private static LongBigList loadEFLongs(String path) throws IOException { try { return (EliasFanoLongBigList) BinIO.loadObject(path); } catch (ClassNotFoundException e) { throw new IOException(e); } } private static byte[] getLine(ByteBigList byteArray, long start) { long end = start; while (end < byteArray.size64() && byteArray.getByte(end) != '\n') { end++; } int length = (int) (end - start); byte[] buffer = new byte[length]; byteArray.getElements(start, buffer, 0, length); return buffer; } /** Load the sizes of the content nodes */ public void loadContentLength() throws IOException { contentLength = loadMappedLongs(path + ".property.content.length.bin"); } /** Get the size (in bytes) of the given content node */ public Long getContentLength(long nodeId) { if (contentLength == null) { throw new IllegalStateException("Content lengths not loaded"); } long res = contentLength.getLong(nodeId); return (res >= 0) ? res : null; } /** Load the IDs of the persons (authors and committers) */ public void loadPersonIds() throws IOException { authorId = loadMappedInts(path + ".property.author_id.bin"); committerId = loadMappedInts(path + ".property.committer_id.bin"); } /** Get a unique integer ID representing the author of the given revision or release node */ public Long getAuthorId(long nodeId) { if (authorId == null) { throw new IllegalStateException("Author IDs not loaded"); } long res = authorId.getInt(nodeId); return (res >= 0) ? res : null; } /** Get a unique integer ID representing the committer of the given revision node */ public Long getCommitterId(long nodeId) { if (committerId == null) { throw new IllegalStateException("Committer IDs not loaded"); } long res = committerId.getInt(nodeId); return (res >= 0) ? res : null; } /** * Loads a boolean array indicating whether the given content node was skipped during archive * ingestion */ public void loadContentIsSkipped() throws IOException { try { contentIsSkipped = (LongArrayBitVector) BinIO.loadObject(path + ".property.content.is_skipped.bin"); } catch (ClassNotFoundException e) { throw new IOException(e); } } /** Returns whether the given content node was skipped during archive ingestion */ public boolean isContentSkipped(long nodeId) { if (contentIsSkipped == null) { throw new IllegalStateException("Skipped content array not loaded"); } return contentIsSkipped.getBoolean(nodeId); } /** Load the timestamps at which the releases and revisions were authored */ public void loadAuthorTimestamps() throws IOException { authorTimestamp = loadMappedLongs(path + ".property.author_timestamp.bin"); authorTimestampOffset = loadMappedShorts(path + ".property.author_timestamp_offset.bin"); } /** Return the timestamp at which the given revision or release was authored */ public Long getAuthorTimestamp(long nodeId) { if (authorTimestamp == null) { throw new IllegalStateException("Author timestamps not loaded"); } long res = authorTimestamp.getLong(nodeId); return (res > Long.MIN_VALUE) ? res : null; } /** Return the timestamp offset at which the given revision or release was authored */ public Short getAuthorTimestampOffset(long nodeId) { if (authorTimestampOffset == null) { throw new IllegalStateException("Author timestamp offsets not loaded"); } short res = authorTimestampOffset.getShort(nodeId); return (res > Short.MIN_VALUE) ? res : null; } /** Load the timestamps at which the releases and revisions were committed */ public void loadCommitterTimestamps() throws IOException { committerTimestamp = loadMappedLongs(path + ".property.committer_timestamp.bin"); committerTimestampOffset = loadMappedShorts(path + ".property.committer_timestamp_offset.bin"); } /** Return the timestamp at which the given revision was committed */ public Long getCommitterTimestamp(long nodeId) { if (committerTimestamp == null) { throw new IllegalStateException("Committer timestamps not loaded"); } long res = committerTimestamp.getLong(nodeId); return (res > Long.MIN_VALUE) ? res : null; } /** Return the timestamp offset at which the given revision was committed */ public Short getCommitterTimestampOffset(long nodeId) { if (committerTimestampOffset == null) { throw new IllegalStateException("Committer timestamp offsets not loaded"); } short res = committerTimestampOffset.getShort(nodeId); return (res > Short.MIN_VALUE) ? res : null; } /** Load the revision messages, the release messages and the origin URLs */ public void loadMessages() throws IOException { messageBuffer = loadMappedBytes(path + ".property.message.bin"); messageOffsets = loadMappedLongs(path + ".property.message.offset.bin"); } /** Get the message of the given revision or release node */ public byte[] getMessage(long nodeId) { + return Base64.getDecoder().decode(getMessageBase64(nodeId)); + } + + /** Get the message of the given revision or release node, encoded as a base64 byte array */ + public byte[] getMessageBase64(long nodeId) { if (messageBuffer == null || messageOffsets == null) { throw new IllegalStateException("Messages not loaded"); } long startOffset = messageOffsets.getLong(nodeId); if (startOffset == -1) { return null; } - return Base64.getDecoder().decode(getLine(messageBuffer, startOffset)); + return getLine(messageBuffer, startOffset); } /** Get the URL of the given origin node */ public String getUrl(long nodeId) { byte[] url = getMessage(nodeId); return (url != null) ? new String(url) : null; } /** Load the release names */ public void loadTagNames() throws IOException { tagNameBuffer = loadMappedBytes(path + ".property.tag_name.bin"); tagNameOffsets = loadMappedLongs(path + ".property.tag_name.offset.bin"); } /** Get the name of the given release node */ public byte[] getTagName(long nodeId) { if (tagNameBuffer == null || tagNameOffsets == null) { throw new IllegalStateException("Tag names not loaded"); } long startOffset = tagNameOffsets.getLong(nodeId); if (startOffset == -1) { return null; } return Base64.getDecoder().decode(getLine(tagNameBuffer, startOffset)); } /** Load the arc label names (directory entry names and snapshot branch names) */ public void loadLabelNames() throws IOException { try { edgeLabelNames = MappedFrontCodedStringBigList.load(path + ".labels.fcl"); } catch (ConfigurationException e) { throw new IOException(e); } } /** * Get the arc label name (either a directory entry name or snapshot branch name) associated with * the given label ID */ public byte[] getLabelName(long labelId) { if (edgeLabelNames == null) { throw new IllegalStateException("Label names not loaded"); } return Base64.getDecoder().decode(edgeLabelNames.getArray(labelId)); } } diff --git a/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java b/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java index ba13af9..f7132af 100644 --- a/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java +++ b/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java @@ -1,151 +1,166 @@ /* * Copyright (c) 2022 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU General Public License version 3, or any later version * See top-level LICENSE file for more information */ /* For each origin and each person, outputs a line "origin_id,person_id", * if that person contributed to the origin. * + * A .csv table containing "origin_id,origin_url_base64" is also written + * to the given path. + * * This takes the output of TopoSort on stdin. * */ package org.softwareheritage.graph.utils; import it.unimi.dsi.big.webgraph.LazyLongIterator; import org.softwareheritage.graph.*; +import java.io.PrintWriter; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Scanner; public class ListOriginContributors { /* * For nodes with a single ancestor, reuses the ancestor's set of contributors instead of copying, * when that ancestor has no more pending successors. */ private static boolean optimizeReuse = true; public static void main(String[] args) throws IOException, ClassNotFoundException { - if (args.length != 1) { - System.err.println("Syntax: java org.softwareheritage.graph.utils.FindEarliestRevision "); + if (args.length != 2) { + System.err.println( + "Syntax: java org.softwareheritage.graph.utils.FindEarliestRevision "); System.exit(1); } String graphBasename = args[0]; + PrintWriter originUrlsFileWriter = new PrintWriter(args[1]); System.err.println("Loading graph " + graphBasename + " ..."); SwhBidirectionalGraph underlyingGraph = SwhBidirectionalGraph.loadMapped(graphBasename); System.err.println("Loading person ids"); underlyingGraph.loadPersonIds(); + System.err.println("Loading messages"); + underlyingGraph.loadMessages(); System.err.println("Selecting subgraph."); Subgraph graph = new Subgraph(underlyingGraph, new AllowedNodes("rev,rel,snp,ori")); System.err.println("Graph loaded."); Scanner stdin = new Scanner(System.in); String firstLine = stdin.nextLine().strip(); if (!firstLine.equals("SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2")) { System.err.format("Unexpected header: %s\n", firstLine); System.exit(2); } /* Map each node id to its set of contributor person ids */ HashMap> contributors = new HashMap<>(); /* * For each node it, counts its number of direct successors that still need to be handled */ HashMap pendingSuccessors = new HashMap<>(); System.out.println("origin_id,person_id"); + originUrlsFileWriter.println("origin_id,origin_url_base64"); while (stdin.hasNextLine()) { String cells[] = stdin.nextLine().strip().split(",", -1); SWHID nodeSWHID = new SWHID(cells[0]); long nodeId = graph.getNodeId(nodeSWHID); long ancestorCount = Long.parseLong(cells[1]); long successorCount = Long.parseLong(cells[2]); String sampleAncestor1SWHID = cells[3]; String sampleAncestor2SWHID = cells[4]; HashSet nodeContributors; boolean reuseAncestorSet = optimizeReuse && (ancestorCount == 1); if (reuseAncestorSet) { long ancestorNodeId = underlyingGraph.getNodeId(new SWHID(sampleAncestor1SWHID)); if (pendingSuccessors.get(ancestorNodeId) == 1) { nodeContributors = contributors.remove(ancestorNodeId); pendingSuccessors.remove(ancestorNodeId); } else { /* Ancestor is not yet ready to be popped */ pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1); nodeContributors = new HashSet<>(); } } else { nodeContributors = new HashSet<>(); } Long personId; if (nodeSWHID.getType() == SwhType.REV) { personId = underlyingGraph.getAuthorId(nodeId); if (personId != null) { nodeContributors.add(personId); } personId = underlyingGraph.getCommitterId(nodeId); if (personId != null) { nodeContributors.add(personId); } } else if (nodeSWHID.getType() == SwhType.REL) { personId = underlyingGraph.getAuthorId(nodeId); if (personId != null) { nodeContributors.add(personId); } } if (!reuseAncestorSet) { long computedAncestorCount = 0; LazyLongIterator it = graph.successors(nodeId); for (long ancestorNodeId; (ancestorNodeId = it.nextLong()) != -1;) { computedAncestorCount++; if (pendingSuccessors.get(ancestorNodeId) == 1) { /* * If this node is the last unhandled successor of the ancestor; pop the ancestor information, * as we won't need it anymore */ pendingSuccessors.remove(ancestorNodeId); nodeContributors.addAll(contributors.remove(ancestorNodeId)); } else { /* * The ancestor has remaining successors to handle; decrement the counter and copy its set of * contributors to the current set */ pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1); nodeContributors.addAll(contributors.get(ancestorNodeId)); } } if (ancestorCount != computedAncestorCount) { System.err.format("Mismatched ancestor count: expected %d, found %d", ancestorCount, computedAncestorCount); System.exit(2); } } if (nodeSWHID.getType() == SwhType.ORI) { nodeContributors.forEach((contributorId) -> { System.out.format("%d,%d\n", nodeId, contributorId); }); + byte[] url = underlyingGraph.getMessageBase64(nodeId); + if (url != null) { + originUrlsFileWriter.format("%d,%s\n", nodeId, new String(url)); + } } if (successorCount > 0) { /* * If the node has any successor, store its set of contributors for later */ contributors.put(nodeId, nodeContributors); pendingSuccessors.put(nodeId, successorCount); } } + + originUrlsFileWriter.flush(); } } diff --git a/swh/graph/luigi/origin_contributors.py b/swh/graph/luigi/origin_contributors.py index 96e5374..654a851 100644 --- a/swh/graph/luigi/origin_contributors.py +++ b/swh/graph/luigi/origin_contributors.py @@ -1,199 +1,209 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ Luigi tasks for contribution graph ================================== This module contains `Luigi `_ tasks driving the creation of the graph of contributions of people (pseudonymized by default). """ # WARNING: do not import unnecessary things here to keep cli startup time under # control from pathlib import Path from typing import Dict, Iterable, List, Tuple, cast import luigi from .compressed_graph import LocalGraph from .misc_datasets import TopoSort from .utils import run_script class ListOriginContributors(luigi.Task): """Creates a file that contains all SWHIDs in topological order from a compressed graph.""" local_graph_path = luigi.PathParameter() topological_order_path = luigi.PathParameter() origin_contributors_path = luigi.PathParameter() + origin_urls_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") def requires(self) -> List[luigi.Task]: """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph` and :class:`swh.graph.luigi.misc_datasets.TopoSort`.""" return [ LocalGraph(local_graph_path=self.local_graph_path), TopoSort( local_graph_path=self.local_graph_path, topological_order_path=self.topological_order_path, graph_name=self.graph_name, ), ] def output(self) -> luigi.Target: """.csv.zst file that contains the topological order.""" return luigi.LocalTarget(self.origin_contributors_path) def run(self) -> None: """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" + import tempfile + class_name = "org.softwareheritage.graph.utils.ListOriginContributors" - script = f""" - zstdcat {self.topological_order_path} \ - | java {class_name} '{self.local_graph_path}/{self.graph_name}' \ - | pv --line-mode --wait \ - | zstdmt -19 - """ - run_script(script, self.origin_contributors_path) + with tempfile.NamedTemporaryFile( + prefix="origin_urls_", suffix=".csv" + ) as origin_urls_fd: + script = f""" + zstdcat {self.topological_order_path} \ + | java {class_name} '{self.local_graph_path}/{self.graph_name}' '{origin_urls_fd.name}' \ + | pv --line-mode --wait \ + | zstdmt -19 + """ # noqa + run_script(script, self.origin_contributors_path) + run_script( + f"pv '{origin_urls_fd.name}' | zstdmt -19", + self.origin_urls_path, + ) class ExportDeanonymizationTable(luigi.Task): """Exports (from swh-storage) a .csv.zst file that contains the columns: ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. The first column is the anonymized full name found in :file:`graph.persons.csv.zst` in the compressed graph, and the latter two are the original name.""" storage_dsn = luigi.Parameter( default="service=swh", description="postgresql DSN of the swh-storage database to read from.", ) deanonymization_table_path = luigi.PathParameter() def output(self) -> luigi.Target: """.csv.zst file that contains the table.""" return luigi.LocalTarget(self.deanonymization_table_path) def run(self) -> None: """Runs a postgresql query to compute the table.""" run_script( f""" psql '{self.storage_dsn}' -c "\ COPY ( SELECT encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, \ encode(fullname, 'base64') as base64, \ encode(fullname, 'escape') as escaped \ FROM person \ ) TO STDOUT CSV HEADER \ " | zstdmt -19 """, # noqa self.deanonymization_table_path, ) class DeanonymizeOriginContributors(luigi.Task): """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, but with ``person_base64`` and ``person_escaped`` columns in addition to ``person_id``. This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names instead of names); which may not be true depending on how the swh-dataset export was configured. """ local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") origin_contributors_path = luigi.PathParameter() deanonymization_table_path = luigi.PathParameter() deanonymized_origin_contributors_path = luigi.PathParameter() def requires(self) -> List[luigi.Task]: """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, and :class:`ExportDeanonymizationTable`.""" return [ LocalGraph(local_graph_path=self.local_graph_path), ListOriginContributors( local_graph_path=self.local_graph_path, origin_contributors_path=self.origin_contributors_path, ), ExportDeanonymizationTable( deanonymization_table_path=self.deanonymization_table_path, ), ] def output(self) -> luigi.Target: """.csv.zst file similar to :meth:`ListOriginContributors.output`'s, but with ``person_base64`` and ``person_escaped`` columns in addition to ``person_id``""" return luigi.LocalTarget(self.deanonymized_origin_contributors_path) def run(self) -> None: """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset and the deanonymization table in memory, then uses them to map each row in the original (anonymized) contributors list to the deanonymized one.""" # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export # was configured to do so); this should add support for it. import base64 import csv import pyzstd # Load the deanonymization table, to map sha256(name) to base64(name) # and escape(name) sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {} with pyzstd.open(self.deanonymization_table_path, "rt") as fd: # TODO: remove that cast once we dropped Python 3.7 support csv_reader = csv.reader(cast(Iterable[str], fd)) header = next(csv_reader) assert header == ["sha256_base64", "base64", "escaped"], header for line in csv_reader: (base64_sha256_name, base64_name, escaped_name) = line sha256_name = base64.b64decode(base64_sha256_name) name = base64.b64decode(base64_name) sha256_to_names[sha256_name] = (name, escaped_name) # Combine with the list of sha256(name), to get the list of base64(name) # and escape(name) persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" with pyzstd.open(persons_path, "rb") as fd: person_id_to_names: List[Tuple[bytes, str]] = [ sha256_to_names.pop(base64.b64decode(line.strip()), (b"", "")) for line in fd ] tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") tmp_output_path.parent.mkdir(parents=True, exist_ok=True) # Finally, write a new table of origin_contributors, by reading the anonymized # table line-by-line and deanonymizing each id # Open temporary output for writes as CSV with pyzstd.open(tmp_output_path, "wt") as output_fd: csv_writer = csv.writer(output_fd, lineterminator="\n") # write header csv_writer.writerow(("origin_id", "person_base64", "person_escaped")) # Open input for reads as CSV with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: # TODO: remove that cast once we dropped Python 3.7 support csv_reader = csv.reader(cast(Iterable[str], input_fd)) header = next(csv_reader) assert header == ["origin_id", "person_id"], header for (origin_id, person_id) in csv_reader: if person_id == "null": # FIXME: workaround for a bug in contribution graphs generated # before 2022-12-01. Those were only used in tests and never # published, so the conditional can be removed when this is # productionized continue (name, escaped_name) = person_id_to_names[int(person_id)] base64_name = base64.b64encode(name).decode("ascii") csv_writer.writerow((origin_id, base64_name, escaped_name)) tmp_output_path.replace(self.deanonymized_origin_contributors_path) diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py index 29aaf40..4cbe74e 100644 --- a/swh/graph/tests/test_origin_contributors.py +++ b/swh/graph/tests/test_origin_contributors.py @@ -1,186 +1,206 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import base64 import datetime from pathlib import Path import subprocess from swh.graph.luigi.origin_contributors import ( DeanonymizeOriginContributors, ExportDeanonymizationTable, ListOriginContributors, ) from swh.model.model import ( ObjectType, Person, Release, Revision, RevisionType, TimestampWithTimezone, ) from .test_toposort import EXPECTED as TOPOLOGICAL_ORDER DATA_DIR = Path(__file__).parents[0] / "dataset" # FIXME: do not hardcode ids here; they should be dynamically loaded # from the test graph ORIGIN_CONTRIBUTORS = """\ origin_id,person_id 2,0 2,2 0,0 0,1 0,2 """ +assert ( + base64.b64decode("aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGg=") + == b"https://example.com/swh/graph" +) +assert ( + base64.b64decode("aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGgy") + == b"https://example.com/swh/graph2" +) + +ORIGIN_URLS = """\ +origin_id,origin_url_base64 +2,aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGg= +0,aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGgy +""" + DEANONYMIZATION_TABLE = """\ sha256_base64,base64,escaped 8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe """ # noqa PERSONS = """\ aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U= UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs= 8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI= """ DEANONYMIZED_ORIGIN_CONTRIBUTORS = """\ origin_id,person_base64,person_escaped 2,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe 2,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe 0,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe 0,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe 0,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe """ # noqa def test_list_origin_contributors(tmpdir): tmpdir = Path(tmpdir) topological_order_path = tmpdir / "topo_order.csv.zst" origin_contributors_path = tmpdir / "origin_contributors.csv.zst" + origin_urls_path = tmpdir / "origin_urls.csv.zst" subprocess.run( ["zstdmt", "-o", topological_order_path], input=TOPOLOGICAL_ORDER.encode(), check=True, ) task = ListOriginContributors( local_graph_path=DATA_DIR / "compressed", topological_order_path=topological_order_path, origin_contributors_path=origin_contributors_path, + origin_urls_path=origin_urls_path, graph_name="example", ) task.run() csv_text = subprocess.check_output(["zstdcat", origin_contributors_path]).decode() - assert csv_text == ORIGIN_CONTRIBUTORS + urls_text = subprocess.check_output(["zstdcat", origin_urls_path]).decode() + assert urls_text == ORIGIN_URLS + def test_export_deanonymization_table(tmpdir, swh_storage_postgresql, swh_storage): tmpdir = Path(tmpdir) tstz = TimestampWithTimezone.from_datetime( datetime.datetime.now(tz=datetime.timezone.utc) ) swh_storage.release_add( [ Release( name=b"v1.0", message=b"first release", author=Person.from_fullname(b"John Doe "), target=b"\x00" * 20, target_type=ObjectType.REVISION, synthetic=True, ) ] ) swh_storage.revision_add( [ Revision( message=b"first commit", author=Person.from_fullname(b"Jane Doe "), committer=Person.from_fullname(b"Jane Doe "), date=tstz, committer_date=tstz, directory=b"\x00" * 20, type=RevisionType.GIT, synthetic=True, ) ] ) deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" task = ExportDeanonymizationTable( storage_dsn=swh_storage_postgresql.dsn, deanonymization_table_path=deanonymization_table_path, ) task.run() csv_text = subprocess.check_output(["zstdcat", deanonymization_table_path]).decode() (header, *rows) = csv_text.split("\n") (expected_header, *expected_rows) = DEANONYMIZATION_TABLE.split("\n") assert header == expected_header assert rows.pop() == "", "Missing trailing newline" expected_rows.pop() assert set(rows) == set(expected_rows) def test_deanonymize_origin_contributors(tmpdir): tmpdir = Path(tmpdir) persons_path = tmpdir / "example.persons.csv.zst" origin_contributors_path = tmpdir / "origin_contributors.csv.zst" deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" deanonymized_origin_contributors_path = ( tmpdir / "sensitive" / "origin_contributors.deanonymized.csv.zst" ) subprocess.run( ["zstdmt", "-o", origin_contributors_path], input=ORIGIN_CONTRIBUTORS.encode(), check=True, ) subprocess.run( ["zstdmt", "-o", persons_path], input=PERSONS.encode(), check=True, ) subprocess.run( ["zstdmt", "-o", deanonymization_table_path], input=DEANONYMIZATION_TABLE.encode(), check=True, ) task = DeanonymizeOriginContributors( local_graph_path=tmpdir, origin_contributors_path=origin_contributors_path, deanonymization_table_path=deanonymization_table_path, deanonymized_origin_contributors_path=deanonymized_origin_contributors_path, graph_name="example", ) task.run() csv_text = subprocess.check_output( ["zstdcat", deanonymized_origin_contributors_path] ).decode() assert csv_text == DEANONYMIZED_ORIGIN_CONTRIBUTORS