diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -11,5 +11,11 @@ [mypy-pytest.*] ignore_missing_imports = True +[mypy-tqdm.*] +ignore_missing_imports = True + +[mypy-confluent_kafka.*] +ignore_missing_imports = True + # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html click +tqdm diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py new file mode 100644 --- /dev/null +++ b/swh/dataset/cli.py @@ -0,0 +1,54 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +import uuid + +from swh.core import config +from swh.core.cli import CONTEXT_SETTINGS +from swh.dataset.graph import export_edges, sort_graph_nodes + + +@click.group(name='dataset', context_settings=CONTEXT_SETTINGS) +@click.option('--config-file', '-C', default=None, + type=click.Path(exists=True, dir_okay=False), + help="Configuration file.") +@click.pass_context +def cli(ctx, config_file): + '''Software Heritage Dataset Tools''' + ctx.ensure_object(dict) + + conf = config.read(config_file) + ctx.obj['config'] = conf + + +@cli.group('graph') +@click.pass_context +def graph(ctx): + """Manage graph edges export""" + pass + + +@graph.command('export') +@click.argument('export-path', type=click.Path()) +@click.option('--export-id', '-e', help="Unique ID of the export run.") +@click.option('--processes', '-p', default=1, + help="Number of parallel processes") +@click.pass_context +def export_graph(ctx, export_path, export_id, processes): + """Export the Software Heritage graph as an edge dataset.""" + config = ctx.obj['config'] + if not export_id: + export_id = str(uuid.uuid4()) + + export_edges(config, export_path, export_id, processes) + + +@graph.command('sort') +@click.argument('export-path', type=click.Path()) +@click.pass_context +def sort_graph(ctx, export_path): + config = ctx.obj['config'] + sort_graph_nodes(export_path, config) diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py new file mode 100644 --- /dev/null +++ b/swh/dataset/exporter.py @@ -0,0 +1,232 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import concurrent.futures +import multiprocessing +import time +import tqdm +from typing import Mapping, Sequence, Tuple +from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor +from confluent_kafka import TopicPartition + +from swh.journal.client import JournalClient + + +class JournalClientOffsetRanges(JournalClient): + """ + A subclass of JournalClient reading only inside some specific offset + range. Partition assignments have to be manually given to the class. + + This client can only read a single topic at a time. + """ + def __init__( + self, + *args, + offset_ranges: Mapping[int, Tuple[int, int]] = None, + assignment: Sequence[int] = None, + progress_queue: multiprocessing.Queue = None, + refresh_every: int = 200, + **kwargs, + ): + """ + Args: + offset_ranges: A mapping of partition_id -> (low, high) offsets + that define the boundaries of the messages to consume. + assignment: The list of partitions to assign to this client. + progress_queue: a multiprocessing.Queue where the current + progress will be reported. + refresh_every: the refreshing rate of the progress reporting. + """ + self.offset_ranges = offset_ranges + self.progress_queue = progress_queue + self.refresh_every = refresh_every + self.assignment = assignment + self.count = None + self.topic_name = None + super().__init__(*args, **kwargs) + + def subscribe(self): + self.topic_name = self.subscription[0] + time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 + self.consumer.assign([ + TopicPartition(self.topic_name, pid) for pid in self.assignment + ]) + + def process(self, *args, **kwargs): + self.count = 0 + try: + self.handle_committed_offsets() + super().process(*args, **kwargs) + except EOFError: + self.progress_queue.put(None) + + def handle_committed_offsets(self, ): + """ + Handle already committed partition offsets before starting processing. + """ + committed = self.consumer.committed([ + TopicPartition(self.topic_name, pid) for pid in self.assignment + ]) + for tp in committed: + self.handle_offset(tp.partition, tp.offset) + + def handle_offset(self, partition_id, offset): + """ + Check whether the client has reached the end of the current + partition, and trigger a reassignment if that is the case. + Raise EOFError if all the partitions have reached the end. + """ + if offset < 0: # Uninitialized partition offset + return + + if self.count % self.refresh_every == 0: + self.progress_queue.put({partition_id: offset}) + + if offset >= self.offset_ranges[partition_id][1] - 1: + self.assignment = [pid for pid in self.assignment + if pid != partition_id] + self.subscribe() + + if not self.assignment: + raise EOFError + + def deserialize_message(self, message): + """ + Override of the message deserialization to hook the handling of the + message offset. + """ + self.handle_offset(message.partition(), message.offset()) + self.count += 1 + return super().deserialize_message(message) + + +class ParallelExporter: + """ + Base class for all the Journal exporters. + + Each exporter should override the `export_worker` function with an + implementation of how to run the message processing. + """ + def __init__(self, config, export_id: str, obj_type, processes=1): + """ + Args: + config: the exporter config, which should also include the + JournalClient configuration. + export_id: a unique identifier for the export that will be used + as part of a Kafka consumer group ID. + obj_type: The type of SWH object to export. + processes: The number of processes to run. + """ + self.config = config + self.export_id = 'swh-dataset-export-{}'.format(export_id) + self.obj_type = obj_type + self.processes = processes + self.offsets = None + + def get_offsets(self): + """ + First pass to fetch all the current low and high offsets of each + partition to define the consumption boundaries. + """ + if self.offsets is None: + client = JournalClient( + **self.config['journal'], + object_types=[self.obj_type], + group_id=self.export_id, + ) + topic_name = client.subscription[0] + topics = client.consumer.list_topics(topic_name).topics + partitions = topics[topic_name].partitions + + self.offsets = {} + for partition_id in tqdm.tqdm(partitions.keys(), + desc=" - Partition offsets"): + tp = TopicPartition(topic_name, partition_id) + (lo, hi) = client.consumer.get_watermark_offsets(tp) + self.offsets[partition_id] = (lo, hi) + return self.offsets + + def run(self, *args): + """ + Run the parallel export. + """ + offsets = self.get_offsets() + to_assign = list(offsets.keys()) + + manager = multiprocessing.Manager() + q = manager.Queue() + + with ProcessPoolExecutor(self.processes + 1) as pool: + futures = [] + for i in range(self.processes): + futures.append(pool.submit( + self.export_worker, + *args, + assignment=to_assign[i::self.processes], + queue=q + )) + futures.append(pool.submit(self.progress_worker, queue=q)) + + concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION) + for f in futures: + if f.running(): + continue + exc = f.exception() + if exc: + pool.shutdown(wait=False) + f.result() + raise exc + + def progress_worker(self, *args, queue=None): + """ + An additional worker process that reports the current progress of the + export between all the different parallel consumers and across all the + partitions, by consuming the shared progress reporting Queue. + """ + d = {} + active_workers = self.processes + offset_diff = sum((hi - lo) for lo, hi in self.offsets.values()) + with tqdm.tqdm(total=offset_diff, desc=" - Journal export") as pbar: + while active_workers: + item = queue.get() + if item is None: + active_workers -= 1 + continue + d.update(item) + progress = sum(n - self.offsets[p][0] for p, n in d.items()) + pbar.set_postfix(active_workers=active_workers, + total_workers=self.processes) + pbar.update(progress - pbar.n) + + def process(self, callback, assignment=None, queue=None): + client = JournalClientOffsetRanges( + **self.config['journal'], + object_types=[self.obj_type], + group_id=self.export_id, + debug='cgrp,broker', + offset_ranges=self.offsets, + assignment=assignment, + progress_queue=queue, + ) + client.process(callback) + + def export_worker(self, *args, **kwargs): + """ + Override this with a custom implementation of a worker function. + + A worker function should call `self.process(fn, **kwargs)` with `fn` + being a callback that will be called in the same fashion as with + `JournalClient.process()`. + + A simple exporter to print all the objects in the log would look like + this: + + ``` + class PrintExporter(ParallelExporter): + def export_worker(self, **kwargs): + self.process(print, **kwargs) + ``` + """ + raise NotImplementedError diff --git a/swh/dataset/graph.py b/swh/dataset/graph.py new file mode 100644 --- /dev/null +++ b/swh/dataset/graph.py @@ -0,0 +1,197 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools +import os +import os.path +import pathlib +import shlex +import subprocess +import tempfile +import uuid + +from swh.dataset.exporter import ParallelExporter +from swh.dataset.utils import ZSTFile +from swh.model.identifiers import origin_identifier, persistent_identifier +from swh.journal.fixer import fix_objects + + +def process_messages(messages, config, node_writer, edge_writer): + """ + Args: + messages: A sequence of messages to process + config: The exporter configuration + node_writer: A file-like object where to write nodes + edge_writer: A file-like object where to write edges + """ + def write_node(node): + node_type, node_id = node + if node_id is None: + return + node_pid = persistent_identifier(object_type=node_type, + object_id=node_id) + node_writer.write('{}\n'.format(node_pid)) + + def write_edge(src, dst): + src_type, src_id = src + dst_type, dst_id = dst + if src_id is None or dst_id is None: + return + src_pid = persistent_identifier(object_type=src_type, object_id=src_id) + dst_pid = persistent_identifier(object_type=dst_type, object_id=dst_id) + edge_writer.write('{} {}\n'.format(src_pid, dst_pid)) + + messages = {k: fix_objects(k, v) for k, v in messages.items()} + + for visit in messages.get('origin_visit', []): + origin_id = origin_identifier({'url': visit['origin']}) + write_node(('origin', origin_id)) + write_edge(('origin', origin_id), ('snapshot', visit['snapshot'])) + + for snapshot in messages.get('snapshot', []): + write_node(('snapshot', snapshot['id'])) + for branch_name, branch in snapshot['branches'].items(): + while branch and branch.get('target_type') == 'alias': + branch_name = branch['target'] + branch = snapshot['branches'][branch_name] + if branch is None or not branch_name: + continue + if (config.get('remove_pull_requests') + and (branch_name.startswith(b'refs/pull') + or branch_name.startswith(b'refs/merge-requests'))): + continue + write_edge(('snapshot', snapshot['id']), + (branch['target_type'], branch['target'])) + + for release in messages.get('release', []): + write_node(('release', release['id'])) + write_edge(('release', release['id']), + (release['target_type'], release['target'])) + + for revision in messages.get('revision', []): + write_node(('revision', revision['id'])) + write_edge(('revision', revision['id']), + ('directory', revision['directory'])) + for parent in revision['parents']: + write_edge(('revision', revision['id']), + ('revision', parent)) + + for directory in messages.get('directory', []): + write_node(('directory', directory['id'])) + for entry in directory['entries']: + entry_type_mapping = { + 'file': 'content', + 'dir': 'directory', + 'rev': 'revision' + } + write_edge(('directory', directory['id']), + (entry_type_mapping[entry['type']], entry['target'])) + + for content in messages.get('content', []): + write_node(('content', content['sha1_git'])) + + +class GraphEdgeExporter(ParallelExporter): + """ + Implementation of ParallelExporter which writes all the graph edges + of a specific type in a Zstandard-compressed CSV file. + + Each row of the CSV is in the format: ` + """ + def export_worker(self, export_path, **kwargs): + dataset_path = pathlib.Path(export_path) + dataset_path.mkdir(exist_ok=True, parents=True) + nodes_file = dataset_path / ('graph-{}.nodes.csv.zst' + .format(str(uuid.uuid4()))) + edges_file = dataset_path / ('graph-{}.edges.csv.zst' + .format(str(uuid.uuid4()))) + + with \ + ZSTFile(nodes_file, 'w') as nodes_writer, \ + ZSTFile(edges_file, 'w') as edges_writer: + process_fn = functools.partial( + process_messages, + config=self.config, + nodes_writer=nodes_writer, + edges_writer=edges_writer + ) + self.process(process_fn, **kwargs) + + +def export_edges(config, export_path, export_id, processes): + """Run the edge exporter for each edge type.""" + object_types = [ + 'origin_visit', + 'snapshot', + 'release', + 'revision', + 'directory', + ] + for obj_type in object_types: + print('{} edges:'.format(obj_type)) + exporter = GraphEdgeExporter(config, export_id, obj_type, processes) + exporter.run(os.path.join(export_path, obj_type)) + + +def sort_graph_nodes(export_path, config): + """ + Generate the node list from the edges files. + + We cannot solely rely on the object IDs that are read in the journal, + as some nodes that are referred to as destinations in the edge file + might not be present in the archive (e.g a rev_entry referring to a + revision that we do not have crawled yet). + + The most efficient way of getting all the nodes that are mentioned in + the edges file is therefore to use sort(1) on the gigantic edge files + to get all the unique node IDs, while using the disk as a temporary + buffer. + + This pipeline does, in order: + + - concatenate and write all the compressed edges files in + graph.edges.csv.zst (using the fact that ZST compression is an additive + function) ; + - deflate the edges ; + - count the number of edges and write it in graph.edges.count.txt ; + - concatenate all the (deflated) nodes from the export with the + destination edges, and sort the output to get the list of unique graph + nodes ; + - count the number of unique graph nodes and write it in + graph.nodes.count.txt ; + - compress and write the resulting nodes in graph.nodes.csv.zst. + """ + # Use bytes for the sorting algorithm (faster than being locale-specific) + env = { + **os.environ.copy(), + 'LC_ALL': 'C', + 'LC_COLLATE': 'C', + 'LANG': 'C', + } + sort_buffer_size = config.get('sort_buffer_size', '4G') + disk_buffer_dir = config.get('disk_buffer_dir', export_path) + with tempfile.TemporaryDirectory(prefix='.graph_node_sort_', + dir=disk_buffer_dir) as buffer_path: + subprocess.run( + [ + "bash", + "-c", + ("pv {export_path}/*/*.edges.csv.zst | " + "tee {export_path}/graph.edges.csv.zst |" + "zstdcat |" + "tee >( wc -l > {export_path}/graph.edges.count.txt ) |" + "cut -d' ' -f2 | " + "cat - <( zstdcat {export_path}/*/*.nodes.csv.zst ) | " + "sort -u -S{sort_buffer_size} -T{buffer_path} | " + "tee >( wc -l > {export_path}/graph.nodes.count.txt ) |" + "zstdmt > {export_path}/graph.nodes.csv.zst") + .format( + export_path=shlex.quote(str(export_path)), + buffer_path=shlex.quote(str(buffer_path)), + sort_buffer_size=shlex.quote(sort_buffer_size), + ), + ], + env=env, + ) diff --git a/swh/dataset/test/test_graph.py b/swh/dataset/test/test_graph.py new file mode 100644 --- /dev/null +++ b/swh/dataset/test/test_graph.py @@ -0,0 +1,437 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import hashlib +from typing import Tuple + +import pytest +from unittest.mock import Mock, call + +from swh.dataset.graph import process_messages, sort_graph_nodes +from swh.dataset.utils import ZSTFile +from swh.model.hashutil import MultiHash, hash_to_bytes + +DATE = { + "timestamp": {"seconds": 1234567891, "microseconds": 0,}, + "offset": 120, + "negative_utc": False, +} + +TEST_CONTENT = { + **MultiHash.from_data(b"foo").digest(), + "length": 3, + "status": "visible", +} + +TEST_REVISION = { + "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), + "message": b"hello", + "date": DATE, + "committer": {"fullname": b"foo", "name": b"foo", "email": b""}, + "author": {"fullname": b"foo", "name": b"foo", "email": b""}, + "committer_date": DATE, + "type": "git", + "directory": b"\x01" * 20, + "synthetic": False, + "metadata": None, + "parents": [], +} + +TEST_RELEASE = { + "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + "name": b"v0.0.1", + "date": { + "timestamp": {"seconds": 1234567890, "microseconds": 0,}, + "offset": 120, + "negative_utc": False, + }, + "author": {"author": {"fullname": b"foo", "name": b"foo", "email": b""}}, + "target_type": "revision", + "target": b"\x04" * 20, + "message": b"foo", + "synthetic": False, +} + +TEST_ORIGIN = {"url": "https://somewhere.org/den/fox"} + +TEST_ORIGIN_VISIT = { + "origin": TEST_ORIGIN["url"], + "date": "2013-05-07 04:20:39.369271+00:00", + "snapshot": None, # TODO + "status": "ongoing", # TODO + "metadata": {"foo": "bar"}, + "type": "git", +} + + +@pytest.fixture +def exporter(): + def wrapped(messages, config=None) -> Tuple[Mock, Mock]: + if config is None: + config = {} + node_writer = Mock() + edge_writer = Mock() + process_messages( + messages, config=config, node_writer=node_writer, edge_writer=edge_writer, + ) + return node_writer.write, edge_writer.write + + return wrapped + + +def binhash(s): + return hashlib.sha1(s.encode()).digest() + + +def hexhash(s): + return hashlib.sha1(s.encode()).hexdigest() + + +def test_export_origin_visits(exporter): + node_writer, edge_writer = exporter( + { + "origin_visit": [ + { + **TEST_ORIGIN_VISIT, + "origin": {"url": "ori1"}, + "snapshot": binhash("snp1"), + }, + { + **TEST_ORIGIN_VISIT, + "origin": {"url": "ori2"}, + "snapshot": binhash("snp2"), + }, + ] + } + ) + assert node_writer.mock_calls == [ + call(f"swh:1:ori:{hexhash('ori1')}\n"), + call(f"swh:1:ori:{hexhash('ori2')}\n"), + ] + assert edge_writer.mock_calls == [ + call(f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}\n"), + call(f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}\n"), + ] + + +def test_export_snapshot_simple(exporter): + node_writer, edge_writer = exporter( + { + "snapshot": [ + { + "id": binhash("snp1"), + "branches": { + b"refs/heads/master": { + "target": binhash("rev1"), + "target_type": "revision", + }, + b"HEAD": {"target": binhash("rev1"), "target_type": "revision"}, + }, + }, + { + "id": binhash("snp2"), + "branches": { + b"refs/heads/master": { + "target": binhash("rev1"), + "target_type": "revision", + }, + b"HEAD": {"target": binhash("rev2"), "target_type": "revision"}, + b"bcnt": {"target": binhash("cnt1"), "target_type": "content"}, + b"bdir": { + "target": binhash("dir1"), + "target_type": "directory", + }, + b"brel": {"target": binhash("rel1"), "target_type": "release"}, + b"bsnp": {"target": binhash("snp1"), "target_type": "snapshot"}, + }, + }, + {"id": binhash("snp3"), "branches": {}}, + ] + } + ) + assert node_writer.mock_calls == [ + call(f"swh:1:snp:{hexhash('snp1')}\n"), + call(f"swh:1:snp:{hexhash('snp2')}\n"), + call(f"swh:1:snp:{hexhash('snp3')}\n"), + ] + assert edge_writer.mock_calls == [ + call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"), + call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"), + call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev1')}\n"), + call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev2')}\n"), + call(f"swh:1:snp:{hexhash('snp2')} swh:1:cnt:{hexhash('cnt1')}\n"), + call(f"swh:1:snp:{hexhash('snp2')} swh:1:dir:{hexhash('dir1')}\n"), + call(f"swh:1:snp:{hexhash('snp2')} swh:1:rel:{hexhash('rel1')}\n"), + call(f"swh:1:snp:{hexhash('snp2')} swh:1:snp:{hexhash('snp1')}\n"), + ] + + +def test_export_snapshot_aliases(exporter): + node_writer, edge_writer = exporter( + { + "snapshot": [ + { + "id": binhash("snp1"), + "branches": { + b"origin_branch": { + "target": binhash("rev1"), + "target_type": "revision", + }, + b"alias1": {"target": b"origin_branch", "target_type": "alias"}, + b"alias2": {"target": b"alias1", "target_type": "alias"}, + b"alias3": {"target": b"alias2", "target_type": "alias"}, + }, + }, + ] + } + ) + assert node_writer.mock_calls == [call(f"swh:1:snp:{hexhash('snp1')}\n")] + assert edge_writer.mock_calls == ( + [call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n")] * 4 + ) + + +def test_export_snapshot_no_pull_requests(exporter): + snp = { + "id": binhash("snp1"), + "branches": { + b"refs/heads/master": { + "target": binhash("rev1"), + "target_type": "revision", + }, + b"refs/pull/42": {"target": binhash("rev2"), "target_type": "revision"}, + b"refs/merge-requests/lol": { + "target": binhash("rev3"), + "target_type": "revision", + }, + }, + } + + node_writer, edge_writer = exporter({"snapshot": [snp]}) + assert edge_writer.mock_calls == [ + call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"), + call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev2')}\n"), + call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev3')}\n"), + ] + + node_writer, edge_writer = exporter( + {"snapshot": [snp]}, config={"remove_pull_requests": True} + ) + assert edge_writer.mock_calls == [ + call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"), + ] + + +def test_export_releases(exporter): + node_writer, edge_writer = exporter( + { + "release": [ + { + **TEST_RELEASE, + "id": binhash("rel1"), + "target": binhash("rev1"), + "target_type": "revision", + }, + { + **TEST_RELEASE, + "id": binhash("rel2"), + "target": binhash("rel1"), + "target_type": "release", + }, + { + **TEST_RELEASE, + "id": binhash("rel3"), + "target": binhash("dir1"), + "target_type": "directory", + }, + { + **TEST_RELEASE, + "id": binhash("rel4"), + "target": binhash("cnt1"), + "target_type": "content", + }, + ] + } + ) + assert node_writer.mock_calls == [ + call(f"swh:1:rel:{hexhash('rel1')}\n"), + call(f"swh:1:rel:{hexhash('rel2')}\n"), + call(f"swh:1:rel:{hexhash('rel3')}\n"), + call(f"swh:1:rel:{hexhash('rel4')}\n"), + ] + assert edge_writer.mock_calls == [ + call(f"swh:1:rel:{hexhash('rel1')} swh:1:rev:{hexhash('rev1')}\n"), + call(f"swh:1:rel:{hexhash('rel2')} swh:1:rel:{hexhash('rel1')}\n"), + call(f"swh:1:rel:{hexhash('rel3')} swh:1:dir:{hexhash('dir1')}\n"), + call(f"swh:1:rel:{hexhash('rel4')} swh:1:cnt:{hexhash('cnt1')}\n"), + ] + + +def test_export_revision(exporter): + node_writer, edge_writer = exporter( + { + "revision": [ + { + **TEST_REVISION, + "id": binhash("rev1"), + "directory": binhash("dir1"), + "parents": [ + binhash("rev2"), + binhash("rev3"), + ], + }, + { + **TEST_REVISION, + "id": binhash("rev2"), + "directory": binhash("dir2"), + "parents": [], + }, + ] + } + ) + assert node_writer.mock_calls == [ + call(f"swh:1:rev:{hexhash('rev1')}\n"), + call(f"swh:1:rev:{hexhash('rev2')}\n"), + ] + assert edge_writer.mock_calls == [ + call(f"swh:1:rev:{hexhash('rev1')} swh:1:dir:{hexhash('dir1')}\n"), + call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}\n"), + call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev3')}\n"), + + call(f"swh:1:rev:{hexhash('rev2')} swh:1:dir:{hexhash('dir2')}\n"), + ] + + +def test_export_directory(exporter): + node_writer, edge_writer = exporter( + { + "directory": [ + { + "id": binhash("dir1"), + "entries": [ + {"type": "file", "target": binhash("cnt1")}, + {"type": "dir", "target": binhash("dir2")}, + {"type": "rev", "target": binhash("rev1")}, + ], + }, + { + "id": binhash("dir2"), + "entries": [], + }, + ] + } + ) + assert node_writer.mock_calls == [ + call(f"swh:1:dir:{hexhash('dir1')}\n"), + call(f"swh:1:dir:{hexhash('dir2')}\n"), + ] + assert edge_writer.mock_calls == [ + call(f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}\n"), + call(f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir2')}\n"), + call(f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}\n"), + ] + + +def test_export_content(exporter): + node_writer, edge_writer = exporter( + { + "content": [ + { + **TEST_CONTENT, + "sha1_git": binhash("cnt1"), + }, + { + **TEST_CONTENT, + "sha1_git": binhash("cnt2"), + }, + ] + } + ) + assert node_writer.mock_calls == [ + call(f"swh:1:cnt:{hexhash('cnt1')}\n"), + call(f"swh:1:cnt:{hexhash('cnt2')}\n"), + ] + assert edge_writer.mock_calls == [] + + +def zstwrite(fp, lines): + with ZSTFile(fp, 'w') as writer: + for l in lines: + writer.write(l + "\n") + + +def zstread(fp): + with ZSTFile(fp, 'r') as reader: + return reader.read() + + +def test_sort_pipeline(tmp_path): + short_type_mapping = { + 'origin_visit': 'ori', + 'snapshot': 'snp', + 'release': 'rel', + 'revision': 'rev', + 'directory': 'dir', + 'content': 'cnt', + } + + input_nodes = [ + f"swh:1:{short}:{hexhash(short + str(x))}" + for short in short_type_mapping.values() + for x in range(4) + ] + + input_edges = [ + f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}", + f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}", + f"swh:1:ori:{hexhash('ori3')} swh:1:snp:{hexhash('snp3')}", + f"swh:1:ori:{hexhash('ori4')} swh:1:snp:{hexhash('snpX')}", # missing dest + + f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup + f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup + f"swh:1:snp:{hexhash('snp3')} swh:1:cnt:{hexhash('cnt1')}", + f"swh:1:snp:{hexhash('snp4')} swh:1:rel:{hexhash('rel1')}", + + f"swh:1:rel:{hexhash('rel1')} swh:1:rel:{hexhash('rel2')}", + f"swh:1:rel:{hexhash('rel2')} swh:1:rev:{hexhash('rev1')}", + f"swh:1:rel:{hexhash('rel3')} swh:1:rev:{hexhash('rev2')}", + f"swh:1:rel:{hexhash('rel4')} swh:1:dir:{hexhash('dir1')}", + + f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup + f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup + f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}", + f"swh:1:rev:{hexhash('rev2')} swh:1:rev:{hexhash('revX')}", # missing dest + f"swh:1:rev:{hexhash('rev3')} swh:1:rev:{hexhash('rev2')}", + f"swh:1:rev:{hexhash('rev4')} swh:1:dir:{hexhash('dir1')}", + + f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}", + f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir1')}", + f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}", + ] + + for obj_type, short_obj_type in short_type_mapping.items(): + p = (tmp_path / obj_type) + p.mkdir() + edges = [e for e in input_edges if e.startswith(f"swh:1:{short_obj_type}")] + zstwrite(p / '00.edges.csv.zst', edges[0::2]) + zstwrite(p / '01.edges.csv.zst', edges[1::2]) + + nodes = [n for n in input_nodes if n.startswith(f"swh:1:{short_obj_type}")] + zstwrite(p / '00.nodes.csv.zst', nodes[0::2]) + zstwrite(p / '01.nodes.csv.zst', nodes[1::2]) + + sort_graph_nodes(tmp_path, config={'sort_buffer_size': '1M'}) + + output_nodes = zstread(tmp_path / "graph.nodes.csv.zst").split("\n") + output_edges = zstread(tmp_path / "graph.edges.csv.zst").split("\n") + output_nodes = list(filter(bool, output_nodes)) + output_edges = list(filter(bool, output_edges)) + + expected_nodes = set(input_nodes) | set(l.split()[1] for l in input_edges) + assert output_nodes == sorted(expected_nodes) + assert int((tmp_path / 'graph.nodes.count.txt').read_text()) == len(expected_nodes) + + assert sorted(output_edges) == sorted(input_edges) + assert int((tmp_path / 'graph.edges.count.txt').read_text()) == len(input_edges) diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py new file mode 100644 --- /dev/null +++ b/swh/dataset/utils.py @@ -0,0 +1,40 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import subprocess + + +class ZSTFile: + def __init__(self, path, mode='r'): + if mode not in ('r', 'rb', 'w', 'wb'): + raise ValueError(f"ZSTFile mode {mode} is invalid.") + self.path = path + self.mode = mode + + def __enter__(self): + is_text = not (self.mode in ('rb', 'wb')) + writing = self.mode in ('w', 'wb') + if writing: + cmd = ['zstd', '-q', '-o', self.path] + else: + cmd = ['zstdcat', self.path] + self.process = subprocess.Popen( + cmd, + text=is_text, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + return self + + def __exit__(self, exc_type, exc_value, tb): + self.process.stdin.close() + self.process.stdout.close() + self.process.wait() + + def read(self, *args): + return self.process.stdout.read(*args) + + def write(self, buf): + self.process.stdin.write(buf)