Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/journalprocessor.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import collections | import collections | ||||
import concurrent.futures | import concurrent.futures | ||||
from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor | from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor | ||||
import contextlib | import contextlib | ||||
from hashlib import sha1 | from hashlib import sha1 | ||||
import logging | import logging | ||||
import multiprocessing | import multiprocessing | ||||
from pathlib import Path | from pathlib import Path | ||||
import time | import time | ||||
from typing import Any, Container, Dict, List, Mapping, Optional, Sequence, Tuple, Type | from typing import Any, Container, Dict, List, Mapping, Optional, Sequence, Tuple, Type | ||||
from confluent_kafka import Message, TopicPartition | from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, Message, TopicPartition | ||||
import tqdm | import tqdm | ||||
from swh.dataset.exporter import Exporter | from swh.dataset.exporter import Exporter | ||||
from swh.dataset.utils import LevelDBSet | from swh.dataset.utils import LevelDBSet | ||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.serializers import kafka_to_value | from swh.journal.serializers import kafka_to_value | ||||
from swh.model.identifiers import origin_identifier | from swh.model.identifiers import origin_identifier | ||||
from swh.storage.fixer import fix_objects | from swh.storage.fixer import fix_objects | ||||
class JournalClientOffsetRanges(JournalClient): | class JournalClientOffsetRanges(JournalClient): | ||||
""" | """ | ||||
A subclass of JournalClient reading only inside some specific offset | A subclass of JournalClient reading only inside some specific offset | ||||
range. Partition assignments have to be manually given to the class. | range. Partition assignments have to be manually given to the class. | ||||
This client can only read a single topic at a time. | This client can only read a single topic at a time. | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
*args, | *args, | ||||
offset_ranges: Mapping[int, Tuple[int, int]] = None, | offset_ranges: Mapping[int, Tuple[int, int]] = None, | ||||
reset_offsets: bool = False, | |||||
assignment: Sequence[int] = None, | assignment: Sequence[int] = None, | ||||
progress_queue: multiprocessing.Queue = None, | progress_queue: multiprocessing.Queue = None, | ||||
refresh_every: int = 200, | refresh_every: int = 200, | ||||
**kwargs, | **kwargs, | ||||
): | ): | ||||
""" | """ | ||||
Args: | Args: | ||||
offset_ranges: A mapping of partition_id -> (low, high) offsets | offset_ranges: A mapping of partition_id -> (low, high) offsets | ||||
that define the boundaries of the messages to consume. | that define the boundaries of the messages to consume. | ||||
assignment: The list of partitions to assign to this client. | assignment: The list of partitions to assign to this client. | ||||
progress_queue: a multiprocessing.Queue where the current | progress_queue: a multiprocessing.Queue where the current | ||||
progress will be reported. | progress will be reported. | ||||
refresh_every: the refreshing rate of the progress reporting. | refresh_every: the refreshing rate of the progress reporting. | ||||
""" | """ | ||||
self.offset_ranges = offset_ranges | self.offset_ranges = offset_ranges | ||||
self.progress_queue = progress_queue | self.progress_queue = progress_queue | ||||
self.refresh_every = refresh_every | self.refresh_every = refresh_every | ||||
self.assignment = assignment | self.assignment = assignment | ||||
self._messages_to_commit: List[Message] = [] | self._messages_to_commit: List[Message] = [] | ||||
self.count = None | self.count = None | ||||
self.topic_name: Optional[str] = None | self.topic_name: Optional[str] = None | ||||
self.reset_offsets = reset_offsets | |||||
kwargs["stop_on_eof"] = True # Stop when the assignment is empty | kwargs["stop_on_eof"] = True # Stop when the assignment is empty | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
def subscribe(self): | def subscribe(self): | ||||
self.topic_name = self.subscription[0] | self.topic_name = self.subscription[0] | ||||
time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 | time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 | ||||
logging.debug("Changing assignment to %s", str(self.assignment)) | logging.debug("Changing assignment to %s", str(self.assignment)) | ||||
offset = OFFSET_BEGINNING if self.reset_offsets else OFFSET_STORED | |||||
self.consumer.assign( | self.consumer.assign( | ||||
[TopicPartition(self.topic_name, pid) for pid in self.assignment] | [TopicPartition(self.topic_name, pid, offset) for pid in self.assignment] | ||||
) | ) | ||||
def unsubscribe(self, partitions: Container[int]): | def unsubscribe(self, partitions: Container[int]): | ||||
assert self.assignment is not None | assert self.assignment is not None | ||||
self.assignment = [pid for pid in self.assignment if pid not in partitions] | self.assignment = [pid for pid in self.assignment if pid not in partitions] | ||||
self.consumer.assign( | self.consumer.assign( | ||||
[TopicPartition(self.topic_name, pid) for pid in self.assignment] | [TopicPartition(self.topic_name, pid) for pid in self.assignment] | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | class ParallelJournalProcessor: | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
config, | config, | ||||
exporters: Sequence[Tuple[Type[Exporter], Dict[str, Any]]], | exporters: Sequence[Tuple[Type[Exporter], Dict[str, Any]]], | ||||
export_id: str, | export_id: str, | ||||
obj_type: str, | obj_type: str, | ||||
node_sets_path: Path, | node_sets_path: Path, | ||||
processes: int = 1, | processes: int = 1, | ||||
reset_offsets: bool = False, | |||||
): | ): | ||||
""" | """ | ||||
Args: | Args: | ||||
config: the exporter config, which should also include the | config: the exporter config, which should also include the | ||||
JournalClient configuration. | JournalClient configuration. | ||||
exporters: a list of Exporter to process the objects | exporters: a list of Exporter to process the objects | ||||
export_id: a unique identifier for the export that will be used | export_id: a unique identifier for the export that will be used | ||||
as part of a Kafka consumer group ID. | as part of a Kafka consumer group ID. | ||||
obj_type: The type of SWH object to export. | obj_type: The type of SWH object to export. | ||||
node_sets_path: A directory where to store the node sets. | node_sets_path: A directory where to store the node sets. | ||||
processes: The number of processes to run. | processes: The number of processes to run. | ||||
""" | """ | ||||
self.config = config | self.config = config | ||||
self.exporters = exporters | self.exporters = exporters | ||||
self.group_id = "swh-dataset-export-{}".format(export_id) | self.group_id = "swh-dataset-export-{}".format(export_id) | ||||
self.obj_type = obj_type | self.obj_type = obj_type | ||||
self.processes = processes | self.processes = processes | ||||
self.node_sets_path = node_sets_path | self.node_sets_path = node_sets_path | ||||
self.offsets = None | self.offsets = None | ||||
self.reset_offsets = reset_offsets | |||||
def get_offsets(self): | def get_offsets(self): | ||||
""" | """ | ||||
Compute (lo, high) offset boundaries for all partitions. | Compute (lo, high) offset boundaries for all partitions. | ||||
First pass to fetch all the current low and high watermark offsets of each | First pass to fetch all the current low and high watermark offsets of each | ||||
partition to define the consumption boundaries. | partition to define the consumption boundaries. | ||||
Show All 17 Lines | def get_offsets(self): | ||||
# be 0 if one message has been written to the partition (with | # be 0 if one message has been written to the partition (with | ||||
# offset 0). | # offset 0). | ||||
# HIGH watermark offset: the offset of the latest message in the | # HIGH watermark offset: the offset of the latest message in the | ||||
# topic/partition available for consumption + 1 | # topic/partition available for consumption + 1 | ||||
def fetch_insert_partition_id(partition_id): | def fetch_insert_partition_id(partition_id): | ||||
tp = TopicPartition(topic_name, partition_id) | tp = TopicPartition(topic_name, partition_id) | ||||
(lo, hi) = client.consumer.get_watermark_offsets(tp) | (lo, hi) = client.consumer.get_watermark_offsets(tp) | ||||
if hi > lo: | if hi > lo and not self.reset_offsets: | ||||
# hi == low means there is nothing in the partition to consume. | # hi == low means there is nothing in the partition to consume. | ||||
# If the partition is not empty, retrieve the committed offset, | # If the partition is not empty, retrieve the committed offset, | ||||
# if any, to use it at lo offset. | # if any, to use it at lo offset. | ||||
committed = client.consumer.committed([tp])[0] | committed = client.consumer.committed([tp])[0] | ||||
lo = max(lo, committed.offset) | lo = max(lo, committed.offset) | ||||
if hi > lo: | if hi > lo: | ||||
# do only process the partition is there are actually new | # do only process the partition is there are actually new | ||||
# messages to process (partition not empty and committed | # messages to process (partition not empty and committed | ||||
▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | class ParallelJournalProcessor: | ||||
def export_worker(self, assignment, progress_queue): | def export_worker(self, assignment, progress_queue): | ||||
worker = JournalProcessorWorker( | worker = JournalProcessorWorker( | ||||
self.config, | self.config, | ||||
self.exporters, | self.exporters, | ||||
self.group_id, | self.group_id, | ||||
self.obj_type, | self.obj_type, | ||||
self.offsets, | self.offsets, | ||||
self.reset_offsets, | |||||
assignment, | assignment, | ||||
progress_queue, | progress_queue, | ||||
self.node_sets_path, | self.node_sets_path, | ||||
) | ) | ||||
with worker: | with worker: | ||||
worker.run() | worker.run() | ||||
class JournalProcessorWorker: | class JournalProcessorWorker: | ||||
""" | """ | ||||
Worker process that processes all the messages and calls the given exporters | Worker process that processes all the messages and calls the given exporters | ||||
for each object read from the journal. | for each object read from the journal. | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
config, | config, | ||||
exporters: Sequence[Tuple[Type[Exporter], Dict[str, Any]]], | exporters: Sequence[Tuple[Type[Exporter], Dict[str, Any]]], | ||||
group_id: str, | group_id: str, | ||||
obj_type: str, | obj_type: str, | ||||
offsets: Dict[int, Tuple[int, int]], | offsets: Dict[int, Tuple[int, int]], | ||||
reset_offsets: bool, | |||||
assignment: Sequence[int], | assignment: Sequence[int], | ||||
progress_queue: multiprocessing.Queue, | progress_queue: multiprocessing.Queue, | ||||
node_sets_path: Path, | node_sets_path: Path, | ||||
): | ): | ||||
self.config = config | self.config = config | ||||
self.group_id = group_id | self.group_id = group_id | ||||
self.obj_type = obj_type | self.obj_type = obj_type | ||||
self.offsets = offsets | self.offsets = offsets | ||||
self.reset_offsets = reset_offsets | |||||
self.assignment = assignment | self.assignment = assignment | ||||
self.progress_queue = progress_queue | self.progress_queue = progress_queue | ||||
self.node_sets_path = node_sets_path | self.node_sets_path = node_sets_path | ||||
self.node_sets_path.mkdir(exist_ok=True, parents=True) | self.node_sets_path.mkdir(exist_ok=True, parents=True) | ||||
self.node_sets: Dict[Tuple[int, str], LevelDBSet] = {} | self.node_sets: Dict[Tuple[int, str], LevelDBSet] = {} | ||||
self.exporters = [ | self.exporters = [ | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def run(self): | ||||
incoming messages. | incoming messages. | ||||
""" | """ | ||||
client = JournalClientOffsetRanges( | client = JournalClientOffsetRanges( | ||||
**self.config["journal"], | **self.config["journal"], | ||||
object_types=[self.obj_type], | object_types=[self.obj_type], | ||||
group_id=self.group_id, | group_id=self.group_id, | ||||
debug="cgrp,broker", | debug="cgrp,broker", | ||||
offset_ranges=self.offsets, | offset_ranges=self.offsets, | ||||
reset_offsets=self.reset_offsets, | |||||
assignment=self.assignment, | assignment=self.assignment, | ||||
progress_queue=self.progress_queue, | progress_queue=self.progress_queue, | ||||
**{"message.max.bytes": str(500 * 1024 * 1024)}, | **{"message.max.bytes": str(500 * 1024 * 1024)}, | ||||
) | ) | ||||
client.process(self.process_messages) | client.process(self.process_messages) | ||||
def process_messages(self, messages): | def process_messages(self, messages): | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 53 Lines • Show Last 20 Lines |