Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/journalprocessor.py
Show First 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | class JournalClientOffsetRanges(JournalClient): | ||||
def subscribe(self): | def subscribe(self): | ||||
self.topic_name = self.subscription[0] | self.topic_name = self.subscription[0] | ||||
time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 | time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 | ||||
logging.debug("Changing assignment to %s", str(self.assignment)) | logging.debug("Changing assignment to %s", str(self.assignment)) | ||||
self.consumer.assign( | self.consumer.assign( | ||||
[TopicPartition(self.topic_name, pid) for pid in self.assignment] | [TopicPartition(self.topic_name, pid) for pid in self.assignment] | ||||
) | ) | ||||
def process(self, *args, **kwargs): | def process(self, worker_fn): | ||||
self.count = 0 | self.count = 0 | ||||
try: | try: | ||||
self.handle_committed_offsets() | self.handle_committed_offsets() | ||||
if self.assignment: | if self.assignment: | ||||
super().process(*args, **kwargs) | super().process(worker_fn) | ||||
finally: | finally: | ||||
self.progress_queue.put(None) | self.progress_queue.put(None) | ||||
def handle_committed_offsets(self,): | def handle_committed_offsets(self,): | ||||
""" | """ | ||||
Handle already committed partition offsets before starting processing. | Handle already committed partition offsets before starting processing. | ||||
""" | """ | ||||
committed = self.consumer.committed( | committed = self.consumer.committed( | ||||
▲ Show 20 Lines • Show All 81 Lines • ▼ Show 20 Lines | def get_offsets(self): | ||||
topics = client.consumer.list_topics(topic_name).topics | topics = client.consumer.list_topics(topic_name).topics | ||||
partitions = topics[topic_name].partitions | partitions = topics[topic_name].partitions | ||||
self.offsets = {} | self.offsets = {} | ||||
def fetch_insert_partition_id(partition_id): | def fetch_insert_partition_id(partition_id): | ||||
tp = TopicPartition(topic_name, partition_id) | tp = TopicPartition(topic_name, partition_id) | ||||
(lo, hi) = client.consumer.get_watermark_offsets(tp) | (lo, hi) = client.consumer.get_watermark_offsets(tp) | ||||
if lo != hi: | if lo != hi: | ||||
olasd: spurious comment :-) | |||||
self.offsets[partition_id] = (lo, hi) | self.offsets[partition_id] = (lo, hi) | ||||
with concurrent.futures.ThreadPoolExecutor( | with concurrent.futures.ThreadPoolExecutor( | ||||
max_workers=self.processes | max_workers=self.processes | ||||
Not Done Inline ActionsMaybe worth a comment, saying that you're only processing the partition if there's been new messages since your last commit, as the condition is the same as the previous one olasd: Maybe worth a comment, saying that you're only processing the partition if there's been new… | |||||
) as executor: | ) as executor: | ||||
list( | list( | ||||
tqdm.tqdm( | tqdm.tqdm( | ||||
executor.map(fetch_insert_partition_id, partitions.keys()), | executor.map(fetch_insert_partition_id, partitions.keys()), | ||||
total=len(partitions), | total=len(partitions), | ||||
desc=" - Partition offsets", | desc=" - Partition offsets", | ||||
) | ) | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 213 Lines • Show Last 20 Lines |
spurious comment :-)