Page MenuHomeSoftware Heritage

D3011.id10680.diff
No OneTemporary

D3011.id10680.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -11,5 +11,11 @@
[mypy-pytest.*]
ignore_missing_imports = True
+[mypy-tqdm.*]
+ignore_missing_imports = True
+
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
# [mypy-add_your_lib_here.*]
# ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,3 +2,4 @@
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
click
+tqdm
diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/cli.py
@@ -0,0 +1,44 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import click
+import uuid
+
+from swh.core import config
+from swh.core.cli import CONTEXT_SETTINGS
+from swh.dataset.graph import export_edges, sort_graph_nodes
+
+
+@click.group(name='dataset', context_settings=CONTEXT_SETTINGS)
+@click.option('--config-file', '-C', default=None,
+ type=click.Path(exists=True, dir_okay=False),
+ help="Configuration file.")
+@click.pass_context
+def cli(ctx, config_file):
+ '''Software Heritage Dataset Tools'''
+ ctx.ensure_object(dict)
+
+ conf = config.read(config_file)
+ ctx.obj['config'] = conf
+
+
+@cli.command('export-graph')
+@click.argument('export-path', type=click.Path())
+@click.option('--export-id', '-e', help="Unique ID of the export run.")
+@click.option('--processes', '-p', default=1,
+ help="Number of parallel processes")
+@click.pass_context
+def export_graph(ctx, export_path, export_id, processes):
+ config = ctx.obj['config']
+ if not export_id:
+ export_id = str(uuid.uuid4())
+
+ print()
+ print('== Edges export phase ==')
+ export_edges(config, export_path, export_id, processes)
+
+ print()
+ print('== Sort phase ==')
+ sort_graph_nodes(export_path, config)
diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/exporter.py
@@ -0,0 +1,171 @@
+import concurrent.futures
+import multiprocessing
+import tqdm
+import time
+from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor
+from confluent_kafka import TopicPartition
+
+from swh.journal.client import JournalClient
+
+
+class JournalClientOffsetRanges(JournalClient):
+ def __init__(self, *args, offset_ranges=None, assignment=None,
+ progress_queue=None, refresh_every=200, **kwargs):
+ self.offset_ranges = offset_ranges
+ self.progress_queue = progress_queue
+ self.refresh_every = refresh_every
+ self.assignment = assignment
+ super().__init__(*args, **kwargs)
+
+ def subscribe(self):
+ topic_name = self.subscription[0]
+ time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983
+ self.consumer.assign([
+ TopicPartition(topic_name, pid) for pid in self.assignment
+ ])
+
+ def process(self, *args, **kwargs):
+ self.count = 0
+ try:
+ # Handle already committed partition offsets
+ topic_name = self.subscription[0]
+ committed = self.consumer.committed([
+ TopicPartition(topic_name, pid) for pid in self.assignment
+ ])
+ for tp in committed:
+ self.handle_offset(tp.partition, tp.offset)
+
+ if not self.assignment:
+ raise EOFError
+
+ # Process the messages
+ super().process(*args, **kwargs)
+ except EOFError:
+ self.progress_queue.put(None)
+ pass
+
+ def handle_offset(self, partition_id, offset):
+ if offset < 0: # Uninitialized partition offset
+ return
+
+ if self.count % self.refresh_every == 0:
+ self.progress_queue.put({partition_id: offset})
+
+ if offset >= self.offset_ranges[partition_id][1] - 1:
+ self.assignment = [pid for pid in self.assignment
+ if pid != partition_id]
+ self.subscribe()
+
+ def deserialize_message(self, message):
+ self.handle_offset(message.partition(), message.offset())
+ self.count += 1
+
+ if not self.assignment:
+ raise EOFError
+
+ return super().deserialize_message(message)
+
+
+class ParallelExporter:
+ def __init__(self, config, export_id, obj_type, processes=1):
+ self.config = config
+ self.export_id = 'swh-dataset-export-{}'.format(export_id)
+ self.obj_type = obj_type
+ self.processes = processes
+
+ self.offsets = None
+
+ def get_offsets(self):
+ if self.offsets is None:
+ client = JournalClient(
+ **self.config['journal'],
+ object_types=[self.obj_type],
+ group_id=self.export_id,
+ )
+ topic_name = client.subscription[0]
+ topics = client.consumer.list_topics(topic_name).topics
+ partitions = topics[topic_name].partitions
+
+ self.offsets = {}
+ for partition_id in tqdm.tqdm(partitions.keys(),
+ desc=" - Partition offsets"):
+ tp = TopicPartition(topic_name, partition_id)
+ (lo, hi) = client.consumer.get_watermark_offsets(tp)
+ self.offsets[partition_id] = (lo, hi)
+ return self.offsets
+
+ def run(self, *args):
+ self.get_offsets()
+ to_assign = list(self.offsets.keys())
+
+ manager = multiprocessing.Manager()
+ q = manager.Queue()
+
+ with ProcessPoolExecutor(self.processes + 1) as pool:
+ futures = []
+ for i in range(self.processes):
+ futures.append(pool.submit(
+ self.export_worker,
+ *args,
+ assignment=to_assign[i::self.processes],
+ queue=q
+ ))
+ futures.append(pool.submit(self.progress_worker, queue=q))
+
+ concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION)
+ for f in futures:
+ if f.running():
+ continue
+ exc = f.exception()
+ if exc:
+ pool.shutdown(wait=False)
+ print(exc)
+ f.result()
+ raise exc
+
+ def progress_worker(self, *args, queue=None):
+ d = {}
+ active_workers = self.processes
+ offset_diff = sum((hi - lo) for lo, hi in self.offsets.values())
+ with tqdm.tqdm(total=offset_diff, desc=" - Journal export") as pbar:
+ while active_workers:
+ item = queue.get()
+ if item is None:
+ active_workers -= 1
+ continue
+ d.update(item)
+ progress = sum(n - self.offsets[p][0] for p, n in d.items())
+ pbar.set_postfix(active_workers=active_workers,
+ total_workers=self.processes)
+ pbar.update(progress - pbar.n)
+
+ def process(self, callback, assignment=None, queue=None):
+ client = JournalClientOffsetRanges(
+ **self.config['journal'],
+ object_types=[self.obj_type],
+ group_id=self.export_id,
+ debug='cgrp,broker',
+ offset_ranges=self.offsets,
+ assignment=assignment,
+ progress_queue=queue,
+ )
+ client.process(callback)
+
+ def export_worker(self, *args, **kwargs):
+ """
+ Override this with a custom implementation of a worker function.
+
+ A worker function should call `self.process(fn, **kwargs)` with `fn`
+ being a callback that will be called in the same fashion as with
+ `JournalClient.process()`.
+
+ A simple exporter to print all the objects in the log would look like
+ this:
+
+ ```
+ class PrintExporter(ParallelExporter):
+ def export_worker(self, **kwargs):
+ self.process(print, **kwargs)
+ ```
+ """
+ raise NotImplementedError
diff --git a/swh/dataset/graph.py b/swh/dataset/graph.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/graph.py
@@ -0,0 +1,121 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import functools
+import os
+import pathlib
+import shlex
+import subprocess
+import tempfile
+import uuid
+
+from swh.dataset.exporter import ParallelExporter
+from swh.dataset.utils import ZSTWriter
+from swh.model.identifiers import origin_identifier, persistent_identifier
+
+
+def process_messages(messages, writer, config):
+ def write(src, dst):
+ src_type, src_id = src
+ dst_type, dst_id = dst
+ if src_id is None or dst_id is None:
+ return
+ src_pid = persistent_identifier(object_type=src_type, object_id=src_id)
+ dst_pid = persistent_identifier(object_type=dst_type, object_id=dst_id)
+ writer.write('{} {}\n'.format(src_pid, dst_pid))
+
+ for visit in messages.get('origin_visit', []):
+ write(('origin', origin_identifier({'url': visit['origin']['url']})),
+ ('snapshot', visit['snapshot']))
+
+ for snapshot in messages.get('snapshot', []):
+ for branch_name, branch in snapshot['branches'].items():
+ while branch and branch.get('target_type') == 'alias':
+ branch_name = branch['target']
+ branch = snapshot['branches'][branch_name]
+ if branch is None or not branch_name:
+ continue
+ if (config.get('remove_pull_requests')
+ and branch_name.startswith(b'refs/pull')
+ or branch_name.startswith(b'refs/merge-requests')):
+ continue
+ write(('snapshot', snapshot['id']),
+ (branch['target_type'], branch['target']))
+
+ for release in messages.get('release', []):
+ write(('release', release['id']),
+ (release['target_type'], release['target']))
+
+ for revision in messages.get('revision', []):
+ write(('revision', revision['id']),
+ ('directory', revision['directory']))
+ for parent in revision['parents']:
+ write(('revision', revision['id']),
+ ('revision', parent))
+
+ for directory in messages.get('directory', []):
+ for entry in directory['entries']:
+ entry_type_mapping = {
+ 'file': 'content',
+ 'dir': 'directory',
+ 'rev': 'revision'
+ }
+ write(('directory', directory['id']),
+ (entry_type_mapping[entry['type']], entry['target']))
+
+
+class GraphEdgeExporter(ParallelExporter):
+ def export_worker(self, export_path, **kwargs):
+ dataset_path = pathlib.Path(export_path)
+ dataset_path.mkdir(exist_ok=True, parents=True)
+ dataset_file = dataset_path / ('graph-{}.edges.csv.zst'
+ .format(str(uuid.uuid4())))
+
+ with ZSTWriter(dataset_file) as writer:
+ process_fn = functools.partial(
+ process_messages, writer=writer, config=self.config,
+ )
+ self.process(process_fn, **kwargs)
+
+
+def export_edges(config, export_path, export_id, processes):
+ object_types = [
+ 'origin_visit',
+ 'snapshot',
+ 'release',
+ 'revision',
+ 'directory',
+ ]
+ for obj_type in object_types:
+ print('{} edges:'.format(obj_type))
+ exporter = GraphEdgeExporter(config, export_id, obj_type, processes)
+ exporter.run(export_path)
+
+
+def sort_graph_nodes(export_path, config):
+ # Use bytes for the sorting algorithm (faster than being locale-specific)
+ env = {
+ **os.environ.copy(),
+ 'LC_ALL': 'C',
+ 'LC_COLLATE': 'C',
+ 'LANG': 'C',
+ }
+ sort_buffer_size = config.get('sort_buffer_size', '4G')
+ disk_buffer_dir = config.get('disk_buffer_dir', export_path)
+ with tempfile.TemporaryDirectory(prefix='.graph_node_sort_',
+ dir=disk_buffer_dir) as buffer_path:
+ subprocess.run(
+ ("zstdcat {export_path}/*.edges.csv.zst | "
+ "tr ' ' '\\n' | "
+ "sort -u -S{sort_buffer_size} -T{buffer_path} | "
+ "zstdmt > {export_path}/graph.nodes.csv.zst")
+ .format(
+ export_path=shlex.quote(export_path),
+ buffer_path=shlex.quote(buffer_path),
+ sort_buffer_size=shlex.quote(sort_buffer_size),
+ ),
+ shell=True,
+ env=env,
+ )
diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/utils.py
@@ -0,0 +1,22 @@
+import subprocess
+
+
+class ZSTWriter:
+ def __init__(self, path, mode='w'):
+ self.path = path
+ self.mode = mode
+
+ def __enter__(self):
+ is_text = not (self.mode == 'wb')
+ self.process = subprocess.Popen(
+ ['zstd', '-q', '-o', self.path],
+ text=is_text, stdin=subprocess.PIPE
+ )
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self.process.stdin.close()
+ self.process.wait()
+
+ def write(self, buf):
+ self.process.stdin.write(buf)

File Metadata

Mime Type
text/plain
Expires
Dec 20 2024, 5:13 AM (11 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232986

Event Timeline