Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/stream.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from typing import Any, BinaryIO, Callable, Dict, Generic, List, TypeVar | from typing import Any, BinaryIO, Callable, Dict, Generic, Iterable | ||||
from swh.journal.serializers import value_to_kafka | from swh.journal.serializers import value_to_kafka | ||||
from . import ValueProtocol | from .interface import TValue | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
TValue = TypeVar("TValue", bound=ValueProtocol) | |||||
class StreamJournalWriter(Generic[TValue]): | class StreamJournalWriter(Generic[TValue]): | ||||
"""A simple JournalWriter which serializes objects in a stream | """A simple JournalWriter which serializes objects in a stream | ||||
Might be used to serialize a storage in a file to generate a test dataset. | Might be used to serialize a storage in a file to generate a test dataset. | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
Show All 9 Lines | class StreamJournalWriter(Generic[TValue]): | ||||
) -> None: | ) -> None: | ||||
object_.unique_key() # Check this does not error, to mimic the kafka writer | object_.unique_key() # Check this does not error, to mimic the kafka writer | ||||
dict_ = self.value_sanitizer(object_type, object_.to_dict()) | dict_ = self.value_sanitizer(object_type, object_.to_dict()) | ||||
self.output.write(value_to_kafka((object_type, dict_))) | self.output.write(value_to_kafka((object_type, dict_))) | ||||
write_update = write_addition | write_update = write_addition | ||||
def write_additions( | def write_additions( | ||||
self, object_type: str, objects: List[TValue], privileged: bool = False | self, object_type: str, objects: Iterable[TValue], privileged: bool = False | ||||
) -> None: | ) -> None: | ||||
for object_ in objects: | for object_ in objects: | ||||
self.write_addition(object_type, object_, privileged) | self.write_addition(object_type, object_, privileged) | ||||
def flush(self) -> None: | |||||
self.output.flush() |