Page MenuHomeSoftware Heritage

D3011.id11085.diff
No OneTemporary

D3011.id11085.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -11,5 +11,11 @@
[mypy-pytest.*]
ignore_missing_imports = True
+[mypy-tqdm.*]
+ignore_missing_imports = True
+
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
# [mypy-add_your_lib_here.*]
# ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,3 +2,4 @@
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
click
+tqdm
diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/cli.py
@@ -0,0 +1,54 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import click
+import uuid
+
+from swh.core import config
+from swh.core.cli import CONTEXT_SETTINGS
+from swh.dataset.graph import export_edges, sort_graph_nodes
+
+
+@click.group(name='dataset', context_settings=CONTEXT_SETTINGS)
+@click.option('--config-file', '-C', default=None,
+ type=click.Path(exists=True, dir_okay=False),
+ help="Configuration file.")
+@click.pass_context
+def cli(ctx, config_file):
+ '''Software Heritage Dataset Tools'''
+ ctx.ensure_object(dict)
+
+ conf = config.read(config_file)
+ ctx.obj['config'] = conf
+
+
+@cli.group('graph')
+@click.pass_context
+def graph(ctx):
+ """Manage graph edges export"""
+ pass
+
+
+@graph.command('export')
+@click.argument('export-path', type=click.Path())
+@click.option('--export-id', '-e', help="Unique ID of the export run.")
+@click.option('--processes', '-p', default=1,
+ help="Number of parallel processes")
+@click.pass_context
+def export_graph(ctx, export_path, export_id, processes):
+ """Export the Software Heritage graph as an edge dataset."""
+ config = ctx.obj['config']
+ if not export_id:
+ export_id = str(uuid.uuid4())
+
+ export_edges(config, export_path, export_id, processes)
+
+
+@graph.command('sort')
+@click.argument('export-path', type=click.Path())
+@click.pass_context
+def sort_graph(ctx, export_path):
+ config = ctx.obj['config']
+ sort_graph_nodes(export_path, config)
diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/exporter.py
@@ -0,0 +1,232 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import concurrent.futures
+import multiprocessing
+import time
+import tqdm
+from typing import Mapping, Sequence, Tuple
+from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor
+from confluent_kafka import TopicPartition
+
+from swh.journal.client import JournalClient
+
+
+class JournalClientOffsetRanges(JournalClient):
+ """
+ A subclass of JournalClient reading only inside some specific offset
+ range. Partition assignments have to be manually given to the class.
+
+ This client can only read a single topic at a time.
+ """
+ def __init__(
+ self,
+ *args,
+ offset_ranges: Mapping[int, Tuple[int, int]] = None,
+ assignment: Sequence[int] = None,
+ progress_queue: multiprocessing.Queue = None,
+ refresh_every: int = 200,
+ **kwargs,
+ ):
+ """
+ Args:
+ offset_ranges: A mapping of partition_id -> (low, high) offsets
+ that define the boundaries of the messages to consume.
+ assignment: The list of partitions to assign to this client.
+ progress_queue: a multiprocessing.Queue where the current
+ progress will be reported.
+ refresh_every: the refreshing rate of the progress reporting.
+ """
+ self.offset_ranges = offset_ranges
+ self.progress_queue = progress_queue
+ self.refresh_every = refresh_every
+ self.assignment = assignment
+ self.count = None
+ self.topic_name = None
+ super().__init__(*args, **kwargs)
+
+ def subscribe(self):
+ self.topic_name = self.subscription[0]
+ time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983
+ self.consumer.assign([
+ TopicPartition(self.topic_name, pid) for pid in self.assignment
+ ])
+
+ def process(self, *args, **kwargs):
+ self.count = 0
+ try:
+ self.handle_committed_offsets()
+ super().process(*args, **kwargs)
+ except EOFError:
+ self.progress_queue.put(None)
+
+ def handle_committed_offsets(self, ):
+ """
+ Handle already committed partition offsets before starting processing.
+ """
+ committed = self.consumer.committed([
+ TopicPartition(self.topic_name, pid) for pid in self.assignment
+ ])
+ for tp in committed:
+ self.handle_offset(tp.partition, tp.offset)
+
+ def handle_offset(self, partition_id, offset):
+ """
+ Check whether the client has reached the end of the current
+ partition, and trigger a reassignment if that is the case.
+ Raise EOFError if all the partitions have reached the end.
+ """
+ if offset < 0: # Uninitialized partition offset
+ return
+
+ if self.count % self.refresh_every == 0:
+ self.progress_queue.put({partition_id: offset})
+
+ if offset >= self.offset_ranges[partition_id][1] - 1:
+ self.assignment = [pid for pid in self.assignment
+ if pid != partition_id]
+ self.subscribe()
+
+ if not self.assignment:
+ raise EOFError
+
+ def deserialize_message(self, message):
+ """
+ Override of the message deserialization to hook the handling of the
+ message offset.
+ """
+ self.handle_offset(message.partition(), message.offset())
+ self.count += 1
+ return super().deserialize_message(message)
+
+
+class ParallelExporter:
+ """
+ Base class for all the Journal exporters.
+
+ Each exporter should override the `export_worker` function with an
+ implementation of how to run the message processing.
+ """
+ def __init__(self, config, export_id: str, obj_type, processes=1):
+ """
+ Args:
+ config: the exporter config, which should also include the
+ JournalClient configuration.
+ export_id: a unique identifier for the export that will be used
+ as part of a Kafka consumer group ID.
+ obj_type: The type of SWH object to export.
+ processes: The number of processes to run.
+ """
+ self.config = config
+ self.export_id = 'swh-dataset-export-{}'.format(export_id)
+ self.obj_type = obj_type
+ self.processes = processes
+ self.offsets = None
+
+ def get_offsets(self):
+ """
+ First pass to fetch all the current low and high offsets of each
+ partition to define the consumption boundaries.
+ """
+ if self.offsets is None:
+ client = JournalClient(
+ **self.config['journal'],
+ object_types=[self.obj_type],
+ group_id=self.export_id,
+ )
+ topic_name = client.subscription[0]
+ topics = client.consumer.list_topics(topic_name).topics
+ partitions = topics[topic_name].partitions
+
+ self.offsets = {}
+ for partition_id in tqdm.tqdm(partitions.keys(),
+ desc=" - Partition offsets"):
+ tp = TopicPartition(topic_name, partition_id)
+ (lo, hi) = client.consumer.get_watermark_offsets(tp)
+ self.offsets[partition_id] = (lo, hi)
+ return self.offsets
+
+ def run(self, *args):
+ """
+ Run the parallel export.
+ """
+ offsets = self.get_offsets()
+ to_assign = list(offsets.keys())
+
+ manager = multiprocessing.Manager()
+ q = manager.Queue()
+
+ with ProcessPoolExecutor(self.processes + 1) as pool:
+ futures = []
+ for i in range(self.processes):
+ futures.append(pool.submit(
+ self.export_worker,
+ *args,
+ assignment=to_assign[i::self.processes],
+ queue=q
+ ))
+ futures.append(pool.submit(self.progress_worker, queue=q))
+
+ concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION)
+ for f in futures:
+ if f.running():
+ continue
+ exc = f.exception()
+ if exc:
+ pool.shutdown(wait=False)
+ f.result()
+ raise exc
+
+ def progress_worker(self, *args, queue=None):
+ """
+ An additional worker process that reports the current progress of the
+ export between all the different parallel consumers and across all the
+ partitions, by consuming the shared progress reporting Queue.
+ """
+ d = {}
+ active_workers = self.processes
+ offset_diff = sum((hi - lo) for lo, hi in self.offsets.values())
+ with tqdm.tqdm(total=offset_diff, desc=" - Journal export") as pbar:
+ while active_workers:
+ item = queue.get()
+ if item is None:
+ active_workers -= 1
+ continue
+ d.update(item)
+ progress = sum(n - self.offsets[p][0] for p, n in d.items())
+ pbar.set_postfix(active_workers=active_workers,
+ total_workers=self.processes)
+ pbar.update(progress - pbar.n)
+
+ def process(self, callback, assignment=None, queue=None):
+ client = JournalClientOffsetRanges(
+ **self.config['journal'],
+ object_types=[self.obj_type],
+ group_id=self.export_id,
+ debug='cgrp,broker',
+ offset_ranges=self.offsets,
+ assignment=assignment,
+ progress_queue=queue,
+ )
+ client.process(callback)
+
+ def export_worker(self, *args, **kwargs):
+ """
+ Override this with a custom implementation of a worker function.
+
+ A worker function should call `self.process(fn, **kwargs)` with `fn`
+ being a callback that will be called in the same fashion as with
+ `JournalClient.process()`.
+
+ A simple exporter to print all the objects in the log would look like
+ this:
+
+ ```
+ class PrintExporter(ParallelExporter):
+ def export_worker(self, **kwargs):
+ self.process(print, **kwargs)
+ ```
+ """
+ raise NotImplementedError
diff --git a/swh/dataset/graph.py b/swh/dataset/graph.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/graph.py
@@ -0,0 +1,197 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import functools
+import os
+import os.path
+import pathlib
+import shlex
+import subprocess
+import tempfile
+import uuid
+
+from swh.dataset.exporter import ParallelExporter
+from swh.dataset.utils import ZSTFile
+from swh.model.identifiers import origin_identifier, persistent_identifier
+from swh.journal.fixer import fix_objects
+
+
+def process_messages(messages, config, node_writer, edge_writer):
+ """
+ Args:
+ messages: A sequence of messages to process
+ config: The exporter configuration
+ node_writer: A file-like object where to write nodes
+ edge_writer: A file-like object where to write edges
+ """
+ def write_node(node):
+ node_type, node_id = node
+ if node_id is None:
+ return
+ node_pid = persistent_identifier(object_type=node_type,
+ object_id=node_id)
+ node_writer.write('{}\n'.format(node_pid))
+
+ def write_edge(src, dst):
+ src_type, src_id = src
+ dst_type, dst_id = dst
+ if src_id is None or dst_id is None:
+ return
+ src_pid = persistent_identifier(object_type=src_type, object_id=src_id)
+ dst_pid = persistent_identifier(object_type=dst_type, object_id=dst_id)
+ edge_writer.write('{} {}\n'.format(src_pid, dst_pid))
+
+ messages = {k: fix_objects(k, v) for k, v in messages.items()}
+
+ for visit in messages.get('origin_visit', []):
+ origin_id = origin_identifier({'url': visit['origin']})
+ write_node(('origin', origin_id))
+ write_edge(('origin', origin_id), ('snapshot', visit['snapshot']))
+
+ for snapshot in messages.get('snapshot', []):
+ write_node(('snapshot', snapshot['id']))
+ for branch_name, branch in snapshot['branches'].items():
+ while branch and branch.get('target_type') == 'alias':
+ branch_name = branch['target']
+ branch = snapshot['branches'][branch_name]
+ if branch is None or not branch_name:
+ continue
+ if (config.get('remove_pull_requests')
+ and (branch_name.startswith(b'refs/pull')
+ or branch_name.startswith(b'refs/merge-requests'))):
+ continue
+ write_edge(('snapshot', snapshot['id']),
+ (branch['target_type'], branch['target']))
+
+ for release in messages.get('release', []):
+ write_node(('release', release['id']))
+ write_edge(('release', release['id']),
+ (release['target_type'], release['target']))
+
+ for revision in messages.get('revision', []):
+ write_node(('revision', revision['id']))
+ write_edge(('revision', revision['id']),
+ ('directory', revision['directory']))
+ for parent in revision['parents']:
+ write_edge(('revision', revision['id']),
+ ('revision', parent))
+
+ for directory in messages.get('directory', []):
+ write_node(('directory', directory['id']))
+ for entry in directory['entries']:
+ entry_type_mapping = {
+ 'file': 'content',
+ 'dir': 'directory',
+ 'rev': 'revision'
+ }
+ write_edge(('directory', directory['id']),
+ (entry_type_mapping[entry['type']], entry['target']))
+
+ for content in messages.get('content', []):
+ write_node(('content', content['sha1_git']))
+
+
+class GraphEdgeExporter(ParallelExporter):
+ """
+ Implementation of ParallelExporter which writes all the graph edges
+ of a specific type in a Zstandard-compressed CSV file.
+
+ Each row of the CSV is in the format: `<SRC PID> <DST PID>
+ """
+ def export_worker(self, export_path, **kwargs):
+ dataset_path = pathlib.Path(export_path)
+ dataset_path.mkdir(exist_ok=True, parents=True)
+ nodes_file = dataset_path / ('graph-{}.nodes.csv.zst'
+ .format(str(uuid.uuid4())))
+ edges_file = dataset_path / ('graph-{}.edges.csv.zst'
+ .format(str(uuid.uuid4())))
+
+ with \
+ ZSTFile(nodes_file, 'w') as nodes_writer, \
+ ZSTFile(edges_file, 'w') as edges_writer:
+ process_fn = functools.partial(
+ process_messages,
+ config=self.config,
+ nodes_writer=nodes_writer,
+ edges_writer=edges_writer
+ )
+ self.process(process_fn, **kwargs)
+
+
+def export_edges(config, export_path, export_id, processes):
+ """Run the edge exporter for each edge type."""
+ object_types = [
+ 'origin_visit',
+ 'snapshot',
+ 'release',
+ 'revision',
+ 'directory',
+ ]
+ for obj_type in object_types:
+ print('{} edges:'.format(obj_type))
+ exporter = GraphEdgeExporter(config, export_id, obj_type, processes)
+ exporter.run(os.path.join(export_path, obj_type))
+
+
+def sort_graph_nodes(export_path, config):
+ """
+ Generate the node list from the edges files.
+
+ We cannot solely rely on the object IDs that are read in the journal,
+ as some nodes that are referred to as destinations in the edge file
+ might not be present in the archive (e.g a rev_entry referring to a
+ revision that we do not have crawled yet).
+
+ The most efficient way of getting all the nodes that are mentioned in
+ the edges file is therefore to use sort(1) on the gigantic edge files
+ to get all the unique node IDs, while using the disk as a temporary
+ buffer.
+
+ This pipeline does, in order:
+
+ - concatenate and write all the compressed edges files in
+ graph.edges.csv.zst (using the fact that ZST compression is an additive
+ function) ;
+ - deflate the edges ;
+ - count the number of edges and write it in graph.edges.count.txt ;
+ - concatenate all the (deflated) nodes from the export with the
+ destination edges, and sort the output to get the list of unique graph
+ nodes ;
+ - count the number of unique graph nodes and write it in
+ graph.nodes.count.txt ;
+ - compress and write the resulting nodes in graph.nodes.csv.zst.
+ """
+ # Use bytes for the sorting algorithm (faster than being locale-specific)
+ env = {
+ **os.environ.copy(),
+ 'LC_ALL': 'C',
+ 'LC_COLLATE': 'C',
+ 'LANG': 'C',
+ }
+ sort_buffer_size = config.get('sort_buffer_size', '4G')
+ disk_buffer_dir = config.get('disk_buffer_dir', export_path)
+ with tempfile.TemporaryDirectory(prefix='.graph_node_sort_',
+ dir=disk_buffer_dir) as buffer_path:
+ subprocess.run(
+ [
+ "bash",
+ "-c",
+ ("pv {export_path}/*/*.edges.csv.zst | "
+ "tee {export_path}/graph.edges.csv.zst |"
+ "zstdcat |"
+ "tee >( wc -l > {export_path}/graph.edges.count.txt ) |"
+ "cut -d' ' -f2 | "
+ "cat - <( zstdcat {export_path}/*/*.nodes.csv.zst ) | "
+ "sort -u -S{sort_buffer_size} -T{buffer_path} | "
+ "tee >( wc -l > {export_path}/graph.nodes.count.txt ) |"
+ "zstdmt > {export_path}/graph.nodes.csv.zst")
+ .format(
+ export_path=shlex.quote(str(export_path)),
+ buffer_path=shlex.quote(str(buffer_path)),
+ sort_buffer_size=shlex.quote(sort_buffer_size),
+ ),
+ ],
+ env=env,
+ )
diff --git a/swh/dataset/test/test_graph.py b/swh/dataset/test/test_graph.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/test/test_graph.py
@@ -0,0 +1,437 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import hashlib
+from typing import Tuple
+
+import pytest
+from unittest.mock import Mock, call
+
+from swh.dataset.graph import process_messages, sort_graph_nodes
+from swh.dataset.utils import ZSTFile
+from swh.model.hashutil import MultiHash, hash_to_bytes
+
+DATE = {
+ "timestamp": {"seconds": 1234567891, "microseconds": 0,},
+ "offset": 120,
+ "negative_utc": False,
+}
+
+TEST_CONTENT = {
+ **MultiHash.from_data(b"foo").digest(),
+ "length": 3,
+ "status": "visible",
+}
+
+TEST_REVISION = {
+ "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"),
+ "message": b"hello",
+ "date": DATE,
+ "committer": {"fullname": b"foo", "name": b"foo", "email": b""},
+ "author": {"fullname": b"foo", "name": b"foo", "email": b""},
+ "committer_date": DATE,
+ "type": "git",
+ "directory": b"\x01" * 20,
+ "synthetic": False,
+ "metadata": None,
+ "parents": [],
+}
+
+TEST_RELEASE = {
+ "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
+ "name": b"v0.0.1",
+ "date": {
+ "timestamp": {"seconds": 1234567890, "microseconds": 0,},
+ "offset": 120,
+ "negative_utc": False,
+ },
+ "author": {"author": {"fullname": b"foo", "name": b"foo", "email": b""}},
+ "target_type": "revision",
+ "target": b"\x04" * 20,
+ "message": b"foo",
+ "synthetic": False,
+}
+
+TEST_ORIGIN = {"url": "https://somewhere.org/den/fox"}
+
+TEST_ORIGIN_VISIT = {
+ "origin": TEST_ORIGIN["url"],
+ "date": "2013-05-07 04:20:39.369271+00:00",
+ "snapshot": None, # TODO
+ "status": "ongoing", # TODO
+ "metadata": {"foo": "bar"},
+ "type": "git",
+}
+
+
+@pytest.fixture
+def exporter():
+ def wrapped(messages, config=None) -> Tuple[Mock, Mock]:
+ if config is None:
+ config = {}
+ node_writer = Mock()
+ edge_writer = Mock()
+ process_messages(
+ messages, config=config, node_writer=node_writer, edge_writer=edge_writer,
+ )
+ return node_writer.write, edge_writer.write
+
+ return wrapped
+
+
+def binhash(s):
+ return hashlib.sha1(s.encode()).digest()
+
+
+def hexhash(s):
+ return hashlib.sha1(s.encode()).hexdigest()
+
+
+def test_export_origin_visits(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "origin_visit": [
+ {
+ **TEST_ORIGIN_VISIT,
+ "origin": {"url": "ori1"},
+ "snapshot": binhash("snp1"),
+ },
+ {
+ **TEST_ORIGIN_VISIT,
+ "origin": {"url": "ori2"},
+ "snapshot": binhash("snp2"),
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:ori:{hexhash('ori1')}\n"),
+ call(f"swh:1:ori:{hexhash('ori2')}\n"),
+ ]
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}\n"),
+ call(f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}\n"),
+ ]
+
+
+def test_export_snapshot_simple(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "snapshot": [
+ {
+ "id": binhash("snp1"),
+ "branches": {
+ b"refs/heads/master": {
+ "target": binhash("rev1"),
+ "target_type": "revision",
+ },
+ b"HEAD": {"target": binhash("rev1"), "target_type": "revision"},
+ },
+ },
+ {
+ "id": binhash("snp2"),
+ "branches": {
+ b"refs/heads/master": {
+ "target": binhash("rev1"),
+ "target_type": "revision",
+ },
+ b"HEAD": {"target": binhash("rev2"), "target_type": "revision"},
+ b"bcnt": {"target": binhash("cnt1"), "target_type": "content"},
+ b"bdir": {
+ "target": binhash("dir1"),
+ "target_type": "directory",
+ },
+ b"brel": {"target": binhash("rel1"), "target_type": "release"},
+ b"bsnp": {"target": binhash("snp1"), "target_type": "snapshot"},
+ },
+ },
+ {"id": binhash("snp3"), "branches": {}},
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:snp:{hexhash('snp1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')}\n"),
+ call(f"swh:1:snp:{hexhash('snp3')}\n"),
+ ]
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev2')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:cnt:{hexhash('cnt1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:dir:{hexhash('dir1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:rel:{hexhash('rel1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:snp:{hexhash('snp1')}\n"),
+ ]
+
+
+def test_export_snapshot_aliases(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "snapshot": [
+ {
+ "id": binhash("snp1"),
+ "branches": {
+ b"origin_branch": {
+ "target": binhash("rev1"),
+ "target_type": "revision",
+ },
+ b"alias1": {"target": b"origin_branch", "target_type": "alias"},
+ b"alias2": {"target": b"alias1", "target_type": "alias"},
+ b"alias3": {"target": b"alias2", "target_type": "alias"},
+ },
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [call(f"swh:1:snp:{hexhash('snp1')}\n")]
+ assert edge_writer.mock_calls == (
+ [call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n")] * 4
+ )
+
+
+def test_export_snapshot_no_pull_requests(exporter):
+ snp = {
+ "id": binhash("snp1"),
+ "branches": {
+ b"refs/heads/master": {
+ "target": binhash("rev1"),
+ "target_type": "revision",
+ },
+ b"refs/pull/42": {"target": binhash("rev2"), "target_type": "revision"},
+ b"refs/merge-requests/lol": {
+ "target": binhash("rev3"),
+ "target_type": "revision",
+ },
+ },
+ }
+
+ node_writer, edge_writer = exporter({"snapshot": [snp]})
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev2')}\n"),
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev3')}\n"),
+ ]
+
+ node_writer, edge_writer = exporter(
+ {"snapshot": [snp]}, config={"remove_pull_requests": True}
+ )
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
+ ]
+
+
+def test_export_releases(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "release": [
+ {
+ **TEST_RELEASE,
+ "id": binhash("rel1"),
+ "target": binhash("rev1"),
+ "target_type": "revision",
+ },
+ {
+ **TEST_RELEASE,
+ "id": binhash("rel2"),
+ "target": binhash("rel1"),
+ "target_type": "release",
+ },
+ {
+ **TEST_RELEASE,
+ "id": binhash("rel3"),
+ "target": binhash("dir1"),
+ "target_type": "directory",
+ },
+ {
+ **TEST_RELEASE,
+ "id": binhash("rel4"),
+ "target": binhash("cnt1"),
+ "target_type": "content",
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:rel:{hexhash('rel1')}\n"),
+ call(f"swh:1:rel:{hexhash('rel2')}\n"),
+ call(f"swh:1:rel:{hexhash('rel3')}\n"),
+ call(f"swh:1:rel:{hexhash('rel4')}\n"),
+ ]
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:rel:{hexhash('rel1')} swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:rel:{hexhash('rel2')} swh:1:rel:{hexhash('rel1')}\n"),
+ call(f"swh:1:rel:{hexhash('rel3')} swh:1:dir:{hexhash('dir1')}\n"),
+ call(f"swh:1:rel:{hexhash('rel4')} swh:1:cnt:{hexhash('cnt1')}\n"),
+ ]
+
+
+def test_export_revision(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "revision": [
+ {
+ **TEST_REVISION,
+ "id": binhash("rev1"),
+ "directory": binhash("dir1"),
+ "parents": [
+ binhash("rev2"),
+ binhash("rev3"),
+ ],
+ },
+ {
+ **TEST_REVISION,
+ "id": binhash("rev2"),
+ "directory": binhash("dir2"),
+ "parents": [],
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:rev:{hexhash('rev2')}\n"),
+ ]
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:rev:{hexhash('rev1')} swh:1:dir:{hexhash('dir1')}\n"),
+ call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}\n"),
+ call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev3')}\n"),
+
+ call(f"swh:1:rev:{hexhash('rev2')} swh:1:dir:{hexhash('dir2')}\n"),
+ ]
+
+
+def test_export_directory(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "directory": [
+ {
+ "id": binhash("dir1"),
+ "entries": [
+ {"type": "file", "target": binhash("cnt1")},
+ {"type": "dir", "target": binhash("dir2")},
+ {"type": "rev", "target": binhash("rev1")},
+ ],
+ },
+ {
+ "id": binhash("dir2"),
+ "entries": [],
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:dir:{hexhash('dir1')}\n"),
+ call(f"swh:1:dir:{hexhash('dir2')}\n"),
+ ]
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}\n"),
+ call(f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir2')}\n"),
+ call(f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}\n"),
+ ]
+
+
+def test_export_content(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "content": [
+ {
+ **TEST_CONTENT,
+ "sha1_git": binhash("cnt1"),
+ },
+ {
+ **TEST_CONTENT,
+ "sha1_git": binhash("cnt2"),
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:cnt:{hexhash('cnt1')}\n"),
+ call(f"swh:1:cnt:{hexhash('cnt2')}\n"),
+ ]
+ assert edge_writer.mock_calls == []
+
+
+def zstwrite(fp, lines):
+ with ZSTFile(fp, 'w') as writer:
+ for l in lines:
+ writer.write(l + "\n")
+
+
+def zstread(fp):
+ with ZSTFile(fp, 'r') as reader:
+ return reader.read()
+
+
+def test_sort_pipeline(tmp_path):
+ short_type_mapping = {
+ 'origin_visit': 'ori',
+ 'snapshot': 'snp',
+ 'release': 'rel',
+ 'revision': 'rev',
+ 'directory': 'dir',
+ 'content': 'cnt',
+ }
+
+ input_nodes = [
+ f"swh:1:{short}:{hexhash(short + str(x))}"
+ for short in short_type_mapping.values()
+ for x in range(4)
+ ]
+
+ input_edges = [
+ f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}",
+ f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}",
+ f"swh:1:ori:{hexhash('ori3')} swh:1:snp:{hexhash('snp3')}",
+ f"swh:1:ori:{hexhash('ori4')} swh:1:snp:{hexhash('snpX')}", # missing dest
+
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup
+ f"swh:1:snp:{hexhash('snp3')} swh:1:cnt:{hexhash('cnt1')}",
+ f"swh:1:snp:{hexhash('snp4')} swh:1:rel:{hexhash('rel1')}",
+
+ f"swh:1:rel:{hexhash('rel1')} swh:1:rel:{hexhash('rel2')}",
+ f"swh:1:rel:{hexhash('rel2')} swh:1:rev:{hexhash('rev1')}",
+ f"swh:1:rel:{hexhash('rel3')} swh:1:rev:{hexhash('rev2')}",
+ f"swh:1:rel:{hexhash('rel4')} swh:1:dir:{hexhash('dir1')}",
+
+ f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup
+ f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup
+ f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}",
+ f"swh:1:rev:{hexhash('rev2')} swh:1:rev:{hexhash('revX')}", # missing dest
+ f"swh:1:rev:{hexhash('rev3')} swh:1:rev:{hexhash('rev2')}",
+ f"swh:1:rev:{hexhash('rev4')} swh:1:dir:{hexhash('dir1')}",
+
+ f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}",
+ f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir1')}",
+ f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}",
+ ]
+
+ for obj_type, short_obj_type in short_type_mapping.items():
+ p = (tmp_path / obj_type)
+ p.mkdir()
+ edges = [e for e in input_edges if e.startswith(f"swh:1:{short_obj_type}")]
+ zstwrite(p / '00.edges.csv.zst', edges[0::2])
+ zstwrite(p / '01.edges.csv.zst', edges[1::2])
+
+ nodes = [n for n in input_nodes if n.startswith(f"swh:1:{short_obj_type}")]
+ zstwrite(p / '00.nodes.csv.zst', nodes[0::2])
+ zstwrite(p / '01.nodes.csv.zst', nodes[1::2])
+
+ sort_graph_nodes(tmp_path, config={'sort_buffer_size': '1M'})
+
+ output_nodes = zstread(tmp_path / "graph.nodes.csv.zst").split("\n")
+ output_edges = zstread(tmp_path / "graph.edges.csv.zst").split("\n")
+ output_nodes = list(filter(bool, output_nodes))
+ output_edges = list(filter(bool, output_edges))
+
+ expected_nodes = set(input_nodes) | set(l.split()[1] for l in input_edges)
+ assert output_nodes == sorted(expected_nodes)
+ assert int((tmp_path / 'graph.nodes.count.txt').read_text()) == len(expected_nodes)
+
+ assert sorted(output_edges) == sorted(input_edges)
+ assert int((tmp_path / 'graph.edges.count.txt').read_text()) == len(input_edges)
diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/utils.py
@@ -0,0 +1,40 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import subprocess
+
+
+class ZSTFile:
+ def __init__(self, path, mode='r'):
+ if mode not in ('r', 'rb', 'w', 'wb'):
+ raise ValueError(f"ZSTFile mode {mode} is invalid.")
+ self.path = path
+ self.mode = mode
+
+ def __enter__(self):
+ is_text = not (self.mode in ('rb', 'wb'))
+ writing = self.mode in ('w', 'wb')
+ if writing:
+ cmd = ['zstd', '-q', '-o', self.path]
+ else:
+ cmd = ['zstdcat', self.path]
+ self.process = subprocess.Popen(
+ cmd,
+ text=is_text,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ )
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self.process.stdin.close()
+ self.process.stdout.close()
+ self.process.wait()
+
+ def read(self, *args):
+ return self.process.stdout.read(*args)
+
+ def write(self, buf):
+ self.process.stdin.write(buf)

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 19, 1:50 AM (2 h, 13 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221811

Event Timeline