diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -83,9 +83,18 @@ type=click.STRING, help="Comma-separated list of objects types to export", ) +@click.option( + "--margin", + type=click.FloatRange(0, 1), + help=( + "Offset margin to start consuming from. E.g. is set to '0.95', " + "consumers will start at 95% of the last committed offset; " + "in other words, start earlier than last committed position." + ), +) @click.pass_context def export_graph( - ctx, export_path, export_id, formats, exclude, object_types, processes + ctx, export_path, export_id, formats, exclude, object_types, processes, margin ): """Export the Software Heritage graph as an edge dataset.""" from importlib import import_module @@ -137,6 +146,7 @@ obj_type, node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type, processes=processes, + offset_margin=margin, ) print("Exporting {}:".format(obj_type)) parallel_exporter.run() diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -69,7 +69,6 @@ self.assignment = assignment self._messages_to_commit: List[Message] = [] self._partitions_to_unsubscribe: Set[int] = set() - self.count = None self.topic_name: Optional[str] = None kwargs["stop_on_eof"] = True # Stop when the assignment is empty @@ -80,12 +79,20 @@ time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 logger.debug("Changing assignment to %s", str(self.assignment)) self.consumer.assign( - [TopicPartition(self.topic_name, pid) for pid in self.assignment] + [ + TopicPartition( + self.topic_name, + pid, + self.offset_ranges[pid][0], + ) + for pid in self.assignment + ] ) def unsubscribe(self, partitions: Container[int]): assert self.assignment is not None self.assignment = [pid for pid in self.assignment if pid not in partitions] + logger.debug("Changing assignment to %s", str(self.assignment)) self.consumer.assign( [TopicPartition(self.topic_name, pid) for pid in self.assignment] ) @@ -177,6 +184,7 @@ obj_type: str, node_sets_path: Path, processes: int = 1, + offset_margin: Optional[float] = None, ): """ Args: @@ -197,6 +205,7 @@ self.processes = processes self.node_sets_path = node_sets_path self.offsets = None + self.offset_margin = offset_margin def get_offsets(self): """ @@ -230,16 +239,30 @@ logger.debug("Fetching offset for partition %s", partition_id) tp = TopicPartition(topic_name, partition_id) (lo, hi) = client.consumer.get_watermark_offsets(tp) - logger.debug("[%s] (lo,hi)=(%s, %s)", partition_id, lo, hi) + logger.debug( + "[%s] watermark offset (lo,hi)=(%s, %s)", partition_id, lo, hi + ) if hi > lo: # hi == low means there is nothing in the partition to consume. # If the partition is not empty, retrieve the committed offset, # if any, to use it at lo offset. + logger.debug( + "Fetching committed offset for partition %s", partition_id + ) committed = client.consumer.committed([tp])[0] logger.debug( "[%s] committed offset: %s", partition_id, committed.offset ) lo = max(lo, committed.offset) + if self.offset_margin: + # Using min() in case of precision loss when self.offset_margin + # is close to 1.0 and lo is very large + newlo = min(lo, int(self.offset_margin * lo)) + logger.debug( + "Apply offset margin: reducing lo from %s to %s", lo, newlo + ) + lo = newlo + if hi > lo: # do only process the partition is there are actually new # messages to process (partition not empty and committed