Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/interface.py
- This file was added.
# Copyright (C) 2015-2022 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from typing import Any, Dict, Iterable, Optional, TypeVar | |||||
from typing_extensions import Protocol, runtime_checkable | |||||
from swh.model.model import KeyType | |||||
TSelf = TypeVar("TSelf") | |||||
class ValueProtocol(Protocol): | |||||
def anonymize(self: TSelf) -> Optional[TSelf]: | |||||
... | |||||
def unique_key(self) -> KeyType: | |||||
... | |||||
def to_dict(self) -> Dict[str, Any]: | |||||
... | |||||
TValue = TypeVar("TValue", bound=ValueProtocol) | |||||
@runtime_checkable | |||||
class JournalWriterInterface(Protocol): | |||||
def write_addition( | |||||
self, object_type: str, object_: TValue, privileged: bool = False | |||||
) -> None: | |||||
"""Add a SWH object of type object_type in the journal.""" | |||||
... | |||||
def write_additions( | |||||
self, object_type: str, objects: Iterable[TValue], privileged: bool = False | |||||
) -> None: | |||||
"""Add a list of SWH objects of type object_type in the journal.""" | |||||
... | |||||
def flush(self) -> None: | |||||
"""Flush the pending object additions in the backend, if any.""" | |||||
... |