diff --git a/swh/provenance/mongo/README.md b/swh/provenance/mongo/README.md --- a/swh/provenance/mongo/README.md +++ b/swh/provenance/mongo/README.md @@ -42,3 +42,36 @@ path: str } ``` + +Flat model +---------- + +content { +} + +directory { +} + +revision { +} + +origin { +} + +content_in_reviosn { +} + +content_in_directory { +} + +directory_in_revision { +} + +revision_before_revision { +} + +revision_in_origin { +} + +location { +} diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py --- a/swh/provenance/mongo/backend.py +++ b/swh/provenance/mongo/backend.py @@ -13,6 +13,7 @@ from bson import ObjectId import mongomock import pymongo +from pymongo import UpdateOne, InsertOne from swh.core.statsd import statsd from swh.model.model import Sha1Git @@ -25,6 +26,7 @@ RelationType, RevisionData, ) +from .entity import Entity STORAGE_DURATION_METRIC = "swh_provenance_storage_mongodb_duration_seconds" @@ -51,65 +53,78 @@ def close(self) -> None: self.db.client.close() + def _format_data(self, data: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]): + return data if isinstance(data, dict) else dict.fromkeys(data) + + def _generate_date_upserts(self, sha1, date, inserts={}): + ts = datetime.timestamp(date) if date is not None else None + # update only those with date either as None or later than the given one + return UpdateOne( + {"sha1": sha1}, + { + "$set": { + "ts": ts, # FIXME, check set condition ts < db.ts + # version restrictions, aggregate + }, + "$setOnInsert": inserts, + }, + upsert=True + ) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) def content_add( self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] ) -> bool: - data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts) - existing = { - x["sha1"]: x - for x in self.db.content.find( - {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} - ) - } - for sha1, date in data.items(): - ts = datetime.timestamp(date) if date is not None else None - if sha1 in existing: - cnt = existing[sha1] - if ts is not None and (cnt["ts"] is None or ts < cnt["ts"]): - self.db.content.update_one( - {"_id": cnt["_id"]}, {"$set": {"ts": ts}} - ) - else: - self.db.content.insert_one( - { - "sha1": sha1, - "ts": ts, - "revision": {}, - "directory": {}, - } - ) + + default_inserts = {"revisions": {}, "directories": {}} + writes = [ + self._generate_date_upserts(sha1, date, default_inserts) + for sha1, date in self._format_data(cnts).items() + ] + Entity.factory("content").bulk_write(self.db, writes) return True + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) + def directory_add( + self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] + ) -> bool: + writes = [ + self._generate_date_upserts(sha1, date) + for sha1, date in self._format_data(dirs).items() + ] + Entity.factory("directory").bulk_write(self.db, writes) + return True + + def _get_oldest_revision_from_content(self, content): + # FIXME, returing with the assumption that content has all the revisons in the array + # Change to seperate collection content_in_revision if gets too big + return self.db.revision.find_one( + { + "_id": {"$in": [ObjectId(obj_id) for obj_id in content["revision"]]}, + "ts": content["ts"], + } + ) + + def _get_preferred_origin(self, revision): + return self.db.origin.find_one({"sha1": revision["preferred"]}) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"}) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - # get all the revisions - # iterate and find the earliest content = self.db.content.find_one({"sha1": id}) if not content: return None - occurs = [] - for revision in self.db.revision.find( - {"_id": {"$in": [ObjectId(obj_id) for obj_id in content["revision"]]}} - ): - if revision["preferred"] is not None: - origin = self.db.origin.find_one({"sha1": revision["preferred"]}) - else: - origin = {"url": None} - - for path in content["revision"][str(revision["_id"])]: - occurs.append( - ProvenanceResult( - content=id, - revision=revision["sha1"], - date=datetime.fromtimestamp(revision["ts"], timezone.utc), - origin=origin["url"], - path=path, - ) - ) - return sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path))[0] + oldest_revision = self._get_oldest_revision_from_content(content) + origin = self._get_preferred_origin(oldest_revision) + return ProvenanceResult( + content=id, + revision=oldest_revision["sha1"], + date=datetime.fromtimestamp(oldest_revision["ts"], timezone.utc), + origin=origin["url"] if origin else None, + path="", # FIXME, find the right path + ) + # FIXME, refactor this @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"}) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None @@ -138,7 +153,11 @@ ) ) for directory in self.db.directory.find( - {"_id": {"$in": [ObjectId(obj_id) for obj_id in content["directory"]]}} + { + "_id": { + "$in": [ObjectId(obj_id) for obj_id in content.get("directory", {})] + } + } ): for revision in self.db.revision.find( {"_id": {"$in": [ObjectId(obj_id) for obj_id in directory["revision"]]}} @@ -178,29 +197,6 @@ ) } - @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) - def directory_add( - self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] - ) -> bool: - data = dirs if isinstance(dirs, dict) else dict.fromkeys(dirs) - existing = { - x["sha1"]: x - for x in self.db.directory.find( - {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} - ) - } - for sha1, date in data.items(): - ts = datetime.timestamp(date) if date is not None else None - if sha1 in existing: - dir = existing[sha1] - if ts is not None and (dir["ts"] is None or ts < dir["ts"]): - self.db.directory.update_one( - {"_id": dir["_id"]}, {"$set": {"ts": ts}} - ) - else: - self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}}) - return True - @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"}) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return { @@ -215,7 +211,8 @@ def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: return { x["sha1"] - for x in self.db.get_collection(entity.value).find( + # for x in self.db.get_collection(entity.value).find( + for x in self.db.get_collection(entity).find( # Temp fix for test {}, {"sha1": 1, "_id": 0} ) } @@ -248,16 +245,10 @@ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"}) def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: - existing = { - x["sha1"]: x - for x in self.db.origin.find( - {"sha1": {"$in": list(orgs)}}, {"sha1": 1, "url": 1, "_id": 1} - ) - } - for sha1, url in orgs.items(): - if sha1 not in existing: - # add new origin - self.db.origin.insert_one({"sha1": sha1, "url": url}) + writes = [ + InsertOne({"sha1": sha1, "url": url}) for (sha1, url) in orgs.items() + ] + Entity.factory("origin").bulk_write(self.db, writes) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"}) @@ -367,7 +358,7 @@ if src_relation != "revision": k = { obj_id: list(set(paths + dsts.get(obj_id, []))) - for obj_id, paths in src_objs[sha1][dst_relation].items() + for obj_id, paths in src_objs[sha1].get(dst_relation, {}).items() } self.db.get_collection(src_relation).update_one( {"_id": src_objs[sha1]["_id"]}, @@ -437,7 +428,7 @@ ) } src_objs = { - x["sha1"]: x[dst] + x["sha1"]: x.get(dst, {}) for x in self.db.get_collection(src).find( {}, {"_id": 0, "sha1": 1, dst: 1} ) @@ -470,7 +461,7 @@ src, *_, dst = relation.value.split("_") empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else [] src_objs = { - x["sha1"]: x[dst] + x["sha1"]: x.get(dst, {}) for x in self.db.get_collection(src).find( {dst: {"$ne": empty}}, {"_id": 0, "sha1": 1, dst: 1} ) @@ -509,3 +500,23 @@ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "with_path"}) def with_path(self) -> bool: return True + + def entity_get(self, entity, filters=None): + """ + A method used only by tests + Returns a list of entities after applying the filter + """ + # FIXME, this should be added to the interface to make + # tests backend agnostic. Adding this in SQL could be tricky + # FIXME, change to entity factory + return list(self.db.get_collection(entity).find(filters)) + + def entities_add(self, entity, data): + """ + A method used only by tests + """ + # FIXME, this should be added to the interface to make + # tests backend agnostic. Adding this in SQL could be tricky + # FIXME, change to entity factory + for each_entity in data: + self.db.get_collection(entity).insert_one(each_entity) diff --git a/swh/provenance/mongo/cache.py b/swh/provenance/mongo/cache.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/cache.py @@ -0,0 +1,8 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class Cache: + pass diff --git a/swh/provenance/mongo/data_model.json b/swh/provenance/mongo/data_model.json new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/data_model.json @@ -0,0 +1,2 @@ +{ +} diff --git a/swh/provenance/mongo/entity.py b/swh/provenance/mongo/entity.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/entity.py @@ -0,0 +1,69 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from .errors import EntityError, DBError + + +class Entity: + collection = "" + validate = True + data_model = {} + + def __init__(self, data=None): + self.data = data + + def save(self): + # validate the model if self.validate is true + # FIXME, set the obj in the cache + pass + + def get(self, qry): + # FIXME, get from cache + pass + + def bulk_write(self, db, writes): + if not writes: + return None + try: + result = db.get_collection(self.collection).bulk_write(writes) + except DBError as e: + # Log as mongo error + raise e + else: + return result + + @staticmethod + def factory(entity): + mapping = {"content": Content, "directory": Directory, "origin": Origin} + if entity in mapping: + return mapping[entity]() + raise EntityError(f"invalid entity type {entity}") + + +class Content(Entity): + collection = "content" + validate = False + + @property + def revision(self): + return self.data.get("revision", {}) + + @property + def directory(self): + return self.data.get("directory", {}) + + +class Directory(Entity): + collection = "directory" + validate = False + + @property + def revision(self): + return self.data.get("revision", {}) + + +class Origin(Entity): + collection = "origin" + validate = False diff --git a/swh/provenance/mongo/errors.py b/swh/provenance/mongo/errors.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/errors.py @@ -0,0 +1,13 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class DBError(Exception): + # FIXME, add mongo specific logging + pass + + +class EntityError(Exception): + pass