Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345447
D6251.id22761.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Subscribers
None
D6251.id22761.diff
View Options
diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py
--- a/swh/provenance/mongo/backend.py
+++ b/swh/provenance/mongo/backend.py
@@ -9,6 +9,8 @@
from bson import ObjectId
import pymongo.database
+from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne
+
from swh.model.model import Sha1Git
@@ -19,41 +21,119 @@
RelationType,
RevisionData,
)
+from .errors import DBError
+from .models import entity_factory
+
class ProvenanceStorageMongoDb:
def __init__(self, db: pymongo.database.Database):
self.db = db
- def content_add(
- self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]
- ) -> bool:
- data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts)
+ def _format_data(self, data: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]):
+ return data if isinstance(data, dict) else dict.fromkeys(data)
+
+ def _generate_date_upserts(self, sha1, date):
+ ts = datetime.timestamp(date) if date is not None else None
+ # update only those with date either as None or later than the given one
+ # FIXME, filter with future date or None
+ # FIXME, handle default insert in the schema
+ return UpdateOne({'sha1': sha1, 'ts': {'$gt': ts}}, {'$set': {'ts': ts}}, upsert=True)
+
+ def content_add(self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool:
+ writes = [self._generate_date_upserts(sha1, date) for sha1, date in self._format_data(cnts).items()]
+ try:
+ entity_factory('content', self.db).bulk_write(writes)
+ except DBError as e:
+ # logging and skipping this item
+ # FIXME, raise if needed
+ pass
+ return True
+
+ def directory_add(self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool:
+ writes = [self._generate_date_upserts(sha1, date) for sha1, date in self._format_data(dirs).items()]
+ try:
+ entity_factory('directory', self.db).bulk_write(writes)
+ except DBError as e:
+ # logging and skipping this item
+ # FIXME, raise if needed
+ pass
+ return True
+
+ def revision_add(self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]) -> bool:
+ data = (
+ revs
+ if isinstance(revs, dict)
+ else dict.fromkeys(revs, RevisionData(date=None, origin=None))
+ )
existing = {
x["sha1"]: x
- for x in self.db.content.find(
- {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1}
+ for x in self.db.revision.find(
+ {"sha1": {"$in": list(data)}},
+ {"sha1": 1, "ts": 1, "preferred": 1, "_id": 1},
)
}
- for sha1, date in data.items():
- ts = datetime.timestamp(date) if date is not None else None
+ for sha1, info in data.items():
+ ts = datetime.timestamp(info.date) if info.date is not None else None
+ preferred = info.origin
if sha1 in existing:
- cnt = existing[sha1]
- if ts is not None and (cnt["ts"] is None or ts < cnt["ts"]):
- self.db.content.update_one(
- {"_id": cnt["_id"]}, {"$set": {"ts": ts}}
+ rev = existing[sha1]
+ if ts is None or (rev["ts"] is not None and ts >= rev["ts"]):
+ ts = rev["ts"]
+ if preferred is None:
+ preferred = rev["preferred"]
+ if ts != rev["ts"] or preferred != rev["preferred"]:
+ self.db.revision.update_one(
+ {"_id": rev["_id"]},
+ {"$set": {"ts": ts, "preferred": preferred}},
)
else:
- self.db.content.insert_one(
+ self.db.revision.insert_one(
{
"sha1": sha1,
+ "preferred": preferred,
+ "origin": [],
+ "revision": [],
"ts": ts,
- "revision": {},
- "directory": {},
}
)
return True
+ def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ # FIXME, add index in contnet sha1 and ts
+ # FIXME, do the timezone operation in mongo
+ return {
+ x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
+ # FIXME try to avoid this loop and return directly in the needed format from mongo
+ for x in self.db.content.find(
+ {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}},
+ {"sha1": 1, "ts": 1, "_id": 0},
+ )
+ }
+
+ def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ return {
+ x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
+ # FIXME try to avoid this loop and return directly in the needed format from mongo
+ # FIXME add ts to index in directory
+ for x in self.db.directory.find(
+ {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}},
+ {"sha1": 1, "ts": 1, "_id": 0},
+ )
+ }
+
+ def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
+ # only for tests
+
+ return set(self.db.get_collection(entity.value).distinct('sha1'))
+
+ def location_add(self, paths: Iterable[bytes]) -> bool:
+ # TODO: implement this methods if path are to be stored in a separate collection
+ return True
+
+ #---------------------------------------------#
+
+
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
# get all the revisions
# iterate and find the earliest
@@ -139,61 +219,11 @@
)
yield from sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path))
- def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
- return {
- x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
- for x in self.db.content.find(
- {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}},
- {"sha1": 1, "ts": 1, "_id": 0},
- )
- }
-
- def directory_add(
- self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]
- ) -> bool:
- data = dirs if isinstance(dirs, dict) else dict.fromkeys(dirs)
- existing = {
- x["sha1"]: x
- for x in self.db.directory.find(
- {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1}
- )
- }
- for sha1, date in data.items():
- ts = datetime.timestamp(date) if date is not None else None
- if sha1 in existing:
- dir = existing[sha1]
- if ts is not None and (dir["ts"] is None or ts < dir["ts"]):
- self.db.directory.update_one(
- {"_id": dir["_id"]}, {"$set": {"ts": ts}}
- )
- else:
- self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}})
- return True
-
- def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
- return {
- x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
- for x in self.db.directory.find(
- {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}},
- {"sha1": 1, "ts": 1, "_id": 0},
- )
- }
-
- def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
- return {
- x["sha1"]
- for x in self.db.get_collection(entity.value).find(
- {}, {"sha1": 1, "_id": 0}
- )
- }
-
- def location_add(self, paths: Iterable[bytes]) -> bool:
- # TODO: implement this methods if path are to be stored in a separate collection
- return True
def location_get_all(self) -> Set[bytes]:
contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1})
paths: List[Iterable[bytes]] = []
+
for content in contents:
paths.extend(value for _, value in content["revision"].items())
paths.extend(value for _, value in content["directory"].items())
@@ -216,54 +246,16 @@
self.db.origin.insert_one({"sha1": sha1, "url": url})
return True
+
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
return {
x["sha1"]: x["url"]
+ # FIXME try to avoid this loop and return directly in the needed format from mongo
for x in self.db.origin.find(
{"sha1": {"$in": list(ids)}}, {"sha1": 1, "url": 1, "_id": 0}
)
}
- def revision_add(
- self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]
- ) -> bool:
- data = (
- revs
- if isinstance(revs, dict)
- else dict.fromkeys(revs, RevisionData(date=None, origin=None))
- )
- existing = {
- x["sha1"]: x
- for x in self.db.revision.find(
- {"sha1": {"$in": list(data)}},
- {"sha1": 1, "ts": 1, "preferred": 1, "_id": 1},
- )
- }
- for sha1, info in data.items():
- ts = datetime.timestamp(info.date) if info.date is not None else None
- preferred = info.origin
- if sha1 in existing:
- rev = existing[sha1]
- if ts is None or (rev["ts"] is not None and ts >= rev["ts"]):
- ts = rev["ts"]
- if preferred is None:
- preferred = rev["preferred"]
- if ts != rev["ts"] or preferred != rev["preferred"]:
- self.db.revision.update_one(
- {"_id": rev["_id"]},
- {"$set": {"ts": ts, "preferred": preferred}},
- )
- else:
- self.db.revision.insert_one(
- {
- "sha1": sha1,
- "preferred": preferred,
- "origin": [],
- "revision": [],
- "ts": ts,
- }
- )
- return True
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
return {
@@ -283,6 +275,7 @@
def relation_add(
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
) -> bool:
+
src_relation, *_, dst_relation = relation.value.split("_")
dst_objs = {
@@ -319,7 +312,7 @@
if src_relation != "revision":
k = {
obj_id: list(set(paths + dsts.get(obj_id, [])))
- for obj_id, paths in src_objs[sha1][dst_relation].items()
+ for obj_id, paths in src_objs[sha1].get(dst_relation, {}).items()
}
self.db.get_collection(src_relation).update_one(
{"_id": src_objs[sha1]["_id"]},
@@ -388,7 +381,7 @@
)
}
src_objs = {
- x["sha1"]: x[dst]
+ x["sha1"]: x.get(dst, {})
for x in self.db.get_collection(src).find(
{}, {"_id": 0, "sha1": 1, dst: 1}
)
@@ -420,7 +413,7 @@
src, *_, dst = relation.value.split("_")
empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else []
src_objs = {
- x["sha1"]: x[dst]
+ x["sha1"]: x.get(dst, {})
for x in self.db.get_collection(src).find(
{dst: {"$ne": empty}}, {"_id": 0, "sha1": 1, dst: 1}
)
diff --git a/swh/provenance/mongo/bootstrap.py b/swh/provenance/mongo/bootstrap.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/bootstrap.py
@@ -0,0 +1,8 @@
+# FIXME, maybe this should not be part of the code
+
+collections = []
+
+indexes = []
+
+def bootstrap_db():
+ pass
diff --git a/swh/provenance/mongo/errors.py b/swh/provenance/mongo/errors.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/errors.py
@@ -0,0 +1,6 @@
+class DBError(Exception):
+ # FIXME, add mongo specific logging
+ pass
+
+class EntiryError(Exception):
+ pass
diff --git a/swh/provenance/mongo/models/__init__.py b/swh/provenance/mongo/models/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/__init__.py
@@ -0,0 +1,13 @@
+from .content import Content
+from .directory import Directory
+# from ..error import EntiryError
+
+
+def entity_factory(entity: str, db=None):
+ mapping = {
+ 'content': Content,
+ 'directory': Directory
+ }
+ if entity in mapping:
+ return mapping[entity](db)
+ raise AttributeError(f"invalid entity type {entity}")
diff --git a/swh/provenance/mongo/models/cache.py b/swh/provenance/mongo/models/cache.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/cache.py
@@ -0,0 +1,35 @@
+class Cache:
+ """
+ A object cache layer
+ Simple in memory implementation
+ (Can later be extended if needed)
+ """
+
+ def __init__(self, key):
+ self.key = key
+
+ def add_data(self, data):
+ self.data = self._get_formatted_data(data)
+
+ def _clear(self):
+ self.data = dict()
+
+ def _get_formatted_data(self, data):
+ if dict:
+ return data
+ return {x.get for x in make_obj()}
+
+ def add_obj(self, obj):
+ self.data[getattr(obj, self.key)] = obj
+
+
+ def get(self, key):
+ self.data.get(key, None)
+
+ def set(self, data):
+ if data is dict():
+ pass
+ if data is lst():
+ pass
+ if data is obj:
+ pass
diff --git a/swh/provenance/mongo/models/content.py b/swh/provenance/mongo/models/content.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/content.py
@@ -0,0 +1,14 @@
+from .entity import Entity
+
+
+class Content(Entity):
+ collection = 'content'
+ validate_model = False
+
+
+ def find_first(sef):
+ pass
+
+
+ def find_all(self):
+ pass
diff --git a/swh/provenance/mongo/models/directory.py b/swh/provenance/mongo/models/directory.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/directory.py
@@ -0,0 +1,5 @@
+from .entity import Entity
+
+class Directory(Entity):
+ collection = 'directory'
+ validate_model = False
diff --git a/swh/provenance/mongo/models/entity.py b/swh/provenance/mongo/models/entity.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/entity.py
@@ -0,0 +1,53 @@
+from abc import ABCMeta
+
+
+class Entity: #(ABCMeta):
+ """
+ An object saved in the db
+ """
+
+ collection: str
+ model: dict
+ validate_model: bool
+
+ def __init__(self, db):
+ self.db = db
+ self.db_collection = self.db.get_collection(self.collection)
+
+ def bulk_write(self, writes):
+ self.db_collection.bulk_write(writes)
+
+ # def _load_model(self):
+ # return {}
+
+ # def _set_data(self, data):
+ # pass
+
+ # def _validate(self):
+ # self.db.command() # use json in schema.json
+
+ # def save(self):
+ # if self.validate_model and self._validate():
+ # raise DataError()
+ # save()
+
+ # def get(self, qry):
+ # pass
+
+ # def _is_older_in_time(self):
+ # return self.data.ts < ts
+
+ # def add_if_older(self):
+ # if _is_older():
+ # self.save()
+
+ # def with_excetion_handle(self):
+ # pass
+
+
+class EntityList:
+ """
+ List or array of entities and their operations
+ Operate mostly on object cache
+ """
+ pass
diff --git a/swh/provenance/mongo/models/origin.py b/swh/provenance/mongo/models/origin.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/origin.py
@@ -0,0 +1,4 @@
+class Origin(Entity):
+ collection = 'origin'
+ model = {}
+ validate_model = False
diff --git a/swh/provenance/mongo/models/revision.py b/swh/provenance/mongo/models/revision.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/revision.py
@@ -0,0 +1,4 @@
+class Revision(Entity):
+ collection = 'revision'
+ model = {}
+ validate_model = False
diff --git a/swh/provenance/mongo/schema.json b/swh/provenance/mongo/schema.json
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/schema.json
@@ -0,0 +1,34 @@
+// The json schema for the first version
+
+{
+ "content": {
+ "bsonType": "object",
+ "required": [ "sha1" ],
+ "properties": {
+ "sha1": {
+ "bsonType": "Binary",
+ "description": ""
+ },
+ }
+ },
+ 'directory': {
+ "bsonType": "object",
+ "required": [ "sha1" ],
+ "properties": {
+ "sha1": {
+ "bsonType": "Binary",
+ "description": ""
+ },
+ }
+ },
+ 'revision': {
+ "bsonType": "object",
+ "required": [ "sha1" ],
+ "properties": {
+ "sha1": {
+ "bsonType": "Binary",
+ "description": ""
+ },
+ }
+ }
+}
diff --git a/swh/provenance/mongo/storage.py b/swh/provenance/mongo/storage.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/storage.py
@@ -0,0 +1,104 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import datetime, timezone
+import os
+from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Union
+
+from bson import ObjectId
+import pymongo.database
+
+from swh.model.model import Sha1Git
+
+from ..interface import (
+ EntityType,
+ ProvenanceResult,
+ RelationData,
+ RelationType,
+ RevisionData,
+)
+
+
+class ProvenanceStorageMongoDb:
+ def __init__(self, db: pymongo.database.Database):
+ self.db = db
+
+ def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
+ pass
+
+ def content_find_all(self, id: Sha1Git, limit: Optional[int] = None) -> Generator[ProvenanceResult, None, None]:
+ pass
+
+ def content_add(self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool:
+ for each_cnt in cnts:
+ try:
+ Entity.factory('content')(each_cnt).add_if_older()
+ except DBError as e:
+ # logging and skipping this item
+ # FIXME, add logging, raise if needed
+ pass
+ return True
+
+ def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ try:
+ contents = Entity.factory('content').get_all_in_list({'sha1': ids})
+ except DBError as e:
+ # logging and returning None
+ # FIXME, add logging, raise if needed
+ return None
+ return Entity.factory('content').dump_list_as_date_dict(contents)
+
+ def directory_add(self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool:
+ for each_cnt in cnts:
+ try:
+ Entity.factory('directory')(each_cnt).add_if_older()
+ except DBError as e:
+ # logging and skipping this item
+ # FIXME, add logging, raise if needed
+ pass
+ return True
+
+ def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ try:
+ directories = Entity.factory('directory').get_all_in_list({'sha1': ids})
+ except DBError as e:
+ # logging and returning None
+ # FIXME, add logging, raise if needed
+ return None
+ return Entity.factory('directory').dump_list_as_date_dict(contents)
+
+ def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
+ return Entity.factory(entity).get_all_in_list({'sha1': ids})
+
+ def location_add(self, paths: Iterable[bytes]) -> bool:
+ # TODO: implement this methods if path are to be stored in a separate collection
+ return True
+
+ def location_get_all(self) -> Set[bytes]:
+ pass
+
+ def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
+ return True
+
+ def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
+ pass
+
+ def revision_add(self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]) -> bool:
+ return True
+
+ def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
+ pass
+
+ def relation_add(self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]) -> bool:
+ return True
+
+ def relation_get(self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False) -> Dict[Sha1Git, Set[RelationData]]:
+ return pass
+
+ def relation_get_all(self, relation: RelationType) -> Dict[Sha1Git, Set[RelationData]]:
+ pass
+
+ def with_path(self) -> bool:
+ return True
diff --git a/swh/provenance/mongo/tests/__init__.py b/swh/provenance/mongo/tests/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/tests/__init__.py
@@ -0,0 +1 @@
+pass
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:21 PM (5 d, 19 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222960
Attached To
D6251: Work in progress. Not for review.
Event Timeline
Log In to Comment