diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py index 055ad8a..7dfeaf7 100644 --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -1,126 +1,123 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os import pathlib import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.dataset.exporters.edges import GraphEdgesExporter from swh.dataset.exporters.orc import ORCExporter from swh.dataset.journalprocessor import ParallelJournalProcessor @swh_cli_group.group(name="dataset", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False), help="Configuration file.", ) @click.pass_context def dataset_cli_group(ctx, config_file): """Software Heritage Dataset Tools""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf @dataset_cli_group.group("graph") @click.pass_context def graph(ctx): """Manage graph export""" pass AVAILABLE_EXPORTERS = { "edges": GraphEdgesExporter, "orc": ORCExporter, } @graph.command("export") @click.argument("export-path", type=click.Path()) @click.option("--export-id", "-e", help="Unique ID of the export run.") @click.option( "--formats", "-f", type=click.STRING, default=",".join(AVAILABLE_EXPORTERS.keys()), show_default=True, help="Formats to export.", ) @click.option("--processes", "-p", default=1, help="Number of parallel processes") @click.option( "--exclude", type=click.STRING, help="Comma-separated list of object types to exclude", ) @click.pass_context def export_graph(ctx, export_path, export_id, formats, exclude, processes): """Export the Software Heritage graph as an edge dataset.""" import uuid config = ctx.obj["config"] if not export_id: export_id = str(uuid.uuid4()) exclude_obj_types = {o.strip() for o in exclude.split(",")} export_formats = [c.strip() for c in formats.split(",")] for f in export_formats: if f not in AVAILABLE_EXPORTERS: raise click.BadOptionUsage( option_name="formats", message=f"{f} is not an available format." ) # Run the exporter for each edge type. object_types = [ "origin", "origin_visit", "origin_visit_status", "snapshot", "release", "revision", "directory", "content", "skipped_content", ] for obj_type in object_types: if obj_type in exclude_obj_types: continue exporters = [ - ( - AVAILABLE_EXPORTERS[f], - {"export_path": os.path.join(export_path, f, obj_type)}, - ) + (AVAILABLE_EXPORTERS[f], {"export_path": os.path.join(export_path, f)},) for f in export_formats ] parallel_exporter = ParallelJournalProcessor( config, exporters, export_id, obj_type, node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type, processes=processes, ) print("Exporting {}:".format(obj_type)) parallel_exporter.run() @graph.command("sort") @click.argument("export-path", type=click.Path()) @click.pass_context def sort_graph(ctx, export_path): config = ctx.obj["config"] from swh.dataset.exporters.edges import sort_graph_nodes sort_graph_nodes(export_path, config) diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py index d36e028..b0783f9 100644 --- a/swh/dataset/exporter.py +++ b/swh/dataset/exporter.py @@ -1,58 +1,66 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import contextlib +import pathlib from types import TracebackType from typing import Any, Dict, Optional, Type class Exporter: """ Base class for all the exporters. Each export can have multiple exporters, so we can read the journal a single time, then export the objects we read in different formats without having to re-read them every time. Override this class with the behavior for an export in a specific export format. You have to overwrite process_object() to make it write to the appropriate export files. You can also put setup and teardown logic in __enter__ and __exit__, and it will be called automatically. """ - def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any) -> None: + def __init__( + self, config: Dict[str, Any], export_path, *args: Any, **kwargs: Any + ) -> None: self.config: Dict[str, Any] = config + self.export_path = pathlib.Path(export_path) + self.exit_stack = contextlib.ExitStack() def __enter__(self) -> "Exporter": + self.export_path.mkdir(exist_ok=True, parents=True) + self.exit_stack.__enter__() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> Optional[bool]: - pass + return self.exit_stack.__exit__(exc_type, exc_value, traceback) def process_object(self, object_type: str, obj: Dict[str, Any]) -> None: """ Process a SWH object to export. Override this with your custom exporter. """ raise NotImplementedError class ExporterDispatch(Exporter): """ Like Exporter, but dispatches each object type to a different function (e.g you can override `process_origin(self, object)` to process origins.) """ def process_object(self, object_type: str, obj: Dict[str, Any]) -> None: method_name = "process_" + object_type if hasattr(self, method_name): getattr(self, method_name)(obj) diff --git a/swh/dataset/exporters/edges.py b/swh/dataset/exporters/edges.py index dca80f3..e888692 100644 --- a/swh/dataset/exporters/edges.py +++ b/swh/dataset/exporters/edges.py @@ -1,203 +1,206 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import os import os.path -import pathlib import shlex import subprocess import tempfile import uuid from swh.dataset.exporter import ExporterDispatch from swh.dataset.utils import ZSTFile, remove_pull_requests from swh.model.identifiers import origin_identifier, swhid class GraphEdgesExporter(ExporterDispatch): """ Implementation of an exporter which writes all the graph edges of a specific type to a Zstandard-compressed CSV file. Each row of the CSV is in the format: ` """ - def __init__(self, config, export_path, **kwargs): - super().__init__(config) - self.export_path = export_path - - def __enter__(self): - dataset_path = pathlib.Path(self.export_path) - dataset_path.mkdir(exist_ok=True, parents=True) - unique_id = str(uuid.uuid4()) - nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(unique_id)) - edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(unique_id)) - self.node_writer = ZSTFile(nodes_file, "w") - self.edge_writer = ZSTFile(edges_file, "w") - self.node_writer.__enter__() - self.edge_writer.__enter__() - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.node_writer.__exit__(exc_type, exc_value, traceback) - self.edge_writer.__exit__(exc_type, exc_value, traceback) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.writers = {} + + def get_writers_for(self, obj_type: str): + if obj_type not in self.writers: + dataset_path = self.export_path / obj_type + dataset_path.mkdir(exist_ok=True) + unique_id = str(uuid.uuid4()) + nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(unique_id)) + edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(unique_id)) + node_writer = self.exit_stack.enter_context(ZSTFile(str(nodes_file), "w")) + edge_writer = self.exit_stack.enter_context(ZSTFile(str(edges_file), "w")) + self.writers[obj_type] = (node_writer, edge_writer) + return self.writers[obj_type] + + def get_node_writer_for(self, obj_type: str): + return self.get_writers_for(obj_type)[0] + + def get_edge_writer_for(self, obj_type: str): + return self.get_writers_for(obj_type)[1] def write_node(self, node): node_type, node_id = node if node_id is None: return node_swhid = swhid(object_type=node_type, object_id=node_id) - self.node_writer.write("{}\n".format(node_swhid)) + node_writer = self.get_node_writer_for(node_type) + node_writer.write("{}\n".format(node_swhid)) def write_edge(self, src, dst, *, labels=None): src_type, src_id = src dst_type, dst_id = dst if src_id is None or dst_id is None: return src_swhid = swhid(object_type=src_type, object_id=src_id) dst_swhid = swhid(object_type=dst_type, object_id=dst_id) edge_line = " ".join([src_swhid, dst_swhid] + (labels if labels else [])) - self.edge_writer.write("{}\n".format(edge_line)) + edge_writer = self.get_edge_writer_for(src_type) + edge_writer.write("{}\n".format(edge_line)) def process_origin(self, origin): origin_id = origin_identifier({"url": origin["url"]}) self.write_node(("origin", origin_id)) def process_origin_visit_status(self, visit_status): origin_id = origin_identifier({"url": visit_status["origin"]}) self.write_edge(("origin", origin_id), ("snapshot", visit_status["snapshot"])) def process_snapshot(self, snapshot): if self.config.get("remove_pull_requests"): remove_pull_requests(snapshot) self.write_node(("snapshot", snapshot["id"])) for branch_name, branch in snapshot["branches"].items(): original_branch_name = branch_name while branch and branch.get("target_type") == "alias": branch_name = branch["target"] branch = snapshot["branches"].get(branch_name) if branch is None or not branch_name: continue self.write_edge( ("snapshot", snapshot["id"]), (branch["target_type"], branch["target"]), labels=[base64.b64encode(original_branch_name).decode(),], ) def process_release(self, release): self.write_node(("release", release["id"])) self.write_edge( ("release", release["id"]), (release["target_type"], release["target"]) ) def process_revision(self, revision): self.write_node(("revision", revision["id"])) self.write_edge( ("revision", revision["id"]), ("directory", revision["directory"]) ) for parent in revision["parents"]: self.write_edge(("revision", revision["id"]), ("revision", parent)) def process_directory(self, directory): self.write_node(("directory", directory["id"])) for entry in directory["entries"]: entry_type_mapping = { "file": "content", "dir": "directory", "rev": "revision", } self.write_edge( ("directory", directory["id"]), (entry_type_mapping[entry["type"]], entry["target"]), labels=[base64.b64encode(entry["name"]).decode(), str(entry["perms"])], ) def process_content(self, content): self.write_node(("content", content["sha1_git"])) def sort_graph_nodes(export_path, config): """ Generate the node list from the edges files. We cannot solely rely on the object IDs that are read in the journal, as some nodes that are referred to as destinations in the edge file might not be present in the archive (e.g a rev_entry referring to a revision that we do not have crawled yet). The most efficient way of getting all the nodes that are mentioned in the edges file is therefore to use sort(1) on the gigantic edge files to get all the unique node IDs, while using the disk as a temporary buffer. This pipeline does, in order: - concatenate and write all the compressed edges files in graph.edges.csv.zst (using the fact that ZST compression is an additive function) ; - deflate the edges ; - count the number of edges and write it in graph.edges.count.txt ; - count the number of occurrences of each edge type and write them in graph.edges.stats.txt ; - concatenate all the (deflated) nodes from the export with the destination edges, and sort the output to get the list of unique graph nodes ; - count the number of unique graph nodes and write it in graph.nodes.count.txt ; - count the number of occurrences of each node type and write them in graph.nodes.stats.txt ; - compress and write the resulting nodes in graph.nodes.csv.zst. """ # Use awk as a replacement of `sort | uniq -c` to avoid buffering everything # in memory counter_command = "awk '{ t[$0]++ } END { for (i in t) print i,t[i] }'" sort_script = """ pv {export_path}/*/*.edges.csv.zst | tee {export_path}/graph.edges.csv.zst | zstdcat | tee >( wc -l > {export_path}/graph.edges.count.txt ) | tee >( cut -d: -f3,6 | {counter_command} | sort \ > {export_path}/graph.edges.stats.txt ) | tee >( cut -d' ' -f3 | grep . | \ sort -u -S{sort_buffer_size} -T{buffer_path} | \ zstdmt > {export_path}/graph.labels.csv.zst ) | cut -d' ' -f2 | cat - <( zstdcat {export_path}/*/*.nodes.csv.zst ) | sort -u -S{sort_buffer_size} -T{buffer_path} | tee >( wc -l > {export_path}/graph.nodes.count.txt ) | tee >( cut -d: -f3 | {counter_command} | sort \ > {export_path}/graph.nodes.stats.txt ) | zstdmt > {export_path}/graph.nodes.csv.zst """ # Use bytes for the sorting algorithm (faster than being locale-specific) env = { **os.environ.copy(), "LC_ALL": "C", "LC_COLLATE": "C", "LANG": "C", } sort_buffer_size = config.get("sort_buffer_size", "4G") disk_buffer_dir = config.get("disk_buffer_dir", export_path) with tempfile.TemporaryDirectory( prefix=".graph_node_sort_", dir=disk_buffer_dir ) as buffer_path: subprocess.run( [ "bash", "-c", sort_script.format( export_path=shlex.quote(str(export_path)), buffer_path=shlex.quote(str(buffer_path)), sort_buffer_size=shlex.quote(sort_buffer_size), counter_command=counter_command, ), ], env=env, ) diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py index a0d984c..6e03638 100644 --- a/swh/dataset/exporters/orc.py +++ b/swh/dataset/exporters/orc.py @@ -1,260 +1,248 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import contextlib from datetime import datetime, timezone -import pathlib import uuid from pyorc import BigInt, Binary, Int, SmallInt, String, Struct, Timestamp, Writer from swh.dataset.exporter import ExporterDispatch from swh.dataset.utils import remove_pull_requests from swh.model.hashutil import hash_to_hex # fmt: off EXPORT_SCHEMA = { 'origin': Struct( url=String() ), 'origin_visit': Struct( origin=String(), visit=BigInt(), date=Timestamp(), type=String(), ), 'origin_visit_status': Struct( origin=String(), visit=BigInt(), date=Timestamp(), status=String(), snapshot=String(), ), 'snapshot': Struct( id=String(), ), 'snapshot_branch': Struct( snapshot_id=String(), name=Binary(), target=String(), target_type=String(), ), 'release': Struct( id=String(), name=Binary(), message=Binary(), target=String(), target_type=String(), author=Binary(), date=Timestamp(), date_offset=SmallInt(), ), 'revision': Struct( id=String(), message=Binary(), author=Binary(), date=Timestamp(), date_offset=SmallInt(), committer=Binary(), committer_date=Timestamp(), committer_offset=SmallInt(), directory=String(), ), 'directory': Struct( id=String(), ), 'directory_entry': Struct( directory_id=String(), name=Binary(), type=String(), target=String(), perms=Int(), ), 'content': Struct( sha1=String(), sha1_git=String(), sha256=String(), blake2s256=String(), length=BigInt(), status=String(), ), 'skipped_content': Struct( sha1=String(), sha1_git=String(), sha256=String(), blake2s256=String(), length=BigInt(), status=String(), reason=String(), ), } # fmt: on def hash_to_hex_or_none(hash): return hash_to_hex(hash) if hash is not None else None def swh_date_to_datetime(obj): if obj is None or obj["timestamp"] is None: return None return datetime.fromtimestamp( obj["timestamp"]["seconds"] + (obj["timestamp"]["microseconds"] / 1e6) ).replace(tzinfo=timezone.utc) def swh_date_to_offset(obj): if obj is None: return None return obj["offset"] class ORCExporter(ExporterDispatch): """ Implementation of an exporter which writes the entire graph dataset as ORC files. Useful for large scale processing, notably on cloud instances (e.g BigQuery, Amazon Athena, Azure). """ - def __init__(self, config, export_path, **kwargs): - super().__init__(config) - self.export_path = pathlib.Path(export_path) - self.export_path.mkdir(exist_ok=True, parents=True) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self.writers = {} - self.exit_stack = contextlib.ExitStack() - - def __enter__(self): - self.exit_stack.__enter__() - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.exit_stack.__exit__(exc_type, exc_value, traceback) def get_writer_for(self, table_name: str): if table_name not in self.writers: object_type_dir = self.export_path / table_name object_type_dir.mkdir(exist_ok=True) unique_id = str(uuid.uuid4()) export_file = object_type_dir / ("graph-{}.orc".format(unique_id)) export_obj = self.exit_stack.enter_context(export_file.open("wb")) self.writers[table_name] = self.exit_stack.enter_context( Writer(export_obj, EXPORT_SCHEMA[table_name]) ) return self.writers[table_name] def process_origin(self, origin): origin_writer = self.get_writer_for("origin") origin_writer.write((origin["url"],)) def process_origin_visit(self, visit): origin_visit_writer = self.get_writer_for("origin_visit") origin_visit_writer.write( (visit["origin"], visit["visit"], visit["date"], visit["type"],) ) def process_origin_visit_status(self, visit_status): origin_visit_status_writer = self.get_writer_for("origin_visit_status") origin_visit_status_writer.write( ( visit_status["origin"], visit_status["visit"], visit_status["date"], visit_status["status"], hash_to_hex_or_none(visit_status["snapshot"]), ) ) def process_snapshot(self, snapshot): if self.config.get("remove_pull_requests"): remove_pull_requests(snapshot) snapshot_writer = self.get_writer_for("snapshot") snapshot_writer.write((hash_to_hex_or_none(snapshot["id"]),)) snapshot_branch_writer = self.get_writer_for("snapshot_branch") for branch_name, branch in snapshot["branches"].items(): if branch is None: continue snapshot_branch_writer.write( ( hash_to_hex_or_none(snapshot["id"]), branch_name, hash_to_hex_or_none(branch["target"]), branch["target_type"], ) ) def process_release(self, release): release_writer = self.get_writer_for("release") release_writer.write( ( hash_to_hex_or_none(release["id"]), release["name"], release["message"], hash_to_hex_or_none(release["target"]), release["target_type"], release["author"]["fullname"], swh_date_to_datetime(release["date"]), swh_date_to_offset(release["date"]), ) ) def process_revision(self, revision): release_writer = self.get_writer_for("revision") release_writer.write( ( hash_to_hex_or_none(revision["id"]), revision["message"], revision["author"]["fullname"], swh_date_to_datetime(revision["date"]), swh_date_to_offset(revision["date"]), revision["committer"]["fullname"], swh_date_to_datetime(revision["committer_date"]), swh_date_to_offset(revision["committer_date"]), hash_to_hex_or_none(revision["directory"]), ) ) def process_directory(self, directory): directory_writer = self.get_writer_for("directory") directory_writer.write((hash_to_hex_or_none(directory["id"]),)) directory_entry_writer = self.get_writer_for("directory_entry") for entry in directory["entries"]: directory_entry_writer.write( ( hash_to_hex_or_none(directory["id"]), entry["name"], entry["type"], hash_to_hex_or_none(entry["target"]), entry["perms"], ) ) def process_content(self, content): content_writer = self.get_writer_for("content") content_writer.write( ( hash_to_hex_or_none(content["sha1"]), hash_to_hex_or_none(content["sha1_git"]), hash_to_hex_or_none(content["sha256"]), hash_to_hex_or_none(content["blake2s256"]), content["length"], content["status"], ) ) def process_skipped_content(self, skipped_content): skipped_content_writer = self.get_writer_for("skipped_content") skipped_content_writer.write( ( hash_to_hex_or_none(skipped_content["sha1"]), hash_to_hex_or_none(skipped_content["sha1_git"]), hash_to_hex_or_none(skipped_content["sha256"]), hash_to_hex_or_none(skipped_content["blake2s256"]), skipped_content["length"], skipped_content["status"], skipped_content["reason"], ) ) diff --git a/swh/dataset/test/test_edges.py b/swh/dataset/test/test_edges.py index 5624284..5aeb64e 100644 --- a/swh/dataset/test/test_edges.py +++ b/swh/dataset/test/test_edges.py @@ -1,572 +1,573 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import b64encode import collections import hashlib from typing import Tuple from unittest.mock import Mock, call import pytest from swh.dataset.exporters.edges import GraphEdgesExporter, sort_graph_nodes from swh.dataset.utils import ZSTFile from swh.model.hashutil import MultiHash, hash_to_bytes DATE = { "timestamp": {"seconds": 1234567891, "microseconds": 0}, "offset": 120, "negative_utc": False, } TEST_CONTENT = { **MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible", } TEST_REVISION = { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATE, "committer": {"fullname": b"foo", "name": b"foo", "email": b""}, "author": {"fullname": b"foo", "name": b"foo", "email": b""}, "committer_date": DATE, "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": [], } TEST_RELEASE = { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0}, "offset": 120, "negative_utc": False, }, "author": {"author": {"fullname": b"foo", "name": b"foo", "email": b""}}, "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, } TEST_ORIGIN = {"url": "https://somewhere.org/den/fox"} TEST_ORIGIN_2 = {"url": "https://somewhere.org/den/fox/2"} TEST_ORIGIN_VISIT_STATUS = { "origin": TEST_ORIGIN["url"], "visit": 1, "date": "2013-05-07 04:20:39.369271+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"foo": "bar"}, } class FakeDiskSet(set): """ A set with an add() method that returns whether the item has been added or was already there. Used to replace SQLiteSet in unittests. """ def add(self, v): assert isinstance(v, bytes) r = True if v in self: r = False super().add(v) return r @pytest.fixture def exporter(): def wrapped(messages, config=None) -> Tuple[Mock, Mock]: if config is None: config = {} exporter = GraphEdgesExporter(config, "/dummy_path") - exporter.node_writer = Mock() - exporter.edge_writer = Mock() + node_writer = Mock() + edge_writer = Mock() + exporter.get_writers_for = lambda *a, **k: (node_writer, edge_writer) for object_type, objects in messages.items(): for obj in objects: exporter.process_object(object_type, obj) - return exporter.node_writer.write, exporter.edge_writer.write + return node_writer.write, edge_writer.write return wrapped def binhash(s): return hashlib.sha1(s.encode()).digest() def hexhash(s): return hashlib.sha1(s.encode()).hexdigest() def b64e(s: str) -> str: return b64encode(s.encode()).decode() def test_export_origin(exporter): node_writer, edge_writer = exporter({"origin": [{"url": "ori1"}, {"url": "ori2"},]}) assert node_writer.mock_calls == [ call(f"swh:1:ori:{hexhash('ori1')}\n"), call(f"swh:1:ori:{hexhash('ori2')}\n"), ] assert edge_writer.mock_calls == [] def test_export_origin_visit_status(exporter): node_writer, edge_writer = exporter( { "origin_visit_status": [ { **TEST_ORIGIN_VISIT_STATUS, "origin": "ori1", "snapshot": binhash("snp1"), }, { **TEST_ORIGIN_VISIT_STATUS, "origin": "ori2", "snapshot": binhash("snp2"), }, ] } ) assert node_writer.mock_calls == [] assert edge_writer.mock_calls == [ call(f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}\n"), call(f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}\n"), ] def test_export_snapshot_simple(exporter): node_writer, edge_writer = exporter( { "snapshot": [ { "id": binhash("snp1"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"HEAD": {"target": binhash("rev1"), "target_type": "revision"}, }, }, { "id": binhash("snp2"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"HEAD": {"target": binhash("rev2"), "target_type": "revision"}, b"bcnt": {"target": binhash("cnt1"), "target_type": "content"}, b"bdir": { "target": binhash("dir1"), "target_type": "directory", }, b"brel": {"target": binhash("rel1"), "target_type": "release"}, b"bsnp": {"target": binhash("snp1"), "target_type": "snapshot"}, }, }, {"id": binhash("snp3"), "branches": {}}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:snp:{hexhash('snp1')}\n"), call(f"swh:1:snp:{hexhash('snp2')}\n"), call(f"swh:1:snp:{hexhash('snp3')}\n"), ] assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('HEAD')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev2')}" f" {b64e('HEAD')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:cnt:{hexhash('cnt1')}" f" {b64e('bcnt')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:dir:{hexhash('dir1')}" f" {b64e('bdir')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:rel:{hexhash('rel1')}" f" {b64e('brel')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:snp:{hexhash('snp1')}" f" {b64e('bsnp')}\n" ), ] def test_export_snapshot_aliases(exporter): node_writer, edge_writer = exporter( { "snapshot": [ { "id": binhash("snp1"), "branches": { b"origin_branch": { "target": binhash("rev1"), "target_type": "revision", }, b"alias1": {"target": b"origin_branch", "target_type": "alias"}, b"alias2": {"target": b"alias1", "target_type": "alias"}, b"alias3": {"target": b"alias2", "target_type": "alias"}, }, }, ] } ) assert node_writer.mock_calls == [call(f"swh:1:snp:{hexhash('snp1')}\n")] assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('origin_branch')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('alias1')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('alias2')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('alias3')}\n" ), ] def test_export_snapshot_no_pull_requests(exporter): snp = { "id": binhash("snp1"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"refs/pull/42": {"target": binhash("rev2"), "target_type": "revision"}, b"refs/merge-requests/lol": { "target": binhash("rev3"), "target_type": "revision", }, b"refs/tags/v1.0.0": { "target": binhash("rev4"), "target_type": "revision", }, b"refs/patch/123456abc": { "target": binhash("rev5"), "target_type": "revision", }, }, } node_writer, edge_writer = exporter({"snapshot": [snp]}) assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev2')}" f" {b64e('refs/pull/42')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev3')}" f" {b64e('refs/merge-requests/lol')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}" f" {b64e('refs/tags/v1.0.0')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev5')}" f" {b64e('refs/patch/123456abc')}\n" ), ] node_writer, edge_writer = exporter( {"snapshot": [snp]}, config={"remove_pull_requests": True} ) assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}" f" {b64e('refs/tags/v1.0.0')}\n" ), ] def test_export_releases(exporter): node_writer, edge_writer = exporter( { "release": [ { **TEST_RELEASE, "id": binhash("rel1"), "target": binhash("rev1"), "target_type": "revision", }, { **TEST_RELEASE, "id": binhash("rel2"), "target": binhash("rel1"), "target_type": "release", }, { **TEST_RELEASE, "id": binhash("rel3"), "target": binhash("dir1"), "target_type": "directory", }, { **TEST_RELEASE, "id": binhash("rel4"), "target": binhash("cnt1"), "target_type": "content", }, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:rel:{hexhash('rel1')}\n"), call(f"swh:1:rel:{hexhash('rel2')}\n"), call(f"swh:1:rel:{hexhash('rel3')}\n"), call(f"swh:1:rel:{hexhash('rel4')}\n"), ] assert edge_writer.mock_calls == [ call(f"swh:1:rel:{hexhash('rel1')} swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:rel:{hexhash('rel2')} swh:1:rel:{hexhash('rel1')}\n"), call(f"swh:1:rel:{hexhash('rel3')} swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:rel:{hexhash('rel4')} swh:1:cnt:{hexhash('cnt1')}\n"), ] def test_export_revision(exporter): node_writer, edge_writer = exporter( { "revision": [ { **TEST_REVISION, "id": binhash("rev1"), "directory": binhash("dir1"), "parents": [binhash("rev2"), binhash("rev3")], }, { **TEST_REVISION, "id": binhash("rev2"), "directory": binhash("dir2"), "parents": [], }, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:rev:{hexhash('rev2')}\n"), ] assert edge_writer.mock_calls == [ call(f"swh:1:rev:{hexhash('rev1')} swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}\n"), call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev3')}\n"), call(f"swh:1:rev:{hexhash('rev2')} swh:1:dir:{hexhash('dir2')}\n"), ] def test_export_directory(exporter): node_writer, edge_writer = exporter( { "directory": [ { "id": binhash("dir1"), "entries": [ { "type": "file", "target": binhash("cnt1"), "name": b"cnt1", "perms": 0o644, }, { "type": "dir", "target": binhash("dir2"), "name": b"dir2", "perms": 0o755, }, { "type": "rev", "target": binhash("rev1"), "name": b"rev1", "perms": 0o160000, }, ], }, {"id": binhash("dir2"), "entries": []}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:dir:{hexhash('dir2')}\n"), ] assert edge_writer.mock_calls == [ call( f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}" f" {b64e('cnt1')} {0o644}\n" ), call( f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir2')}" f" {b64e('dir2')} {0o755}\n" ), call( f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('rev1')} {0o160000}\n" ), ] def test_export_content(exporter): node_writer, edge_writer = exporter( { "content": [ {**TEST_CONTENT, "sha1_git": binhash("cnt1")}, {**TEST_CONTENT, "sha1_git": binhash("cnt2")}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:cnt:{hexhash('cnt1')}\n"), call(f"swh:1:cnt:{hexhash('cnt2')}\n"), ] assert edge_writer.mock_calls == [] def zstwrite(fp, lines): with ZSTFile(fp, "w") as writer: for line in lines: writer.write(line + "\n") def zstread(fp): with ZSTFile(fp, "r") as reader: return reader.read() def test_sort_pipeline(tmp_path): short_type_mapping = { "origin_visit_status": "ori", "snapshot": "snp", "release": "rel", "revision": "rev", "directory": "dir", "content": "cnt", } input_nodes = [ f"swh:1:{short}:{hexhash(short + str(x))}" for short in short_type_mapping.values() for x in range(4) ] input_edges = [ f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}", f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}", f"swh:1:ori:{hexhash('ori3')} swh:1:snp:{hexhash('snp3')}", f"swh:1:ori:{hexhash('ori4')} swh:1:snp:{hexhash('snpX')}", # missing dest f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')} {b64e('dup1')}", f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')} {b64e('dup2')}", f"swh:1:snp:{hexhash('snp3')} swh:1:cnt:{hexhash('cnt1')} {b64e('c1')}", f"swh:1:snp:{hexhash('snp4')} swh:1:rel:{hexhash('rel1')} {b64e('r1')}", f"swh:1:rel:{hexhash('rel1')} swh:1:rel:{hexhash('rel2')}", f"swh:1:rel:{hexhash('rel2')} swh:1:rev:{hexhash('rev1')}", f"swh:1:rel:{hexhash('rel3')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rel:{hexhash('rel4')} swh:1:dir:{hexhash('dir1')}", f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rev:{hexhash('rev2')} swh:1:rev:{hexhash('revX')}", # missing dest f"swh:1:rev:{hexhash('rev3')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rev:{hexhash('rev4')} swh:1:dir:{hexhash('dir1')}", f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')} {b64e('c1')} 42", f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir1')} {b64e('d1')} 1337", f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')} {b64e('r1')} 0", ] for obj_type, short_obj_type in short_type_mapping.items(): p = tmp_path / obj_type p.mkdir() edges = [e for e in input_edges if e.startswith(f"swh:1:{short_obj_type}")] zstwrite(p / "00.edges.csv.zst", edges[0::2]) zstwrite(p / "01.edges.csv.zst", edges[1::2]) nodes = [n for n in input_nodes if n.startswith(f"swh:1:{short_obj_type}")] zstwrite(p / "00.nodes.csv.zst", nodes[0::2]) zstwrite(p / "01.nodes.csv.zst", nodes[1::2]) sort_graph_nodes(tmp_path, config={"sort_buffer_size": "1M"}) output_nodes = zstread(tmp_path / "graph.nodes.csv.zst").split("\n") output_edges = zstread(tmp_path / "graph.edges.csv.zst").split("\n") output_labels = zstread(tmp_path / "graph.labels.csv.zst").split("\n") output_nodes = list(filter(bool, output_nodes)) output_edges = list(filter(bool, output_edges)) output_labels = list(filter(bool, output_labels)) expected_nodes = set(input_nodes) | set(e.split()[1] for e in input_edges) assert output_nodes == sorted(expected_nodes) assert int((tmp_path / "graph.nodes.count.txt").read_text()) == len(expected_nodes) assert sorted(output_edges) == sorted(input_edges) assert int((tmp_path / "graph.edges.count.txt").read_text()) == len(input_edges) expected_labels = set(e[2] for e in [e.split() for e in input_edges] if len(e) > 2) assert output_labels == sorted(expected_labels) actual_node_stats = (tmp_path / "graph.nodes.stats.txt").read_text().strip() expected_node_stats = "\n".join( sorted( "{} {}".format(k, v) for k, v in collections.Counter( node.split(":")[2] for node in expected_nodes ).items() ) ) assert actual_node_stats == expected_node_stats actual_edge_stats = (tmp_path / "graph.edges.stats.txt").read_text().strip() expected_edge_stats = "\n".join( sorted( "{} {}".format(k, v) for k, v in collections.Counter( "{}:{}".format(edge.split(":")[2], edge.split(":")[5]) for edge in input_edges ).items() ) ) assert actual_edge_stats == expected_edge_stats