diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py --- a/swh/provenance/mongo/backend.py +++ b/swh/provenance/mongo/backend.py @@ -8,6 +8,7 @@ from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Union from bson import ObjectId +from pymongo import UpdateOne import pymongo.database from swh.model.model import Sha1Git @@ -19,39 +20,43 @@ RelationType, RevisionData, ) +from .entity import Entity class ProvenanceStorageMongoDb: def __init__(self, db: pymongo.database.Database): self.db = db + def _format_data(self, data: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]): + return data if isinstance(data, dict) else dict.fromkeys(data) + + def _generate_date_upserts(self, sha1, date): + ts = datetime.timestamp(date) if date is not None else None + # update only those with date either as None or later than the given one + return UpdateOne( + {"$and": [{"sha1": sha1}, {"$or": [{"ts": None}, {"ts": {"$gt": ts}}]}]}, + {"$set": {"ts": ts, "sha1": sha1}}, + upsert=True, + ) + def content_add( self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] ) -> bool: - data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts) - existing = { - x["sha1"]: x - for x in self.db.content.find( - {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} - ) - } - for sha1, date in data.items(): - ts = datetime.timestamp(date) if date is not None else None - if sha1 in existing: - cnt = existing[sha1] - if ts is not None and (cnt["ts"] is None or ts < cnt["ts"]): - self.db.content.update_one( - {"_id": cnt["_id"]}, {"$set": {"ts": ts}} - ) - else: - self.db.content.insert_one( - { - "sha1": sha1, - "ts": ts, - "revision": {}, - "directory": {}, - } - ) + writes = [ + self._generate_date_upserts(sha1, date) + for sha1, date in self._format_data(cnts).items() + ] + Entity.factory("content").bulk_write(self.db, writes) + return True + + def directory_add( + self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] + ) -> bool: + writes = [ + self._generate_date_upserts(sha1, date) + for sha1, date in self._format_data(dirs).items() + ] + Entity.factory("directory").bulk_write(self.db, writes) return True def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: @@ -109,7 +114,11 @@ ) ) for directory in self.db.directory.find( - {"_id": {"$in": [ObjectId(obj_id) for obj_id in content["directory"]]}} + { + "_id": { + "$in": [ObjectId(obj_id) for obj_id in content.get("directory", {})] + } + } ): for revision in self.db.revision.find( {"_id": {"$in": [ObjectId(obj_id) for obj_id in directory["revision"]]}} @@ -148,28 +157,6 @@ ) } - def directory_add( - self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] - ) -> bool: - data = dirs if isinstance(dirs, dict) else dict.fromkeys(dirs) - existing = { - x["sha1"]: x - for x in self.db.directory.find( - {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} - ) - } - for sha1, date in data.items(): - ts = datetime.timestamp(date) if date is not None else None - if sha1 in existing: - dir = existing[sha1] - if ts is not None and (dir["ts"] is None or ts < dir["ts"]): - self.db.directory.update_one( - {"_id": dir["_id"]}, {"$set": {"ts": ts}} - ) - else: - self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}}) - return True - def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return { x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) @@ -319,7 +306,7 @@ if src_relation != "revision": k = { obj_id: list(set(paths + dsts.get(obj_id, []))) - for obj_id, paths in src_objs[sha1][dst_relation].items() + for obj_id, paths in src_objs[sha1].get(dst_relation, {}).items() } self.db.get_collection(src_relation).update_one( {"_id": src_objs[sha1]["_id"]}, @@ -388,7 +375,7 @@ ) } src_objs = { - x["sha1"]: x[dst] + x["sha1"]: x.get(dst, {}) for x in self.db.get_collection(src).find( {}, {"_id": 0, "sha1": 1, dst: 1} ) @@ -420,7 +407,7 @@ src, *_, dst = relation.value.split("_") empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else [] src_objs = { - x["sha1"]: x[dst] + x["sha1"]: x.get(dst, {}) for x in self.db.get_collection(src).find( {dst: {"$ne": empty}}, {"_id": 0, "sha1": 1, dst: 1} ) diff --git a/swh/provenance/mongo/cache.py b/swh/provenance/mongo/cache.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/cache.py @@ -0,0 +1,8 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class Cache: + pass diff --git a/swh/provenance/mongo/data_model.json b/swh/provenance/mongo/data_model.json new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/data_model.json @@ -0,0 +1,2 @@ +{ +} diff --git a/swh/provenance/mongo/entity.py b/swh/provenance/mongo/entity.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/entity.py @@ -0,0 +1,64 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from .errors import EntityError, DBError + + +class Entity: + collection = "" + validate = True + data_model = {} + + def __init__(self, data=None): + self.data = data + + def save(self): + # validate the model if self.validate is true + # FIXME, set the obj in the cache + pass + + def get(self, qry): + # FIXME, get from cache + pass + + def bulk_write(self, db, writes): + if not writes: + return None + try: + result = db.get_collection(self.collection).bulk_write(writes) + except DBError as e: + # Log as mongo error + raise e + else: + return result + + @staticmethod + def factory(entity): + mapping = {"content": Content, "directory": Directory} + if entity in mapping: + return mapping[entity]() + raise EntityError(f"invalid entity type {entity}") + + +class Content(Entity): + collection = "content" + validate = False + + @property + def revision(self): + return self.data.get("revision", {}) + + @property + def directory(self): + return self.data.get("directory", {}) + + +class Directory(Entity): + collection = "directory" + validate = False + + @property + def revision(self): + return self.data.get("revision", {}) diff --git a/swh/provenance/mongo/errors.py b/swh/provenance/mongo/errors.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/errors.py @@ -0,0 +1,13 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class DBError(Exception): + # FIXME, add mongo specific logging + pass + + +class EntityError(Exception): + pass