Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/inmemory.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from multiprocessing import Manager | from multiprocessing import Manager | ||||
from typing import Any, Callable, Dict, Generic, List, Tuple, TypeVar | from typing import Any, Callable, Dict, Iterable, List, Tuple | ||||
from . import ValueProtocol | from .interface import ValueProtocol | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
TValue = TypeVar("TValue", bound=ValueProtocol) | class InMemoryJournalWriter: | ||||
objects: List[Tuple[str, ValueProtocol]] | |||||
privileged_objects: List[Tuple[str, ValueProtocol]] | |||||
class InMemoryJournalWriter(Generic[TValue]): | |||||
objects: List[Tuple[str, TValue]] | |||||
privileged_objects: List[Tuple[str, TValue]] | |||||
def __init__( | def __init__( | ||||
self, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]] | self, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]] | ||||
): | ): | ||||
# Share the list of objects across processes, for RemoteAPI tests. | # Share the list of objects across processes, for RemoteAPI tests. | ||||
self.manager = Manager() | self.manager = Manager() | ||||
self.objects = self.manager.list() | self.objects = self.manager.list() | ||||
self.privileged_objects = self.manager.list() | self.privileged_objects = self.manager.list() | ||||
def write_addition( | def write_addition(self, object_type: str, object_: ValueProtocol) -> None: | ||||
self, object_type: str, object_: TValue, privileged: bool = False | |||||
) -> None: | |||||
object_.unique_key() # Check this does not error, to mimic the kafka writer | object_.unique_key() # Check this does not error, to mimic the kafka writer | ||||
if privileged: | anon_object_ = object_.anonymize() | ||||
if anon_object_ is not None: | |||||
self.privileged_objects.append((object_type, object_)) | self.privileged_objects.append((object_type, object_)) | ||||
self.objects.append((object_type, anon_object_)) | |||||
else: | else: | ||||
self.objects.append((object_type, object_)) | self.objects.append((object_type, object_)) | ||||
write_update = write_addition | |||||
def write_additions( | def write_additions( | ||||
self, object_type: str, objects: List[TValue], privileged: bool = False | self, object_type: str, objects: Iterable[ValueProtocol] | ||||
) -> None: | ) -> None: | ||||
for object_ in objects: | for object_ in objects: | ||||
self.write_addition(object_type, object_, privileged) | self.write_addition(object_type, object_) | ||||
def flush(self) -> None: | |||||
pass |