diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -83,9 +83,18 @@ type=click.STRING, help="Comma-separated list of objects types to export", ) +@click.option( + "--margin", + type=click.FLOAT, + help=( + "Offset margin to start consuming from. E.g. is set to '0.95', " + "consumers will start at 95% of the last committed offset; " + "in other words, start earlier than last committed position." + ), +) @click.pass_context def export_graph( - ctx, export_path, export_id, formats, exclude, object_types, processes + ctx, export_path, export_id, formats, exclude, object_types, processes, margin ): """Export the Software Heritage graph as an edge dataset.""" from importlib import import_module @@ -94,6 +103,8 @@ from swh.dataset.journalprocessor import ParallelJournalProcessor config = ctx.obj["config"] + if margin: + config["journal"]["offset_margin"] = margin if not export_id: export_id = str(uuid.uuid4()) diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -52,6 +52,7 @@ assignment: Sequence[int] = None, progress_queue: multiprocessing.Queue = None, refresh_every: int = 200, + offset_margin: float = 0.0, **kwargs, ): """ @@ -69,7 +70,7 @@ self.assignment = assignment self._messages_to_commit: List[Message] = [] self._partitions_to_unsubscribe: Set[int] = set() - + self.offset_margin = offset_margin self.count = None self.topic_name: Optional[str] = None kwargs["stop_on_eof"] = True # Stop when the assignment is empty @@ -80,12 +81,20 @@ time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 logger.debug("Changing assignment to %s", str(self.assignment)) self.consumer.assign( - [TopicPartition(self.topic_name, pid) for pid in self.assignment] + [ + TopicPartition( + self.topic_name, + pid, + int(self.offset_margin * self.offset_ranges[pid][0]), + ) + for pid in self.assignment + ] ) def unsubscribe(self, partitions: Container[int]): assert self.assignment is not None self.assignment = [pid for pid in self.assignment if pid not in partitions] + logger.debug("Changing assignment to %s", str(self.assignment)) self.consumer.assign( [TopicPartition(self.topic_name, pid) for pid in self.assignment] ) @@ -211,6 +220,7 @@ cfg = self.config["journal"].copy() cfg["object_types"] = [self.obj_type] cfg["group_id"] = self.group_id + cfg.pop("offset_margin", None) client = JournalClient(**cfg) topic_name = client.subscription[0] topics = client.consumer.list_topics(topic_name).topics @@ -230,11 +240,16 @@ logger.debug("Fetching offset for partition %s", partition_id) tp = TopicPartition(topic_name, partition_id) (lo, hi) = client.consumer.get_watermark_offsets(tp) - logger.debug("[%s] (lo,hi)=(%s, %s)", partition_id, lo, hi) + logger.debug( + "[%s] watermark offset (lo,hi)=(%s, %s)", partition_id, lo, hi + ) if hi > lo: # hi == low means there is nothing in the partition to consume. # If the partition is not empty, retrieve the committed offset, # if any, to use it at lo offset. + logger.debug( + "Fetching committed offset for partition %s", partition_id + ) committed = client.consumer.committed([tp])[0] logger.debug( "[%s] committed offset: %s", partition_id, committed.offset