Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/journalprocessor.py
Show First 20 Lines • Show All 83 Lines • ▼ Show 20 Lines | def handle_offset(self, partition_id, offset): | ||||
if offset < 0: # Uninitialized partition offset | if offset < 0: # Uninitialized partition offset | ||||
return | return | ||||
if self.count % self.refresh_every == 0: | if self.count % self.refresh_every == 0: | ||||
self.progress_queue.put({partition_id: offset}) | self.progress_queue.put({partition_id: offset}) | ||||
if offset >= self.offset_ranges[partition_id][1] - 1: | if offset >= self.offset_ranges[partition_id][1] - 1: | ||||
if partition_id in self.assignment: | if partition_id in self.assignment: | ||||
self.progress_queue.put({partition_id: offset}) | |||||
self.assignment = [ | self.assignment = [ | ||||
pid for pid in self.assignment if pid != partition_id | pid for pid in self.assignment if pid != partition_id | ||||
] | ] | ||||
self.subscribe() # Actually, unsubscribes from the partition_id | self.subscribe() # Actually, unsubscribes from the partition_id | ||||
def deserialize_message(self, message): | def deserialize_message(self, message): | ||||
""" | """ | ||||
Override of the message deserialization to hook the handling of the | Override of the message deserialization to hook the handling of the | ||||
▲ Show 20 Lines • Show All 143 Lines • ▼ Show 20 Lines | def progress_worker(self, queue=None): | ||||
desc = f" - Journal export ({self.obj_type})" | desc = f" - Journal export ({self.obj_type})" | ||||
with tqdm.tqdm(total=offset_diff, desc=desc) as pbar: | with tqdm.tqdm(total=offset_diff, desc=desc) as pbar: | ||||
while active_workers: | while active_workers: | ||||
item = queue.get() | item = queue.get() | ||||
if item is None: | if item is None: | ||||
active_workers -= 1 | active_workers -= 1 | ||||
continue | continue | ||||
d.update(item) | d.update(item) | ||||
progress = sum(n - self.offsets[p][0] for p, n in d.items()) | progress = sum(n + 1 - self.offsets[p][0] for p, n in d.items()) | ||||
pbar.set_postfix( | pbar.set_postfix( | ||||
active_workers=active_workers, total_workers=self.processes | active_workers=active_workers, total_workers=self.processes | ||||
) | ) | ||||
pbar.update(progress - pbar.n) | pbar.update(progress - pbar.n) | ||||
def export_worker(self, assignment, progress_queue): | def export_worker(self, assignment, progress_queue): | ||||
worker = JournalProcessorWorker( | worker = JournalProcessorWorker( | ||||
self.config, | self.config, | ||||
▲ Show 20 Lines • Show All 153 Lines • Show Last 20 Lines |