Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/journalprocessor.py
Show All 12 Lines | |||||
from pathlib import Path | from pathlib import Path | ||||
import time | import time | ||||
from typing import Any, Dict, Mapping, Sequence, Tuple, Type | from typing import Any, Dict, Mapping, Sequence, Tuple, Type | ||||
from confluent_kafka import TopicPartition | from confluent_kafka import TopicPartition | ||||
import tqdm | import tqdm | ||||
from swh.dataset.exporter import Exporter | from swh.dataset.exporter import Exporter | ||||
from swh.dataset.utils import SQLiteSet | from swh.dataset.utils import LevelDBSet | ||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.serializers import kafka_to_value | from swh.journal.serializers import kafka_to_value | ||||
from swh.model.identifiers import origin_identifier | from swh.model.identifiers import origin_identifier | ||||
from swh.storage.fixer import fix_objects | from swh.storage.fixer import fix_objects | ||||
class JournalClientOffsetRanges(JournalClient): | class JournalClientOffsetRanges(JournalClient): | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 245 Lines • ▼ Show 20 Lines | ): | ||||
self.export_id = export_id | self.export_id = export_id | ||||
self.obj_type = obj_type | self.obj_type = obj_type | ||||
self.offsets = offsets | self.offsets = offsets | ||||
self.assignment = assignment | self.assignment = assignment | ||||
self.progress_queue = progress_queue | self.progress_queue = progress_queue | ||||
self.node_sets_path = node_sets_path | self.node_sets_path = node_sets_path | ||||
self.node_sets_path.mkdir(exist_ok=True, parents=True) | self.node_sets_path.mkdir(exist_ok=True, parents=True) | ||||
self.node_sets: Dict[Tuple[int, str], SQLiteSet] = {} | self.node_sets: Dict[Tuple[int, str], LevelDBSet] = {} | ||||
self.exporters = [ | self.exporters = [ | ||||
exporter_class(config, **kwargs) for exporter_class, kwargs in exporters | exporter_class(config, **kwargs) for exporter_class, kwargs in exporters | ||||
] | ] | ||||
self.exit_stack: contextlib.ExitStack = contextlib.ExitStack() | self.exit_stack: contextlib.ExitStack = contextlib.ExitStack() | ||||
def __enter__(self): | def __enter__(self): | ||||
self.exit_stack.__enter__() | self.exit_stack.__enter__() | ||||
Show All 19 Lines | def get_node_set_for_object(self, partition_id: int, object_id: bytes): | ||||
shard_id = (partition_id, obj_id_prefix) | shard_id = (partition_id, obj_id_prefix) | ||||
if shard_id not in self.node_sets: | if shard_id not in self.node_sets: | ||||
node_set_dir = ( | node_set_dir = ( | ||||
self.node_sets_path | self.node_sets_path | ||||
/ self.obj_type | / self.obj_type | ||||
/ ("part-{}".format(str(partition_id))) | / ("part-{}".format(str(partition_id))) | ||||
) | ) | ||||
node_set_dir.mkdir(exist_ok=True, parents=True) | node_set_dir.mkdir(exist_ok=True, parents=True) | ||||
node_set_file = node_set_dir / "nodes-{}.sqlite".format(obj_id_prefix) | node_set_file = node_set_dir / "nodes-{}.db".format(obj_id_prefix) | ||||
node_set = SQLiteSet(node_set_file) | node_set = LevelDBSet(node_set_file) | ||||
self.exit_stack.enter_context(node_set) | self.exit_stack.enter_context(node_set) | ||||
self.node_sets[shard_id] = node_set | self.node_sets[shard_id] = node_set | ||||
return self.node_sets[shard_id] | return self.node_sets[shard_id] | ||||
def run(self): | def run(self): | ||||
""" | """ | ||||
Start a Journal client on the given assignment and process all the | Start a Journal client on the given assignment and process all the | ||||
incoming messages. | incoming messages. | ||||
▲ Show 20 Lines • Show All 68 Lines • Show Last 20 Lines |