Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345769
D6234.id22582.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
D6234.id22582.diff
View Options
diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py
--- a/swh/dataset/cli.py
+++ b/swh/dataset/cli.py
@@ -66,8 +66,17 @@
type=click.STRING,
help="Comma-separated list of object types to exclude",
)
+@click.option(
+ "--reset",
+ type=bool,
+ is_flag=True,
+ help=(
+ "Consume the kafka journal from the beginning instead of current "
+ "(committed) offsets"
+ ),
+)
@click.pass_context
-def export_graph(ctx, export_path, export_id, formats, exclude, processes):
+def export_graph(ctx, export_path, export_id, formats, exclude, processes, reset):
"""Export the Software Heritage graph as an edge dataset."""
import uuid
@@ -109,6 +118,7 @@
obj_type,
node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type,
processes=processes,
+ reset_offsets=reset,
)
print("Exporting {}:".format(obj_type))
parallel_exporter.run()
diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py
--- a/swh/dataset/journalprocessor.py
+++ b/swh/dataset/journalprocessor.py
@@ -14,7 +14,7 @@
import time
from typing import Any, Dict, Mapping, Sequence, Tuple, Type
-from confluent_kafka import TopicPartition
+from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, TopicPartition
import tqdm
from swh.dataset.exporter import Exporter
@@ -37,6 +37,7 @@
self,
*args,
offset_ranges: Mapping[int, Tuple[int, int]] = None,
+ reset_offsets: bool = False,
assignment: Sequence[int] = None,
progress_queue: multiprocessing.Queue = None,
refresh_every: int = 200,
@@ -57,6 +58,7 @@
self.assignment = assignment
self.count = None
self.topic_name = None
+ self.reset_offsets = reset_offsets
kwargs["stop_on_eof"] = True # Stop when the assignment is empty
super().__init__(*args, **kwargs)
@@ -64,6 +66,13 @@
self.topic_name = self.subscription[0]
time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983
logging.debug("Changing assignment to %s", str(self.assignment))
+ offset = OFFSET_BEGINNING if self.reset_offsets else OFFSET_STORED
+ self.consumer.assign(
+ [TopicPartition(self.topic_name, pid, offset) for pid in self.assignment]
+ )
+
+ def unsubscribe(self, partitions):
+ self.assignment = [pid for pid in self.assignment if pid not in partitions]
self.consumer.assign(
[TopicPartition(self.topic_name, pid) for pid in self.assignment]
)
@@ -90,10 +99,7 @@
if offset >= self.offset_ranges[partition_id][1] - 1:
if partition_id in self.assignment:
self.progress_queue.put({partition_id: offset})
- self.assignment = [
- pid for pid in self.assignment if pid != partition_id
- ]
- self.subscribe() # Actually, unsubscribes from the partition_id
+ self.unsubscribe([partition_id])
def deserialize_message(self, message):
"""
@@ -121,6 +127,7 @@
obj_type: str,
node_sets_path: Path,
processes: int = 1,
+ reset_offsets: bool = False,
):
"""
Args:
@@ -140,6 +147,7 @@
self.processes = processes
self.node_sets_path = node_sets_path
self.offsets = None
+ self.reset_offsets = reset_offsets
def get_offsets(self):
"""
@@ -173,7 +181,7 @@
def fetch_insert_partition_id(partition_id):
tp = TopicPartition(topic_name, partition_id)
(lo, hi) = client.consumer.get_watermark_offsets(tp)
- if hi > lo:
+ if hi > lo and not self.reset_offsets:
# hi == low means there is nothing in the partition to consume.
# If the partition is not empty, retrieve the committed offset,
# if any, to use it at lo offset.
@@ -260,6 +268,7 @@
self.group_id,
self.obj_type,
self.offsets,
+ self.reset_offsets,
assignment,
progress_queue,
self.node_sets_path,
@@ -281,6 +290,7 @@
group_id: str,
obj_type: str,
offsets: Dict[int, Tuple[int, int]],
+ reset_offsets: bool,
assignment: Sequence[int],
progress_queue: multiprocessing.Queue,
node_sets_path: Path,
@@ -289,6 +299,7 @@
self.group_id = group_id
self.obj_type = obj_type
self.offsets = offsets
+ self.reset_offsets = reset_offsets
self.assignment = assignment
self.progress_queue = progress_queue
@@ -348,6 +359,7 @@
group_id=self.group_id,
debug="cgrp,broker",
offset_ranges=self.offsets,
+ reset_offsets=self.reset_offsets,
assignment=self.assignment,
progress_queue=self.progress_queue,
**{"message.max.bytes": str(500 * 1024 * 1024)},
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:30 PM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226392
Attached To
D6234: Add a --reset option to export_graph cli tool
Event Timeline
Log In to Comment