Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/journalprocessor.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import collections | import collections | ||||
import concurrent.futures | import concurrent.futures | ||||
from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor | from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor | ||||
import contextlib | import contextlib | ||||
from hashlib import sha1 | from hashlib import sha1 | ||||
import logging | import logging | ||||
import multiprocessing | import multiprocessing | ||||
from pathlib import Path | from pathlib import Path | ||||
import time | import time | ||||
from typing import Any, Dict, Mapping, Sequence, Tuple, Type | from typing import Any, Dict, List, Mapping, Sequence, Tuple, Type | ||||
from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, TopicPartition | from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, Message, TopicPartition | ||||
import tqdm | import tqdm | ||||
from swh.dataset.exporter import Exporter | from swh.dataset.exporter import Exporter | ||||
from swh.dataset.utils import LevelDBSet | from swh.dataset.utils import LevelDBSet | ||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.serializers import kafka_to_value | from swh.journal.serializers import kafka_to_value | ||||
from swh.model.identifiers import origin_identifier | from swh.model.identifiers import origin_identifier | ||||
from swh.storage.fixer import fix_objects | from swh.storage.fixer import fix_objects | ||||
Show All 25 Lines | ): | ||||
progress_queue: a multiprocessing.Queue where the current | progress_queue: a multiprocessing.Queue where the current | ||||
progress will be reported. | progress will be reported. | ||||
refresh_every: the refreshing rate of the progress reporting. | refresh_every: the refreshing rate of the progress reporting. | ||||
""" | """ | ||||
self.offset_ranges = offset_ranges | self.offset_ranges = offset_ranges | ||||
self.progress_queue = progress_queue | self.progress_queue = progress_queue | ||||
self.refresh_every = refresh_every | self.refresh_every = refresh_every | ||||
self.assignment = assignment | self.assignment = assignment | ||||
self._messages_to_commit: List[Message] = [] | |||||
self.count = None | self.count = None | ||||
self.topic_name = None | self.topic_name = None | ||||
self.reset_offsets = reset_offsets | self.reset_offsets = reset_offsets | ||||
kwargs["stop_on_eof"] = True # Stop when the assignment is empty | kwargs["stop_on_eof"] = True # Stop when the assignment is empty | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
def subscribe(self): | def subscribe(self): | ||||
self.topic_name = self.subscription[0] | self.topic_name = self.subscription[0] | ||||
Show All 13 Lines | class JournalClientOffsetRanges(JournalClient): | ||||
def process(self, worker_fn): | def process(self, worker_fn): | ||||
self.count = 0 | self.count = 0 | ||||
try: | try: | ||||
if self.assignment: | if self.assignment: | ||||
super().process(worker_fn) | super().process(worker_fn) | ||||
finally: | finally: | ||||
self.progress_queue.put(None) | self.progress_queue.put(None) | ||||
def handle_offset(self, partition_id, offset): | def handle_offset(self, message): | ||||
""" | """ | ||||
Check whether the client has reached the end of the current | Check whether the client has reached the end of the current | ||||
partition, and trigger a reassignment if that is the case. | partition, and trigger a reassignment if that is the case. | ||||
vlorentz: needs to be updated | |||||
Done Inline ActionsIMHO, this is still a valid description of what the method does, don't you think? douardda: IMHO, this is still a valid description of what the method does, don't you think? | |||||
Not Done Inline ActionsYes, you're right. handle_messages needs a new one, though! (summarize the motivation in this diff's description) vlorentz: Yes, you're right. handle_messages needs a new one, though! (summarize the motivation in this… | |||||
""" | """ | ||||
offset = message.offset() | |||||
partition_id = message.partition() | |||||
if offset < 0: # Uninitialized partition offset | if offset < 0: # Uninitialized partition offset | ||||
return | return | ||||
if self.count % self.refresh_every == 0: | if self.count % self.refresh_every == 0: | ||||
self.progress_queue.put({partition_id: offset}) | self.progress_queue.put({partition_id: offset}) | ||||
if offset >= self.offset_ranges[partition_id][1] - 1: | if offset >= self.offset_ranges[partition_id][1] - 1: | ||||
if partition_id in self.assignment: | if partition_id in self.assignment: | ||||
self.progress_queue.put({partition_id: offset}) | self.progress_queue.put({partition_id: offset}) | ||||
# unsubscribe from partition but make sure current message's | |||||
# offset will be committed after executing the worker_fn in | |||||
# process(); see handle_messages() below | |||||
self._messages_to_commit.append(message) | |||||
self.unsubscribe([partition_id]) | self.unsubscribe([partition_id]) | ||||
def deserialize_message(self, message): | def deserialize_message(self, message): | ||||
""" | """ | ||||
Override of the message deserialization to hook the handling of the | Override of the message deserialization to hook the handling of the | ||||
message offset. | message offset. | ||||
We also return the raw objects instead of deserializing them because we | We also return the raw objects instead of deserializing them because we | ||||
will need the partition ID later. | will need the partition ID later. | ||||
""" | """ | ||||
self.handle_offset(message.partition(), message.offset()) | self.handle_offset(message) | ||||
self.count += 1 | self.count += 1 | ||||
return message | return message | ||||
def handle_messages(self, messages, worker_fn): | |||||
nb_processed, at_eof = super().handle_messages(messages, worker_fn) | |||||
for msg in self._messages_to_commit: | |||||
self.consumer.commit(message=msg) | |||||
self._messages_to_commit.clear() | |||||
return nb_processed, at_eof | |||||
class ParallelJournalProcessor: | class ParallelJournalProcessor: | ||||
""" | """ | ||||
Reads the given object type from the journal in parallel. | Reads the given object type from the journal in parallel. | ||||
It creates one JournalExportWorker per process. | It creates one JournalExportWorker per process. | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
▲ Show 20 Lines • Show All 301 Lines • Show Last 20 Lines |
needs to be updated