Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/__init__.py
# Copyright (C) 2019-2021 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Any, Dict, Optional, TypeVar | from typing import Any, Dict, Type | ||||
import warnings | import warnings | ||||
from typing_extensions import Protocol | from .interface import JournalWriterInterface | ||||
from swh.model.model import KeyType | |||||
TSelf = TypeVar("TSelf") | |||||
class ValueProtocol(Protocol): | |||||
def anonymize(self: TSelf) -> Optional[TSelf]: | |||||
... | |||||
def unique_key(self) -> KeyType: | |||||
... | |||||
def to_dict(self) -> Dict[str, Any]: | |||||
... | |||||
def model_object_dict_sanitizer( | def model_object_dict_sanitizer( | ||||
object_type: str, object_dict: Dict[str, Any] | object_type: str, object_dict: Dict[str, Any] | ||||
) -> Dict[str, str]: | ) -> Dict[str, str]: | ||||
object_dict = object_dict.copy() | object_dict = object_dict.copy() | ||||
if object_type == "content": | if object_type == "content": | ||||
object_dict.pop("data", None) | object_dict.pop("data", None) | ||||
return object_dict | return object_dict | ||||
def get_journal_writer(cls, **kwargs): | def get_journal_writer(cls, **kwargs) -> JournalWriterInterface: | ||||
if "args" in kwargs: | if "args" in kwargs: | ||||
warnings.warn( | warnings.warn( | ||||
'Explicit "args" key is deprecated, use keys directly instead.', | 'Explicit "args" key is deprecated, use keys directly instead.', | ||||
DeprecationWarning, | DeprecationWarning, | ||||
) | ) | ||||
kwargs = kwargs["args"] | kwargs = kwargs["args"] | ||||
kwargs.setdefault("value_sanitizer", model_object_dict_sanitizer) | kwargs.setdefault("value_sanitizer", model_object_dict_sanitizer) | ||||
if cls == "inmemory": # FIXME: Remove inmemory in due time | if cls == "inmemory": # FIXME: Remove inmemory in due time | ||||
warnings.warn( | warnings.warn( | ||||
"cls = 'inmemory' is deprecated, use 'memory' instead", DeprecationWarning | "cls = 'inmemory' is deprecated, use 'memory' instead", DeprecationWarning | ||||
) | ) | ||||
cls = "memory" | cls = "memory" | ||||
JournalWriter: Type[JournalWriterInterface] | |||||
if cls == "memory": | if cls == "memory": | ||||
from .inmemory import InMemoryJournalWriter as JournalWriter | from .inmemory import InMemoryJournalWriter | ||||
JournalWriter = InMemoryJournalWriter | |||||
elif cls == "kafka": | elif cls == "kafka": | ||||
from .kafka import KafkaJournalWriter as JournalWriter | from .kafka import KafkaJournalWriter | ||||
JournalWriter = KafkaJournalWriter | |||||
elif cls == "stream": | elif cls == "stream": | ||||
from .stream import StreamJournalWriter as JournalWriter | from .stream import StreamJournalWriter | ||||
JournalWriter = StreamJournalWriter | |||||
assert "output_stream" in kwargs | assert "output_stream" in kwargs | ||||
else: | else: | ||||
raise ValueError("Unknown journal writer class `%s`" % cls) | raise ValueError("Unknown journal writer class `%s`" % cls) | ||||
return JournalWriter(**kwargs) | return JournalWriter(**kwargs) |