Page MenuHomeSoftware Heritage

D6234.id22582.diff
No OneTemporary

D6234.id22582.diff

diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py
--- a/swh/dataset/cli.py
+++ b/swh/dataset/cli.py
@@ -66,8 +66,17 @@
type=click.STRING,
help="Comma-separated list of object types to exclude",
)
+@click.option(
+ "--reset",
+ type=bool,
+ is_flag=True,
+ help=(
+ "Consume the kafka journal from the beginning instead of current "
+ "(committed) offsets"
+ ),
+)
@click.pass_context
-def export_graph(ctx, export_path, export_id, formats, exclude, processes):
+def export_graph(ctx, export_path, export_id, formats, exclude, processes, reset):
"""Export the Software Heritage graph as an edge dataset."""
import uuid
@@ -109,6 +118,7 @@
obj_type,
node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type,
processes=processes,
+ reset_offsets=reset,
)
print("Exporting {}:".format(obj_type))
parallel_exporter.run()
diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py
--- a/swh/dataset/journalprocessor.py
+++ b/swh/dataset/journalprocessor.py
@@ -14,7 +14,7 @@
import time
from typing import Any, Dict, Mapping, Sequence, Tuple, Type
-from confluent_kafka import TopicPartition
+from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, TopicPartition
import tqdm
from swh.dataset.exporter import Exporter
@@ -37,6 +37,7 @@
self,
*args,
offset_ranges: Mapping[int, Tuple[int, int]] = None,
+ reset_offsets: bool = False,
assignment: Sequence[int] = None,
progress_queue: multiprocessing.Queue = None,
refresh_every: int = 200,
@@ -57,6 +58,7 @@
self.assignment = assignment
self.count = None
self.topic_name = None
+ self.reset_offsets = reset_offsets
kwargs["stop_on_eof"] = True # Stop when the assignment is empty
super().__init__(*args, **kwargs)
@@ -64,6 +66,13 @@
self.topic_name = self.subscription[0]
time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983
logging.debug("Changing assignment to %s", str(self.assignment))
+ offset = OFFSET_BEGINNING if self.reset_offsets else OFFSET_STORED
+ self.consumer.assign(
+ [TopicPartition(self.topic_name, pid, offset) for pid in self.assignment]
+ )
+
+ def unsubscribe(self, partitions):
+ self.assignment = [pid for pid in self.assignment if pid not in partitions]
self.consumer.assign(
[TopicPartition(self.topic_name, pid) for pid in self.assignment]
)
@@ -90,10 +99,7 @@
if offset >= self.offset_ranges[partition_id][1] - 1:
if partition_id in self.assignment:
self.progress_queue.put({partition_id: offset})
- self.assignment = [
- pid for pid in self.assignment if pid != partition_id
- ]
- self.subscribe() # Actually, unsubscribes from the partition_id
+ self.unsubscribe([partition_id])
def deserialize_message(self, message):
"""
@@ -121,6 +127,7 @@
obj_type: str,
node_sets_path: Path,
processes: int = 1,
+ reset_offsets: bool = False,
):
"""
Args:
@@ -140,6 +147,7 @@
self.processes = processes
self.node_sets_path = node_sets_path
self.offsets = None
+ self.reset_offsets = reset_offsets
def get_offsets(self):
"""
@@ -173,7 +181,7 @@
def fetch_insert_partition_id(partition_id):
tp = TopicPartition(topic_name, partition_id)
(lo, hi) = client.consumer.get_watermark_offsets(tp)
- if hi > lo:
+ if hi > lo and not self.reset_offsets:
# hi == low means there is nothing in the partition to consume.
# If the partition is not empty, retrieve the committed offset,
# if any, to use it at lo offset.
@@ -260,6 +268,7 @@
self.group_id,
self.obj_type,
self.offsets,
+ self.reset_offsets,
assignment,
progress_queue,
self.node_sets_path,
@@ -281,6 +290,7 @@
group_id: str,
obj_type: str,
offsets: Dict[int, Tuple[int, int]],
+ reset_offsets: bool,
assignment: Sequence[int],
progress_queue: multiprocessing.Queue,
node_sets_path: Path,
@@ -289,6 +299,7 @@
self.group_id = group_id
self.obj_type = obj_type
self.offsets = offsets
+ self.reset_offsets = reset_offsets
self.assignment = assignment
self.progress_queue = progress_queue
@@ -348,6 +359,7 @@
group_id=self.group_id,
debug="cgrp,broker",
offset_ranges=self.offsets,
+ reset_offsets=self.reset_offsets,
assignment=self.assignment,
progress_queue=self.progress_queue,
**{"message.max.bytes": str(500 * 1024 * 1024)},

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:30 PM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226392

Event Timeline