diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py --- a/swh/provenance/mongo/backend.py +++ b/swh/provenance/mongo/backend.py @@ -8,6 +8,7 @@ from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Union from bson import ObjectId +from pymongo import UpdateOne import pymongo.database from swh.model.model import Sha1Git @@ -19,39 +20,43 @@ RelationType, RevisionData, ) +from .entity import Entity class ProvenanceStorageMongoDb: def __init__(self, db: pymongo.database.Database): self.db = db + def _format_data(self, data: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]): + return data if isinstance(data, dict) else dict.fromkeys(data) + + def _generate_date_upserts(self, sha1, date): + ts = datetime.timestamp(date) if date is not None else None + # update only those with date either as None or later than the given one + return UpdateOne( + {"$and": [{"sha1": sha1}, {"$or": [{"ts": None}, {"ts": {"$gt": ts}}]}]}, + {"$set": {"ts": ts, "sha1": sha1}}, + upsert=True, + ) + def content_add( self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] ) -> bool: - data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts) - existing = { - x["sha1"]: x - for x in self.db.content.find( - {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} - ) - } - for sha1, date in data.items(): - ts = datetime.timestamp(date) if date is not None else None - if sha1 in existing: - cnt = existing[sha1] - if ts is not None and (cnt["ts"] is None or ts < cnt["ts"]): - self.db.content.update_one( - {"_id": cnt["_id"]}, {"$set": {"ts": ts}} - ) - else: - self.db.content.insert_one( - { - "sha1": sha1, - "ts": ts, - "revision": {}, - "directory": {}, - } - ) + writes = [ + self._generate_date_upserts(sha1, date) + for sha1, date in self._format_data(cnts).items() + ] + Entity.factory("content").bulk_write(self.db, writes) + return True + + def directory_add( + self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] + ) -> bool: + writes = [ + self._generate_date_upserts(sha1, date) + for sha1, date in self._format_data(dirs).items() + ] + Entity.factory("directory").bulk_write(self.db, writes) return True def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: @@ -109,7 +114,11 @@ ) ) for directory in self.db.directory.find( - {"_id": {"$in": [ObjectId(obj_id) for obj_id in content["directory"]]}} + { + "_id": { + "$in": [ObjectId(obj_id) for obj_id in content.get("directory", {})] + } + } ): for revision in self.db.revision.find( {"_id": {"$in": [ObjectId(obj_id) for obj_id in directory["revision"]]}} @@ -148,28 +157,6 @@ ) } - def directory_add( - self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] - ) -> bool: - data = dirs if isinstance(dirs, dict) else dict.fromkeys(dirs) - existing = { - x["sha1"]: x - for x in self.db.directory.find( - {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} - ) - } - for sha1, date in data.items(): - ts = datetime.timestamp(date) if date is not None else None - if sha1 in existing: - dir = existing[sha1] - if ts is not None and (dir["ts"] is None or ts < dir["ts"]): - self.db.directory.update_one( - {"_id": dir["_id"]}, {"$set": {"ts": ts}} - ) - else: - self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}}) - return True - def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return { x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) @@ -319,7 +306,7 @@ if src_relation != "revision": k = { obj_id: list(set(paths + dsts.get(obj_id, []))) - for obj_id, paths in src_objs[sha1][dst_relation].items() + for obj_id, paths in src_objs[sha1].get(dst_relation, {}).items() } self.db.get_collection(src_relation).update_one( {"_id": src_objs[sha1]["_id"]}, @@ -388,7 +375,7 @@ ) } src_objs = { - x["sha1"]: x[dst] + x["sha1"]: x.get(dst, {}) for x in self.db.get_collection(src).find( {}, {"_id": 0, "sha1": 1, dst: 1} ) @@ -420,7 +407,7 @@ src, *_, dst = relation.value.split("_") empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else [] src_objs = { - x["sha1"]: x[dst] + x["sha1"]: x.get(dst, {}) for x in self.db.get_collection(src).find( {dst: {"$ne": empty}}, {"_id": 0, "sha1": 1, dst: 1} ) diff --git a/swh/provenance/mongo/cache.py b/swh/provenance/mongo/cache.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/cache.py @@ -0,0 +1,8 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class Cache: + pass diff --git a/swh/provenance/mongo/data_model.json b/swh/provenance/mongo/data_model.json new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/data_model.json @@ -0,0 +1,2 @@ +{ +} diff --git a/swh/provenance/mongo/entity.py b/swh/provenance/mongo/entity.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/entity.py @@ -0,0 +1,64 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from .errors import EntityError, DBError + + +class Entity: + collection = "" + validate = True + data_model = {} + + def __init__(self, data=None): + self.data = data + + def save(self): + # validate the model if self.validate is true + # FIXME, set the obj in the cache + pass + + def get(self, qry): + # FIXME, get from cache + pass + + def bulk_write(self, db, writes): + if not writes: + return None + try: + result = db.get_collection(self.collection).bulk_write(writes) + except DBError as e: + # Log as mongo error + raise e + else: + return result + + @staticmethod + def factory(entity): + mapping = {"content": Content, "directory": Directory} + if entity in mapping: + return mapping[entity]() + raise EntityError(f"invalid entity type {entity}") + + +class Content(Entity): + collection = "content" + validate = False + + @property + def revision(self): + return self.data.get("revision", {}) + + @property + def directory(self): + return self.data.get("directory", {}) + + +class Directory(Entity): + collection = "directory" + validate = False + + @property + def revision(self): + return self.data.get("revision", {}) diff --git a/swh/provenance/mongo/errors.py b/swh/provenance/mongo/errors.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/errors.py @@ -0,0 +1,13 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class DBError(Exception): + # FIXME, add mongo specific logging + pass + + +class EntityError(Exception): + pass diff --git a/swh/provenance/tests/mongo/test_backend.py b/swh/provenance/tests/mongo/test_backend.py new file mode 100644 --- /dev/null +++ b/swh/provenance/tests/mongo/test_backend.py @@ -0,0 +1,236 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +Unit tests for the mongo backend +""" + +from datetime import datetime + +import pymongo.database +import pytest + +from swh.provenance.mongo.backend import ProvenanceStorageMongoDb + + +class TestBackend: + @pytest.fixture + def backend(self, mongodb: pymongo.database.Database): + return ProvenanceStorageMongoDb(mongodb) + + # add content tests + + def test_add_content_empty(self, backend, mongodb): + assert backend.content_add({}) is True + assert mongodb.content.count_documents({}) == 0 + + assert backend.content_add([]) is True + assert mongodb.content.count_documents({}) == 0 + + def test_add_content_with_insert(self, backend, mongodb): + # add data using add_contnet + # get data from mongo and compare + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + data = {sha1: None} + assert backend.content_add(data) is True + + assert mongodb.content.count_documents({}) == 1 + cnt = mongodb.content.find_one({"sha1": sha1}) + assert cnt["ts"] is None + assert cnt["revision"] == {} + assert cnt["directory"] == {} + + def test_add_content_with_update_later_date(self, backend, mongodb): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + mongodb.content.insert_one( + {"sha1": sha1, "ts": 1631881748, "revision": revision, "directory": {}} + ) + + new_date = datetime(2010, 10, 10) + data = {sha1: new_date} + # data has a date before the one in the db + assert backend.content_add(data) is True + cnt = mongodb.content.find_one({"sha1": sha1}) + assert cnt["ts"] == new_date.timestamp() + assert cnt["revision"] == revision + assert cnt["directory"] == {} + + def test_add_content_with_update_none_date(self, backend, mongodb): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + mongodb.content.insert_one( + {"sha1": sha1, "ts": None, "revision": revision, "directory": {}} + ) + + new_date = datetime(2010, 10, 10) + data = {sha1: new_date} + # data has a date before the one in the db + assert backend.content_add(data) is True + cnt = mongodb.content.find_one({"sha1": sha1}) + assert cnt["ts"] == new_date.timestamp() + assert cnt["revision"] == revision + assert cnt["directory"] == {} + + def test_add_content_do_not_update_older_date(self, backend, mongodb): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + mongodb.content.insert_one( + {"sha1": sha1, "ts": 1286661600, "revision": revision, "directory": {}} + ) + + new_date = datetime(2020, 10, 10) + data = {sha1: new_date} + # data has a date after the one in the db + assert backend.content_add(data) is True + cnt = mongodb.content.find_one({"sha1": sha1}) + assert cnt["ts"] == 1286661600 + + def test_add_content_multiple(self, backend, mongodb): + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + sha1_3 = "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + sha1_4 = "3ebfa301dc59196f18593c45e519287a23297589" + revision = {"test": "test"} + mongodb.content.insert_one( + {"sha1": sha1_1, "ts": 1286661600, "revision": revision, "directory": {}} + ) + mongodb.content.insert_one( + {"sha1": sha1_2, "ts": None, "revision": revision, "directory": {}} + ) + mongodb.content.insert_one( + {"sha1": sha1_3, "ts": 1631889655, "revision": revision, "directory": {}} + ) + + data = { + sha1_1: datetime(2020, 10, 10), # given date is in future, no update + sha1_2: datetime(2020, 10, 10), # will update None date + sha1_3: datetime(2010, 10, 10), # date in the past, will update + sha1_4: datetime(2010, 10, 10), # new rcd, will insert + } + + assert backend.content_add(data) is True + mongodb.content.count_documents({}) == 4 + cnt = mongodb.content.find_one({"sha1": sha1_1}) + assert cnt["ts"] == 1286661600 + assert cnt["revision"] == revision + assert cnt["directory"] == {} + + cnt = mongodb.content.find_one({"sha1": sha1_2}) + assert cnt["ts"] == datetime(2020, 10, 10).timestamp() + assert cnt["revision"] == revision + assert cnt["directory"] == {} + + cnt = mongodb.content.find_one({"sha1": sha1_3}) + assert cnt["ts"] == datetime(2010, 10, 10).timestamp() + assert cnt["revision"] == revision + assert cnt["directory"] == {} + + cnt = mongodb.content.find_one({"sha1": sha1_4}) + assert cnt["ts"] == datetime(2010, 10, 10).timestamp() + assert cnt["revision"] == {} + assert cnt["directory"] == {} + + # add directory tests + + def test_add_directory_empty(self, backend, mongodb): + assert backend.directory_add({}) is True + assert mongodb.directory.count_documents({}) == 0 + + assert backend.directory_add([]) is True + assert mongodb.directory.count_documents({}) == 0 + + def test_add_directory_with_insert(self, backend, mongodb): + # add data using add_contnet + # get data from mongo and compare + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + data = {sha1: None} + assert backend.directory_add(data) is True + + assert mongodb.directory.count_documents({}) == 1 + cnt = mongodb.directory.find_one({"sha1": sha1}) + assert cnt["ts"] is None + assert cnt["revision"] == {} + + def test_add_directory_with_update_later_date(self, backend, mongodb): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + mongodb.directory.insert_one( + {"sha1": sha1, "ts": 1631881748, "revision": revision} + ) + + new_date = datetime(2010, 10, 10) + data = {sha1: new_date} + # data has a date before the one in the db + assert backend.directory_add(data) is True + diy = mongodb.directory.find_one({"sha1": sha1}) + assert diy["ts"] == new_date.timestamp() + assert diy["revision"] == revision + + def test_add_directory_with_update_none_date(self, backend, mongodb): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + mongodb.directory.insert_one({"sha1": sha1, "ts": None, "revision": revision}) + + new_date = datetime(2010, 10, 10) + data = {sha1: new_date} + # data has a date before the one in the db + assert backend.directory_add(data) is True + diy = mongodb.directory.find_one({"sha1": sha1}) + assert diy["ts"] == new_date.timestamp() + assert diy["revision"] == revision + + def test_add_directory_do_not_update_older_date(self, backend, mongodb): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + mongodb.directory.insert_one( + {"sha1": sha1, "ts": 1286661600, "revision": revision} + ) + + new_date = datetime(2020, 10, 10) + data = {sha1: new_date} + # data has a date after the one in the db + assert backend.directory_add(data) is True + cnt = mongodb.directory.find_one({"sha1": sha1}) + assert cnt["ts"] == 1286661600 + + def test_add_directory_multiple(self, backend, mongodb): + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + sha1_3 = "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + sha1_4 = "3ebfa301dc59196f18593c45e519287a23297589" + revision = {"test": "test"} + mongodb.directory.insert_one( + {"sha1": sha1_1, "ts": 1286661600, "revision": revision} + ) + mongodb.directory.insert_one({"sha1": sha1_2, "ts": None, "revision": revision}) + mongodb.directory.insert_one( + {"sha1": sha1_3, "ts": 1631889655, "revision": revision} + ) + + data = { + sha1_1: datetime(2020, 10, 10), # given date is in future, no update + sha1_2: datetime(2020, 10, 10), # will update None date + sha1_3: datetime(2010, 10, 10), # date in the past, will update + sha1_4: datetime(2010, 10, 10), # new rcd, will insert + } + + assert backend.directory_add(data) is True + mongodb.directory.count_documents({}) == 4 + dry = mongodb.directory.find_one({"sha1": sha1_1}) + assert dry["ts"] == 1286661600 + assert dry["revision"] == revision + + dry = mongodb.directory.find_one({"sha1": sha1_2}) + assert dry["ts"] == datetime(2020, 10, 10).timestamp() + assert dry["revision"] == revision + + dry = mongodb.directory.find_one({"sha1": sha1_3}) + assert dry["ts"] == datetime(2010, 10, 10).timestamp() + assert dry["revision"] == revision + + dry = mongodb.directory.find_one({"sha1": sha1_4}) + assert dry["ts"] == datetime(2010, 10, 10).timestamp() + assert dry["revision"] == {}