diff --git a/swh/provenance/mongo/README.md b/swh/provenance/mongo/README.md --- a/swh/provenance/mongo/README.md +++ b/swh/provenance/mongo/README.md @@ -42,3 +42,36 @@ path: str } ``` + +Flat model +---------- + +content { +} + +directory { +} + +revision { +} + +origin { +} + +content_in_reviosn { +} + +content_in_directory { +} + +directory_in_revision { +} + +revision_before_revision { +} + +revision_in_origin { +} + +location { +} diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py --- a/swh/provenance/mongo/backend.py +++ b/swh/provenance/mongo/backend.py @@ -13,6 +13,7 @@ from bson import ObjectId import mongomock import pymongo +from pymongo import InsertOne, UpdateOne from swh.core.statsd import statsd from swh.model.model import Sha1Git @@ -25,6 +26,7 @@ RelationType, RevisionData, ) +from .entity import Entity STORAGE_DURATION_METRIC = "swh_provenance_storage_mongodb_duration_seconds" @@ -51,44 +53,121 @@ def close(self) -> None: self.db.client.close() + def _generate_date_upserts(self, sha1, date, inserts={}, upsert=True): + ts = datetime.timestamp(date) if date is not None else None + # update only those with date either as None or later than the given one + return UpdateOne( + {"sha1": sha1, "$or": [{"ts": {"$gte": ts}}, {"ts": None}]}, + { + "$set": {"ts": ts}, + "$setOnInsert": inserts, + }, + upsert=upsert, + ) + + def _generate_date_upserts_alternate(self, sha1, date, inserts={}, upsert=True): + # FIXME, Compare the performance and decide which one to keep + ts = datetime.timestamp(date) if date is not None else None + # update only those with date either as None or later than the given one + return UpdateOne( + {"sha1": sha1}, + [ + { + "$set": { + "ts": { + "$cond": { + "if": {"$gt": ["$ts", ts]}, + "then": ts, + "else": None, + } + } + }, + }, + ], + upsert=upsert, + ) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) def content_add( self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] ) -> bool: - data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts) - existing = { - x["sha1"]: x - for x in self.db.content.find( - {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} - ) - } - for sha1, date in data.items(): - ts = datetime.timestamp(date) if date is not None else None - if sha1 in existing: - cnt = existing[sha1] - if ts is not None and (cnt["ts"] is None or ts < cnt["ts"]): - self.db.content.update_one( - {"_id": cnt["_id"]}, {"$set": {"ts": ts}} - ) - else: - self.db.content.insert_one( - { - "sha1": sha1, - "ts": ts, - "revision": {}, - "directory": {}, - } - ) + default_inserts = {"revision": {}, "directory": {}} + if isinstance(cnts, dict): + writes = [ + self._generate_date_upserts(sha1, date, default_inserts) + for sha1, date in cnts.items() + ] + else: + writes = [ + self._generate_date_upserts(sha1, None, default_inserts) + for sha1 in cnts + ] + statsd.gauge("provenance.content-list.size", len(writes)) + Entity.factory(self.db, "content").bulk_write(writes) return True + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) + def directory_add( + self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] + ) -> bool: + default_inserts = {"revision": {}} + if isinstance(dirs, dict): + writes = [ + self._generate_date_upserts(sha1, date, default_inserts) + for sha1, date in dirs.items() + ] + else: + writes = [ + self._generate_date_upserts(sha1, None, default_inserts) + for sha1 in dirs + ] + statsd.gauge("provenance.directory-list.size", len(writes)) + Entity.factory(self.db, "directory").bulk_write(writes) + return True + + def _get_oldest_revision_from_content(self, content): + # FIXME, returing with the assumption that content has all the + # revisons in the array + # Change to seperate collection content_in_revision if gets too big + return Entity.factory(self.db, "revision").find_one( + { + "_id": {"$in": [ObjectId(obj_id) for obj_id in content["revision"]]}, + "ts": content["ts"], + } + ) + + def _get_preferred_origin(self, revision): + if revision.get("preferred"): + return Entity.factory(self.db, "origin").find_one( + {"sha1": revision["preferred"]} + ) + return None + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"}) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + content = Entity.factory(self.db, "content").find_one({"sha1": id}) + if not content: + return None + + oldest_revision = self._get_oldest_revision_from_content(content) + origin = self._get_preferred_origin(oldest_revision) + + return ProvenanceResult( + content=id, + revision=oldest_revision["sha1"], + date=datetime.fromtimestamp(oldest_revision["ts"], timezone.utc), + origin=origin["url"] if origin else None, + path=sorted(content["revision"][str(oldest_revision["_id"])])[0] + # FIXME, this is likely to be wrong, tests pass + ) + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"}) + def content_find_first_old(self, id: Sha1Git) -> Optional[ProvenanceResult]: # get all the revisions # iterate and find the earliest content = self.db.content.find_one({"sha1": id}) if not content: return None - occurs = [] for revision in self.db.revision.find( {"_id": {"$in": [ObjectId(obj_id) for obj_id in content["revision"]]}} @@ -97,7 +176,6 @@ origin = self.db.origin.find_one({"sha1": revision["preferred"]}) else: origin = {"url": None} - for path in content["revision"][str(revision["_id"])]: occurs.append( ProvenanceResult( @@ -110,6 +188,7 @@ ) return sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path))[0] + # FIXME, refactor this @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"}) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None @@ -138,7 +217,11 @@ ) ) for directory in self.db.directory.find( - {"_id": {"$in": [ObjectId(obj_id) for obj_id in content["directory"]]}} + { + "_id": { + "$in": [ObjectId(obj_id) for obj_id in content.get("directory", {})] + } + } ): for revision in self.db.revision.find( {"_id": {"$in": [ObjectId(obj_id) for obj_id in directory["revision"]]}} @@ -171,41 +254,18 @@ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"}) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return { - x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) - for x in self.db.content.find( + cnt["sha1"]: datetime.fromtimestamp(cnt["ts"], timezone.utc) + for cnt in Entity.factory(self.db, "content").find( {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, {"sha1": 1, "ts": 1, "_id": 0}, ) } - @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) - def directory_add( - self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] - ) -> bool: - data = dirs if isinstance(dirs, dict) else dict.fromkeys(dirs) - existing = { - x["sha1"]: x - for x in self.db.directory.find( - {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} - ) - } - for sha1, date in data.items(): - ts = datetime.timestamp(date) if date is not None else None - if sha1 in existing: - dir = existing[sha1] - if ts is not None and (dir["ts"] is None or ts < dir["ts"]): - self.db.directory.update_one( - {"_id": dir["_id"]}, {"$set": {"ts": ts}} - ) - else: - self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}}) - return True - @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"}) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return { - x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) - for x in self.db.directory.find( + dry["sha1"]: datetime.fromtimestamp(dry["ts"], timezone.utc) + for dry in Entity.factory(self.db, "directory").find( {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, {"sha1": 1, "ts": 1, "_id": 0}, ) @@ -213,6 +273,7 @@ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"}) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: + # test only method; not using entity factory or cache return { x["sha1"] for x in self.db.get_collection(entity.value).find( @@ -227,6 +288,7 @@ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"}) def location_get_all(self) -> Set[bytes]: + # test only method; not using entity factory or cache contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1}) paths: List[Iterable[bytes]] = [] for content in contents: @@ -244,35 +306,51 @@ self.db = mongomock.MongoClient(**self.conn_args).get_database(self.dbname) else: # assume real MongoDB server by default - self.db = pymongo.MongoClient(**self.conn_args).get_database(self.dbname) + self.db = pymongo.MongoClient(host="127.0.0.1", port=27017, connect=False).get_database(self.dbname) @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"}) def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: - existing = { - x["sha1"]: x - for x in self.db.origin.find( - {"sha1": {"$in": list(orgs)}}, {"sha1": 1, "url": 1, "_id": 1} - ) - } - for sha1, url in orgs.items(): - if sha1 not in existing: - # add new origin - self.db.origin.insert_one({"sha1": sha1, "url": url}) + writes = [InsertOne({"sha1": sha1, "url": url}) for (sha1, url) in orgs.items()] + Entity.factory(self.db, "origin").bulk_write(writes) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"}) def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: return { - x["sha1"]: x["url"] - for x in self.db.origin.find( + ori["sha1"]: ori["url"] + for ori in Entity.factory(self.db, "origin").find( {"sha1": {"$in": list(ids)}}, {"sha1": 1, "url": 1, "_id": 0} ) } + def _generate_revision_upserts(self, sha1, date=None, preferred=None, upsert=True): + ts = datetime.timestamp(date) if date is not None else None + return UpdateOne( + {"sha1": sha1, "$or": [{"ts": {"$gte": ts}}, {"ts": None}]}, + { + "$set": {"ts": ts, "preferred": preferred}, + "$setOnInsert": {"origin": [], "revision": []}, + }, + upsert=upsert, + ) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"}) def revision_add( - self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] + self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] ) -> bool: + # if isinstance(revs, dict): + # writes = [ + # self._generate_revision_upserts(sha1, data.date, data.origin) + # for sha1, data in revs.items() + # ] + # else: + # writes = [ + # self._generate_revision_upserts(sha1) + # for sha1 in cnts + # ] + # Entity.factory(self.db, "revision").bulk_write(writes) + # return True + data = ( revs if isinstance(revs, dict) @@ -285,6 +363,9 @@ {"sha1": 1, "ts": 1, "preferred": 1, "_id": 1}, ) } + + statsd.gauge("provenance.revision-cache.size", len(existing)) + for sha1, info in data.items(): ts = datetime.timestamp(info.date) if info.date is not None else None preferred = info.origin @@ -314,11 +395,13 @@ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"}) def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: return { - x["sha1"]: RevisionData( - date=datetime.fromtimestamp(x["ts"], timezone.utc) if x["ts"] else None, - origin=x["preferred"], + rev["sha1"]: RevisionData( + date=datetime.fromtimestamp(rev["ts"], timezone.utc) + if rev["ts"] + else None, + origin=rev["preferred"], ) - for x in self.db.revision.find( + for rev in Entity.factory(self.db, "revision").find( { "sha1": {"$in": list(ids)}, "$or": [{"preferred": {"$ne": None}}, {"ts": {"$ne": None}}], @@ -362,12 +445,15 @@ ) } + statsd.gauge("provenance.relation-src-cache.size", len(src_objs)) + statsd.gauge("provenance.relation-dst-cache.size", len(dst_objs)) + for sha1, dsts in denorm.items(): # update if src_relation != "revision": k = { obj_id: list(set(paths + dsts.get(obj_id, []))) - for obj_id, paths in src_objs[sha1][dst_relation].items() + for obj_id, paths in src_objs[sha1].get(dst_relation, {}).items() } self.db.get_collection(src_relation).update_one( {"_id": src_objs[sha1]["_id"]}, @@ -437,7 +523,7 @@ ) } src_objs = { - x["sha1"]: x[dst] + x["sha1"]: x.get(dst, {}) for x in self.db.get_collection(src).find( {}, {"_id": 0, "sha1": 1, dst: 1} ) @@ -467,10 +553,11 @@ def relation_get_all( self, relation: RelationType ) -> Dict[Sha1Git, Set[RelationData]]: + # test only method; not using entity factory or cache src, *_, dst = relation.value.split("_") empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else [] src_objs = { - x["sha1"]: x[dst] + x["sha1"]: x.get(dst, {}) for x in self.db.get_collection(src).find( {dst: {"$ne": empty}}, {"_id": 0, "sha1": 1, dst: 1} ) @@ -509,3 +596,25 @@ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "with_path"}) def with_path(self) -> bool: return True + + def entity_get(self, entity, filters=None): + """ + A method used only by tests + Returns a list of entities after applying the filter + """ + # FIXME, this should be added to the interface to make + # tests backend agnostic. Adding this in SQL could be tricky + # FIXME, change to entity factory + return list(Entity.factory(self.db, entity).find(filters)) + + def entity_add(self, entity, data): + """ + A method used only by tests + """ + # FIXME, this should be added to the interface to make + # tests backend agnostic. Adding this in SQL could be tricky + # FIXME, change to entity factory + return Entity.factory(self.db, entity).insert(data) + + def create_indexes(self, entity, fields, unique=False): + Entity.factory(self.db, entity).create_index(fields, unique) diff --git a/swh/provenance/mongo/backend_normalized.py b/swh/provenance/mongo/backend_normalized.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/backend_normalized.py @@ -0,0 +1,91 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from .backend import ProvenanceStorageMongoDb + +class MongoNormalized(ProvenanceStorageMongoDb): + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) + def content_add( + self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] + ) -> bool: + return True + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) + def directory_add( + self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] + ) -> bool: + return True + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"}) + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + pass + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"}) + def content_find_all( + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[ProvenanceResult, None, None]: + pass + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"}) + def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + pass + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"}) + def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + pass + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"}) + def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: + pass + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"}) + def location_add(self, paths: Iterable[bytes]) -> bool: + # TODO: implement this methods if path are to be stored in a separate collection + return True + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"}) + def location_get_all(self) -> Set[bytes]: + pass + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"}) + def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: + pass + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"}) + def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: + pass + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"}) + def revision_add( + self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] + ) -> bool: + return True + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"}) + def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: + pass + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"}) + def relation_add( + self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] + ) -> bool: + return True + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"}) + def relation_get( + self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False + ) -> Dict[Sha1Git, Set[RelationData]]: + pass + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get_all"}) + def relation_get_all( + self, relation: RelationType + ) -> Dict[Sha1Git, Set[RelationData]]: + pass + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "with_path"}) + def with_path(self) -> bool: + return True diff --git a/swh/provenance/mongo/cache.py b/swh/provenance/mongo/cache.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/cache.py @@ -0,0 +1,8 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class Cache: + pass diff --git a/swh/provenance/mongo/data_model.json b/swh/provenance/mongo/data_model.json new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/data_model.json @@ -0,0 +1,2 @@ +{ +} diff --git a/swh/provenance/mongo/entity.py b/swh/provenance/mongo/entity.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/entity.py @@ -0,0 +1,124 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from .errors import DBError, EntityError + + +def cache_get(func): + def wrapper(*args, **kw): + # implement cache as needed + return func(*args, **kw) + + return wrapper + + +class Entity: + collection = "" + validate = True + data_model = {} + + def __init__(self, db): + self.db = db + + def _get_collection(self): + # FIXME, add raise as needed + return self.db.get_collection(self.collection) + + @cache_get + def find_one(self, filters): + # validate the model if self.validate is true + return self._get_collection().find_one(filters) + + @cache_get + def find(self, *args, **kw): + # validate the model if self.validate is true + # checking inside the list could be a bad idea, so better to skip + return self._get_collection().find(*args, **kw) + + def insert(self, data): + if not data: + return None + try: + result = self._get_collection().insert_many(data, ordered=False) + except (DBError, Exception): # as e: + # Log as a mongo error + # raise e + pass + else: + return result + + def insert_one(self, obj): + # validate the model if self.validate is true + # FIXME, set the obj in the cache + if not obj: + return None + try: + result = self._get_collection().insert_one(obj) + except DBError as e: + # Log as a mongo error + raise e + else: + return result + + def bulk_write(self, writes): + # validate the model if self.validate is true, + # checking inside the list could be a bad idea, so better to skip + if not writes: + return None + try: + result = self._get_collection().bulk_write(writes, ordered=False) + except (DBError, Exception): # as e: + # Log as a mongo error + # raise e + pass + else: + return result + + def create_index(self, fields, unique=False): + self._get_collection().create_index(fields, unique=unique) + + @staticmethod + def factory(db, entity): + mapping = { + "content": Content, + "directory": Directory, + "origin": Origin, + "revision": Revision, + } + if entity in mapping: + return mapping[entity](db) + raise EntityError(f"invalid entity type {entity}") + + +class Content(Entity): + collection = "content" + validate = False + + @property + def revision(self): + return self.data.get("revision", {}) + + @property + def directory(self): + return self.data.get("directory", {}) + + +class Directory(Entity): + collection = "directory" + validate = False + + @property + def revision(self): + return self.data.get("revision", {}) + + +class Origin(Entity): + collection = "origin" + validate = False + + +class Revision(Entity): + collection = "revision" + validate = False diff --git a/swh/provenance/mongo/errors.py b/swh/provenance/mongo/errors.py new file mode 100644 --- /dev/null +++ b/swh/provenance/mongo/errors.py @@ -0,0 +1,13 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class DBError(Exception): + # FIXME, add mongo specific logging + pass + + +class EntityError(Exception): + pass diff --git a/swh/provenance/tests/mongo/test_backend.py b/swh/provenance/tests/mongo/test_backend.py new file mode 100644 --- /dev/null +++ b/swh/provenance/tests/mongo/test_backend.py @@ -0,0 +1,726 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +Unit tests for the mongo backend +""" + +from datetime import datetime + +import pymongo.database +import pytest + +# from swh.provenance.mongo.backend import ProvenanceStorageMongoDb +from swh.provenance import get_provenance_storage +from swh.provenance.interface import RevisionData + + +class TestMongoDBInit: + """ + Make sure mongo indexes are set + Validate the datamodel + mongomock is used to simulate the mongo server. + Version assumed is mongo 5.0 engine:wiredTiger + """ + + def test_contnet_sha1_unique_index(self, mongodb): + pass + + def test_contnet_sha1_ts_combination_index(self, mongodb): + pass + + def test_directory_sha1_unique_index(self, mongodb): + pass + + def test_directory_sha1_ts_combination_index(self, mongodb): + pass + + def test_origin_sha1_unique_index(self, mongodb): + pass + + +class TestMongoBackend: + """ + Test mongo backend + This class tests each method in mongodb backend in isolation + methods are verified by directly interacting with the mongoDB + mongomock is used to simulate the mongo server. + """ + + # FIXME, many, if not all, of the following methods can be refactored + # to make this test class backend agnostic + + # FIMXE, use fixtures to supply test data (along with the performance tests) + # FIXME, consider splitting to different classes or modules if this gets too long + + @pytest.fixture + def backend(self, mongodb: pymongo.database.Database): + mongodb_params = { + "dbname": mongodb.name, + } + + from pymongo import MongoClient + + mongo_client = MongoClient("mongodb://localhost:27017") + mongo_client.drop_database(mongodb.name) + + with get_provenance_storage( + cls="mongodb", db=mongodb_params, engine="mongomock" + ) as storage: + self._set_indexes(storage) + yield storage + + def _set_indexes(self, backend): + backend.create_indexes("content", [("sha1", 1)], unique=True) + backend.create_indexes("content", [("sha1", 1), ("ts", 1)]) + + backend.create_indexes("directory", [("sha1", 1)], unique=True) + backend.create_indexes("directory", [("sha1", 1), ("ts", 1)]) + + backend.create_indexes("origin", [("sha1", 1)], unique=True) + + backend.create_indexes("revision", [("sha1", 1)], unique=True) + + # add content tests + + def test_add_content_empty(self, backend): + assert backend.content_add({}) is True + results = backend.entity_get("content") + assert len(results) == 0 + + assert backend.content_add([]) is True + results = backend.entity_get("content") + assert len(results) == 0 + + def test_add_content_with_insert(self, backend): + # add data using add_contnet + # get data from mongo and compare + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + data = { + sha1_1: None, + sha1_2: datetime(2020, 10, 10), + } # two new rcds will be inserted + assert backend.content_add(data) is True + + results = backend.entity_get("content") + assert len(results) == 2 + results = backend.entity_get("content", {"sha1": sha1_1}) + assert len(results) == 1 + cnt = results[0] + assert cnt["ts"] is None + assert cnt["revision"] == {} + assert cnt["directory"] == {} + + results = backend.entity_get("content", {"sha1": sha1_2}) + assert len(results) == 1 + cnt = results[0] + assert cnt["ts"] == datetime(2020, 10, 10).timestamp() + assert cnt["revision"] == {} + assert cnt["directory"] == {} + + def test_add_content_with_update_later_date(self, backend): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + backend.entity_add( + "content", + [ + { + "sha1": sha1, + "ts": datetime(2020, 10, 10).timestamp(), + "revision": revision, + "directory": {}, + } + ], + ) + + new_date = datetime(2010, 10, 10) + data = {sha1: new_date} + # data has a date earlier than the one in the db, so the date will be updated + assert backend.content_add(data) is True + results = backend.entity_get("content") + assert len(results) == 1 + + results = backend.entity_get("content", {"sha1": sha1}) + assert len(results) == 1 + cnt = results[0] + assert cnt["ts"] == new_date.timestamp() + assert cnt["revision"] == revision + assert cnt["directory"] == {} + + def test_add_content_with_update_none_date(self, backend): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + backend.entity_add( + "content", + [{"sha1": sha1, "ts": None, "revision": revision, "directory": {}}], + ) + + new_date = datetime(2010, 10, 10) + data = {sha1: new_date} + # db has None date, will be updated with the newly supplied date + assert backend.content_add(data) is True + results = backend.entity_get("content", {"sha1": sha1}) + assert len(results) == 1 + cnt = results[0] + assert cnt["ts"] == new_date.timestamp() + + assert cnt["revision"] == revision + assert cnt["directory"] == {} + + def test_add_content_do_not_update_older_date(self, backend): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + backend.entity_add( + "content", + [ + { + "sha1": sha1, + "ts": datetime(2010, 10, 10).timestamp(), + "revision": revision, + "directory": {}, + } + ], + ) + + new_date = datetime(2020, 10, 10) + data = {sha1: new_date} + # data has a date later than the one in the db, no update will happen + assert backend.content_add(data) is True + results = backend.entity_get("content", {"sha1": sha1}) + assert len(results) == 1 + cnt = results[0] + assert cnt["ts"] == datetime(2010, 10, 10).timestamp() + + def test_add_content_multiple(self, backend): + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + sha1_3 = "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + sha1_4 = "3ebfa301dc59196f18593c45e519287a23297589" + revision = {"test": "test"} + backend.entity_add( + "content", + [ + { + "sha1": sha1_1, + "ts": 1286661600, + "revision": revision, + "directory": {}, + }, + {"sha1": sha1_2, "ts": None, "revision": revision, "directory": {}}, + { + "sha1": sha1_3, + "ts": 1631889655, + "revision": revision, + "directory": {}, + }, + ], + ) + + data = { + sha1_1: datetime(2020, 10, 10), # given date is in the future, no update + sha1_2: datetime(2020, 10, 10), # will update None date + sha1_3: datetime(2010, 10, 10), # date in the past, will update + sha1_4: datetime(2020, 10, 10), # new rcd, will insert + } + + assert backend.content_add(data) is True + results = backend.entity_get("content") + assert len(results) == 4 + + cnt = backend.entity_get("content", {"sha1": sha1_1})[0] + assert cnt["ts"] == 1286661600 + assert cnt["revision"] == revision + assert cnt["directory"] == {} + + cnt = backend.entity_get("content", {"sha1": sha1_2})[0] + assert cnt["ts"] == datetime(2020, 10, 10).timestamp() + assert cnt["revision"] == revision + assert cnt["directory"] == {} + + cnt = backend.entity_get("content", {"sha1": sha1_3})[0] + assert cnt["ts"] == datetime(2010, 10, 10).timestamp() + assert cnt["revision"] == revision + assert cnt["directory"] == {} + + cnt = backend.entity_get("content", {"sha1": sha1_4})[0] + assert cnt["ts"] == datetime(2020, 10, 10).timestamp() + assert cnt["revision"] == {} + assert cnt["directory"] == {} + + # # # add directory tests + + def test_add_directory_empty(self, backend): + assert backend.directory_add({}) is True + results = backend.entity_get("directory") + assert len(results) == 0 + + assert backend.directory_add([]) is True + results = backend.entity_get("directory") + assert len(results) == 0 + + def test_add_directory_with_insert(self, backend): + # add data using add_directory + # get data from mongo and compare + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + data = { + sha1_1: None, + sha1_2: datetime(2020, 10, 10), + } # two new rcds will be inserted + assert backend.directory_add(data) is True + + results = backend.entity_get("directory") + assert len(results) == 2 + + cnt = backend.entity_get("directory", {"sha1": sha1_1})[0] + assert cnt["ts"] is None + assert cnt["revision"] == {} + + cnt = backend.entity_get("directory", {"sha1": sha1_2})[0] + assert cnt["ts"] == datetime(2020, 10, 10).timestamp() + assert cnt["revision"] == {} + + def test_add_directory_with_update_later_date(self, backend): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + backend.entity_add( + "directory", [{"sha1": sha1, "ts": 1631881748, "revision": revision}] + ) + + new_date = datetime(2010, 10, 10) + data = {sha1: new_date} + # data has a date earlier than the one in the db, will update the date + assert backend.directory_add(data) is True + + diy = backend.entity_get("directory", {"sha1": sha1})[0] + assert diy["ts"] == new_date.timestamp() + assert diy["revision"] == revision + + def test_add_directory_with_update_none_date(self, backend): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + backend.entity_add( + "directory", [{"sha1": sha1, "ts": None, "revision": revision}] + ) + + new_date = datetime(2010, 10, 10) + data = {sha1: new_date} + # db has None date, will be updated with the given date + assert backend.directory_add(data) is True + diy = backend.entity_get("directory", {"sha1": sha1})[0] + assert diy["ts"] == new_date.timestamp() + assert diy["revision"] == revision + + def test_add_directory_do_not_update_older_date(self, backend): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + revision = {"test": "test"} + backend.entity_add( + "directory", + [ + { + "sha1": sha1, + "ts": datetime(2010, 10, 10).timestamp(), + "revision": revision, + } + ], + ) + + new_date = datetime(2020, 10, 10) + data = {sha1: new_date} + # data has a date later than the one in the db, no update will happen + assert backend.directory_add(data) is True + results = backend.entity_get("directory", {"sha1": sha1}) + assert len(results) == 1 + cnt = results[0] + assert cnt["ts"] == datetime(2010, 10, 10).timestamp() + + def test_add_directory_multiple(self, backend): + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + sha1_3 = "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + sha1_4 = "3ebfa301dc59196f18593c45e519287a23297589" + revision = {"test": "test"} + backend.entity_add( + "directory", + [ + {"sha1": sha1_1, "ts": 1286661600, "revision": revision}, + {"sha1": sha1_2, "ts": None, "revision": revision}, + {"sha1": sha1_3, "ts": 1631889655, "revision": revision}, + ], + ) + + data = { + sha1_1: datetime(2020, 10, 10), # given date is in the future, no update + sha1_2: datetime(2020, 10, 10), # will update None date + sha1_3: datetime(2010, 10, 10), # date in the past, will update + sha1_4: datetime(2010, 10, 10), # new rcd, will insert + } + + assert backend.directory_add(data) is True + results = backend.entity_get("directory") + assert len(results) == 4 + + dry = backend.entity_get("directory", {"sha1": sha1_1})[0] + assert dry["ts"] == 1286661600 + assert dry["revision"] == revision + + dry = backend.entity_get("directory", {"sha1": sha1_2})[0] + assert dry["ts"] == datetime(2020, 10, 10).timestamp() + assert dry["revision"] == revision + + dry = backend.entity_get("directory", {"sha1": sha1_3})[0] + assert dry["ts"] == datetime(2010, 10, 10).timestamp() + assert dry["revision"] == revision + + dry = backend.entity_get("directory", {"sha1": sha1_4})[0] + assert dry["ts"] == datetime(2010, 10, 10).timestamp() + assert dry["revision"] == {} + + # # # content_get tests + + def test_content_get_empty_ids(self, backend): + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + data1 = {"sha1": sha1_1, "ts": datetime(2010, 10, 8).timestamp()} + data2 = {"sha1": sha1_2, "ts": datetime(2020, 8, 20).timestamp()} + backend.entity_add("content", [data1, data2]) + + results = backend.content_get([]) + assert results == {} + + def test_content_get(self, backend): + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + sha1_3 = "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + data1 = {"sha1": sha1_1, "ts": datetime(2010, 10, 8).timestamp()} + data2 = {"sha1": sha1_2, "ts": datetime(2020, 8, 20).timestamp()} + + # This has None for ts, will not be returend from content_get + data3 = {"sha1": sha1_2, "ts": None} + backend.entity_add("content", [data1, data2, data3]) + + results = backend.content_get([sha1_1, sha1_2, sha1_3]) + assert len(results) == 2 + results[sha1_1] = datetime(2010, 10, 8).timestamp() + results[sha1_2] = datetime(2020, 8, 20).timestamp() + + # # # directory_get tests + + def test_directory_get_empty_ids(self, backend): + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + data1 = {"sha1": sha1_1, "ts": datetime(2010, 10, 8).timestamp()} + data2 = {"sha1": sha1_2, "ts": datetime(2020, 8, 20).timestamp()} + backend.entity_add("directory", [data1, data2]) + + results = backend.directory_get([]) + assert results == {} + + def test_directory_get(self, backend): + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + sha1_3 = "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + data1 = {"sha1": sha1_1, "ts": datetime(2010, 10, 8).timestamp()} + data2 = {"sha1": sha1_2, "ts": datetime(2020, 8, 20).timestamp()} + + # This has None for ts, will not be returend from directory_get + data3 = {"sha1": sha1_2, "ts": None} + backend.entity_add("directory", [data1, data2, data3]) + results = backend.directory_get([sha1_1, sha1_2, sha1_3]) + assert len(results) == 2 + results[sha1_1] = datetime(2010, 10, 8).timestamp() + results[sha1_2] = datetime(2020, 8, 20).timestamp() + + # # # location_add tests + + def test_location_add(self, backend): + # FIXME, this will change with model change + assert backend.location_add([]) is True + + # # origin_add tests + + def test_origin_add_empty(self, backend): + assert backend.origin_add({}) is True + results = backend.entity_get("origin") + assert len(results) == 0 + + def test_origin_add_new(self, backend): + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + data = {sha1_1: "1.example.com", sha1_2: "2.example.com"} + assert backend.origin_add(data) is True + results = backend.entity_get("origin") + assert len(results) == 2 + + def test_origin_add_skip_existing(self, backend): + # sending an existing origin hash will not add or update any record + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + backend.entity_add("origin", [{"sha1": sha1, "url": "example.com"}]) + + sha1_1 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + data = {sha1: "1.example.com", sha1_1: "2.example.com"} + assert backend.origin_add(data) is True + results = backend.entity_get("origin") + assert len(results) == 2 + origin = backend.entity_get("origin", {"sha1": sha1})[0] + assert origin["url"] == "example.com" # not 1.example.com + + # # # origin_get tests + + def test_origin_get_empty(self, backend): + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + backend.entity_add( + "origin", + [ + {"sha1": sha1_1, "url": "1.example.com"}, + {"sha1": sha1_2, "url": "2.example.com"}, + ], + ) + assert backend.origin_get({}) == {} + + def test_origin_get(self, backend): + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + backend.entity_add( + "origin", + [ + {"sha1": sha1_1, "url": "1.example.com"}, + {"sha1": sha1_2, "url": "2.example.com"}, + ], + ) + assert backend.origin_get([sha1_1]) == {sha1_1: "1.example.com"} + + # # # revision_add tests + + def test_revision_add_empty(self, backend): + assert backend.revision_add({}) is True + + assert backend.revision_add([]) is True + + def test_revision_add_create_revision_with_empty_date(self, backend): + """ + Input to revision_add is a list of sha1s + RevisionData will be considerd as None + """ + + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + data = [sha1_1, sha1_2] + assert backend.revision_add(data) is True + results = backend.entity_get("revision") + assert len(results) == 2 + + revision_1 = backend.entity_get("revision", {"sha1": sha1_1})[0] + assert revision_1["preferred"] is None + assert revision_1["origin"] == [] + assert revision_1["revision"] == [] + assert revision_1["ts"] is None + + def test_revision_add_create_new_revision_with_date(self, backend): + """ + RevisionData is a dataclass with date and sha1 of the preferred origin + """ + + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3" + origin_sha1 = "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + + # DOUBT, this function is not expecting preferred origin to be already added + # what if an unknown origin comes as preferred, an integrity error? + rev_data_1 = RevisionData(datetime(2020, 8, 20), origin_sha1) + rev_data_2 = RevisionData(datetime(2010, 8, 20), None) + + data = {sha1_1: rev_data_1, sha1_2: rev_data_2} + assert backend.revision_add(data) is True + results = backend.entity_get("revision") + assert len(results) == 2 + + revision_1 = backend.entity_get("revision", {"sha1": sha1_1})[0] + assert revision_1["preferred"] == rev_data_1.origin + assert ( + revision_1["origin"] == [] + ) # DOUBT, should this contain the new preferred origin? + assert ( + revision_1["revision"] == [] + ) # DOUBT, this is the history, should the new revaion should be added here? + assert revision_1["ts"] == rev_data_1.date.timestamp() + + revision_2 = backend.entity_get("revision", {"sha1": sha1_2})[0] + assert revision_2["preferred"] is None + assert revision_2["origin"] == [] + assert revision_2["revision"] == [] + assert revision_2["ts"] == datetime(2010, 8, 20).timestamp() + + def test_revision_add_update_none_date_and_origin(self, backend): + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + # add a revision with sha1 + backend.entity_add( + "revision", + [ + { + "sha1": sha1, + "ts": None, + "preferred": None, + "origin": ["test-origin"], # FIXME, validate sha1 at the datamodel + "revision": [ + "test-revision" + ], # FIXME, validate sha1 at the datamodel + } + ], + ) + + origin_sha1 = "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + rev_data = RevisionData(datetime(2010, 8, 20), origin_sha1) + data = {sha1: rev_data} + # both date and origin will be updated since db has None for both + assert backend.revision_add(data) is True + + # making sure an update happend and no duplicate record is added + assert len(backend.entity_get("revision", {"sha1": sha1})) == 1 + revision = backend.entity_get("revision", {"sha1": sha1})[0] + assert revision["ts"] == datetime(2010, 8, 20).timestamp() + assert revision["preferred"] == origin_sha1 + assert revision["origin"] == ["test-origin"] + assert revision["revision"] == ["test-revision"] + + def test_revision_add_existing_sha1_with_earlier_date(self, backend): + # date will be updated with the new (earlier) date + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + # add a revision with sha1 + backend.entity_add( + "revision", + [ + { + "sha1": sha1, + "ts": datetime(2020, 8, 20).timestamp(), + "preferred": None, + "origin": ["test-origin"], # FIXME, validate sha1 at the datamodel + "revision": [ + "test-revision" + ], # FIXME, validate sha1 at the datamodel + } + ], + ) + # date in data is earlier then the one in db + rev_data = RevisionData(datetime(2010, 8, 20), None) + data = {sha1: rev_data} + assert backend.revision_add(data) is True + + # making sure an update happend and no duplicate record is added + results = backend.entity_get("revision") + assert len(results) == 1 + revision = backend.entity_get("revision", {"sha1": sha1})[0] + assert revision["ts"] == datetime(2010, 8, 20).timestamp() + assert revision["preferred"] is None + + def test_revision_add_existing_sha1_with_later_date(self, backend): + # date will not be updated as it is later than the one in the db + # preferred revision will be updated + sha1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + # add a revision with sha1 + backend.entity_add( + "revision", + [ + { + "sha1": sha1, + "ts": datetime(2010, 8, 20).timestamp(), + "preferred": None, + "origin": ["test-origin"], # FIXME, validate sha1 at the datamodel + "revision": [ + "test-revision" + ], # FIXME, validate sha1 at the datamodel + } + ], + ) + # date in data is earlier then the one in db + origin_sha1 = "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + rev_data = RevisionData(datetime(2020, 8, 20), origin_sha1) + data = {sha1: rev_data} + assert backend.revision_add(data) is True + + # making sure an update happend and no duplicate record is added + assert len(backend.entity_get("revision", {"sha1": sha1})) == 1 + revision = backend.entity_get("revision", {"sha1": sha1})[0] + assert revision["ts"] == datetime(2010, 8, 20).timestamp() + assert revision["preferred"] == origin_sha1 + + def test_revision_add_multiple(self, backend): + # Both ts and origin will be updeated + sha1_1 = "cf23df2207d99a74fbe169e3eba035e633b65d94" + sha1_2 = "" + sha1_3 = "" + backend.entity_add( + "revision", + [ + { + "sha1": sha1_1, + "ts": None, + "preferred": None, + "origin": ["test-origin"], + "revision": ["test-revision"], + }, + { # only origin will be updeated + "sha1": sha1_2, + "ts": datetime(2010, 8, 20).timestamp(), + "preferred": None, + "origin": ["test-origin"], + "revision": ["test-revision"], + }, + { # only date will be updated + "sha1": sha1_3, + "ts": None, + "preferred": "109f4b3c50d7b0df729d299bc6f8e9ef9066971f", + "origin": ["test-origin"], + "revision": ["test-revision"], + }, + ], + ) + + # both date and origin will be updated + # rev_data_1 = RevisionData( + # datetime(2020, 8, 20), "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + # ) + # # only origin will be updated + # rev_data_2 = RevisionData(None, "109f4b3c50d7b0df729d299bc6f8e9ef9066971f") + # # only date will be updated + # rev_data_3 = RevisionData( + # datetime(2020, 8, 20), "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + # ) + # # this record will be inserted + # rev_data_4 = RevisionData( + # datetime(2020, 8, 20), "109f4b3c50d7b0df729d299bc6f8e9ef9066971f" + # ) + # data = {sha1_1: rev_data_1, sha1_2: rev_data_2, sha1_3: rev_data_3, + # sha1_4: rev_data_4} + + # FiXME, compare + + # # # revision_get tests + + def test_revision_get(self, backend): + # create data with preferred as None, ts None + # preferred as Not None, ts not None + pass + + def test_revision_get_preferred_none(self, backend): + pass + + def test_revision_get_ts_none(self, backend): + pass + + # relation_add tests + + # relation_get tests + + # with_path tests + + def test_with_path(self, backend): + assert backend.with_path() is True + + # content_find_first tests + + # content_find_all tests