Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/stream.py
# Copyright (C) 2021-2022 The Software Heritage developers | # Copyright (C) 2021-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from typing import Any, BinaryIO, Callable, Dict, Iterable | from typing import Any, BinaryIO, Callable, Dict, Iterable | ||||
from swh.journal.serializers import value_to_kafka | from swh.journal.serializers import value_to_kafka | ||||
from .interface import ValueProtocol | from .interface import KeyType, ValueProtocol | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class StreamJournalWriter: | class StreamJournalWriter: | ||||
"""A simple JournalWriter which serializes objects in a stream | """A simple JournalWriter which serializes objects in a stream | ||||
Might be used to serialize a storage in a file to generate a test dataset. | Might be used to serialize a storage in a file to generate a test dataset. | ||||
Show All 14 Lines | def write_addition(self, object_type: str, object_: ValueProtocol) -> None: | ||||
self.output.write(value_to_kafka((object_type, dict_))) | self.output.write(value_to_kafka((object_type, dict_))) | ||||
def write_additions( | def write_additions( | ||||
self, object_type: str, objects: Iterable[ValueProtocol] | self, object_type: str, objects: Iterable[ValueProtocol] | ||||
) -> None: | ) -> None: | ||||
for object_ in objects: | for object_ in objects: | ||||
self.write_addition(object_type, object_) | self.write_addition(object_type, object_) | ||||
def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None: | |||||
pass | |||||
def flush(self) -> None: | def flush(self) -> None: | ||||
self.output.flush() | self.output.flush() |