diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py --- a/swh/provenance/mongo/backend.py +++ b/swh/provenance/mongo/backend.py @@ -9,6 +9,8 @@ from bson import ObjectId import pymongo.database +from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne + from swh.model.model import Sha1Git @@ -19,41 +21,119 @@ RelationType, RevisionData, ) +from .errors import DBError +from .models import entity_factory + class ProvenanceStorageMongoDb: def __init__(self, db: pymongo.database.Database): self.db = db - def content_add( - self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]] - ) -> bool: - data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts) + def _format_data(self, data: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]): + return data if isinstance(data, dict) else dict.fromkeys(data) + + def _generate_date_upserts(self, sha1, date): + ts = datetime.timestamp(date) if date is not None else None + # update only those with date either as None or later than the given one + # FIXME, filter with future date or None + # FIXME, handle default insert in the schema + return UpdateOne({'sha1': sha1, 'ts': {'$gt': ts}}, {'$set': {'ts': ts}}, upsert=True) + + def content_add(self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool: + writes = [self._generate_date_upserts(sha1, date) for sha1, date in self._format_data(cnts).items()] + try: + entity_factory('content', self.db).bulk_write(writes) + except DBError as e: + # logging and skipping this item + # FIXME, raise if needed + pass + return True + + def directory_add(self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool: + writes = [self._generate_date_upserts(sha1, date) for sha1, date in self._format_data(dirs).items()] + try: + entity_factory('directory', self.db).bulk_write(writes) + except DBError as e: + # logging and skipping this item + # FIXME, raise if needed + pass + return True + + def revision_add(self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]) -> bool: + data = ( + revs + if isinstance(revs, dict) + else dict.fromkeys(revs, RevisionData(date=None, origin=None)) + ) existing = { x["sha1"]: x - for x in self.db.content.find( - {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} + for x in self.db.revision.find( + {"sha1": {"$in": list(data)}}, + {"sha1": 1, "ts": 1, "preferred": 1, "_id": 1}, ) } - for sha1, date in data.items(): - ts = datetime.timestamp(date) if date is not None else None + for sha1, info in data.items(): + ts = datetime.timestamp(info.date) if info.date is not None else None + preferred = info.origin if sha1 in existing: - cnt = existing[sha1] - if ts is not None and (cnt["ts"] is None or ts < cnt["ts"]): - self.db.content.update_one( - {"_id": cnt["_id"]}, {"$set": {"ts": ts}} + rev = existing[sha1] + if ts is None or (rev["ts"] is not None and ts >= rev["ts"]): + ts = rev["ts"] + if preferred is None: + preferred = rev["preferred"] + if ts != rev["ts"] or preferred != rev["preferred"]: + self.db.revision.update_one( + {"_id": rev["_id"]}, + {"$set": {"ts": ts, "preferred": preferred}}, ) else: - self.db.content.insert_one( + self.db.revision.insert_one( { "sha1": sha1, + "preferred": preferred, + "origin": [], + "revision": [], "ts": ts, - "revision": {}, - "directory": {}, } ) return True + def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + # FIXME, add index in contnet sha1 and ts + # FIXME, do the timezone operation in mongo + return { + x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) + # FIXME try to avoid this loop and return directly in the needed format from mongo + for x in self.db.content.find( + {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, + {"sha1": 1, "ts": 1, "_id": 0}, + ) + } + + def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + return { + x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) + # FIXME try to avoid this loop and return directly in the needed format from mongo + # FIXME add ts to index in directory + for x in self.db.directory.find( + {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, + {"sha1": 1, "ts": 1, "_id": 0}, + ) + } + + def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: + # only for tests + + return set(self.db.get_collection(entity.value).distinct('sha1')) + + def location_add(self, paths: Iterable[bytes]) -> bool: + # TODO: implement this methods if path are to be stored in a separate collection + return True + + #---------------------------------------------# + + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: # get all the revisions # iterate and find the earliest @@ -139,61 +219,11 @@ ) yield from sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path)) - def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: - return { - x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) - for x in self.db.content.find( - {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, - {"sha1": 1, "ts": 1, "_id": 0}, - ) - } - - def directory_add( - self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]] - ) -> bool: - data = dirs if isinstance(dirs, dict) else dict.fromkeys(dirs) - existing = { - x["sha1"]: x - for x in self.db.directory.find( - {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} - ) - } - for sha1, date in data.items(): - ts = datetime.timestamp(date) if date is not None else None - if sha1 in existing: - dir = existing[sha1] - if ts is not None and (dir["ts"] is None or ts < dir["ts"]): - self.db.directory.update_one( - {"_id": dir["_id"]}, {"$set": {"ts": ts}} - ) - else: - self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}}) - return True - - def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: - return { - x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) - for x in self.db.directory.find( - {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, - {"sha1": 1, "ts": 1, "_id": 0}, - ) - } - - def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: - return { - x["sha1"] - for x in self.db.get_collection(entity.value).find( - {}, {"sha1": 1, "_id": 0} - ) - } - - def location_add(self, paths: Iterable[bytes]) -> bool: - # TODO: implement this methods if path are to be stored in a separate collection - return True def location_get_all(self) -> Set[bytes]: contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1}) paths: List[Iterable[bytes]] = [] + for content in contents: paths.extend(value for _, value in content["revision"].items()) paths.extend(value for _, value in content["directory"].items()) @@ -216,54 +246,16 @@ self.db.origin.insert_one({"sha1": sha1, "url": url}) return True + def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: return { x["sha1"]: x["url"] + # FIXME try to avoid this loop and return directly in the needed format from mongo for x in self.db.origin.find( {"sha1": {"$in": list(ids)}}, {"sha1": 1, "url": 1, "_id": 0} ) } - def revision_add( - self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] - ) -> bool: - data = ( - revs - if isinstance(revs, dict) - else dict.fromkeys(revs, RevisionData(date=None, origin=None)) - ) - existing = { - x["sha1"]: x - for x in self.db.revision.find( - {"sha1": {"$in": list(data)}}, - {"sha1": 1, "ts": 1, "preferred": 1, "_id": 1}, - ) - } - for sha1, info in data.items(): - ts = datetime.timestamp(info.date) if info.date is not None else None - preferred = info.origin - if sha1 in existing: - rev = existing[sha1] - if ts is None or (rev["ts"] is not None and ts >= rev["ts"]): - ts = rev["ts"] - if preferred is None: - preferred = rev["preferred"] - if ts != rev["ts"] or preferred != rev["preferred"]: - self.db.revision.update_one( - {"_id": rev["_id"]}, - {"$set": {"ts": ts, "preferred": preferred}}, - ) - else: - self.db.revision.insert_one( - { - "sha1": sha1, - "preferred": preferred, - "origin": [], - "revision": [], - "ts": ts, - } - ) - return True def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: return { @@ -283,6 +275,7 @@ def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: + src_relation, *_, dst_relation = relation.value.split("_") dst_objs = { @@ -319,7 +312,7 @@ if src_relation != "revision": k = { obj_id: list(set(paths + dsts.get(obj_id, []))) - for obj_id, paths in src_objs[sha1][dst_relation].items() + for obj_id, paths in src_objs[sha1].get(dst_relation, {}).items() } self.db.get_collection(src_relation).update_one( {"_id": src_objs[sha1]["_id"]}, @@ -388,7 +381,7 @@ ) } src_objs = { - x["sha1"]: x[dst] + x["sha1"]: x.get(dst, {}) for x in self.db.get_collection(src).find( {}, {"_id": 0, "sha1": 1, dst: 1} ) @@ -420,7 +413,7 @@ src, *_, dst = relation.value.split("_") empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else [] src_objs = { - x["sha1"]: x[dst] + x["sha1"]: x.get(dst, {}) for x in self.db.get_collection(src).find( {dst: {"$ne": empty}}, {"_id": 0, "sha1": 1, dst: 1} ) diff --git a/swh/provenance/mongo/bootstrap.py b/swh/provenance/mongo/bootstrap.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/bootstrap.py @@ -0,0 +1,8 @@ +# FIXME, maybe this should not be part of the code + +collections = [] + +indexes = [] + +def bootstrap_db(): + pass diff --git a/swh/provenance/mongo/errors.py b/swh/provenance/mongo/errors.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/errors.py @@ -0,0 +1,6 @@ +class DBError(Exception): + # FIXME, add mongo specific logging + pass + +class EntiryError(Exception): + pass diff --git a/swh/provenance/mongo/models/__init__.py b/swh/provenance/mongo/models/__init__.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/__init__.py @@ -0,0 +1,13 @@ +from .content import Content +from .directory import Directory +# from ..error import EntiryError + + +def entity_factory(entity: str, db=None): + mapping = { + 'content': Content, + 'directory': Directory + } + if entity in mapping: + return mapping[entity](db) + raise AttributeError(f"invalid entity type {entity}") diff --git a/swh/provenance/mongo/models/cache.py b/swh/provenance/mongo/models/cache.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/cache.py @@ -0,0 +1,35 @@ +class Cache: + """ + A object cache layer + Simple in memory implementation + (Can later be extended if needed) + """ + + def __init__(self, key): + self.key = key + + def add_data(self, data): + self.data = self._get_formatted_data(data) + + def _clear(self): + self.data = dict() + + def _get_formatted_data(self, data): + if dict: + return data + return {x.get for x in make_obj()} + + def add_obj(self, obj): + self.data[getattr(obj, self.key)] = obj + + + def get(self, key): + self.data.get(key, None) + + def set(self, data): + if data is dict(): + pass + if data is lst(): + pass + if data is obj: + pass diff --git a/swh/provenance/mongo/models/content.py b/swh/provenance/mongo/models/content.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/content.py @@ -0,0 +1,14 @@ +from .entity import Entity + + +class Content(Entity): + collection = 'content' + validate_model = False + + + def find_first(sef): + pass + + + def find_all(self): + pass diff --git a/swh/provenance/mongo/models/directory.py b/swh/provenance/mongo/models/directory.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/directory.py @@ -0,0 +1,5 @@ +from .entity import Entity + +class Directory(Entity): + collection = 'directory' + validate_model = False diff --git a/swh/provenance/mongo/models/entity.py b/swh/provenance/mongo/models/entity.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/entity.py @@ -0,0 +1,53 @@ +from abc import ABCMeta + + +class Entity: #(ABCMeta): + """ + An object saved in the db + """ + + collection: str + model: dict + validate_model: bool + + def __init__(self, db): + self.db = db + self.db_collection = self.db.get_collection(self.collection) + + def bulk_write(self, writes): + self.db_collection.bulk_write(writes) + + # def _load_model(self): + # return {} + + # def _set_data(self, data): + # pass + + # def _validate(self): + # self.db.command() # use json in schema.json + + # def save(self): + # if self.validate_model and self._validate(): + # raise DataError() + # save() + + # def get(self, qry): + # pass + + # def _is_older_in_time(self): + # return self.data.ts < ts + + # def add_if_older(self): + # if _is_older(): + # self.save() + + # def with_excetion_handle(self): + # pass + + +class EntityList: + """ + List or array of entities and their operations + Operate mostly on object cache + """ + pass diff --git a/swh/provenance/mongo/models/origin.py b/swh/provenance/mongo/models/origin.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/origin.py @@ -0,0 +1,4 @@ +class Origin(Entity): + collection = 'origin' + model = {} + validate_model = False diff --git a/swh/provenance/mongo/models/revision.py b/swh/provenance/mongo/models/revision.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/models/revision.py @@ -0,0 +1,4 @@ +class Revision(Entity): + collection = 'revision' + model = {} + validate_model = False diff --git a/swh/provenance/mongo/schema.json b/swh/provenance/mongo/schema.json new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/schema.json @@ -0,0 +1,34 @@ +// The json schema for the first version + +{ + "content": { + "bsonType": "object", + "required": [ "sha1" ], + "properties": { + "sha1": { + "bsonType": "Binary", + "description": "" + }, + } + }, + 'directory': { + "bsonType": "object", + "required": [ "sha1" ], + "properties": { + "sha1": { + "bsonType": "Binary", + "description": "" + }, + } + }, + 'revision': { + "bsonType": "object", + "required": [ "sha1" ], + "properties": { + "sha1": { + "bsonType": "Binary", + "description": "" + }, + } + } +} diff --git a/swh/provenance/mongo/storage.py b/swh/provenance/mongo/storage.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/storage.py @@ -0,0 +1,104 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime, timezone +import os +from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Union + +from bson import ObjectId +import pymongo.database + +from swh.model.model import Sha1Git + +from ..interface import ( + EntityType, + ProvenanceResult, + RelationData, + RelationType, + RevisionData, +) + + +class ProvenanceStorageMongoDb: + def __init__(self, db: pymongo.database.Database): + self.db = db + + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + pass + + def content_find_all(self, id: Sha1Git, limit: Optional[int] = None) -> Generator[ProvenanceResult, None, None]: + pass + + def content_add(self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool: + for each_cnt in cnts: + try: + Entity.factory('content')(each_cnt).add_if_older() + except DBError as e: + # logging and skipping this item + # FIXME, add logging, raise if needed + pass + return True + + def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + try: + contents = Entity.factory('content').get_all_in_list({'sha1': ids}) + except DBError as e: + # logging and returning None + # FIXME, add logging, raise if needed + return None + return Entity.factory('content').dump_list_as_date_dict(contents) + + def directory_add(self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool: + for each_cnt in cnts: + try: + Entity.factory('directory')(each_cnt).add_if_older() + except DBError as e: + # logging and skipping this item + # FIXME, add logging, raise if needed + pass + return True + + def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + try: + directories = Entity.factory('directory').get_all_in_list({'sha1': ids}) + except DBError as e: + # logging and returning None + # FIXME, add logging, raise if needed + return None + return Entity.factory('directory').dump_list_as_date_dict(contents) + + def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: + return Entity.factory(entity).get_all_in_list({'sha1': ids}) + + def location_add(self, paths: Iterable[bytes]) -> bool: + # TODO: implement this methods if path are to be stored in a separate collection + return True + + def location_get_all(self) -> Set[bytes]: + pass + + def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: + return True + + def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: + pass + + def revision_add(self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]) -> bool: + return True + + def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: + pass + + def relation_add(self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]) -> bool: + return True + + def relation_get(self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False) -> Dict[Sha1Git, Set[RelationData]]: + return pass + + def relation_get_all(self, relation: RelationType) -> Dict[Sha1Git, Set[RelationData]]: + pass + + def with_path(self) -> bool: + return True diff --git a/swh/provenance/mongo/tests/__init__.py b/swh/provenance/mongo/tests/__init__.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/tests/__init__.py @@ -0,0 +1 @@ +pass