Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/journalprocessor.py
| Show All 12 Lines | |||||
| from pathlib import Path | from pathlib import Path | ||||
| import time | import time | ||||
| from typing import Any, Dict, Mapping, Sequence, Tuple, Type | from typing import Any, Dict, Mapping, Sequence, Tuple, Type | ||||
| from confluent_kafka import TopicPartition | from confluent_kafka import TopicPartition | ||||
| import tqdm | import tqdm | ||||
| from swh.dataset.exporter import Exporter | from swh.dataset.exporter import Exporter | ||||
| from swh.dataset.utils import SQLiteSet | from swh.dataset.utils import LevelDBSet | ||||
| from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
| from swh.journal.serializers import kafka_to_value | from swh.journal.serializers import kafka_to_value | ||||
| from swh.model.identifiers import origin_identifier | from swh.model.identifiers import origin_identifier | ||||
| from swh.storage.fixer import fix_objects | from swh.storage.fixer import fix_objects | ||||
| class JournalClientOffsetRanges(JournalClient): | class JournalClientOffsetRanges(JournalClient): | ||||
| """ | """ | ||||
| ▲ Show 20 Lines • Show All 245 Lines • ▼ Show 20 Lines | ): | ||||
| self.export_id = export_id | self.export_id = export_id | ||||
| self.obj_type = obj_type | self.obj_type = obj_type | ||||
| self.offsets = offsets | self.offsets = offsets | ||||
| self.assignment = assignment | self.assignment = assignment | ||||
| self.progress_queue = progress_queue | self.progress_queue = progress_queue | ||||
| self.node_sets_path = node_sets_path | self.node_sets_path = node_sets_path | ||||
| self.node_sets_path.mkdir(exist_ok=True, parents=True) | self.node_sets_path.mkdir(exist_ok=True, parents=True) | ||||
| self.node_sets: Dict[Tuple[int, str], SQLiteSet] = {} | self.node_sets: Dict[Tuple[int, str], LevelDBSet] = {} | ||||
| self.exporters = [ | self.exporters = [ | ||||
| exporter_class(config, **kwargs) for exporter_class, kwargs in exporters | exporter_class(config, **kwargs) for exporter_class, kwargs in exporters | ||||
| ] | ] | ||||
| self.exit_stack: contextlib.ExitStack = contextlib.ExitStack() | self.exit_stack: contextlib.ExitStack = contextlib.ExitStack() | ||||
| def __enter__(self): | def __enter__(self): | ||||
| self.exit_stack.__enter__() | self.exit_stack.__enter__() | ||||
| Show All 19 Lines | def get_node_set_for_object(self, partition_id: int, object_id: bytes): | ||||
| shard_id = (partition_id, obj_id_prefix) | shard_id = (partition_id, obj_id_prefix) | ||||
| if shard_id not in self.node_sets: | if shard_id not in self.node_sets: | ||||
| node_set_dir = ( | node_set_dir = ( | ||||
| self.node_sets_path | self.node_sets_path | ||||
| / self.obj_type | / self.obj_type | ||||
| / ("part-{}".format(str(partition_id))) | / ("part-{}".format(str(partition_id))) | ||||
| ) | ) | ||||
| node_set_dir.mkdir(exist_ok=True, parents=True) | node_set_dir.mkdir(exist_ok=True, parents=True) | ||||
| node_set_file = node_set_dir / "nodes-{}.sqlite".format(obj_id_prefix) | node_set_file = node_set_dir / "nodes-{}.db".format(obj_id_prefix) | ||||
| node_set = SQLiteSet(node_set_file) | node_set = LevelDBSet(node_set_file) | ||||
| self.exit_stack.enter_context(node_set) | self.exit_stack.enter_context(node_set) | ||||
| self.node_sets[shard_id] = node_set | self.node_sets[shard_id] = node_set | ||||
| return self.node_sets[shard_id] | return self.node_sets[shard_id] | ||||
| def run(self): | def run(self): | ||||
| """ | """ | ||||
| Start a Journal client on the given assignment and process all the | Start a Journal client on the given assignment and process all the | ||||
| incoming messages. | incoming messages. | ||||
| ▲ Show 20 Lines • Show All 68 Lines • Show Last 20 Lines | |||||