Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/inmemory.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from multiprocessing import Manager | from multiprocessing import Manager | ||||
from typing import Any, Callable, Dict, Generic, List, Tuple, TypeVar | from typing import Any, Callable, Dict, Generic, Iterable, List, Tuple | ||||
from . import ValueProtocol | from .interface import TValue | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
TValue = TypeVar("TValue", bound=ValueProtocol) | |||||
class InMemoryJournalWriter(Generic[TValue]): | class InMemoryJournalWriter(Generic[TValue]): | ||||
objects: List[Tuple[str, TValue]] | objects: List[Tuple[str, TValue]] | ||||
privileged_objects: List[Tuple[str, TValue]] | privileged_objects: List[Tuple[str, TValue]] | ||||
def __init__( | def __init__( | ||||
self, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]] | self, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]] | ||||
): | ): | ||||
# Share the list of objects across processes, for RemoteAPI tests. | # Share the list of objects across processes, for RemoteAPI tests. | ||||
self.manager = Manager() | self.manager = Manager() | ||||
self.objects = self.manager.list() | self.objects = self.manager.list() | ||||
self.privileged_objects = self.manager.list() | self.privileged_objects = self.manager.list() | ||||
def write_addition( | def write_addition(self, object_type: str, object_: TValue) -> None: | ||||
self, object_type: str, object_: TValue, privileged: bool = False | |||||
) -> None: | |||||
object_.unique_key() # Check this does not error, to mimic the kafka writer | object_.unique_key() # Check this does not error, to mimic the kafka writer | ||||
if privileged: | anon_object_ = object_.anonymize() | ||||
if anon_object_ is not None: | |||||
self.privileged_objects.append((object_type, object_)) | self.privileged_objects.append((object_type, object_)) | ||||
self.objects.append((object_type, anon_object_)) | |||||
else: | else: | ||||
self.objects.append((object_type, object_)) | self.objects.append((object_type, object_)) | ||||
write_update = write_addition | def write_additions(self, object_type: str, objects: Iterable[TValue]) -> None: | ||||
def write_additions( | |||||
self, object_type: str, objects: List[TValue], privileged: bool = False | |||||
) -> None: | |||||
for object_ in objects: | for object_ in objects: | ||||
self.write_addition(object_type, object_, privileged) | self.write_addition(object_type, object_) | ||||
def flush(self) -> None: | |||||
pass |