Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123871
D3011.id10680.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
D3011.id10680.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -11,5 +11,11 @@
[mypy-pytest.*]
ignore_missing_imports = True
+[mypy-tqdm.*]
+ignore_missing_imports = True
+
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
# [mypy-add_your_lib_here.*]
# ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,3 +2,4 @@
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
click
+tqdm
diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/cli.py
@@ -0,0 +1,44 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import click
+import uuid
+
+from swh.core import config
+from swh.core.cli import CONTEXT_SETTINGS
+from swh.dataset.graph import export_edges, sort_graph_nodes
+
+
+@click.group(name='dataset', context_settings=CONTEXT_SETTINGS)
+@click.option('--config-file', '-C', default=None,
+ type=click.Path(exists=True, dir_okay=False),
+ help="Configuration file.")
+@click.pass_context
+def cli(ctx, config_file):
+ '''Software Heritage Dataset Tools'''
+ ctx.ensure_object(dict)
+
+ conf = config.read(config_file)
+ ctx.obj['config'] = conf
+
+
+@cli.command('export-graph')
+@click.argument('export-path', type=click.Path())
+@click.option('--export-id', '-e', help="Unique ID of the export run.")
+@click.option('--processes', '-p', default=1,
+ help="Number of parallel processes")
+@click.pass_context
+def export_graph(ctx, export_path, export_id, processes):
+ config = ctx.obj['config']
+ if not export_id:
+ export_id = str(uuid.uuid4())
+
+ print()
+ print('== Edges export phase ==')
+ export_edges(config, export_path, export_id, processes)
+
+ print()
+ print('== Sort phase ==')
+ sort_graph_nodes(export_path, config)
diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/exporter.py
@@ -0,0 +1,171 @@
+import concurrent.futures
+import multiprocessing
+import tqdm
+import time
+from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor
+from confluent_kafka import TopicPartition
+
+from swh.journal.client import JournalClient
+
+
+class JournalClientOffsetRanges(JournalClient):
+ def __init__(self, *args, offset_ranges=None, assignment=None,
+ progress_queue=None, refresh_every=200, **kwargs):
+ self.offset_ranges = offset_ranges
+ self.progress_queue = progress_queue
+ self.refresh_every = refresh_every
+ self.assignment = assignment
+ super().__init__(*args, **kwargs)
+
+ def subscribe(self):
+ topic_name = self.subscription[0]
+ time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983
+ self.consumer.assign([
+ TopicPartition(topic_name, pid) for pid in self.assignment
+ ])
+
+ def process(self, *args, **kwargs):
+ self.count = 0
+ try:
+ # Handle already committed partition offsets
+ topic_name = self.subscription[0]
+ committed = self.consumer.committed([
+ TopicPartition(topic_name, pid) for pid in self.assignment
+ ])
+ for tp in committed:
+ self.handle_offset(tp.partition, tp.offset)
+
+ if not self.assignment:
+ raise EOFError
+
+ # Process the messages
+ super().process(*args, **kwargs)
+ except EOFError:
+ self.progress_queue.put(None)
+ pass
+
+ def handle_offset(self, partition_id, offset):
+ if offset < 0: # Uninitialized partition offset
+ return
+
+ if self.count % self.refresh_every == 0:
+ self.progress_queue.put({partition_id: offset})
+
+ if offset >= self.offset_ranges[partition_id][1] - 1:
+ self.assignment = [pid for pid in self.assignment
+ if pid != partition_id]
+ self.subscribe()
+
+ def deserialize_message(self, message):
+ self.handle_offset(message.partition(), message.offset())
+ self.count += 1
+
+ if not self.assignment:
+ raise EOFError
+
+ return super().deserialize_message(message)
+
+
+class ParallelExporter:
+ def __init__(self, config, export_id, obj_type, processes=1):
+ self.config = config
+ self.export_id = 'swh-dataset-export-{}'.format(export_id)
+ self.obj_type = obj_type
+ self.processes = processes
+
+ self.offsets = None
+
+ def get_offsets(self):
+ if self.offsets is None:
+ client = JournalClient(
+ **self.config['journal'],
+ object_types=[self.obj_type],
+ group_id=self.export_id,
+ )
+ topic_name = client.subscription[0]
+ topics = client.consumer.list_topics(topic_name).topics
+ partitions = topics[topic_name].partitions
+
+ self.offsets = {}
+ for partition_id in tqdm.tqdm(partitions.keys(),
+ desc=" - Partition offsets"):
+ tp = TopicPartition(topic_name, partition_id)
+ (lo, hi) = client.consumer.get_watermark_offsets(tp)
+ self.offsets[partition_id] = (lo, hi)
+ return self.offsets
+
+ def run(self, *args):
+ self.get_offsets()
+ to_assign = list(self.offsets.keys())
+
+ manager = multiprocessing.Manager()
+ q = manager.Queue()
+
+ with ProcessPoolExecutor(self.processes + 1) as pool:
+ futures = []
+ for i in range(self.processes):
+ futures.append(pool.submit(
+ self.export_worker,
+ *args,
+ assignment=to_assign[i::self.processes],
+ queue=q
+ ))
+ futures.append(pool.submit(self.progress_worker, queue=q))
+
+ concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION)
+ for f in futures:
+ if f.running():
+ continue
+ exc = f.exception()
+ if exc:
+ pool.shutdown(wait=False)
+ print(exc)
+ f.result()
+ raise exc
+
+ def progress_worker(self, *args, queue=None):
+ d = {}
+ active_workers = self.processes
+ offset_diff = sum((hi - lo) for lo, hi in self.offsets.values())
+ with tqdm.tqdm(total=offset_diff, desc=" - Journal export") as pbar:
+ while active_workers:
+ item = queue.get()
+ if item is None:
+ active_workers -= 1
+ continue
+ d.update(item)
+ progress = sum(n - self.offsets[p][0] for p, n in d.items())
+ pbar.set_postfix(active_workers=active_workers,
+ total_workers=self.processes)
+ pbar.update(progress - pbar.n)
+
+ def process(self, callback, assignment=None, queue=None):
+ client = JournalClientOffsetRanges(
+ **self.config['journal'],
+ object_types=[self.obj_type],
+ group_id=self.export_id,
+ debug='cgrp,broker',
+ offset_ranges=self.offsets,
+ assignment=assignment,
+ progress_queue=queue,
+ )
+ client.process(callback)
+
+ def export_worker(self, *args, **kwargs):
+ """
+ Override this with a custom implementation of a worker function.
+
+ A worker function should call `self.process(fn, **kwargs)` with `fn`
+ being a callback that will be called in the same fashion as with
+ `JournalClient.process()`.
+
+ A simple exporter to print all the objects in the log would look like
+ this:
+
+ ```
+ class PrintExporter(ParallelExporter):
+ def export_worker(self, **kwargs):
+ self.process(print, **kwargs)
+ ```
+ """
+ raise NotImplementedError
diff --git a/swh/dataset/graph.py b/swh/dataset/graph.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/graph.py
@@ -0,0 +1,121 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import functools
+import os
+import pathlib
+import shlex
+import subprocess
+import tempfile
+import uuid
+
+from swh.dataset.exporter import ParallelExporter
+from swh.dataset.utils import ZSTWriter
+from swh.model.identifiers import origin_identifier, persistent_identifier
+
+
+def process_messages(messages, writer, config):
+ def write(src, dst):
+ src_type, src_id = src
+ dst_type, dst_id = dst
+ if src_id is None or dst_id is None:
+ return
+ src_pid = persistent_identifier(object_type=src_type, object_id=src_id)
+ dst_pid = persistent_identifier(object_type=dst_type, object_id=dst_id)
+ writer.write('{} {}\n'.format(src_pid, dst_pid))
+
+ for visit in messages.get('origin_visit', []):
+ write(('origin', origin_identifier({'url': visit['origin']['url']})),
+ ('snapshot', visit['snapshot']))
+
+ for snapshot in messages.get('snapshot', []):
+ for branch_name, branch in snapshot['branches'].items():
+ while branch and branch.get('target_type') == 'alias':
+ branch_name = branch['target']
+ branch = snapshot['branches'][branch_name]
+ if branch is None or not branch_name:
+ continue
+ if (config.get('remove_pull_requests')
+ and branch_name.startswith(b'refs/pull')
+ or branch_name.startswith(b'refs/merge-requests')):
+ continue
+ write(('snapshot', snapshot['id']),
+ (branch['target_type'], branch['target']))
+
+ for release in messages.get('release', []):
+ write(('release', release['id']),
+ (release['target_type'], release['target']))
+
+ for revision in messages.get('revision', []):
+ write(('revision', revision['id']),
+ ('directory', revision['directory']))
+ for parent in revision['parents']:
+ write(('revision', revision['id']),
+ ('revision', parent))
+
+ for directory in messages.get('directory', []):
+ for entry in directory['entries']:
+ entry_type_mapping = {
+ 'file': 'content',
+ 'dir': 'directory',
+ 'rev': 'revision'
+ }
+ write(('directory', directory['id']),
+ (entry_type_mapping[entry['type']], entry['target']))
+
+
+class GraphEdgeExporter(ParallelExporter):
+ def export_worker(self, export_path, **kwargs):
+ dataset_path = pathlib.Path(export_path)
+ dataset_path.mkdir(exist_ok=True, parents=True)
+ dataset_file = dataset_path / ('graph-{}.edges.csv.zst'
+ .format(str(uuid.uuid4())))
+
+ with ZSTWriter(dataset_file) as writer:
+ process_fn = functools.partial(
+ process_messages, writer=writer, config=self.config,
+ )
+ self.process(process_fn, **kwargs)
+
+
+def export_edges(config, export_path, export_id, processes):
+ object_types = [
+ 'origin_visit',
+ 'snapshot',
+ 'release',
+ 'revision',
+ 'directory',
+ ]
+ for obj_type in object_types:
+ print('{} edges:'.format(obj_type))
+ exporter = GraphEdgeExporter(config, export_id, obj_type, processes)
+ exporter.run(export_path)
+
+
+def sort_graph_nodes(export_path, config):
+ # Use bytes for the sorting algorithm (faster than being locale-specific)
+ env = {
+ **os.environ.copy(),
+ 'LC_ALL': 'C',
+ 'LC_COLLATE': 'C',
+ 'LANG': 'C',
+ }
+ sort_buffer_size = config.get('sort_buffer_size', '4G')
+ disk_buffer_dir = config.get('disk_buffer_dir', export_path)
+ with tempfile.TemporaryDirectory(prefix='.graph_node_sort_',
+ dir=disk_buffer_dir) as buffer_path:
+ subprocess.run(
+ ("zstdcat {export_path}/*.edges.csv.zst | "
+ "tr ' ' '\\n' | "
+ "sort -u -S{sort_buffer_size} -T{buffer_path} | "
+ "zstdmt > {export_path}/graph.nodes.csv.zst")
+ .format(
+ export_path=shlex.quote(export_path),
+ buffer_path=shlex.quote(buffer_path),
+ sort_buffer_size=shlex.quote(sort_buffer_size),
+ ),
+ shell=True,
+ env=env,
+ )
diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/utils.py
@@ -0,0 +1,22 @@
+import subprocess
+
+
+class ZSTWriter:
+ def __init__(self, path, mode='w'):
+ self.path = path
+ self.mode = mode
+
+ def __enter__(self):
+ is_text = not (self.mode == 'wb')
+ self.process = subprocess.Popen(
+ ['zstd', '-q', '-o', self.path],
+ text=is_text, stdin=subprocess.PIPE
+ )
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self.process.stdin.close()
+ self.process.wait()
+
+ def write(self, buf):
+ self.process.stdin.write(buf)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 20 2024, 5:13 AM (11 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232986
Attached To
D3011: dataset: add graph export based on kafka
Event Timeline
Log In to Comment