diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -11,5 +11,11 @@ [mypy-pytest.*] ignore_missing_imports = True +[mypy-tqdm.*] +ignore_missing_imports = True + +[mypy-confluent_kafka.*] +ignore_missing_imports = True + # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html click +tqdm diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py new file mode 100644 --- /dev/null +++ b/swh/dataset/cli.py @@ -0,0 +1,44 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +import uuid + +from swh.core import config +from swh.core.cli import CONTEXT_SETTINGS +from swh.dataset.graph import export_edges, sort_graph_nodes + + +@click.group(name='dataset', context_settings=CONTEXT_SETTINGS) +@click.option('--config-file', '-C', default=None, + type=click.Path(exists=True, dir_okay=False), + help="Configuration file.") +@click.pass_context +def cli(ctx, config_file): + '''Software Heritage Dataset Tools''' + ctx.ensure_object(dict) + + conf = config.read(config_file) + ctx.obj['config'] = conf + + +@cli.command('export-graph') +@click.argument('export-path', type=click.Path()) +@click.option('--export-id', '-e', help="Unique ID of the export run.") +@click.option('--processes', '-p', default=1, + help="Number of parallel processes") +@click.pass_context +def export_graph(ctx, export_path, export_id, processes): + config = ctx.obj['config'] + if not export_id: + export_id = str(uuid.uuid4()) + + print() + print('== Edges export phase ==') + export_edges(config, export_path, export_id, processes) + + print() + print('== Sort phase ==') + sort_graph_nodes(export_path, config) diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py new file mode 100644 --- /dev/null +++ b/swh/dataset/exporter.py @@ -0,0 +1,171 @@ +import concurrent.futures +import multiprocessing +import tqdm +import time +from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor +from confluent_kafka import TopicPartition + +from swh.journal.client import JournalClient + + +class JournalClientOffsetRanges(JournalClient): + def __init__(self, *args, offset_ranges=None, assignment=None, + progress_queue=None, refresh_every=200, **kwargs): + self.offset_ranges = offset_ranges + self.progress_queue = progress_queue + self.refresh_every = refresh_every + self.assignment = assignment + super().__init__(*args, **kwargs) + + def subscribe(self): + topic_name = self.subscription[0] + time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 + self.consumer.assign([ + TopicPartition(topic_name, pid) for pid in self.assignment + ]) + + def process(self, *args, **kwargs): + self.count = 0 + try: + # Handle already committed partition offsets + topic_name = self.subscription[0] + committed = self.consumer.committed([ + TopicPartition(topic_name, pid) for pid in self.assignment + ]) + for tp in committed: + self.handle_offset(tp.partition, tp.offset) + + if not self.assignment: + raise EOFError + + # Process the messages + super().process(*args, **kwargs) + except EOFError: + self.progress_queue.put(None) + pass + + def handle_offset(self, partition_id, offset): + if offset < 0: # Uninitialized partition offset + return + + if self.count % self.refresh_every == 0: + self.progress_queue.put({partition_id: offset}) + + if offset >= self.offset_ranges[partition_id][1] - 1: + self.assignment = [pid for pid in self.assignment + if pid != partition_id] + self.subscribe() + + def deserialize_message(self, message): + self.handle_offset(message.partition(), message.offset()) + self.count += 1 + + if not self.assignment: + raise EOFError + + return super().deserialize_message(message) + + +class ParallelExporter: + def __init__(self, config, export_id, obj_type, processes=1): + self.config = config + self.export_id = 'swh-dataset-export-{}'.format(export_id) + self.obj_type = obj_type + self.processes = processes + + self.offsets = None + + def get_offsets(self): + if self.offsets is None: + client = JournalClient( + **self.config['journal'], + object_types=[self.obj_type], + group_id=self.export_id, + ) + topic_name = client.subscription[0] + topics = client.consumer.list_topics(topic_name).topics + partitions = topics[topic_name].partitions + + self.offsets = {} + for partition_id in tqdm.tqdm(partitions.keys(), + desc=" - Partition offsets"): + tp = TopicPartition(topic_name, partition_id) + (lo, hi) = client.consumer.get_watermark_offsets(tp) + self.offsets[partition_id] = (lo, hi) + return self.offsets + + def run(self, *args): + self.get_offsets() + to_assign = list(self.offsets.keys()) + + manager = multiprocessing.Manager() + q = manager.Queue() + + with ProcessPoolExecutor(self.processes + 1) as pool: + futures = [] + for i in range(self.processes): + futures.append(pool.submit( + self.export_worker, + *args, + assignment=to_assign[i::self.processes], + queue=q + )) + futures.append(pool.submit(self.progress_worker, queue=q)) + + concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION) + for f in futures: + if f.running(): + continue + exc = f.exception() + if exc: + pool.shutdown(wait=False) + print(exc) + f.result() + raise exc + + def progress_worker(self, *args, queue=None): + d = {} + active_workers = self.processes + offset_diff = sum((hi - lo) for lo, hi in self.offsets.values()) + with tqdm.tqdm(total=offset_diff, desc=" - Journal export") as pbar: + while active_workers: + item = queue.get() + if item is None: + active_workers -= 1 + continue + d.update(item) + progress = sum(n - self.offsets[p][0] for p, n in d.items()) + pbar.set_postfix(active_workers=active_workers, + total_workers=self.processes) + pbar.update(progress - pbar.n) + + def process(self, callback, assignment=None, queue=None): + client = JournalClientOffsetRanges( + **self.config['journal'], + object_types=[self.obj_type], + group_id=self.export_id, + debug='cgrp,broker', + offset_ranges=self.offsets, + assignment=assignment, + progress_queue=queue, + ) + client.process(callback) + + def export_worker(self, *args, **kwargs): + """ + Override this with a custom implementation of a worker function. + + A worker function should call `self.process(fn, **kwargs)` with `fn` + being a callback that will be called in the same fashion as with + `JournalClient.process()`. + + A simple exporter to print all the objects in the log would look like + this: + + ``` + class PrintExporter(ParallelExporter): + def export_worker(self, **kwargs): + self.process(print, **kwargs) + ``` + """ + raise NotImplementedError diff --git a/swh/dataset/graph.py b/swh/dataset/graph.py new file mode 100644 --- /dev/null +++ b/swh/dataset/graph.py @@ -0,0 +1,121 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools +import os +import pathlib +import shlex +import subprocess +import tempfile +import uuid + +from swh.dataset.exporter import ParallelExporter +from swh.dataset.utils import ZSTWriter +from swh.model.identifiers import origin_identifier, persistent_identifier + + +def process_messages(messages, writer, config): + def write(src, dst): + src_type, src_id = src + dst_type, dst_id = dst + if src_id is None or dst_id is None: + return + src_pid = persistent_identifier(object_type=src_type, object_id=src_id) + dst_pid = persistent_identifier(object_type=dst_type, object_id=dst_id) + writer.write('{} {}\n'.format(src_pid, dst_pid)) + + for visit in messages.get('origin_visit', []): + write(('origin', origin_identifier({'url': visit['origin']['url']})), + ('snapshot', visit['snapshot'])) + + for snapshot in messages.get('snapshot', []): + for branch_name, branch in snapshot['branches'].items(): + while branch and branch.get('target_type') == 'alias': + branch_name = branch['target'] + branch = snapshot['branches'][branch_name] + if branch is None or not branch_name: + continue + if (config.get('remove_pull_requests') + and branch_name.startswith(b'refs/pull') + or branch_name.startswith(b'refs/merge-requests')): + continue + write(('snapshot', snapshot['id']), + (branch['target_type'], branch['target'])) + + for release in messages.get('release', []): + write(('release', release['id']), + (release['target_type'], release['target'])) + + for revision in messages.get('revision', []): + write(('revision', revision['id']), + ('directory', revision['directory'])) + for parent in revision['parents']: + write(('revision', revision['id']), + ('revision', parent)) + + for directory in messages.get('directory', []): + for entry in directory['entries']: + entry_type_mapping = { + 'file': 'content', + 'dir': 'directory', + 'rev': 'revision' + } + write(('directory', directory['id']), + (entry_type_mapping[entry['type']], entry['target'])) + + +class GraphEdgeExporter(ParallelExporter): + def export_worker(self, export_path, **kwargs): + dataset_path = pathlib.Path(export_path) + dataset_path.mkdir(exist_ok=True, parents=True) + dataset_file = dataset_path / ('graph-{}.edges.csv.zst' + .format(str(uuid.uuid4()))) + + with ZSTWriter(dataset_file) as writer: + process_fn = functools.partial( + process_messages, writer=writer, config=self.config, + ) + self.process(process_fn, **kwargs) + + +def export_edges(config, export_path, export_id, processes): + object_types = [ + 'origin_visit', + 'snapshot', + 'release', + 'revision', + 'directory', + ] + for obj_type in object_types: + print('{} edges:'.format(obj_type)) + exporter = GraphEdgeExporter(config, export_id, obj_type, processes) + exporter.run(export_path) + + +def sort_graph_nodes(export_path, config): + # Use bytes for the sorting algorithm (faster than being locale-specific) + env = { + **os.environ.copy(), + 'LC_ALL': 'C', + 'LC_COLLATE': 'C', + 'LANG': 'C', + } + sort_buffer_size = config.get('sort_buffer_size', '4G') + disk_buffer_dir = config.get('disk_buffer_dir', export_path) + with tempfile.TemporaryDirectory(prefix='.graph_node_sort_', + dir=disk_buffer_dir) as buffer_path: + subprocess.run( + ("zstdcat {export_path}/*.edges.csv.zst | " + "tr ' ' '\\n' | " + "sort -u -S{sort_buffer_size} -T{buffer_path} | " + "zstdmt > {export_path}/graph.nodes.csv.zst") + .format( + export_path=shlex.quote(export_path), + buffer_path=shlex.quote(buffer_path), + sort_buffer_size=shlex.quote(sort_buffer_size), + ), + shell=True, + env=env, + ) diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py new file mode 100644 --- /dev/null +++ b/swh/dataset/utils.py @@ -0,0 +1,22 @@ +import subprocess + + +class ZSTWriter: + def __init__(self, path, mode='w'): + self.path = path + self.mode = mode + + def __enter__(self): + is_text = not (self.mode == 'wb') + self.process = subprocess.Popen( + ['zstd', '-q', '-o', self.path], + text=is_text, stdin=subprocess.PIPE + ) + return self + + def __exit__(self, exc_type, exc_value, tb): + self.process.stdin.close() + self.process.wait() + + def write(self, buf): + self.process.stdin.write(buf)