diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py index f0e032e..a1aa8b3 100644 --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -1,64 +1,116 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control +import os +import pathlib + import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group +from swh.dataset.exporters.edges import GraphEdgesExporter +from swh.dataset.journalprocessor import ParallelJournalProcessor @swh_cli_group.group(name="dataset", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False), help="Configuration file.", ) @click.pass_context def dataset_cli_group(ctx, config_file): """Software Heritage Dataset Tools""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf @dataset_cli_group.group("graph") @click.pass_context def graph(ctx): - """Manage graph edges export""" + """Manage graph export""" pass +AVAILABLE_EXPORTERS = { + "edges": GraphEdgesExporter, +} + + @graph.command("export") @click.argument("export-path", type=click.Path()) @click.option("--export-id", "-e", help="Unique ID of the export run.") +@click.option( + "--formats", + "-f", + type=click.STRING, + default=",".join(AVAILABLE_EXPORTERS.keys()), + show_default=True, + help="Formats to export.", +) @click.option("--processes", "-p", default=1, help="Number of parallel processes") @click.pass_context -def export_graph(ctx, export_path, export_id, processes): +def export_graph(ctx, export_path, export_id, formats, processes): """Export the Software Heritage graph as an edge dataset.""" import uuid config = ctx.obj["config"] if not export_id: export_id = str(uuid.uuid4()) - from swh.dataset.graph import export_edges + export_formats = [c.strip() for c in formats.split(",")] + for f in export_formats: + if f not in AVAILABLE_EXPORTERS: + raise click.BadOptionUsage( + option_name="formats", message=f"{f} is not an available format." + ) - export_edges(config, export_path, export_id, processes) + # Run the exporter for each edge type. + object_types = [ + "origin", + "origin_visit", + "origin_visit_status", + "snapshot", + "release", + "revision", + "directory", + "content", + "skipped_content", + ] + for obj_type in object_types: + exporters = [ + ( + AVAILABLE_EXPORTERS[f], + {"export_path": os.path.join(export_path, f, obj_type)}, + ) + for f in export_formats + ] + parallel_exporter = ParallelJournalProcessor( + config, + exporters, + export_id, + obj_type, + node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type, + processes=processes, + ) + print("Exporting {}:".format(obj_type)) + parallel_exporter.run() @graph.command("sort") @click.argument("export-path", type=click.Path()) @click.pass_context def sort_graph(ctx, export_path): config = ctx.obj["config"] - from swh.dataset.graph import sort_graph_nodes + from swh.dataset.exporters.edges import sort_graph_nodes sort_graph_nodes(export_path, config) diff --git a/swh/dataset/exporters/__init__.py b/swh/dataset/exporters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swh/dataset/graph.py b/swh/dataset/exporters/edges.py similarity index 91% rename from swh/dataset/graph.py rename to swh/dataset/exporters/edges.py index 189d592..c3c9bbd 100644 --- a/swh/dataset/graph.py +++ b/swh/dataset/exporters/edges.py @@ -1,250 +1,222 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import os import os.path import pathlib import shlex import subprocess import tempfile import uuid from swh.dataset.exporter import ExporterDispatch -from swh.dataset.journalprocessor import ParallelJournalProcessor from swh.dataset.utils import ZSTFile from swh.model.identifiers import origin_identifier, swhid class GraphEdgesExporter(ExporterDispatch): """ Implementation of an exporter which writes all the graph edges of a specific type to a Zstandard-compressed CSV file. Each row of the CSV is in the format: ` """ def __init__(self, config, export_path, **kwargs): super().__init__(config) self.export_path = export_path def __enter__(self): dataset_path = pathlib.Path(self.export_path) dataset_path.mkdir(exist_ok=True, parents=True) unique_id = str(uuid.uuid4()) nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(unique_id)) edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(unique_id)) self.node_writer = ZSTFile(nodes_file, "w") self.edge_writer = ZSTFile(edges_file, "w") self.node_writer.__enter__() self.edge_writer.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): self.node_writer.__exit__(exc_type, exc_value, traceback) self.edge_writer.__exit__(exc_type, exc_value, traceback) def write_node(self, node): node_type, node_id = node if node_id is None: return node_swhid = swhid(object_type=node_type, object_id=node_id) self.node_writer.write("{}\n".format(node_swhid)) def write_edge(self, src, dst, *, labels=None): src_type, src_id = src dst_type, dst_id = dst if src_id is None or dst_id is None: return src_swhid = swhid(object_type=src_type, object_id=src_id) dst_swhid = swhid(object_type=dst_type, object_id=dst_id) edge_line = " ".join([src_swhid, dst_swhid] + (labels if labels else [])) self.edge_writer.write("{}\n".format(edge_line)) def process_origin(self, origin): origin_id = origin_identifier({"url": origin["url"]}) self.write_node(("origin", origin_id)) def process_origin_visit_status(self, visit_status): origin_id = origin_identifier({"url": visit_status["origin"]}) self.write_edge(("origin", origin_id), ("snapshot", visit_status["snapshot"])) def process_snapshot(self, snapshot): if self.config.get("remove_pull_requests"): self.remove_pull_requests(snapshot) self.write_node(("snapshot", snapshot["id"])) for branch_name, branch in snapshot["branches"].items(): original_branch_name = branch_name while branch and branch.get("target_type") == "alias": branch_name = branch["target"] branch = snapshot["branches"][branch_name] if branch is None or not branch_name: continue self.write_edge( ("snapshot", snapshot["id"]), (branch["target_type"], branch["target"]), labels=[base64.b64encode(original_branch_name).decode(),], ) def process_release(self, release): self.write_node(("release", release["id"])) self.write_edge( ("release", release["id"]), (release["target_type"], release["target"]) ) def process_revision(self, revision): self.write_node(("revision", revision["id"])) self.write_edge( ("revision", revision["id"]), ("directory", revision["directory"]) ) for parent in revision["parents"]: self.write_edge(("revision", revision["id"]), ("revision", parent)) def process_directory(self, directory): self.write_node(("directory", directory["id"])) for entry in directory["entries"]: entry_type_mapping = { "file": "content", "dir": "directory", "rev": "revision", } self.write_edge( ("directory", directory["id"]), (entry_type_mapping[entry["type"]], entry["target"]), - labels=[base64.b64encode(entry["name"]).decode(), str(entry["perms"]),], + labels=[base64.b64encode(entry["name"]).decode(), str(entry["perms"])], ) def process_content(self, content): self.write_node(("content", content["sha1_git"])) def remove_pull_requests(self, snapshot): """ Heuristic to filter out pull requests in snapshots: remove all branches that start with refs/ but do not start with refs/heads or refs/tags. """ # Copy the items with list() to remove items during iteration for branch_name, branch in list(snapshot["branches"].items()): original_branch_name = branch_name while branch and branch.get("target_type") == "alias": branch_name = branch["target"] branch = snapshot["branches"][branch_name] if branch is None or not branch_name: continue if branch_name.startswith(b"refs/") and not ( branch_name.startswith(b"refs/heads") or branch_name.startswith(b"refs/tags") ): snapshot["branches"].pop(original_branch_name) -def export_edges(config, export_path, export_id, processes): - """Run the edge exporter for each edge type.""" - object_types = [ - "origin", - "origin_visit_status", - "snapshot", - "release", - "revision", - "directory", - "content", - ] - for obj_type in object_types: - print("{} edges:".format(obj_type)) - exporters = [ - (GraphEdgesExporter, {"export_path": os.path.join(export_path, obj_type)}), - ] - parallel_exporter = ParallelJournalProcessor( - config, - exporters, - export_id, - obj_type, - node_sets_path=pathlib.Path(export_path) / ".node_sets", - processes=processes, - ) - parallel_exporter.run() - - def sort_graph_nodes(export_path, config): """ Generate the node list from the edges files. We cannot solely rely on the object IDs that are read in the journal, as some nodes that are referred to as destinations in the edge file might not be present in the archive (e.g a rev_entry referring to a revision that we do not have crawled yet). The most efficient way of getting all the nodes that are mentioned in the edges file is therefore to use sort(1) on the gigantic edge files to get all the unique node IDs, while using the disk as a temporary buffer. This pipeline does, in order: - concatenate and write all the compressed edges files in graph.edges.csv.zst (using the fact that ZST compression is an additive function) ; - deflate the edges ; - count the number of edges and write it in graph.edges.count.txt ; - count the number of occurrences of each edge type and write them in graph.edges.stats.txt ; - concatenate all the (deflated) nodes from the export with the destination edges, and sort the output to get the list of unique graph nodes ; - count the number of unique graph nodes and write it in graph.nodes.count.txt ; - count the number of occurrences of each node type and write them in graph.nodes.stats.txt ; - compress and write the resulting nodes in graph.nodes.csv.zst. """ # Use awk as a replacement of `sort | uniq -c` to avoid buffering everything # in memory counter_command = "awk '{ t[$0]++ } END { for (i in t) print i,t[i] }'" sort_script = """ pv {export_path}/*/*.edges.csv.zst | tee {export_path}/graph.edges.csv.zst | zstdcat | tee >( wc -l > {export_path}/graph.edges.count.txt ) | tee >( cut -d: -f3,6 | {counter_command} | sort \ > {export_path}/graph.edges.stats.txt ) | tee >( cut -d' ' -f3 | grep . | \ sort -u -S{sort_buffer_size} -T{buffer_path} | \ zstdmt > {export_path}/graph.labels.csv.zst ) | cut -d' ' -f2 | cat - <( zstdcat {export_path}/*/*.nodes.csv.zst ) | sort -u -S{sort_buffer_size} -T{buffer_path} | tee >( wc -l > {export_path}/graph.nodes.count.txt ) | tee >( cut -d: -f3 | {counter_command} | sort \ > {export_path}/graph.nodes.stats.txt ) | zstdmt > {export_path}/graph.nodes.csv.zst """ # Use bytes for the sorting algorithm (faster than being locale-specific) env = { **os.environ.copy(), "LC_ALL": "C", "LC_COLLATE": "C", "LANG": "C", } sort_buffer_size = config.get("sort_buffer_size", "4G") disk_buffer_dir = config.get("disk_buffer_dir", export_path) with tempfile.TemporaryDirectory( prefix=".graph_node_sort_", dir=disk_buffer_dir ) as buffer_path: subprocess.run( [ "bash", "-c", sort_script.format( export_path=shlex.quote(str(export_path)), buffer_path=shlex.quote(str(buffer_path)), sort_buffer_size=shlex.quote(sort_buffer_size), counter_command=counter_command, ), ], env=env, ) diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py index 075b772..6675dad 100644 --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -1,388 +1,388 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import concurrent.futures from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor import contextlib from hashlib import sha1 import multiprocessing from pathlib import Path import time from typing import Any, Dict, Mapping, Sequence, Tuple, Type from confluent_kafka import TopicPartition import tqdm from swh.dataset.exporter import Exporter from swh.dataset.utils import SQLiteSet from swh.journal.client import JournalClient from swh.journal.serializers import kafka_to_value from swh.model.identifiers import origin_identifier from swh.storage.fixer import fix_objects class JournalClientOffsetRanges(JournalClient): """ A subclass of JournalClient reading only inside some specific offset range. Partition assignments have to be manually given to the class. This client can only read a single topic at a time. """ def __init__( self, *args, offset_ranges: Mapping[int, Tuple[int, int]] = None, assignment: Sequence[int] = None, progress_queue: multiprocessing.Queue = None, refresh_every: int = 200, **kwargs, ): """ Args: offset_ranges: A mapping of partition_id -> (low, high) offsets that define the boundaries of the messages to consume. assignment: The list of partitions to assign to this client. progress_queue: a multiprocessing.Queue where the current progress will be reported. refresh_every: the refreshing rate of the progress reporting. """ self.offset_ranges = offset_ranges self.progress_queue = progress_queue self.refresh_every = refresh_every self.assignment = assignment self.count = None self.topic_name = None super().__init__(*args, **kwargs) def subscribe(self): self.topic_name = self.subscription[0] time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 self.consumer.assign( [TopicPartition(self.topic_name, pid) for pid in self.assignment] ) def process(self, *args, **kwargs): self.count = 0 try: self.handle_committed_offsets() super().process(*args, **kwargs) except EOFError: pass finally: self.progress_queue.put(None) def handle_committed_offsets(self,): """ Handle already committed partition offsets before starting processing. """ committed = self.consumer.committed( [TopicPartition(self.topic_name, pid) for pid in self.assignment] ) for tp in committed: self.handle_offset(tp.partition, tp.offset) def handle_offset(self, partition_id, offset): """ Check whether the client has reached the end of the current partition, and trigger a reassignment if that is the case. Raise EOFError if all the partitions have reached the end. """ if offset < 0: # Uninitialized partition offset return if self.count % self.refresh_every == 0: self.progress_queue.put({partition_id: offset}) if offset >= self.offset_ranges[partition_id][1] - 1: self.assignment = [pid for pid in self.assignment if pid != partition_id] self.subscribe() if not self.assignment: raise EOFError def deserialize_message(self, message): """ Override of the message deserialization to hook the handling of the message offset. We also return the raw objects instead of deserializing them because we will need the partition ID later. """ self.handle_offset(message.partition(), message.offset()) self.count += 1 return message class ParallelJournalProcessor: """ Reads the given object type from the journal in parallel. It creates one JournalExportWorker per process. """ def __init__( self, config, exporters: Sequence[Tuple[Type[Exporter], Dict[str, Any]]], export_id: str, obj_type: str, node_sets_path: Path, processes: int = 1, ): """ Args: config: the exporter config, which should also include the JournalClient configuration. exporters: a list of Exporter to process the objects export_id: a unique identifier for the export that will be used as part of a Kafka consumer group ID. obj_type: The type of SWH object to export. node_sets_path: A directory where to store the node sets. processes: The number of processes to run. """ self.config = config self.exporters = exporters self.export_id = "swh-dataset-export-{}".format(export_id) self.obj_type = obj_type self.processes = processes self.node_sets_path = node_sets_path self.offsets = None def get_offsets(self): """ First pass to fetch all the current low and high offsets of each partition to define the consumption boundaries. """ if self.offsets is None: client = JournalClient( **self.config["journal"], object_types=[self.obj_type], group_id=self.export_id, ) topic_name = client.subscription[0] topics = client.consumer.list_topics(topic_name).topics partitions = topics[topic_name].partitions self.offsets = {} def fetch_insert_partition_id(partition_id): tp = TopicPartition(topic_name, partition_id) (lo, hi) = client.consumer.get_watermark_offsets(tp) self.offsets[partition_id] = (lo, hi) with concurrent.futures.ThreadPoolExecutor( max_workers=self.processes ) as executor: list( tqdm.tqdm( executor.map(fetch_insert_partition_id, partitions.keys()), total=len(partitions), desc=" - Partition offsets", ) ) return self.offsets def run(self): """ Run the parallel export. """ offsets = self.get_offsets() to_assign = list(offsets.keys()) manager = multiprocessing.Manager() q = manager.Queue() with ProcessPoolExecutor(self.processes + 1) as pool: futures = [] for i in range(self.processes): futures.append( pool.submit( self.export_worker, assignment=to_assign[i :: self.processes], progress_queue=q, ) ) futures.append(pool.submit(self.progress_worker, queue=q)) concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION) for f in futures: if f.running(): continue exc = f.exception() if exc: pool.shutdown(wait=False) f.result() raise exc def progress_worker(self, queue=None): """ An additional worker process that reports the current progress of the export between all the different parallel consumers and across all the partitions, by consuming the shared progress reporting Queue. """ d = {} active_workers = self.processes offset_diff = sum((hi - lo) for lo, hi in self.offsets.values()) with tqdm.tqdm(total=offset_diff, desc=" - Journal export") as pbar: while active_workers: item = queue.get() if item is None: active_workers -= 1 continue d.update(item) progress = sum(n - self.offsets[p][0] for p, n in d.items()) pbar.set_postfix( active_workers=active_workers, total_workers=self.processes ) pbar.update(progress - pbar.n) def export_worker(self, assignment, progress_queue): worker = JournalProcessorWorker( self.config, self.exporters, self.export_id, self.obj_type, self.offsets, assignment, progress_queue, self.node_sets_path, ) with worker: worker.run() class JournalProcessorWorker: """ Worker process that processes all the messages and calls the given exporters for each object read from the journal. """ def __init__( self, config, exporters: Sequence[Tuple[Type[Exporter], Dict[str, Any]]], export_id: str, obj_type: str, offsets: Dict[int, Tuple[int, int]], assignment: Sequence[int], progress_queue: multiprocessing.Queue, node_sets_path: Path, ): self.config = config self.export_id = export_id self.obj_type = obj_type self.offsets = offsets self.assignment = assignment self.progress_queue = progress_queue self.node_sets_path = node_sets_path self.node_sets_path.mkdir(exist_ok=True, parents=True) self.node_sets: Dict[Tuple[int, str], SQLiteSet] = {} self.exporters = [ exporter_class(config, **kwargs) for exporter_class, kwargs in exporters ] self.exit_stack: contextlib.ExitStack = contextlib.ExitStack() def __enter__(self): self.exit_stack.__enter__() for exporter in self.exporters: self.exit_stack.enter_context(exporter) return self def __exit__(self, exc_type, exc_value, traceback): self.exit_stack.__exit__(exc_type, exc_value, traceback) def get_node_set_for_object(self, partition_id: int, object_id: bytes): """ Return an on-disk set object, which stores the nodes that have already been processed. Node sets are sharded by partition ID (as each object is guaranteed to be assigned to a deterministic Kafka partition) then by object ID prefix. The sharding path of each file looks like: .node_sets/{origin..content}/part-{0..256}/nodes-{0..f}.sqlite """ obj_id_prefix = "{:x}".format(object_id[0] % 16) shard_id = (partition_id, obj_id_prefix) if shard_id not in self.node_sets: node_set_dir = ( self.node_sets_path / self.obj_type / ("part-{}".format(str(partition_id))) ) node_set_dir.mkdir(exist_ok=True, parents=True) node_set_file = node_set_dir / "nodes-{}.sqlite".format(obj_id_prefix) node_set = SQLiteSet(node_set_file) self.exit_stack.enter_context(node_set) self.node_sets[shard_id] = node_set return self.node_sets[shard_id] def run(self): """ Start a Journal client on the given assignment and process all the incoming messages. """ client = JournalClientOffsetRanges( **self.config["journal"], object_types=[self.obj_type], group_id=self.export_id, debug="cgrp,broker", offset_ranges=self.offsets, assignment=self.assignment, progress_queue=self.progress_queue, **{"message.max.bytes": str(500 * 1024 * 1024)}, ) client.process(self.process_messages) def process_messages(self, messages): """ Process the incoming Kafka messages. """ for object_type, message_list in messages.items(): fixed_objects_by_partition = collections.defaultdict(list) for message in message_list: fixed_objects_by_partition[message.partition()].extend( fix_objects(object_type, [kafka_to_value(message.value())]) ) for partition, objects in fixed_objects_by_partition.items(): for obj in objects: self.process_message(object_type, partition, obj) def process_message(self, object_type, partition, obj): """ Process a single incoming Kafka message if the object it refers to has not been processed yet. It uses an on-disk set to make sure that each object is only ever processed once. """ if object_type == "origin_visit": origin_id = origin_identifier({"url": obj["origin"]}) visit = obj["visit"] node_id = sha1(f"{origin_id}:{visit}".encode()).digest() elif object_type == "origin_visit_status": if obj["status"] not in ("partial", "full"): # Temporary visit object, not useful for the exports return origin_id = origin_identifier({"url": obj["origin"]}) visit = obj["visit"] ts = obj["date"].timestamp() node_id = sha1(f"{origin_id}:{visit}:{ts}".encode()).digest() elif object_type == "origin": node_id = sha1(obj["url"].encode()).digest() - elif object_type == "content": + elif object_type in ("content", "skipped_content"): node_id = obj["sha1_git"] else: node_id = obj["id"] node_set = self.get_node_set_for_object(partition, node_id) if not node_set.add(node_id): # Node already processed, skipping. return for exporter in self.exporters: exporter.process_object(object_type, obj) diff --git a/swh/dataset/test/test_graph.py b/swh/dataset/test/test_edges.py similarity index 98% rename from swh/dataset/test/test_graph.py rename to swh/dataset/test/test_edges.py index d6ce2e1..5624284 100644 --- a/swh/dataset/test/test_graph.py +++ b/swh/dataset/test/test_edges.py @@ -1,572 +1,572 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import b64encode import collections import hashlib from typing import Tuple from unittest.mock import Mock, call import pytest -from swh.dataset.graph import GraphEdgesExporter, sort_graph_nodes +from swh.dataset.exporters.edges import GraphEdgesExporter, sort_graph_nodes from swh.dataset.utils import ZSTFile from swh.model.hashutil import MultiHash, hash_to_bytes DATE = { "timestamp": {"seconds": 1234567891, "microseconds": 0}, "offset": 120, "negative_utc": False, } TEST_CONTENT = { **MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible", } TEST_REVISION = { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATE, "committer": {"fullname": b"foo", "name": b"foo", "email": b""}, "author": {"fullname": b"foo", "name": b"foo", "email": b""}, "committer_date": DATE, "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": [], } TEST_RELEASE = { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { - "timestamp": {"seconds": 1234567890, "microseconds": 0,}, + "timestamp": {"seconds": 1234567890, "microseconds": 0}, "offset": 120, "negative_utc": False, }, "author": {"author": {"fullname": b"foo", "name": b"foo", "email": b""}}, "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, } TEST_ORIGIN = {"url": "https://somewhere.org/den/fox"} TEST_ORIGIN_2 = {"url": "https://somewhere.org/den/fox/2"} TEST_ORIGIN_VISIT_STATUS = { "origin": TEST_ORIGIN["url"], "visit": 1, "date": "2013-05-07 04:20:39.369271+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"foo": "bar"}, } class FakeDiskSet(set): """ A set with an add() method that returns whether the item has been added or was already there. Used to replace SQLiteSet in unittests. """ def add(self, v): assert isinstance(v, bytes) r = True if v in self: r = False super().add(v) return r @pytest.fixture def exporter(): def wrapped(messages, config=None) -> Tuple[Mock, Mock]: if config is None: config = {} exporter = GraphEdgesExporter(config, "/dummy_path") exporter.node_writer = Mock() exporter.edge_writer = Mock() for object_type, objects in messages.items(): for obj in objects: exporter.process_object(object_type, obj) return exporter.node_writer.write, exporter.edge_writer.write return wrapped def binhash(s): return hashlib.sha1(s.encode()).digest() def hexhash(s): return hashlib.sha1(s.encode()).hexdigest() def b64e(s: str) -> str: return b64encode(s.encode()).decode() def test_export_origin(exporter): node_writer, edge_writer = exporter({"origin": [{"url": "ori1"}, {"url": "ori2"},]}) assert node_writer.mock_calls == [ call(f"swh:1:ori:{hexhash('ori1')}\n"), call(f"swh:1:ori:{hexhash('ori2')}\n"), ] assert edge_writer.mock_calls == [] def test_export_origin_visit_status(exporter): node_writer, edge_writer = exporter( { "origin_visit_status": [ { **TEST_ORIGIN_VISIT_STATUS, "origin": "ori1", "snapshot": binhash("snp1"), }, { **TEST_ORIGIN_VISIT_STATUS, "origin": "ori2", "snapshot": binhash("snp2"), }, ] } ) assert node_writer.mock_calls == [] assert edge_writer.mock_calls == [ call(f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}\n"), call(f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}\n"), ] def test_export_snapshot_simple(exporter): node_writer, edge_writer = exporter( { "snapshot": [ { "id": binhash("snp1"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"HEAD": {"target": binhash("rev1"), "target_type": "revision"}, }, }, { "id": binhash("snp2"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"HEAD": {"target": binhash("rev2"), "target_type": "revision"}, b"bcnt": {"target": binhash("cnt1"), "target_type": "content"}, b"bdir": { "target": binhash("dir1"), "target_type": "directory", }, b"brel": {"target": binhash("rel1"), "target_type": "release"}, b"bsnp": {"target": binhash("snp1"), "target_type": "snapshot"}, }, }, {"id": binhash("snp3"), "branches": {}}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:snp:{hexhash('snp1')}\n"), call(f"swh:1:snp:{hexhash('snp2')}\n"), call(f"swh:1:snp:{hexhash('snp3')}\n"), ] assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('HEAD')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev2')}" f" {b64e('HEAD')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:cnt:{hexhash('cnt1')}" f" {b64e('bcnt')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:dir:{hexhash('dir1')}" f" {b64e('bdir')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:rel:{hexhash('rel1')}" f" {b64e('brel')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:snp:{hexhash('snp1')}" f" {b64e('bsnp')}\n" ), ] def test_export_snapshot_aliases(exporter): node_writer, edge_writer = exporter( { "snapshot": [ { "id": binhash("snp1"), "branches": { b"origin_branch": { "target": binhash("rev1"), "target_type": "revision", }, b"alias1": {"target": b"origin_branch", "target_type": "alias"}, b"alias2": {"target": b"alias1", "target_type": "alias"}, b"alias3": {"target": b"alias2", "target_type": "alias"}, }, }, ] } ) assert node_writer.mock_calls == [call(f"swh:1:snp:{hexhash('snp1')}\n")] assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('origin_branch')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('alias1')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('alias2')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('alias3')}\n" ), ] def test_export_snapshot_no_pull_requests(exporter): snp = { "id": binhash("snp1"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"refs/pull/42": {"target": binhash("rev2"), "target_type": "revision"}, b"refs/merge-requests/lol": { "target": binhash("rev3"), "target_type": "revision", }, b"refs/tags/v1.0.0": { "target": binhash("rev4"), "target_type": "revision", }, b"refs/patch/123456abc": { "target": binhash("rev5"), "target_type": "revision", }, }, } node_writer, edge_writer = exporter({"snapshot": [snp]}) assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev2')}" f" {b64e('refs/pull/42')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev3')}" f" {b64e('refs/merge-requests/lol')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}" f" {b64e('refs/tags/v1.0.0')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev5')}" f" {b64e('refs/patch/123456abc')}\n" ), ] node_writer, edge_writer = exporter( {"snapshot": [snp]}, config={"remove_pull_requests": True} ) assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}" f" {b64e('refs/tags/v1.0.0')}\n" ), ] def test_export_releases(exporter): node_writer, edge_writer = exporter( { "release": [ { **TEST_RELEASE, "id": binhash("rel1"), "target": binhash("rev1"), "target_type": "revision", }, { **TEST_RELEASE, "id": binhash("rel2"), "target": binhash("rel1"), "target_type": "release", }, { **TEST_RELEASE, "id": binhash("rel3"), "target": binhash("dir1"), "target_type": "directory", }, { **TEST_RELEASE, "id": binhash("rel4"), "target": binhash("cnt1"), "target_type": "content", }, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:rel:{hexhash('rel1')}\n"), call(f"swh:1:rel:{hexhash('rel2')}\n"), call(f"swh:1:rel:{hexhash('rel3')}\n"), call(f"swh:1:rel:{hexhash('rel4')}\n"), ] assert edge_writer.mock_calls == [ call(f"swh:1:rel:{hexhash('rel1')} swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:rel:{hexhash('rel2')} swh:1:rel:{hexhash('rel1')}\n"), call(f"swh:1:rel:{hexhash('rel3')} swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:rel:{hexhash('rel4')} swh:1:cnt:{hexhash('cnt1')}\n"), ] def test_export_revision(exporter): node_writer, edge_writer = exporter( { "revision": [ { **TEST_REVISION, "id": binhash("rev1"), "directory": binhash("dir1"), - "parents": [binhash("rev2"), binhash("rev3"),], + "parents": [binhash("rev2"), binhash("rev3")], }, { **TEST_REVISION, "id": binhash("rev2"), "directory": binhash("dir2"), "parents": [], }, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:rev:{hexhash('rev2')}\n"), ] assert edge_writer.mock_calls == [ call(f"swh:1:rev:{hexhash('rev1')} swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}\n"), call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev3')}\n"), call(f"swh:1:rev:{hexhash('rev2')} swh:1:dir:{hexhash('dir2')}\n"), ] def test_export_directory(exporter): node_writer, edge_writer = exporter( { "directory": [ { "id": binhash("dir1"), "entries": [ { "type": "file", "target": binhash("cnt1"), "name": b"cnt1", "perms": 0o644, }, { "type": "dir", "target": binhash("dir2"), "name": b"dir2", "perms": 0o755, }, { "type": "rev", "target": binhash("rev1"), "name": b"rev1", "perms": 0o160000, }, ], }, - {"id": binhash("dir2"), "entries": [],}, + {"id": binhash("dir2"), "entries": []}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:dir:{hexhash('dir2')}\n"), ] assert edge_writer.mock_calls == [ call( f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}" f" {b64e('cnt1')} {0o644}\n" ), call( f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir2')}" f" {b64e('dir2')} {0o755}\n" ), call( f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('rev1')} {0o160000}\n" ), ] def test_export_content(exporter): node_writer, edge_writer = exporter( { "content": [ - {**TEST_CONTENT, "sha1_git": binhash("cnt1"),}, - {**TEST_CONTENT, "sha1_git": binhash("cnt2"),}, + {**TEST_CONTENT, "sha1_git": binhash("cnt1")}, + {**TEST_CONTENT, "sha1_git": binhash("cnt2")}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:cnt:{hexhash('cnt1')}\n"), call(f"swh:1:cnt:{hexhash('cnt2')}\n"), ] assert edge_writer.mock_calls == [] def zstwrite(fp, lines): with ZSTFile(fp, "w") as writer: for line in lines: writer.write(line + "\n") def zstread(fp): with ZSTFile(fp, "r") as reader: return reader.read() def test_sort_pipeline(tmp_path): short_type_mapping = { "origin_visit_status": "ori", "snapshot": "snp", "release": "rel", "revision": "rev", "directory": "dir", "content": "cnt", } input_nodes = [ f"swh:1:{short}:{hexhash(short + str(x))}" for short in short_type_mapping.values() for x in range(4) ] input_edges = [ f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}", f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}", f"swh:1:ori:{hexhash('ori3')} swh:1:snp:{hexhash('snp3')}", f"swh:1:ori:{hexhash('ori4')} swh:1:snp:{hexhash('snpX')}", # missing dest f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')} {b64e('dup1')}", f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')} {b64e('dup2')}", f"swh:1:snp:{hexhash('snp3')} swh:1:cnt:{hexhash('cnt1')} {b64e('c1')}", f"swh:1:snp:{hexhash('snp4')} swh:1:rel:{hexhash('rel1')} {b64e('r1')}", f"swh:1:rel:{hexhash('rel1')} swh:1:rel:{hexhash('rel2')}", f"swh:1:rel:{hexhash('rel2')} swh:1:rev:{hexhash('rev1')}", f"swh:1:rel:{hexhash('rel3')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rel:{hexhash('rel4')} swh:1:dir:{hexhash('dir1')}", f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rev:{hexhash('rev2')} swh:1:rev:{hexhash('revX')}", # missing dest f"swh:1:rev:{hexhash('rev3')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rev:{hexhash('rev4')} swh:1:dir:{hexhash('dir1')}", f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')} {b64e('c1')} 42", f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir1')} {b64e('d1')} 1337", f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')} {b64e('r1')} 0", ] for obj_type, short_obj_type in short_type_mapping.items(): p = tmp_path / obj_type p.mkdir() edges = [e for e in input_edges if e.startswith(f"swh:1:{short_obj_type}")] zstwrite(p / "00.edges.csv.zst", edges[0::2]) zstwrite(p / "01.edges.csv.zst", edges[1::2]) nodes = [n for n in input_nodes if n.startswith(f"swh:1:{short_obj_type}")] zstwrite(p / "00.nodes.csv.zst", nodes[0::2]) zstwrite(p / "01.nodes.csv.zst", nodes[1::2]) sort_graph_nodes(tmp_path, config={"sort_buffer_size": "1M"}) output_nodes = zstread(tmp_path / "graph.nodes.csv.zst").split("\n") output_edges = zstread(tmp_path / "graph.edges.csv.zst").split("\n") output_labels = zstread(tmp_path / "graph.labels.csv.zst").split("\n") output_nodes = list(filter(bool, output_nodes)) output_edges = list(filter(bool, output_edges)) output_labels = list(filter(bool, output_labels)) expected_nodes = set(input_nodes) | set(e.split()[1] for e in input_edges) assert output_nodes == sorted(expected_nodes) assert int((tmp_path / "graph.nodes.count.txt").read_text()) == len(expected_nodes) assert sorted(output_edges) == sorted(input_edges) assert int((tmp_path / "graph.edges.count.txt").read_text()) == len(input_edges) expected_labels = set(e[2] for e in [e.split() for e in input_edges] if len(e) > 2) assert output_labels == sorted(expected_labels) actual_node_stats = (tmp_path / "graph.nodes.stats.txt").read_text().strip() expected_node_stats = "\n".join( sorted( "{} {}".format(k, v) for k, v in collections.Counter( node.split(":")[2] for node in expected_nodes ).items() ) ) assert actual_node_stats == expected_node_stats actual_edge_stats = (tmp_path / "graph.edges.stats.txt").read_text().strip() expected_edge_stats = "\n".join( sorted( "{} {}".format(k, v) for k, v in collections.Counter( "{}:{}".format(edge.split(":")[2], edge.split(":")[5]) for edge in input_edges ).items() ) ) assert actual_edge_stats == expected_edge_stats diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py index 0101f93..39bf829 100644 --- a/swh/dataset/utils.py +++ b/swh/dataset/utils.py @@ -1,85 +1,105 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import sqlite3 import subprocess class ZSTFile: """ Object-like wrapper around a ZST file. Uses a subprocess of the "zstd" command to compress and deflate the objects. """ def __init__(self, path: str, mode: str = "r"): if mode not in ("r", "rb", "w", "wb"): raise ValueError(f"ZSTFile mode {mode} is invalid.") self.path = path self.mode = mode def __enter__(self) -> "ZSTFile": is_text = not (self.mode in ("rb", "wb")) writing = self.mode in ("w", "wb") if writing: cmd = ["zstd", "-q", "-o", self.path] else: cmd = ["zstdcat", self.path] self.process = subprocess.Popen( cmd, text=is_text, stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) return self def __exit__(self, exc_type, exc_value, tb): self.process.stdin.close() self.process.stdout.close() self.process.wait() def read(self, *args): return self.process.stdout.read(*args) def write(self, buf): self.process.stdin.write(buf) class SQLiteSet: """ On-disk Set object for hashes using SQLite as an indexer backend. Used to deduplicate objects when processing large queues with duplicates. """ def __init__(self, db_path): self.db_path = db_path def __enter__(self): self.db = sqlite3.connect(str(self.db_path)) self.db.execute( "CREATE TABLE IF NOT EXISTS" " tmpset (val TEXT NOT NULL PRIMARY KEY)" " WITHOUT ROWID" ) self.db.execute("PRAGMA synchronous = OFF") self.db.execute("PRAGMA journal_mode = OFF") return self def __exit__(self, exc_type, exc_val, exc_tb): self.db.commit() self.db.close() def add(self, v: bytes) -> bool: """ Add an item to the set. Args: v: The value to add to the set. Returns: True if the value was added to the set, False if it was already present. """ try: self.db.execute("INSERT INTO tmpset(val) VALUES (?)", (v.hex(),)) except sqlite3.IntegrityError: return False else: return True + + +def remove_pull_requests(snapshot): + """ + Heuristic to filter out pull requests in snapshots: remove all branches + that start with refs/ but do not start with refs/heads or refs/tags. + """ + # Copy the items with list() to remove items during iteration + for branch_name, branch in list(snapshot["branches"].items()): + original_branch_name = branch_name + while branch and branch.get("target_type") == "alias": + branch_name = branch["target"] + branch = snapshot["branches"][branch_name] + if branch is None or not branch_name: + continue + if branch_name.startswith(b"refs/") and not ( + branch_name.startswith(b"refs/heads") + or branch_name.startswith(b"refs/tags") + ): + snapshot["branches"].pop(original_branch_name)