diff --git a/swh/dataset/graph.py b/swh/dataset/graph.py index cb92c56..170e6cb 100644 --- a/swh/dataset/graph.py +++ b/swh/dataset/graph.py @@ -1,219 +1,220 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import functools import os import os.path import pathlib import shlex import subprocess import tempfile import uuid from swh.dataset.exporter import ParallelExporter from swh.dataset.utils import ZSTFile, SQLiteSet from swh.model.identifiers import origin_identifier, persistent_identifier from swh.storage.fixer import fix_objects def process_messages(messages, config, node_writer, edge_writer, node_set): """ Args: messages: A sequence of messages to process config: The exporter configuration node_writer: A file-like object where to write nodes edge_writer: A file-like object where to write edges """ def write_node(node): node_type, node_id = node if node_id is None: return node_pid = persistent_identifier(object_type=node_type, object_id=node_id) node_writer.write("{}\n".format(node_pid)) def write_edge(src, dst): src_type, src_id = src dst_type, dst_id = dst if src_id is None or dst_id is None: return src_pid = persistent_identifier(object_type=src_type, object_id=src_id) dst_pid = persistent_identifier(object_type=dst_type, object_id=dst_id) edge_writer.write("{} {}\n".format(src_pid, dst_pid)) messages = {k: fix_objects(k, v) for k, v in messages.items()} for visit in messages.get("origin_visit", []): origin_id = origin_identifier({"url": visit["origin"]}) - if not node_set.add(origin_id): + visit_id = visit["visit"] + if not node_set.add("{}:{}".format(origin_id, visit_id).encode()): continue write_node(("origin", origin_id)) write_edge(("origin", origin_id), ("snapshot", visit["snapshot"])) for snapshot in messages.get("snapshot", []): if not node_set.add(snapshot["id"]): continue write_node(("snapshot", snapshot["id"])) for branch_name, branch in snapshot["branches"].items(): while branch and branch.get("target_type") == "alias": branch_name = branch["target"] branch = snapshot["branches"][branch_name] if branch is None or not branch_name: continue if config.get("remove_pull_requests") and ( branch_name.startswith(b"refs/pull") or branch_name.startswith(b"refs/merge-requests") ): continue write_edge( ("snapshot", snapshot["id"]), (branch["target_type"], branch["target"]) ) for release in messages.get("release", []): if not node_set.add(release["id"]): continue write_node(("release", release["id"])) write_edge( ("release", release["id"]), (release["target_type"], release["target"]) ) for revision in messages.get("revision", []): if not node_set.add(revision["id"]): continue write_node(("revision", revision["id"])) write_edge(("revision", revision["id"]), ("directory", revision["directory"])) for parent in revision["parents"]: write_edge(("revision", revision["id"]), ("revision", parent)) for directory in messages.get("directory", []): if not node_set.add(directory["id"]): continue write_node(("directory", directory["id"])) for entry in directory["entries"]: entry_type_mapping = { "file": "content", "dir": "directory", "rev": "revision", } write_edge( ("directory", directory["id"]), (entry_type_mapping[entry["type"]], entry["target"]), ) for content in messages.get("content", []): if not node_set.add(content["sha1_git"]): continue write_node(("content", content["sha1_git"])) class GraphEdgeExporter(ParallelExporter): """ Implementation of ParallelExporter which writes all the graph edges of a specific type in a Zstandard-compressed CSV file. Each row of the CSV is in the format: ` """ def export_worker(self, export_path, **kwargs): dataset_path = pathlib.Path(export_path) dataset_path.mkdir(exist_ok=True, parents=True) unique_id = str(uuid.uuid4()) nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(unique_id)) edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(unique_id)) node_set_file = dataset_path / (".set-nodes-{}.sqlite3".format(unique_id)) with contextlib.ExitStack() as stack: nodes_writer = stack.enter_context(ZSTFile(nodes_file, "w")) edges_writer = stack.enter_context(ZSTFile(edges_file, "w")) node_set = stack.enter_context(SQLiteSet(node_set_file)) process_fn = functools.partial( process_messages, config=self.config, nodes_writer=nodes_writer, edges_writer=edges_writer, node_set=node_set, ) self.process(process_fn, **kwargs) def export_edges(config, export_path, export_id, processes): """Run the edge exporter for each edge type.""" object_types = [ "origin_visit", "snapshot", "release", "revision", "directory", ] for obj_type in object_types: print("{} edges:".format(obj_type)) exporter = GraphEdgeExporter(config, export_id, obj_type, processes) exporter.run(os.path.join(export_path, obj_type)) def sort_graph_nodes(export_path, config): """ Generate the node list from the edges files. We cannot solely rely on the object IDs that are read in the journal, as some nodes that are referred to as destinations in the edge file might not be present in the archive (e.g a rev_entry referring to a revision that we do not have crawled yet). The most efficient way of getting all the nodes that are mentioned in the edges file is therefore to use sort(1) on the gigantic edge files to get all the unique node IDs, while using the disk as a temporary buffer. This pipeline does, in order: - concatenate and write all the compressed edges files in graph.edges.csv.zst (using the fact that ZST compression is an additive function) ; - deflate the edges ; - count the number of edges and write it in graph.edges.count.txt ; - concatenate all the (deflated) nodes from the export with the destination edges, and sort the output to get the list of unique graph nodes ; - count the number of unique graph nodes and write it in graph.nodes.count.txt ; - compress and write the resulting nodes in graph.nodes.csv.zst. """ # Use bytes for the sorting algorithm (faster than being locale-specific) env = { **os.environ.copy(), "LC_ALL": "C", "LC_COLLATE": "C", "LANG": "C", } sort_buffer_size = config.get("sort_buffer_size", "4G") disk_buffer_dir = config.get("disk_buffer_dir", export_path) with tempfile.TemporaryDirectory( prefix=".graph_node_sort_", dir=disk_buffer_dir ) as buffer_path: subprocess.run( [ "bash", "-c", ( "pv {export_path}/*/*.edges.csv.zst | " "tee {export_path}/graph.edges.csv.zst |" "zstdcat |" "tee >( wc -l > {export_path}/graph.edges.count.txt ) |" "cut -d' ' -f2 | " "cat - <( zstdcat {export_path}/*/*.nodes.csv.zst ) | " "sort -u -S{sort_buffer_size} -T{buffer_path} | " "tee >( wc -l > {export_path}/graph.nodes.count.txt ) |" "zstdmt > {export_path}/graph.nodes.csv.zst" ).format( export_path=shlex.quote(str(export_path)), buffer_path=shlex.quote(str(buffer_path)), sort_buffer_size=shlex.quote(sort_buffer_size), ), ], env=env, ) diff --git a/swh/dataset/test/test_graph.py b/swh/dataset/test/test_graph.py index fe1121d..75ac22a 100644 --- a/swh/dataset/test/test_graph.py +++ b/swh/dataset/test/test_graph.py @@ -1,438 +1,481 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib from typing import Tuple import pytest from unittest.mock import Mock, call from swh.dataset.graph import process_messages, sort_graph_nodes from swh.dataset.utils import ZSTFile from swh.model.hashutil import MultiHash, hash_to_bytes DATE = { "timestamp": {"seconds": 1234567891, "microseconds": 0,}, "offset": 120, "negative_utc": False, } TEST_CONTENT = { **MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible", } TEST_REVISION = { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATE, "committer": {"fullname": b"foo", "name": b"foo", "email": b""}, "author": {"fullname": b"foo", "name": b"foo", "email": b""}, "committer_date": DATE, "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": [], } TEST_RELEASE = { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, "author": {"author": {"fullname": b"foo", "name": b"foo", "email": b""}}, "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, } TEST_ORIGIN = {"url": "https://somewhere.org/den/fox"} +TEST_ORIGIN_2 = {"url": "https://somewhere.org/den/fox/2"} TEST_ORIGIN_VISIT = { "origin": TEST_ORIGIN["url"], + "visit": 1, "date": "2013-05-07 04:20:39.369271+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"foo": "bar"}, "type": "git", } +class FakeDiskSet(set): + """ + A set with an add() method that returns whether the item has been added + or was already there. Used to replace SQLiteSet in unittests. + """ + + def add(self, v): + assert isinstance(v, bytes) + r = True + if v in self: + r = False + super().add(v) + return r + + @pytest.fixture def exporter(): - def wrapped(messages, config=None, nodes_already_there=False) -> Tuple[Mock, Mock]: + def wrapped(messages, config=None) -> Tuple[Mock, Mock]: if config is None: config = {} node_writer = Mock() edge_writer = Mock() - node_set = Mock() - node_set.add.return_value = not nodes_already_there + node_set = FakeDiskSet() process_messages( messages, config=config, node_writer=node_writer, edge_writer=edge_writer, node_set=node_set, ) return node_writer.write, edge_writer.write return wrapped def binhash(s): return hashlib.sha1(s.encode()).digest() def hexhash(s): return hashlib.sha1(s.encode()).hexdigest() def test_export_origin_visits(exporter): node_writer, edge_writer = exporter( { "origin_visit": [ { **TEST_ORIGIN_VISIT, "origin": {"url": "ori1"}, "snapshot": binhash("snp1"), }, { **TEST_ORIGIN_VISIT, "origin": {"url": "ori2"}, "snapshot": binhash("snp2"), }, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:ori:{hexhash('ori1')}\n"), call(f"swh:1:ori:{hexhash('ori2')}\n"), ] assert edge_writer.mock_calls == [ call(f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}\n"), call(f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}\n"), ] def test_export_snapshot_simple(exporter): node_writer, edge_writer = exporter( { "snapshot": [ { "id": binhash("snp1"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"HEAD": {"target": binhash("rev1"), "target_type": "revision"}, }, }, { "id": binhash("snp2"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"HEAD": {"target": binhash("rev2"), "target_type": "revision"}, b"bcnt": {"target": binhash("cnt1"), "target_type": "content"}, b"bdir": { "target": binhash("dir1"), "target_type": "directory", }, b"brel": {"target": binhash("rel1"), "target_type": "release"}, b"bsnp": {"target": binhash("snp1"), "target_type": "snapshot"}, }, }, {"id": binhash("snp3"), "branches": {}}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:snp:{hexhash('snp1')}\n"), call(f"swh:1:snp:{hexhash('snp2')}\n"), call(f"swh:1:snp:{hexhash('snp3')}\n"), ] assert edge_writer.mock_calls == [ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev2')}\n"), call(f"swh:1:snp:{hexhash('snp2')} swh:1:cnt:{hexhash('cnt1')}\n"), call(f"swh:1:snp:{hexhash('snp2')} swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:snp:{hexhash('snp2')} swh:1:rel:{hexhash('rel1')}\n"), call(f"swh:1:snp:{hexhash('snp2')} swh:1:snp:{hexhash('snp1')}\n"), ] def test_export_snapshot_aliases(exporter): node_writer, edge_writer = exporter( { "snapshot": [ { "id": binhash("snp1"), "branches": { b"origin_branch": { "target": binhash("rev1"), "target_type": "revision", }, b"alias1": {"target": b"origin_branch", "target_type": "alias"}, b"alias2": {"target": b"alias1", "target_type": "alias"}, b"alias3": {"target": b"alias2", "target_type": "alias"}, }, }, ] } ) assert node_writer.mock_calls == [call(f"swh:1:snp:{hexhash('snp1')}\n")] assert edge_writer.mock_calls == ( [call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n")] * 4 ) def test_export_snapshot_no_pull_requests(exporter): snp = { "id": binhash("snp1"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"refs/pull/42": {"target": binhash("rev2"), "target_type": "revision"}, b"refs/merge-requests/lol": { "target": binhash("rev3"), "target_type": "revision", }, }, } node_writer, edge_writer = exporter({"snapshot": [snp]}) assert edge_writer.mock_calls == [ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev2')}\n"), call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev3')}\n"), ] node_writer, edge_writer = exporter( {"snapshot": [snp]}, config={"remove_pull_requests": True} ) assert edge_writer.mock_calls == [ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"), ] def test_export_releases(exporter): node_writer, edge_writer = exporter( { "release": [ { **TEST_RELEASE, "id": binhash("rel1"), "target": binhash("rev1"), "target_type": "revision", }, { **TEST_RELEASE, "id": binhash("rel2"), "target": binhash("rel1"), "target_type": "release", }, { **TEST_RELEASE, "id": binhash("rel3"), "target": binhash("dir1"), "target_type": "directory", }, { **TEST_RELEASE, "id": binhash("rel4"), "target": binhash("cnt1"), "target_type": "content", }, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:rel:{hexhash('rel1')}\n"), call(f"swh:1:rel:{hexhash('rel2')}\n"), call(f"swh:1:rel:{hexhash('rel3')}\n"), call(f"swh:1:rel:{hexhash('rel4')}\n"), ] assert edge_writer.mock_calls == [ call(f"swh:1:rel:{hexhash('rel1')} swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:rel:{hexhash('rel2')} swh:1:rel:{hexhash('rel1')}\n"), call(f"swh:1:rel:{hexhash('rel3')} swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:rel:{hexhash('rel4')} swh:1:cnt:{hexhash('cnt1')}\n"), ] def test_export_revision(exporter): node_writer, edge_writer = exporter( { "revision": [ { **TEST_REVISION, "id": binhash("rev1"), "directory": binhash("dir1"), "parents": [binhash("rev2"), binhash("rev3"),], }, { **TEST_REVISION, "id": binhash("rev2"), "directory": binhash("dir2"), "parents": [], }, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:rev:{hexhash('rev2')}\n"), ] assert edge_writer.mock_calls == [ call(f"swh:1:rev:{hexhash('rev1')} swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}\n"), call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev3')}\n"), call(f"swh:1:rev:{hexhash('rev2')} swh:1:dir:{hexhash('dir2')}\n"), ] def test_export_directory(exporter): node_writer, edge_writer = exporter( { "directory": [ { "id": binhash("dir1"), "entries": [ {"type": "file", "target": binhash("cnt1")}, {"type": "dir", "target": binhash("dir2")}, {"type": "rev", "target": binhash("rev1")}, ], }, {"id": binhash("dir2"), "entries": [],}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:dir:{hexhash('dir2')}\n"), ] assert edge_writer.mock_calls == [ call(f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}\n"), call(f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir2')}\n"), call(f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}\n"), ] def test_export_content(exporter): node_writer, edge_writer = exporter( { "content": [ {**TEST_CONTENT, "sha1_git": binhash("cnt1"),}, {**TEST_CONTENT, "sha1_git": binhash("cnt2"),}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:cnt:{hexhash('cnt1')}\n"), call(f"swh:1:cnt:{hexhash('cnt2')}\n"), ] assert edge_writer.mock_calls == [] -def test_export_already_there_nodes(exporter): +def test_export_duplicate_node(exporter): node_writer, edge_writer = exporter( { - "content": [{**TEST_CONTENT, "sha1_git": binhash("cnt1")}], - "directory": [{"id": binhash("dir2"), "entries": []}], + "content": [ + {**TEST_CONTENT, "sha1_git": binhash("cnt1")}, + {**TEST_CONTENT, "sha1_git": binhash("cnt1")}, + {**TEST_CONTENT, "sha1_git": binhash("cnt1")}, + ], }, - nodes_already_there=True, ) - assert node_writer.mock_calls == [] + assert node_writer.mock_calls == [ + call(f"swh:1:cnt:{hexhash('cnt1')}\n"), + ] + assert edge_writer.mock_calls == [] + + +def test_export_duplicate_visit(exporter): + node_writer, edge_writer = exporter( + { + "origin_visit": [ + {**TEST_ORIGIN_VISIT, "origin": {"url": "ori1"}, "visit": 1}, + {**TEST_ORIGIN_VISIT, "origin": {"url": "ori2"}, "visit": 1}, + {**TEST_ORIGIN_VISIT, "origin": {"url": "ori1"}, "visit": 1}, + {**TEST_ORIGIN_VISIT, "origin": {"url": "ori2"}, "visit": 1}, + {**TEST_ORIGIN_VISIT, "origin": {"url": "ori1"}, "visit": 2}, + {**TEST_ORIGIN_VISIT, "origin": {"url": "ori2"}, "visit": 2}, + {**TEST_ORIGIN_VISIT, "origin": {"url": "ori2"}, "visit": 2}, + ], + }, + ) + assert node_writer.mock_calls == [ + call(f"swh:1:ori:{hexhash('ori1')}\n"), + call(f"swh:1:ori:{hexhash('ori2')}\n"), + call(f"swh:1:ori:{hexhash('ori1')}\n"), + call(f"swh:1:ori:{hexhash('ori2')}\n"), + ] assert edge_writer.mock_calls == [] def zstwrite(fp, lines): with ZSTFile(fp, "w") as writer: for l in lines: writer.write(l + "\n") def zstread(fp): with ZSTFile(fp, "r") as reader: return reader.read() def test_sort_pipeline(tmp_path): short_type_mapping = { "origin_visit": "ori", "snapshot": "snp", "release": "rel", "revision": "rev", "directory": "dir", "content": "cnt", } input_nodes = [ f"swh:1:{short}:{hexhash(short + str(x))}" for short in short_type_mapping.values() for x in range(4) ] input_edges = [ f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}", f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}", f"swh:1:ori:{hexhash('ori3')} swh:1:snp:{hexhash('snp3')}", f"swh:1:ori:{hexhash('ori4')} swh:1:snp:{hexhash('snpX')}", # missing dest f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup f"swh:1:snp:{hexhash('snp3')} swh:1:cnt:{hexhash('cnt1')}", f"swh:1:snp:{hexhash('snp4')} swh:1:rel:{hexhash('rel1')}", f"swh:1:rel:{hexhash('rel1')} swh:1:rel:{hexhash('rel2')}", f"swh:1:rel:{hexhash('rel2')} swh:1:rev:{hexhash('rev1')}", f"swh:1:rel:{hexhash('rel3')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rel:{hexhash('rel4')} swh:1:dir:{hexhash('dir1')}", f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rev:{hexhash('rev2')} swh:1:rev:{hexhash('revX')}", # missing dest f"swh:1:rev:{hexhash('rev3')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rev:{hexhash('rev4')} swh:1:dir:{hexhash('dir1')}", f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}", f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir1')}", f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}", ] for obj_type, short_obj_type in short_type_mapping.items(): p = tmp_path / obj_type p.mkdir() edges = [e for e in input_edges if e.startswith(f"swh:1:{short_obj_type}")] zstwrite(p / "00.edges.csv.zst", edges[0::2]) zstwrite(p / "01.edges.csv.zst", edges[1::2]) nodes = [n for n in input_nodes if n.startswith(f"swh:1:{short_obj_type}")] zstwrite(p / "00.nodes.csv.zst", nodes[0::2]) zstwrite(p / "01.nodes.csv.zst", nodes[1::2]) sort_graph_nodes(tmp_path, config={"sort_buffer_size": "1M"}) output_nodes = zstread(tmp_path / "graph.nodes.csv.zst").split("\n") output_edges = zstread(tmp_path / "graph.edges.csv.zst").split("\n") output_nodes = list(filter(bool, output_nodes)) output_edges = list(filter(bool, output_edges)) expected_nodes = set(input_nodes) | set(l.split()[1] for l in input_edges) assert output_nodes == sorted(expected_nodes) assert int((tmp_path / "graph.nodes.count.txt").read_text()) == len(expected_nodes) assert sorted(output_edges) == sorted(input_edges) assert int((tmp_path / "graph.edges.count.txt").read_text()) == len(input_edges)