Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/validate.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import contextlib | import contextlib | ||||
from typing import Dict, Iterable, Iterator, Optional, Tuple, Type, TypeVar, Union | from typing import Dict, Iterable, Iterator, Optional, Tuple, Type, TypeVar, Union | ||||
from deprecated import deprecated | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
SkippedContent, | SkippedContent, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
Revision, | Revision, | ||||
Release, | Release, | ||||
Snapshot, | Snapshot, | ||||
OriginVisit, | OriginVisit, | ||||
▲ Show 20 Lines • Show All 119 Lines • ▼ Show 20 Lines | def snapshot_add(self, snapshots: Iterable[Union[Dict, Snapshot]]) -> Dict: | ||||
) | ) | ||||
def origin_visit_add(self, visits: Iterable[OriginVisit]) -> Iterable[OriginVisit]: | def origin_visit_add(self, visits: Iterable[OriginVisit]) -> Iterable[OriginVisit]: | ||||
return self.storage.origin_visit_add(visits) | return self.storage.origin_visit_add(visits) | ||||
def origin_add(self, origins: Iterable[Union[Dict, Origin]]) -> Dict[str, int]: | def origin_add(self, origins: Iterable[Union[Dict, Origin]]) -> Dict[str, int]: | ||||
return self.storage.origin_add([dict_converter(Origin, o) for o in origins]) | return self.storage.origin_add([dict_converter(Origin, o) for o in origins]) | ||||
@deprecated("Use origin_add([origin]) instead") | |||||
def origin_add_one(self, origin: Union[Dict, Origin]) -> int: | def origin_add_one(self, origin: Union[Dict, Origin]) -> int: | ||||
return self.storage.origin_add_one(dict_converter(Origin, origin)) | return self.storage.origin_add_one(dict_converter(Origin, origin)) | ||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | ||||
return self.storage.clear_buffers(object_types) | return self.storage.clear_buffers(object_types) | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | ||||
return self.storage.flush(object_types) | return self.storage.flush(object_types) |