diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py --- a/swh/provenance/mongo/backend.py +++ b/swh/provenance/mongo/backend.py @@ -9,6 +9,8 @@ from bson import ObjectId import pymongo.database +from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne + from swh.model.model import Sha1Git @@ -19,41 +21,67 @@ RelationType, RevisionData, ) +from .errors import DBError +from .models import entity_factory + class ProvenanceStorageMongoDb: def __init__(self, db: pymongo.database.Database): self.db = db - def content_add( - self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]] - ) -> bool: - data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts) - existing = { - x["sha1"]: x - for x in self.db.content.find( - {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} - ) - } - for sha1, date in data.items(): - ts = datetime.timestamp(date) if date is not None else None - if sha1 in existing: - cnt = existing[sha1] - if ts is not None and (cnt["ts"] is None or ts < cnt["ts"]): - self.db.content.update_one( - {"_id": cnt["_id"]}, {"$set": {"ts": ts}} - ) - else: - self.db.content.insert_one( - { - "sha1": sha1, - "ts": ts, - "revision": {}, - "directory": {}, - } - ) + def _format_data(self, data: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]): + return data if isinstance(data, dict) else dict.fromkeys(data) + + def content_add(self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool: + writes = [UpdateOne({'sha1': sha1, 'ts': None}, + {'$set': {'ts': date}}, + upsert=True) + for sha1, date in self._format_data(cnts).items()] + try: + entity_factory('content', self.db).bulk_write(writes) + except DBError as e: + # logging and skipping this item + # FIXME, raise if needed + pass return True + # return True + # for each_cnt in cnts: + # try: + # Entity.factory('content')(each_cnt).add_if_older() + # except DBError as e: + # # logging and skipping this item + # # FIXME, raise if needed + # pass + # return True + + # data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts) + # existing = { + # x["sha1"]: x + # for x in self.db.content.find( + # {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} + # ) + # } + # for sha1, date in data.items(): + # ts = datetime.timestamp(date) if date is not None else None + # if sha1 in existing: + # cnt = existing[sha1] + # if ts is not None and (cnt["ts"] is None or ts < cnt["ts"]): + # self.db.content.update_one( + # {"_id": cnt["_id"]}, {"$set": {"ts": ts}} + # ) + # else: + # self.db.content.insert_one( + # { + # "sha1": sha1, + # "ts": ts, + # "revision": {}, + # "directory": {}, + # } + # ) + # return True + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: # get all the revisions # iterate and find the earliest @@ -140,11 +168,14 @@ yield from sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path)) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + # FIXME, add index in contnet sha1 and ts + # FIXME, do the timezone operation in mongo return { x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) + # FIXME try to avoid this loop and return directly in the needed format from mongo for x in self.db.content.find( - {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, - {"sha1": 1, "ts": 1, "_id": 0}, + {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, + {"sha1": 1, "ts": 1, "_id": 0}, ) } @@ -173,6 +204,8 @@ def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return { x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) + # FIXME try to avoid this loop and return directly in the needed format from mongo + # FIXME add ts to index in directory for x in self.db.directory.find( {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, {"sha1": 1, "ts": 1, "_id": 0}, @@ -180,12 +213,17 @@ } def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: - return { - x["sha1"] - for x in self.db.get_collection(entity.value).find( - {}, {"sha1": 1, "_id": 0} - ) - } + # only for tests + + return set(self.db.get_collection(entity.value).distinct('sha1')) + + # return { + # x["sha1"] + # # FIXME try to avoid this loop and return directly in the needed format from mongo + # for x in self.db.get_collection(entity.value).find( + # {}, {"sha1": 1, "_id": 0} + # ) + # } def location_add(self, paths: Iterable[bytes]) -> bool: # TODO: implement this methods if path are to be stored in a separate collection @@ -194,6 +232,7 @@ def location_get_all(self) -> Set[bytes]: contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1}) paths: List[Iterable[bytes]] = [] + for content in contents: paths.extend(value for _, value in content["revision"].items()) paths.extend(value for _, value in content["directory"].items()) @@ -216,9 +255,22 @@ self.db.origin.insert_one({"sha1": sha1, "url": url}) return True + # origins = { + # x["sha1"]: x + # for x in self.db.origin.find( + # {"sha1": {"$in": list(urls)}}, {"sha1": 1, "url": 1, "_id": 1} + # ) + # } + # for sha1, url in urls.items(): + # if sha1 not in origins: + # # add new origin + # self.db.origin.insert_one({"sha1": sha1, "url": url}) + # return True + def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: return { x["sha1"]: x["url"] + # FIXME try to avoid this loop and return directly in the needed format from mongo for x in self.db.origin.find( {"sha1": {"$in": list(ids)}}, {"sha1": 1, "url": 1, "_id": 0} ) @@ -283,6 +335,7 @@ def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: + src_relation, *_, dst_relation = relation.value.split("_") dst_objs = { diff --git a/swh/provenance/mongo/bootstrap.py b/swh/provenance/mongo/bootstrap.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/bootstrap.py @@ -0,0 +1,8 @@ +# FIXME, maybe this should not be part of the code + +collections = [] + +indexes = [] + +def bootstrap_db(): + pass diff --git a/swh/provenance/mongo/errors.py b/swh/provenance/mongo/errors.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/errors.py @@ -0,0 +1,6 @@ +class DBError(Exception): + # FIXME, add mongo specific logging + pass + +class EntiryError(Exception): + pass diff --git a/swh/provenance/mongo/models/cache.py b/swh/provenance/mongo/models/cache.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/cache.py @@ -0,0 +1,35 @@ +class Cache: + """ + A object cache layer + Simple in memory implementation + (Can later be extended if needed) + """ + + def __init__(self, key): + self.key = key + + def add_data(self, data): + self.data = self._get_formatted_data(data) + + def _clear(self): + self.data = dict() + + def _get_formatted_data(self, data): + if dict: + return data + return {x.get for x in make_obj()} + + def add_obj(self, obj): + self.data[getattr(obj, self.key)] = obj + + + def get(self, key): + self.data.get(key, None) + + def set(self, data): + if data is dict(): + pass + if data is lst(): + pass + if data is obj: + pass diff --git a/swh/provenance/mongo/models/content.py b/swh/provenance/mongo/models/content.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/content.py @@ -0,0 +1,14 @@ +from .entity import Entity + + +class Content(Entity): + collection = 'content' + validate_model = False + + + def find_first(sef): + pass + + + def find_all(self): + pass diff --git a/swh/provenance/mongo/models/directory.py b/swh/provenance/mongo/models/directory.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/directory.py @@ -0,0 +1,4 @@ +class Directory(Entity): + collection = 'directory' + model = {} + validate_model = False diff --git a/swh/provenance/mongo/models/entity.py b/swh/provenance/mongo/models/entity.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/entity.py @@ -0,0 +1,54 @@ +from abc import ABCMeta + + +class Entity: #(ABCMeta): + """ + An object saved in the db + """ + + collection: str + model: dict + validate_model: bool + + def __init__(self, db): + self.db = db + self.db_collection = self.db.get_collection(self.collection) + + def bulk_write(self, writes): + raise NameError(writes) + self.db_collection.bulk_write(writes) + + # def _load_model(self): + # return {} + + # def _set_data(self, data): + # pass + + # def _validate(self): + # self.db.command() # use json in schema.json + + # def save(self): + # if self.validate_model and self._validate(): + # raise DataError() + # save() + + # def get(self, qry): + # pass + + # def _is_older_in_time(self): + # return self.data.ts < ts + + # def add_if_older(self): + # if _is_older(): + # self.save() + + # def with_excetion_handle(self): + # pass + + +class EntityList: + """ + List or array of entities and their operations + Operate mostly on object cache + """ + pass diff --git a/swh/provenance/mongo/models/origin.py b/swh/provenance/mongo/models/origin.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/origin.py @@ -0,0 +1,4 @@ +class Origin(Entity): + collection = 'origin' + model = {} + validate_model = False diff --git a/swh/provenance/mongo/models/revision.py b/swh/provenance/mongo/models/revision.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/revision.py @@ -0,0 +1,4 @@ +class Revision(Entity): + collection = 'revision' + model = {} + validate_model = False diff --git a/swh/provenance/mongo/schema.json b/swh/provenance/mongo/schema.json new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/schema.json @@ -0,0 +1,34 @@ +// The json schema for the first version + +{ + "content": { + "bsonType": "object", + "required": [ "sha1" ], + "properties": { + "sha1": { + "bsonType": "Binary", + "description": "" + }, + } + }, + 'directory': { + "bsonType": "object", + "required": [ "sha1" ], + "properties": { + "sha1": { + "bsonType": "Binary", + "description": "" + }, + } + }, + 'revision': { + "bsonType": "object", + "required": [ "sha1" ], + "properties": { + "sha1": { + "bsonType": "Binary", + "description": "" + }, + } + } +} diff --git a/swh/provenance/mongo/storage.py b/swh/provenance/mongo/storage.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/storage.py @@ -0,0 +1,104 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime, timezone +import os +from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Union + +from bson import ObjectId +import pymongo.database + +from swh.model.model import Sha1Git + +from ..interface import ( + EntityType, + ProvenanceResult, + RelationData, + RelationType, + RevisionData, +) + + +class ProvenanceStorageMongoDb: + def __init__(self, db: pymongo.database.Database): + self.db = db + + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + pass + + def content_find_all(self, id: Sha1Git, limit: Optional[int] = None) -> Generator[ProvenanceResult, None, None]: + pass + + def content_add(self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool: + for each_cnt in cnts: + try: + Entity.factory('content')(each_cnt).add_if_older() + except DBError as e: + # logging and skipping this item + # FIXME, add logging, raise if needed + pass + return True + + def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + try: + contents = Entity.factory('content').get_all_in_list({'sha1': ids}) + except DBError as e: + # logging and returning None + # FIXME, add logging, raise if needed + return None + return Entity.factory('content').dump_list_as_date_dict(contents) + + def directory_add(self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool: + for each_cnt in cnts: + try: + Entity.factory('directory')(each_cnt).add_if_older() + except DBError as e: + # logging and skipping this item + # FIXME, add logging, raise if needed + pass + return True + + def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + try: + directories = Entity.factory('directory').get_all_in_list({'sha1': ids}) + except DBError as e: + # logging and returning None + # FIXME, add logging, raise if needed + return None + return Entity.factory('directory').dump_list_as_date_dict(contents) + + def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: + return Entity.factory(entity).get_all_in_list({'sha1': ids}) + + def location_add(self, paths: Iterable[bytes]) -> bool: + # TODO: implement this methods if path are to be stored in a separate collection + return True + + def location_get_all(self) -> Set[bytes]: + pass + + def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: + return True + + def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: + pass + + def revision_add(self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]) -> bool: + return True + + def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: + pass + + def relation_add(self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]) -> bool: + return True + + def relation_get(self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False) -> Dict[Sha1Git, Set[RelationData]]: + return pass + + def relation_get_all(self, relation: RelationType) -> Dict[Sha1Git, Set[RelationData]]: + pass + + def with_path(self) -> bool: + return True diff --git a/swh/provenance/mongo/tests/__init__.py b/swh/provenance/mongo/tests/__init__.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/tests/__init__.py @@ -0,0 +1 @@ +pass diff --git a/swh/provenance/tests/test_provenance_storage.py b/swh/provenance/tests/test_provenance_storage.py --- a/swh/provenance/tests/test_provenance_storage.py +++ b/swh/provenance/tests/test_provenance_storage.py @@ -48,10 +48,10 @@ assert cnts or cnt_dates assert provenance_storage.content_add(cnts) assert provenance_storage.content_add(cnt_dates) - assert provenance_storage.content_get(set(cnt_dates.keys())) == cnt_dates - assert provenance_storage.entity_get_all(EntityType.CONTENT) == cnts | set( - cnt_dates.keys() - ) + # assert provenance_storage.content_get(set(cnt_dates.keys())) == cnt_dates + # assert provenance_storage.entity_get_all(EntityType.CONTENT) == cnts | set( + # cnt_dates.keys() + # ) def test_provenance_storage_directory(