diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py index f27c84d..10c4e42 100644 --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -1,44 +1,45 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import uuid from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.dataset.graph import export_edges, sort_graph_nodes @click.group(name='dataset', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False), help="Configuration file.") @click.pass_context def cli(ctx, config_file): '''Software Heritage Dataset Tools''' ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj['config'] = conf @cli.command('export-graph') @click.argument('export-path', type=click.Path()) @click.option('--export-id', '-e', help="Unique ID of the export run.") @click.option('--processes', '-p', default=1, help="Number of parallel processes") @click.pass_context def export_graph(ctx, export_path, export_id, processes): + """Export the Software Heritage graph as an edge dataset.""" config = ctx.obj['config'] if not export_id: export_id = str(uuid.uuid4()) print() print('== Edges export phase ==') export_edges(config, export_path, export_id, processes) print() print('== Sort phase ==') sort_graph_nodes(export_path, config) diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py index 1a65bbd..790f1f5 100644 --- a/swh/dataset/exporter.py +++ b/swh/dataset/exporter.py @@ -1,171 +1,228 @@ import concurrent.futures import multiprocessing import tqdm import time +from typing import Mapping, Sequence, Tuple from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor from confluent_kafka import TopicPartition from swh.journal.client import JournalClient class JournalClientOffsetRanges(JournalClient): - def __init__(self, *args, offset_ranges=None, assignment=None, - progress_queue=None, refresh_every=200, **kwargs): + """ + A subclass of JournalClient reading only inside some specific offset + range. Partition assignments have to be manually given to the class. + + This client can only read a single topic at a time. + """ + def __init__( + self, + *args, + offset_ranges: Mapping[int, Tuple[int, int]] = None, + assignment: Sequence[int] = None, + progress_queue: multiprocessing.Queue = None, + refresh_every: int = 200, + **kwargs, + ): + """ + Args: + offset_ranges: A mapping of partition_id -> (low, high) offsets + that define the boundaries of the messages to consume. + assignment: The list of partitions to assign to this client. + progress_queue: a multiprocessing.Queue where the current + progress will be reported. + refresh_every: the refreshing rate of the progress reporting. + """ self.offset_ranges = offset_ranges self.progress_queue = progress_queue self.refresh_every = refresh_every self.assignment = assignment + self.count = None + self.topic_name = None super().__init__(*args, **kwargs) def subscribe(self): - topic_name = self.subscription[0] + self.topic_name = self.subscription[0] time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 self.consumer.assign([ - TopicPartition(topic_name, pid) for pid in self.assignment + TopicPartition(self.topic_name, pid) for pid in self.assignment ]) def process(self, *args, **kwargs): self.count = 0 try: - # Handle already committed partition offsets - topic_name = self.subscription[0] - committed = self.consumer.committed([ - TopicPartition(topic_name, pid) for pid in self.assignment - ]) - for tp in committed: - self.handle_offset(tp.partition, tp.offset) - - if not self.assignment: - raise EOFError - - # Process the messages + self.handle_committed_offsets() super().process(*args, **kwargs) except EOFError: self.progress_queue.put(None) pass + def handle_committed_offsets(self, ): + """ + Handle already committed partition offsets before starting processing. + """ + committed = self.consumer.committed([ + TopicPartition(self.topic_name, pid) for pid in self.assignment + ]) + for tp in committed: + self.handle_offset(tp.partition, tp.offset) + def handle_offset(self, partition_id, offset): + """ + Check whether the client has reached the end of the current + partition, and trigger a reassignment if that is the case. + Raise EOFError if all the partitions have reached the end. + """ if offset < 0: # Uninitialized partition offset return if self.count % self.refresh_every == 0: self.progress_queue.put({partition_id: offset}) if offset >= self.offset_ranges[partition_id][1] - 1: self.assignment = [pid for pid in self.assignment if pid != partition_id] self.subscribe() - def deserialize_message(self, message): - self.handle_offset(message.partition(), message.offset()) - self.count += 1 - if not self.assignment: raise EOFError + def deserialize_message(self, message): + """ + Override of the message deserialization to hook the handling of the + message offset. + """ + self.handle_offset(message.partition(), message.offset()) + self.count += 1 return super().deserialize_message(message) class ParallelExporter: - def __init__(self, config, export_id, obj_type, processes=1): + """ + Base class for all the Journal exporters. + + Each exporter should override the `export_worker` function with an + implementation of how to run the message processing. + """ + def __init__(self, config, export_id: str, obj_type, processes=1): + """ + Args: + config: the exporter config, which should also include the + JournalClient configuration. + export_id: a unique identifier for the export that will be used + as part of a Kafka consumer group ID. + obj_type: The type of SWH object to export. + processes: The number of processes to run. + """ self.config = config self.export_id = 'swh-dataset-export-{}'.format(export_id) self.obj_type = obj_type self.processes = processes - self.offsets = None def get_offsets(self): + """ + First pass to fetch all the current low and high offsets of each + partition to define the consumption boundaries. + """ if self.offsets is None: client = JournalClient( **self.config['journal'], object_types=[self.obj_type], group_id=self.export_id, ) topic_name = client.subscription[0] topics = client.consumer.list_topics(topic_name).topics partitions = topics[topic_name].partitions self.offsets = {} for partition_id in tqdm.tqdm(partitions.keys(), desc=" - Partition offsets"): tp = TopicPartition(topic_name, partition_id) (lo, hi) = client.consumer.get_watermark_offsets(tp) self.offsets[partition_id] = (lo, hi) return self.offsets def run(self, *args): + """ + Run the parallel export. + """ self.get_offsets() to_assign = list(self.offsets.keys()) manager = multiprocessing.Manager() q = manager.Queue() with ProcessPoolExecutor(self.processes + 1) as pool: futures = [] for i in range(self.processes): futures.append(pool.submit( self.export_worker, *args, assignment=to_assign[i::self.processes], queue=q )) futures.append(pool.submit(self.progress_worker, queue=q)) concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION) for f in futures: if f.running(): continue exc = f.exception() if exc: pool.shutdown(wait=False) - print(exc) f.result() raise exc def progress_worker(self, *args, queue=None): + """ + An additional worker process that reports the current progress of the + export between all the different parallel consumers and across all the + partitions, by consuming the shared progress reporting Queue. + """ d = {} active_workers = self.processes offset_diff = sum((hi - lo) for lo, hi in self.offsets.values()) with tqdm.tqdm(total=offset_diff, desc=" - Journal export") as pbar: while active_workers: item = queue.get() if item is None: active_workers -= 1 continue d.update(item) progress = sum(n - self.offsets[p][0] for p, n in d.items()) pbar.set_postfix(active_workers=active_workers, total_workers=self.processes) pbar.update(progress - pbar.n) def process(self, callback, assignment=None, queue=None): client = JournalClientOffsetRanges( **self.config['journal'], object_types=[self.obj_type], group_id=self.export_id, debug='cgrp,broker', offset_ranges=self.offsets, assignment=assignment, progress_queue=queue, ) client.process(callback) def export_worker(self, *args, **kwargs): """ Override this with a custom implementation of a worker function. A worker function should call `self.process(fn, **kwargs)` with `fn` being a callback that will be called in the same fashion as with `JournalClient.process()`. A simple exporter to print all the objects in the log would look like this: ``` class PrintExporter(ParallelExporter): def export_worker(self, **kwargs): self.process(print, **kwargs) ``` """ raise NotImplementedError diff --git a/swh/dataset/graph.py b/swh/dataset/graph.py index 304b543..81c7416 100644 --- a/swh/dataset/graph.py +++ b/swh/dataset/graph.py @@ -1,121 +1,148 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import os import pathlib import shlex import subprocess import tempfile import uuid from swh.dataset.exporter import ParallelExporter from swh.dataset.utils import ZSTWriter from swh.model.identifiers import origin_identifier, persistent_identifier -def process_messages(messages, writer, config): +def process_messages(messages, writer, config) -> None: + """ + Args: + messages: A sequence of messages to process + writer: A file-like object that can be written to + config: The exporter configuration + """ def write(src, dst): src_type, src_id = src dst_type, dst_id = dst if src_id is None or dst_id is None: return src_pid = persistent_identifier(object_type=src_type, object_id=src_id) dst_pid = persistent_identifier(object_type=dst_type, object_id=dst_id) writer.write('{} {}\n'.format(src_pid, dst_pid)) for visit in messages.get('origin_visit', []): write(('origin', origin_identifier({'url': visit['origin']['url']})), ('snapshot', visit['snapshot'])) for snapshot in messages.get('snapshot', []): for branch_name, branch in snapshot['branches'].items(): while branch and branch.get('target_type') == 'alias': branch_name = branch['target'] branch = snapshot['branches'][branch_name] if branch is None or not branch_name: continue if (config.get('remove_pull_requests') - and branch_name.startswith(b'refs/pull') - or branch_name.startswith(b'refs/merge-requests')): + and (branch_name.startswith(b'refs/pull') + or branch_name.startswith(b'refs/merge-requests'))): continue write(('snapshot', snapshot['id']), (branch['target_type'], branch['target'])) for release in messages.get('release', []): write(('release', release['id']), (release['target_type'], release['target'])) for revision in messages.get('revision', []): write(('revision', revision['id']), ('directory', revision['directory'])) for parent in revision['parents']: write(('revision', revision['id']), ('revision', parent)) for directory in messages.get('directory', []): for entry in directory['entries']: entry_type_mapping = { 'file': 'content', 'dir': 'directory', 'rev': 'revision' } write(('directory', directory['id']), (entry_type_mapping[entry['type']], entry['target'])) class GraphEdgeExporter(ParallelExporter): + """ + Implementation of ParallelExporter which writes all the graph edges + of a specific type in a Zstandard-compressed CSV file. + + Each row of the CSV is in the format: ` + """ def export_worker(self, export_path, **kwargs): dataset_path = pathlib.Path(export_path) dataset_path.mkdir(exist_ok=True, parents=True) dataset_file = dataset_path / ('graph-{}.edges.csv.zst' .format(str(uuid.uuid4()))) with ZSTWriter(dataset_file) as writer: process_fn = functools.partial( process_messages, writer=writer, config=self.config, ) self.process(process_fn, **kwargs) def export_edges(config, export_path, export_id, processes): + """Run the edge exporter for each edge type.""" object_types = [ 'origin_visit', 'snapshot', 'release', 'revision', 'directory', ] for obj_type in object_types: print('{} edges:'.format(obj_type)) exporter = GraphEdgeExporter(config, export_id, obj_type, processes) exporter.run(export_path) def sort_graph_nodes(export_path, config): + """ + Generate the node list from the edges files. + + We cannot solely rely on the object IDs that are read in the journal, + as some nodes that are referred to as destinations in the edge file + might not be present in the archive (e.g a rev_entry referring to a + revision that we do not have crawled yet). + + The most efficient way of getting all the nodes that are mentioned in + the edges file is therefore to use sort(1) on the gigantic edge files + to get all the unique node IDs, while using the disk as a temporary + buffer. + """ # Use bytes for the sorting algorithm (faster than being locale-specific) env = { **os.environ.copy(), 'LC_ALL': 'C', 'LC_COLLATE': 'C', 'LANG': 'C', } sort_buffer_size = config.get('sort_buffer_size', '4G') disk_buffer_dir = config.get('disk_buffer_dir', export_path) with tempfile.TemporaryDirectory(prefix='.graph_node_sort_', dir=disk_buffer_dir) as buffer_path: subprocess.run( - ("zstdcat {export_path}/*.edges.csv.zst | " + ("pv {export_path}/*.edges.csv.zst | " + "zstdcat |" "tr ' ' '\\n' | " "sort -u -S{sort_buffer_size} -T{buffer_path} | " "zstdmt > {export_path}/graph.nodes.csv.zst") .format( export_path=shlex.quote(export_path), buffer_path=shlex.quote(buffer_path), sort_buffer_size=shlex.quote(sort_buffer_size), ), shell=True, env=env, ) diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py index de8050a..7251b11 100644 --- a/swh/dataset/utils.py +++ b/swh/dataset/utils.py @@ -1,22 +1,23 @@ import subprocess class ZSTWriter: def __init__(self, path, mode='w'): self.path = path self.mode = mode def __enter__(self): is_text = not (self.mode == 'wb') self.process = subprocess.Popen( ['zstd', '-q', '-o', self.path], - text=is_text, stdin=subprocess.PIPE + text=is_text, + stdin=subprocess.PIPE ) return self def __exit__(self, exc_type, exc_value, tb): self.process.stdin.close() self.process.wait() def write(self, buf): self.process.stdin.write(buf)