diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -66,8 +66,17 @@ type=click.STRING, help="Comma-separated list of object types to exclude", ) +@click.option( + "--reset", + type=bool, + is_flag=True, + help=( + "Consume the kafka journal from the beginning instead of current " + "(committed) offsets" + ), +) @click.pass_context -def export_graph(ctx, export_path, export_id, formats, exclude, processes): +def export_graph(ctx, export_path, export_id, formats, exclude, processes, reset): """Export the Software Heritage graph as an edge dataset.""" import uuid @@ -109,6 +118,7 @@ obj_type, node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type, processes=processes, + reset_offsets=reset, ) print("Exporting {}:".format(obj_type)) parallel_exporter.run() diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -14,7 +14,7 @@ import time from typing import Any, Container, Dict, List, Mapping, Optional, Sequence, Tuple, Type -from confluent_kafka import Message, TopicPartition +from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, Message, TopicPartition import tqdm from swh.dataset.exporter import Exporter @@ -37,6 +37,7 @@ self, *args, offset_ranges: Mapping[int, Tuple[int, int]] = None, + reset_offsets: bool = False, assignment: Sequence[int] = None, progress_queue: multiprocessing.Queue = None, refresh_every: int = 200, @@ -58,6 +59,7 @@ self._messages_to_commit: List[Message] = [] self.count = None self.topic_name: Optional[str] = None + self.reset_offsets = reset_offsets kwargs["stop_on_eof"] = True # Stop when the assignment is empty super().__init__(*args, **kwargs) @@ -65,8 +67,9 @@ self.topic_name = self.subscription[0] time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 logging.debug("Changing assignment to %s", str(self.assignment)) + offset = OFFSET_BEGINNING if self.reset_offsets else OFFSET_STORED self.consumer.assign( - [TopicPartition(self.topic_name, pid) for pid in self.assignment] + [TopicPartition(self.topic_name, pid, offset) for pid in self.assignment] ) def unsubscribe(self, partitions: Container[int]): @@ -150,6 +153,7 @@ obj_type: str, node_sets_path: Path, processes: int = 1, + reset_offsets: bool = False, ): """ Args: @@ -169,6 +173,7 @@ self.processes = processes self.node_sets_path = node_sets_path self.offsets = None + self.reset_offsets = reset_offsets def get_offsets(self): """ @@ -202,7 +207,7 @@ def fetch_insert_partition_id(partition_id): tp = TopicPartition(topic_name, partition_id) (lo, hi) = client.consumer.get_watermark_offsets(tp) - if hi > lo: + if hi > lo and not self.reset_offsets: # hi == low means there is nothing in the partition to consume. # If the partition is not empty, retrieve the committed offset, # if any, to use it at lo offset. @@ -289,6 +294,7 @@ self.group_id, self.obj_type, self.offsets, + self.reset_offsets, assignment, progress_queue, self.node_sets_path, @@ -310,6 +316,7 @@ group_id: str, obj_type: str, offsets: Dict[int, Tuple[int, int]], + reset_offsets: bool, assignment: Sequence[int], progress_queue: multiprocessing.Queue, node_sets_path: Path, @@ -318,6 +325,7 @@ self.group_id = group_id self.obj_type = obj_type self.offsets = offsets + self.reset_offsets = reset_offsets self.assignment = assignment self.progress_queue = progress_queue @@ -377,6 +385,7 @@ group_id=self.group_id, debug="cgrp,broker", offset_ranges=self.offsets, + reset_offsets=self.reset_offsets, assignment=self.assignment, progress_queue=self.progress_queue, **{"message.max.bytes": str(500 * 1024 * 1024)},