Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123346
D3011.id11085.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
35 KB
Subscribers
None
D3011.id11085.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -11,5 +11,11 @@
[mypy-pytest.*]
ignore_missing_imports = True
+[mypy-tqdm.*]
+ignore_missing_imports = True
+
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
# [mypy-add_your_lib_here.*]
# ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,3 +2,4 @@
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
click
+tqdm
diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/cli.py
@@ -0,0 +1,54 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import click
+import uuid
+
+from swh.core import config
+from swh.core.cli import CONTEXT_SETTINGS
+from swh.dataset.graph import export_edges, sort_graph_nodes
+
+
+@click.group(name='dataset', context_settings=CONTEXT_SETTINGS)
+@click.option('--config-file', '-C', default=None,
+ type=click.Path(exists=True, dir_okay=False),
+ help="Configuration file.")
+@click.pass_context
+def cli(ctx, config_file):
+ '''Software Heritage Dataset Tools'''
+ ctx.ensure_object(dict)
+
+ conf = config.read(config_file)
+ ctx.obj['config'] = conf
+
+
+@cli.group('graph')
+@click.pass_context
+def graph(ctx):
+ """Manage graph edges export"""
+ pass
+
+
+@graph.command('export')
+@click.argument('export-path', type=click.Path())
+@click.option('--export-id', '-e', help="Unique ID of the export run.")
+@click.option('--processes', '-p', default=1,
+ help="Number of parallel processes")
+@click.pass_context
+def export_graph(ctx, export_path, export_id, processes):
+ """Export the Software Heritage graph as an edge dataset."""
+ config = ctx.obj['config']
+ if not export_id:
+ export_id = str(uuid.uuid4())
+
+ export_edges(config, export_path, export_id, processes)
+
+
+@graph.command('sort')
+@click.argument('export-path', type=click.Path())
+@click.pass_context
+def sort_graph(ctx, export_path):
+ config = ctx.obj['config']
+ sort_graph_nodes(export_path, config)
diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/exporter.py
@@ -0,0 +1,232 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import concurrent.futures
+import multiprocessing
+import time
+import tqdm
+from typing import Mapping, Sequence, Tuple
+from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor
+from confluent_kafka import TopicPartition
+
+from swh.journal.client import JournalClient
+
+
+class JournalClientOffsetRanges(JournalClient):
+ """
+ A subclass of JournalClient reading only inside some specific offset
+ range. Partition assignments have to be manually given to the class.
+
+ This client can only read a single topic at a time.
+ """
+ def __init__(
+ self,
+ *args,
+ offset_ranges: Mapping[int, Tuple[int, int]] = None,
+ assignment: Sequence[int] = None,
+ progress_queue: multiprocessing.Queue = None,
+ refresh_every: int = 200,
+ **kwargs,
+ ):
+ """
+ Args:
+ offset_ranges: A mapping of partition_id -> (low, high) offsets
+ that define the boundaries of the messages to consume.
+ assignment: The list of partitions to assign to this client.
+ progress_queue: a multiprocessing.Queue where the current
+ progress will be reported.
+ refresh_every: the refreshing rate of the progress reporting.
+ """
+ self.offset_ranges = offset_ranges
+ self.progress_queue = progress_queue
+ self.refresh_every = refresh_every
+ self.assignment = assignment
+ self.count = None
+ self.topic_name = None
+ super().__init__(*args, **kwargs)
+
+ def subscribe(self):
+ self.topic_name = self.subscription[0]
+ time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983
+ self.consumer.assign([
+ TopicPartition(self.topic_name, pid) for pid in self.assignment
+ ])
+
+ def process(self, *args, **kwargs):
+ self.count = 0
+ try:
+ self.handle_committed_offsets()
+ super().process(*args, **kwargs)
+ except EOFError:
+ self.progress_queue.put(None)
+
+ def handle_committed_offsets(self, ):
+ """
+ Handle already committed partition offsets before starting processing.
+ """
+ committed = self.consumer.committed([
+ TopicPartition(self.topic_name, pid) for pid in self.assignment
+ ])
+ for tp in committed:
+ self.handle_offset(tp.partition, tp.offset)
+
+ def handle_offset(self, partition_id, offset):
+ """
+ Check whether the client has reached the end of the current
+ partition, and trigger a reassignment if that is the case.
+ Raise EOFError if all the partitions have reached the end.
+ """
+ if offset < 0: # Uninitialized partition offset
+ return
+
+ if self.count % self.refresh_every == 0:
+ self.progress_queue.put({partition_id: offset})
+
+ if offset >= self.offset_ranges[partition_id][1] - 1:
+ self.assignment = [pid for pid in self.assignment
+ if pid != partition_id]
+ self.subscribe()
+
+ if not self.assignment:
+ raise EOFError
+
+ def deserialize_message(self, message):
+ """
+ Override of the message deserialization to hook the handling of the
+ message offset.
+ """
+ self.handle_offset(message.partition(), message.offset())
+ self.count += 1
+ return super().deserialize_message(message)
+
+
+class ParallelExporter:
+ """
+ Base class for all the Journal exporters.
+
+ Each exporter should override the `export_worker` function with an
+ implementation of how to run the message processing.
+ """
+ def __init__(self, config, export_id: str, obj_type, processes=1):
+ """
+ Args:
+ config: the exporter config, which should also include the
+ JournalClient configuration.
+ export_id: a unique identifier for the export that will be used
+ as part of a Kafka consumer group ID.
+ obj_type: The type of SWH object to export.
+ processes: The number of processes to run.
+ """
+ self.config = config
+ self.export_id = 'swh-dataset-export-{}'.format(export_id)
+ self.obj_type = obj_type
+ self.processes = processes
+ self.offsets = None
+
+ def get_offsets(self):
+ """
+ First pass to fetch all the current low and high offsets of each
+ partition to define the consumption boundaries.
+ """
+ if self.offsets is None:
+ client = JournalClient(
+ **self.config['journal'],
+ object_types=[self.obj_type],
+ group_id=self.export_id,
+ )
+ topic_name = client.subscription[0]
+ topics = client.consumer.list_topics(topic_name).topics
+ partitions = topics[topic_name].partitions
+
+ self.offsets = {}
+ for partition_id in tqdm.tqdm(partitions.keys(),
+ desc=" - Partition offsets"):
+ tp = TopicPartition(topic_name, partition_id)
+ (lo, hi) = client.consumer.get_watermark_offsets(tp)
+ self.offsets[partition_id] = (lo, hi)
+ return self.offsets
+
+ def run(self, *args):
+ """
+ Run the parallel export.
+ """
+ offsets = self.get_offsets()
+ to_assign = list(offsets.keys())
+
+ manager = multiprocessing.Manager()
+ q = manager.Queue()
+
+ with ProcessPoolExecutor(self.processes + 1) as pool:
+ futures = []
+ for i in range(self.processes):
+ futures.append(pool.submit(
+ self.export_worker,
+ *args,
+ assignment=to_assign[i::self.processes],
+ queue=q
+ ))
+ futures.append(pool.submit(self.progress_worker, queue=q))
+
+ concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION)
+ for f in futures:
+ if f.running():
+ continue
+ exc = f.exception()
+ if exc:
+ pool.shutdown(wait=False)
+ f.result()
+ raise exc
+
+ def progress_worker(self, *args, queue=None):
+ """
+ An additional worker process that reports the current progress of the
+ export between all the different parallel consumers and across all the
+ partitions, by consuming the shared progress reporting Queue.
+ """
+ d = {}
+ active_workers = self.processes
+ offset_diff = sum((hi - lo) for lo, hi in self.offsets.values())
+ with tqdm.tqdm(total=offset_diff, desc=" - Journal export") as pbar:
+ while active_workers:
+ item = queue.get()
+ if item is None:
+ active_workers -= 1
+ continue
+ d.update(item)
+ progress = sum(n - self.offsets[p][0] for p, n in d.items())
+ pbar.set_postfix(active_workers=active_workers,
+ total_workers=self.processes)
+ pbar.update(progress - pbar.n)
+
+ def process(self, callback, assignment=None, queue=None):
+ client = JournalClientOffsetRanges(
+ **self.config['journal'],
+ object_types=[self.obj_type],
+ group_id=self.export_id,
+ debug='cgrp,broker',
+ offset_ranges=self.offsets,
+ assignment=assignment,
+ progress_queue=queue,
+ )
+ client.process(callback)
+
+ def export_worker(self, *args, **kwargs):
+ """
+ Override this with a custom implementation of a worker function.
+
+ A worker function should call `self.process(fn, **kwargs)` with `fn`
+ being a callback that will be called in the same fashion as with
+ `JournalClient.process()`.
+
+ A simple exporter to print all the objects in the log would look like
+ this:
+
+ ```
+ class PrintExporter(ParallelExporter):
+ def export_worker(self, **kwargs):
+ self.process(print, **kwargs)
+ ```
+ """
+ raise NotImplementedError
diff --git a/swh/dataset/graph.py b/swh/dataset/graph.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/graph.py
@@ -0,0 +1,197 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import functools
+import os
+import os.path
+import pathlib
+import shlex
+import subprocess
+import tempfile
+import uuid
+
+from swh.dataset.exporter import ParallelExporter
+from swh.dataset.utils import ZSTFile
+from swh.model.identifiers import origin_identifier, persistent_identifier
+from swh.journal.fixer import fix_objects
+
+
+def process_messages(messages, config, node_writer, edge_writer):
+ """
+ Args:
+ messages: A sequence of messages to process
+ config: The exporter configuration
+ node_writer: A file-like object where to write nodes
+ edge_writer: A file-like object where to write edges
+ """
+ def write_node(node):
+ node_type, node_id = node
+ if node_id is None:
+ return
+ node_pid = persistent_identifier(object_type=node_type,
+ object_id=node_id)
+ node_writer.write('{}\n'.format(node_pid))
+
+ def write_edge(src, dst):
+ src_type, src_id = src
+ dst_type, dst_id = dst
+ if src_id is None or dst_id is None:
+ return
+ src_pid = persistent_identifier(object_type=src_type, object_id=src_id)
+ dst_pid = persistent_identifier(object_type=dst_type, object_id=dst_id)
+ edge_writer.write('{} {}\n'.format(src_pid, dst_pid))
+
+ messages = {k: fix_objects(k, v) for k, v in messages.items()}
+
+ for visit in messages.get('origin_visit', []):
+ origin_id = origin_identifier({'url': visit['origin']})
+ write_node(('origin', origin_id))
+ write_edge(('origin', origin_id), ('snapshot', visit['snapshot']))
+
+ for snapshot in messages.get('snapshot', []):
+ write_node(('snapshot', snapshot['id']))
+ for branch_name, branch in snapshot['branches'].items():
+ while branch and branch.get('target_type') == 'alias':
+ branch_name = branch['target']
+ branch = snapshot['branches'][branch_name]
+ if branch is None or not branch_name:
+ continue
+ if (config.get('remove_pull_requests')
+ and (branch_name.startswith(b'refs/pull')
+ or branch_name.startswith(b'refs/merge-requests'))):
+ continue
+ write_edge(('snapshot', snapshot['id']),
+ (branch['target_type'], branch['target']))
+
+ for release in messages.get('release', []):
+ write_node(('release', release['id']))
+ write_edge(('release', release['id']),
+ (release['target_type'], release['target']))
+
+ for revision in messages.get('revision', []):
+ write_node(('revision', revision['id']))
+ write_edge(('revision', revision['id']),
+ ('directory', revision['directory']))
+ for parent in revision['parents']:
+ write_edge(('revision', revision['id']),
+ ('revision', parent))
+
+ for directory in messages.get('directory', []):
+ write_node(('directory', directory['id']))
+ for entry in directory['entries']:
+ entry_type_mapping = {
+ 'file': 'content',
+ 'dir': 'directory',
+ 'rev': 'revision'
+ }
+ write_edge(('directory', directory['id']),
+ (entry_type_mapping[entry['type']], entry['target']))
+
+ for content in messages.get('content', []):
+ write_node(('content', content['sha1_git']))
+
+
+class GraphEdgeExporter(ParallelExporter):
+ """
+ Implementation of ParallelExporter which writes all the graph edges
+ of a specific type in a Zstandard-compressed CSV file.
+
+ Each row of the CSV is in the format: `<SRC PID> <DST PID>
+ """
+ def export_worker(self, export_path, **kwargs):
+ dataset_path = pathlib.Path(export_path)
+ dataset_path.mkdir(exist_ok=True, parents=True)
+ nodes_file = dataset_path / ('graph-{}.nodes.csv.zst'
+ .format(str(uuid.uuid4())))
+ edges_file = dataset_path / ('graph-{}.edges.csv.zst'
+ .format(str(uuid.uuid4())))
+
+ with \
+ ZSTFile(nodes_file, 'w') as nodes_writer, \
+ ZSTFile(edges_file, 'w') as edges_writer:
+ process_fn = functools.partial(
+ process_messages,
+ config=self.config,
+ nodes_writer=nodes_writer,
+ edges_writer=edges_writer
+ )
+ self.process(process_fn, **kwargs)
+
+
+def export_edges(config, export_path, export_id, processes):
+ """Run the edge exporter for each edge type."""
+ object_types = [
+ 'origin_visit',
+ 'snapshot',
+ 'release',
+ 'revision',
+ 'directory',
+ ]
+ for obj_type in object_types:
+ print('{} edges:'.format(obj_type))
+ exporter = GraphEdgeExporter(config, export_id, obj_type, processes)
+ exporter.run(os.path.join(export_path, obj_type))
+
+
+def sort_graph_nodes(export_path, config):
+ """
+ Generate the node list from the edges files.
+
+ We cannot solely rely on the object IDs that are read in the journal,
+ as some nodes that are referred to as destinations in the edge file
+ might not be present in the archive (e.g a rev_entry referring to a
+ revision that we do not have crawled yet).
+
+ The most efficient way of getting all the nodes that are mentioned in
+ the edges file is therefore to use sort(1) on the gigantic edge files
+ to get all the unique node IDs, while using the disk as a temporary
+ buffer.
+
+ This pipeline does, in order:
+
+ - concatenate and write all the compressed edges files in
+ graph.edges.csv.zst (using the fact that ZST compression is an additive
+ function) ;
+ - deflate the edges ;
+ - count the number of edges and write it in graph.edges.count.txt ;
+ - concatenate all the (deflated) nodes from the export with the
+ destination edges, and sort the output to get the list of unique graph
+ nodes ;
+ - count the number of unique graph nodes and write it in
+ graph.nodes.count.txt ;
+ - compress and write the resulting nodes in graph.nodes.csv.zst.
+ """
+ # Use bytes for the sorting algorithm (faster than being locale-specific)
+ env = {
+ **os.environ.copy(),
+ 'LC_ALL': 'C',
+ 'LC_COLLATE': 'C',
+ 'LANG': 'C',
+ }
+ sort_buffer_size = config.get('sort_buffer_size', '4G')
+ disk_buffer_dir = config.get('disk_buffer_dir', export_path)
+ with tempfile.TemporaryDirectory(prefix='.graph_node_sort_',
+ dir=disk_buffer_dir) as buffer_path:
+ subprocess.run(
+ [
+ "bash",
+ "-c",
+ ("pv {export_path}/*/*.edges.csv.zst | "
+ "tee {export_path}/graph.edges.csv.zst |"
+ "zstdcat |"
+ "tee >( wc -l > {export_path}/graph.edges.count.txt ) |"
+ "cut -d' ' -f2 | "
+ "cat - <( zstdcat {export_path}/*/*.nodes.csv.zst ) | "
+ "sort -u -S{sort_buffer_size} -T{buffer_path} | "
+ "tee >( wc -l > {export_path}/graph.nodes.count.txt ) |"
+ "zstdmt > {export_path}/graph.nodes.csv.zst")
+ .format(
+ export_path=shlex.quote(str(export_path)),
+ buffer_path=shlex.quote(str(buffer_path)),
+ sort_buffer_size=shlex.quote(sort_buffer_size),
+ ),
+ ],
+ env=env,
+ )
diff --git a/swh/dataset/test/test_graph.py b/swh/dataset/test/test_graph.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/test/test_graph.py
@@ -0,0 +1,437 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import hashlib
+from typing import Tuple
+
+import pytest
+from unittest.mock import Mock, call
+
+from swh.dataset.graph import process_messages, sort_graph_nodes
+from swh.dataset.utils import ZSTFile
+from swh.model.hashutil import MultiHash, hash_to_bytes
+
+DATE = {
+ "timestamp": {"seconds": 1234567891, "microseconds": 0,},
+ "offset": 120,
+ "negative_utc": False,
+}
+
+TEST_CONTENT = {
+ **MultiHash.from_data(b"foo").digest(),
+ "length": 3,
+ "status": "visible",
+}
+
+TEST_REVISION = {
+ "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"),
+ "message": b"hello",
+ "date": DATE,
+ "committer": {"fullname": b"foo", "name": b"foo", "email": b""},
+ "author": {"fullname": b"foo", "name": b"foo", "email": b""},
+ "committer_date": DATE,
+ "type": "git",
+ "directory": b"\x01" * 20,
+ "synthetic": False,
+ "metadata": None,
+ "parents": [],
+}
+
+TEST_RELEASE = {
+ "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
+ "name": b"v0.0.1",
+ "date": {
+ "timestamp": {"seconds": 1234567890, "microseconds": 0,},
+ "offset": 120,
+ "negative_utc": False,
+ },
+ "author": {"author": {"fullname": b"foo", "name": b"foo", "email": b""}},
+ "target_type": "revision",
+ "target": b"\x04" * 20,
+ "message": b"foo",
+ "synthetic": False,
+}
+
+TEST_ORIGIN = {"url": "https://somewhere.org/den/fox"}
+
+TEST_ORIGIN_VISIT = {
+ "origin": TEST_ORIGIN["url"],
+ "date": "2013-05-07 04:20:39.369271+00:00",
+ "snapshot": None, # TODO
+ "status": "ongoing", # TODO
+ "metadata": {"foo": "bar"},
+ "type": "git",
+}
+
+
+@pytest.fixture
+def exporter():
+ def wrapped(messages, config=None) -> Tuple[Mock, Mock]:
+ if config is None:
+ config = {}
+ node_writer = Mock()
+ edge_writer = Mock()
+ process_messages(
+ messages, config=config, node_writer=node_writer, edge_writer=edge_writer,
+ )
+ return node_writer.write, edge_writer.write
+
+ return wrapped
+
+
+def binhash(s):
+ return hashlib.sha1(s.encode()).digest()
+
+
+def hexhash(s):
+ return hashlib.sha1(s.encode()).hexdigest()
+
+
+def test_export_origin_visits(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "origin_visit": [
+ {
+ **TEST_ORIGIN_VISIT,
+ "origin": {"url": "ori1"},
+ "snapshot": binhash("snp1"),
+ },
+ {
+ **TEST_ORIGIN_VISIT,
+ "origin": {"url": "ori2"},
+ "snapshot": binhash("snp2"),
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:ori:{hexhash('ori1')}\n"),
+ call(f"swh:1:ori:{hexhash('ori2')}\n"),
+ ]
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}\n"),
+ call(f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}\n"),
+ ]
+
+
+def test_export_snapshot_simple(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "snapshot": [
+ {
+ "id": binhash("snp1"),
+ "branches": {
+ b"refs/heads/master": {
+ "target": binhash("rev1"),
+ "target_type": "revision",
+ },
+ b"HEAD": {"target": binhash("rev1"), "target_type": "revision"},
+ },
+ },
+ {
+ "id": binhash("snp2"),
+ "branches": {
+ b"refs/heads/master": {
+ "target": binhash("rev1"),
+ "target_type": "revision",
+ },
+ b"HEAD": {"target": binhash("rev2"), "target_type": "revision"},
+ b"bcnt": {"target": binhash("cnt1"), "target_type": "content"},
+ b"bdir": {
+ "target": binhash("dir1"),
+ "target_type": "directory",
+ },
+ b"brel": {"target": binhash("rel1"), "target_type": "release"},
+ b"bsnp": {"target": binhash("snp1"), "target_type": "snapshot"},
+ },
+ },
+ {"id": binhash("snp3"), "branches": {}},
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:snp:{hexhash('snp1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')}\n"),
+ call(f"swh:1:snp:{hexhash('snp3')}\n"),
+ ]
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev2')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:cnt:{hexhash('cnt1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:dir:{hexhash('dir1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:rel:{hexhash('rel1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp2')} swh:1:snp:{hexhash('snp1')}\n"),
+ ]
+
+
+def test_export_snapshot_aliases(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "snapshot": [
+ {
+ "id": binhash("snp1"),
+ "branches": {
+ b"origin_branch": {
+ "target": binhash("rev1"),
+ "target_type": "revision",
+ },
+ b"alias1": {"target": b"origin_branch", "target_type": "alias"},
+ b"alias2": {"target": b"alias1", "target_type": "alias"},
+ b"alias3": {"target": b"alias2", "target_type": "alias"},
+ },
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [call(f"swh:1:snp:{hexhash('snp1')}\n")]
+ assert edge_writer.mock_calls == (
+ [call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n")] * 4
+ )
+
+
+def test_export_snapshot_no_pull_requests(exporter):
+ snp = {
+ "id": binhash("snp1"),
+ "branches": {
+ b"refs/heads/master": {
+ "target": binhash("rev1"),
+ "target_type": "revision",
+ },
+ b"refs/pull/42": {"target": binhash("rev2"), "target_type": "revision"},
+ b"refs/merge-requests/lol": {
+ "target": binhash("rev3"),
+ "target_type": "revision",
+ },
+ },
+ }
+
+ node_writer, edge_writer = exporter({"snapshot": [snp]})
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev2')}\n"),
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev3')}\n"),
+ ]
+
+ node_writer, edge_writer = exporter(
+ {"snapshot": [snp]}, config={"remove_pull_requests": True}
+ )
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
+ ]
+
+
+def test_export_releases(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "release": [
+ {
+ **TEST_RELEASE,
+ "id": binhash("rel1"),
+ "target": binhash("rev1"),
+ "target_type": "revision",
+ },
+ {
+ **TEST_RELEASE,
+ "id": binhash("rel2"),
+ "target": binhash("rel1"),
+ "target_type": "release",
+ },
+ {
+ **TEST_RELEASE,
+ "id": binhash("rel3"),
+ "target": binhash("dir1"),
+ "target_type": "directory",
+ },
+ {
+ **TEST_RELEASE,
+ "id": binhash("rel4"),
+ "target": binhash("cnt1"),
+ "target_type": "content",
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:rel:{hexhash('rel1')}\n"),
+ call(f"swh:1:rel:{hexhash('rel2')}\n"),
+ call(f"swh:1:rel:{hexhash('rel3')}\n"),
+ call(f"swh:1:rel:{hexhash('rel4')}\n"),
+ ]
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:rel:{hexhash('rel1')} swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:rel:{hexhash('rel2')} swh:1:rel:{hexhash('rel1')}\n"),
+ call(f"swh:1:rel:{hexhash('rel3')} swh:1:dir:{hexhash('dir1')}\n"),
+ call(f"swh:1:rel:{hexhash('rel4')} swh:1:cnt:{hexhash('cnt1')}\n"),
+ ]
+
+
+def test_export_revision(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "revision": [
+ {
+ **TEST_REVISION,
+ "id": binhash("rev1"),
+ "directory": binhash("dir1"),
+ "parents": [
+ binhash("rev2"),
+ binhash("rev3"),
+ ],
+ },
+ {
+ **TEST_REVISION,
+ "id": binhash("rev2"),
+ "directory": binhash("dir2"),
+ "parents": [],
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:rev:{hexhash('rev1')}\n"),
+ call(f"swh:1:rev:{hexhash('rev2')}\n"),
+ ]
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:rev:{hexhash('rev1')} swh:1:dir:{hexhash('dir1')}\n"),
+ call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}\n"),
+ call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev3')}\n"),
+
+ call(f"swh:1:rev:{hexhash('rev2')} swh:1:dir:{hexhash('dir2')}\n"),
+ ]
+
+
+def test_export_directory(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "directory": [
+ {
+ "id": binhash("dir1"),
+ "entries": [
+ {"type": "file", "target": binhash("cnt1")},
+ {"type": "dir", "target": binhash("dir2")},
+ {"type": "rev", "target": binhash("rev1")},
+ ],
+ },
+ {
+ "id": binhash("dir2"),
+ "entries": [],
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:dir:{hexhash('dir1')}\n"),
+ call(f"swh:1:dir:{hexhash('dir2')}\n"),
+ ]
+ assert edge_writer.mock_calls == [
+ call(f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}\n"),
+ call(f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir2')}\n"),
+ call(f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}\n"),
+ ]
+
+
+def test_export_content(exporter):
+ node_writer, edge_writer = exporter(
+ {
+ "content": [
+ {
+ **TEST_CONTENT,
+ "sha1_git": binhash("cnt1"),
+ },
+ {
+ **TEST_CONTENT,
+ "sha1_git": binhash("cnt2"),
+ },
+ ]
+ }
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:cnt:{hexhash('cnt1')}\n"),
+ call(f"swh:1:cnt:{hexhash('cnt2')}\n"),
+ ]
+ assert edge_writer.mock_calls == []
+
+
+def zstwrite(fp, lines):
+ with ZSTFile(fp, 'w') as writer:
+ for l in lines:
+ writer.write(l + "\n")
+
+
+def zstread(fp):
+ with ZSTFile(fp, 'r') as reader:
+ return reader.read()
+
+
+def test_sort_pipeline(tmp_path):
+ short_type_mapping = {
+ 'origin_visit': 'ori',
+ 'snapshot': 'snp',
+ 'release': 'rel',
+ 'revision': 'rev',
+ 'directory': 'dir',
+ 'content': 'cnt',
+ }
+
+ input_nodes = [
+ f"swh:1:{short}:{hexhash(short + str(x))}"
+ for short in short_type_mapping.values()
+ for x in range(4)
+ ]
+
+ input_edges = [
+ f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}",
+ f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}",
+ f"swh:1:ori:{hexhash('ori3')} swh:1:snp:{hexhash('snp3')}",
+ f"swh:1:ori:{hexhash('ori4')} swh:1:snp:{hexhash('snpX')}", # missing dest
+
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup
+ f"swh:1:snp:{hexhash('snp3')} swh:1:cnt:{hexhash('cnt1')}",
+ f"swh:1:snp:{hexhash('snp4')} swh:1:rel:{hexhash('rel1')}",
+
+ f"swh:1:rel:{hexhash('rel1')} swh:1:rel:{hexhash('rel2')}",
+ f"swh:1:rel:{hexhash('rel2')} swh:1:rev:{hexhash('rev1')}",
+ f"swh:1:rel:{hexhash('rel3')} swh:1:rev:{hexhash('rev2')}",
+ f"swh:1:rel:{hexhash('rel4')} swh:1:dir:{hexhash('dir1')}",
+
+ f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup
+ f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup
+ f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}",
+ f"swh:1:rev:{hexhash('rev2')} swh:1:rev:{hexhash('revX')}", # missing dest
+ f"swh:1:rev:{hexhash('rev3')} swh:1:rev:{hexhash('rev2')}",
+ f"swh:1:rev:{hexhash('rev4')} swh:1:dir:{hexhash('dir1')}",
+
+ f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}",
+ f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir1')}",
+ f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}",
+ ]
+
+ for obj_type, short_obj_type in short_type_mapping.items():
+ p = (tmp_path / obj_type)
+ p.mkdir()
+ edges = [e for e in input_edges if e.startswith(f"swh:1:{short_obj_type}")]
+ zstwrite(p / '00.edges.csv.zst', edges[0::2])
+ zstwrite(p / '01.edges.csv.zst', edges[1::2])
+
+ nodes = [n for n in input_nodes if n.startswith(f"swh:1:{short_obj_type}")]
+ zstwrite(p / '00.nodes.csv.zst', nodes[0::2])
+ zstwrite(p / '01.nodes.csv.zst', nodes[1::2])
+
+ sort_graph_nodes(tmp_path, config={'sort_buffer_size': '1M'})
+
+ output_nodes = zstread(tmp_path / "graph.nodes.csv.zst").split("\n")
+ output_edges = zstread(tmp_path / "graph.edges.csv.zst").split("\n")
+ output_nodes = list(filter(bool, output_nodes))
+ output_edges = list(filter(bool, output_edges))
+
+ expected_nodes = set(input_nodes) | set(l.split()[1] for l in input_edges)
+ assert output_nodes == sorted(expected_nodes)
+ assert int((tmp_path / 'graph.nodes.count.txt').read_text()) == len(expected_nodes)
+
+ assert sorted(output_edges) == sorted(input_edges)
+ assert int((tmp_path / 'graph.edges.count.txt').read_text()) == len(input_edges)
diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/utils.py
@@ -0,0 +1,40 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import subprocess
+
+
+class ZSTFile:
+ def __init__(self, path, mode='r'):
+ if mode not in ('r', 'rb', 'w', 'wb'):
+ raise ValueError(f"ZSTFile mode {mode} is invalid.")
+ self.path = path
+ self.mode = mode
+
+ def __enter__(self):
+ is_text = not (self.mode in ('rb', 'wb'))
+ writing = self.mode in ('w', 'wb')
+ if writing:
+ cmd = ['zstd', '-q', '-o', self.path]
+ else:
+ cmd = ['zstdcat', self.path]
+ self.process = subprocess.Popen(
+ cmd,
+ text=is_text,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ )
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self.process.stdin.close()
+ self.process.stdout.close()
+ self.process.wait()
+
+ def read(self, *args):
+ return self.process.stdout.read(*args)
+
+ def write(self, buf):
+ self.process.stdin.write(buf)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Dec 19, 1:50 AM (4 h, 59 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221811
Attached To
D3011: dataset: add graph export based on kafka
Event Timeline
Log In to Comment