Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/in_memory_backend.py
# Copyright (C) 2017-2021 The Software Heritage developers | # Copyright (C) 2017-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Any, Dict, List, Optional, Tuple, Union | from typing import Any, Dict, List, Optional, Tuple | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.identifiers import CoreSWHID | ||||
from .cache import VaultCache | from .cache import VaultCache | ||||
ObjectId = Union[str, bytes] | |||||
class InMemoryVaultBackend: | class InMemoryVaultBackend: | ||||
"""Stub vault backend, for use in the CLI.""" | """Stub vault backend, for use in the CLI.""" | ||||
def __init__(self): | def __init__(self): | ||||
self._cache = VaultCache(cls="memory") | self._cache = VaultCache(cls="memory") | ||||
def fetch(self, bundle_type: str, obj_id: ObjectId) -> Optional[bytes]: | def fetch(self, bundle_type: str, swhid: CoreSWHID) -> Optional[bytes]: | ||||
return self._cache.get(bundle_type, hash_to_bytes(obj_id)) | return self._cache.get(bundle_type, swhid) | ||||
def cook( | def cook( | ||||
self, bundle_type: str, obj_id: ObjectId, email: Optional[str] = None | self, bundle_type: str, swhid: CoreSWHID, email: Optional[str] = None | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
raise NotImplementedError("InMemoryVaultBackend.cook()") | raise NotImplementedError("InMemoryVaultBackend.cook()") | ||||
def progress(self, bundle_type: str, obj_id: ObjectId): | def progress(self, bundle_type: str, swhid: CoreSWHID): | ||||
raise NotImplementedError("InMemoryVaultBackend.progress()") | raise NotImplementedError("InMemoryVaultBackend.progress()") | ||||
# Cookers endpoints | # Cookers endpoints | ||||
def set_progress(self, bundle_type: str, obj_id: ObjectId, progress: str) -> None: | def set_progress(self, bundle_type: str, swhid: CoreSWHID, progress: str) -> None: | ||||
pass | pass | ||||
def set_status(self, bundle_type: str, obj_id: ObjectId, status: str) -> None: | def set_status(self, bundle_type: str, swhid: CoreSWHID, status: str) -> None: | ||||
pass | pass | ||||
def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle) -> bool: | def put_bundle(self, bundle_type: str, swhid: CoreSWHID, bundle) -> bool: | ||||
self._cache.add(bundle_type, hash_to_bytes(obj_id), bundle) | self._cache.add(bundle_type, swhid, bundle) | ||||
return True | return True | ||||
def send_notif(self, bundle_type: str, obj_id: ObjectId): | def send_notif(self, bundle_type: str, swhid: CoreSWHID): | ||||
pass | pass | ||||
# Batch endpoints | # Batch endpoints | ||||
def batch_cook(self, batch: List[Tuple[str, str]]) -> int: | def batch_cook(self, batch: List[Tuple[str, str]]) -> int: | ||||
raise NotImplementedError("InMemoryVaultBackend.batch_cook()") | raise NotImplementedError("InMemoryVaultBackend.batch_cook()") | ||||
def batch_progress(self, batch_id: int) -> Dict[str, Any]: | def batch_progress(self, batch_id: int) -> Dict[str, Any]: | ||||
pass | pass |