Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/mongo/storage.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from datetime import datetime, timezone | |||||
import os | |||||
from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Union | |||||
from bson import ObjectId | |||||
import pymongo.database | |||||
from swh.model.model import Sha1Git | |||||
from ..interface import ( | |||||
EntityType, | |||||
ProvenanceResult, | |||||
RelationData, | |||||
RelationType, | |||||
RevisionData, | |||||
) | |||||
class ProvenanceStorageMongoDb: | |||||
def __init__(self, db: pymongo.database.Database): | |||||
self.db = db | |||||
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | |||||
pass | |||||
def content_find_all(self, id: Sha1Git, limit: Optional[int] = None) -> Generator[ProvenanceResult, None, None]: | |||||
pass | |||||
def content_add(self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool: | |||||
for each_cnt in cnts: | |||||
try: | |||||
Entity.factory('content')(each_cnt).add_if_older() | |||||
except DBError as e: | |||||
# logging and skipping this item | |||||
# FIXME, add logging, raise if needed | |||||
pass | |||||
return True | |||||
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | |||||
try: | |||||
contents = Entity.factory('content').get_all_in_list({'sha1': ids}) | |||||
except DBError as e: | |||||
# logging and returning None | |||||
# FIXME, add logging, raise if needed | |||||
return None | |||||
return Entity.factory('content').dump_list_as_date_dict(contents) | |||||
def directory_add(self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool: | |||||
for each_cnt in cnts: | |||||
try: | |||||
Entity.factory('directory')(each_cnt).add_if_older() | |||||
except DBError as e: | |||||
# logging and skipping this item | |||||
# FIXME, add logging, raise if needed | |||||
pass | |||||
return True | |||||
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | |||||
try: | |||||
directories = Entity.factory('directory').get_all_in_list({'sha1': ids}) | |||||
except DBError as e: | |||||
# logging and returning None | |||||
# FIXME, add logging, raise if needed | |||||
return None | |||||
return Entity.factory('directory').dump_list_as_date_dict(contents) | |||||
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: | |||||
return Entity.factory(entity).get_all_in_list({'sha1': ids}) | |||||
def location_add(self, paths: Iterable[bytes]) -> bool: | |||||
# TODO: implement this methods if path are to be stored in a separate collection | |||||
return True | |||||
def location_get_all(self) -> Set[bytes]: | |||||
pass | |||||
def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: | |||||
return True | |||||
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | |||||
pass | |||||
def revision_add(self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]) -> bool: | |||||
return True | |||||
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | |||||
pass | |||||
def relation_add(self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]) -> bool: | |||||
return True | |||||
def relation_get(self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False) -> Dict[Sha1Git, Set[RelationData]]: | |||||
return pass | |||||
def relation_get_all(self, relation: RelationType) -> Dict[Sha1Git, Set[RelationData]]: | |||||
pass | |||||
def with_path(self) -> bool: | |||||
return True |