Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/journalprocessor.py
Show First 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | ): | ||||
refresh_every: the refreshing rate of the progress reporting. | refresh_every: the refreshing rate of the progress reporting. | ||||
""" | """ | ||||
self.offset_ranges = offset_ranges | self.offset_ranges = offset_ranges | ||||
self.progress_queue = progress_queue | self.progress_queue = progress_queue | ||||
self.refresh_every = refresh_every | self.refresh_every = refresh_every | ||||
self.assignment = assignment | self.assignment = assignment | ||||
self.count = None | self.count = None | ||||
self.topic_name = None | self.topic_name = None | ||||
kwargs["stop_on_eof"] = True # Stop when the assignment is empty | |||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
def subscribe(self): | def subscribe(self): | ||||
self.topic_name = self.subscription[0] | self.topic_name = self.subscription[0] | ||||
time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 | time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 | ||||
logging.debug("Changing assignment to %s", str(self.assignment)) | logging.debug("Changing assignment to %s", str(self.assignment)) | ||||
self.consumer.assign( | self.consumer.assign( | ||||
[TopicPartition(self.topic_name, pid) for pid in self.assignment] | [TopicPartition(self.topic_name, pid) for pid in self.assignment] | ||||
) | ) | ||||
def process(self, *args, **kwargs): | def process(self, *args, **kwargs): | ||||
self.count = 0 | self.count = 0 | ||||
try: | try: | ||||
self.handle_committed_offsets() | self.handle_committed_offsets() | ||||
if self.assignment: | if self.assignment: | ||||
super().process(*args, **kwargs) | super().process(*args, **kwargs) | ||||
except EOFError: | |||||
pass | |||||
finally: | finally: | ||||
self.progress_queue.put(None) | self.progress_queue.put(None) | ||||
def handle_committed_offsets(self,): | def handle_committed_offsets(self,): | ||||
""" | """ | ||||
Handle already committed partition offsets before starting processing. | Handle already committed partition offsets before starting processing. | ||||
""" | """ | ||||
committed = self.consumer.committed( | committed = self.consumer.committed( | ||||
[TopicPartition(self.topic_name, pid) for pid in self.assignment] | [TopicPartition(self.topic_name, pid) for pid in self.assignment] | ||||
) | ) | ||||
for tp in committed: | for tp in committed: | ||||
self.handle_offset(tp.partition, tp.offset) | self.handle_offset(tp.partition, tp.offset) | ||||
def handle_offset(self, partition_id, offset): | def handle_offset(self, partition_id, offset): | ||||
""" | """ | ||||
Check whether the client has reached the end of the current | Check whether the client has reached the end of the current | ||||
partition, and trigger a reassignment if that is the case. | partition, and trigger a reassignment if that is the case. | ||||
Raise EOFError if all the partitions have reached the end. | |||||
""" | """ | ||||
if offset < 0: # Uninitialized partition offset | if offset < 0: # Uninitialized partition offset | ||||
return | return | ||||
if self.count % self.refresh_every == 0: | if self.count % self.refresh_every == 0: | ||||
self.progress_queue.put({partition_id: offset}) | self.progress_queue.put({partition_id: offset}) | ||||
if offset >= self.offset_ranges[partition_id][1] - 1: | if offset >= self.offset_ranges[partition_id][1] - 1: | ||||
if partition_id in self.assignment: | if partition_id in self.assignment: | ||||
self.assignment = [ | self.assignment = [ | ||||
pid for pid in self.assignment if pid != partition_id | pid for pid in self.assignment if pid != partition_id | ||||
] | ] | ||||
self.subscribe() # Actually, unsubscribes from the partition_id | self.subscribe() # Actually, unsubscribes from the partition_id | ||||
if not self.assignment: | |||||
raise EOFError | |||||
def deserialize_message(self, message): | def deserialize_message(self, message): | ||||
""" | """ | ||||
Override of the message deserialization to hook the handling of the | Override of the message deserialization to hook the handling of the | ||||
message offset. | message offset. | ||||
We also return the raw objects instead of deserializing them because we | We also return the raw objects instead of deserializing them because we | ||||
will need the partition ID later. | will need the partition ID later. | ||||
""" | """ | ||||
self.handle_offset(message.partition(), message.offset()) | self.handle_offset(message.partition(), message.offset()) | ||||
▲ Show 20 Lines • Show All 284 Lines • Show Last 20 Lines |