Page MenuHomeSoftware Heritage

D6251.id22761.diff
No OneTemporary

D6251.id22761.diff

diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py
--- a/swh/provenance/mongo/backend.py
+++ b/swh/provenance/mongo/backend.py
@@ -9,6 +9,8 @@
from bson import ObjectId
import pymongo.database
+from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne
+
from swh.model.model import Sha1Git
@@ -19,41 +21,119 @@
RelationType,
RevisionData,
)
+from .errors import DBError
+from .models import entity_factory
+
class ProvenanceStorageMongoDb:
def __init__(self, db: pymongo.database.Database):
self.db = db
- def content_add(
- self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]
- ) -> bool:
- data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts)
+ def _format_data(self, data: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]):
+ return data if isinstance(data, dict) else dict.fromkeys(data)
+
+ def _generate_date_upserts(self, sha1, date):
+ ts = datetime.timestamp(date) if date is not None else None
+ # update only those with date either as None or later than the given one
+ # FIXME, filter with future date or None
+ # FIXME, handle default insert in the schema
+ return UpdateOne({'sha1': sha1, 'ts': {'$gt': ts}}, {'$set': {'ts': ts}}, upsert=True)
+
+ def content_add(self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool:
+ writes = [self._generate_date_upserts(sha1, date) for sha1, date in self._format_data(cnts).items()]
+ try:
+ entity_factory('content', self.db).bulk_write(writes)
+ except DBError as e:
+ # logging and skipping this item
+ # FIXME, raise if needed
+ pass
+ return True
+
+ def directory_add(self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool:
+ writes = [self._generate_date_upserts(sha1, date) for sha1, date in self._format_data(dirs).items()]
+ try:
+ entity_factory('directory', self.db).bulk_write(writes)
+ except DBError as e:
+ # logging and skipping this item
+ # FIXME, raise if needed
+ pass
+ return True
+
+ def revision_add(self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]) -> bool:
+ data = (
+ revs
+ if isinstance(revs, dict)
+ else dict.fromkeys(revs, RevisionData(date=None, origin=None))
+ )
existing = {
x["sha1"]: x
- for x in self.db.content.find(
- {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1}
+ for x in self.db.revision.find(
+ {"sha1": {"$in": list(data)}},
+ {"sha1": 1, "ts": 1, "preferred": 1, "_id": 1},
)
}
- for sha1, date in data.items():
- ts = datetime.timestamp(date) if date is not None else None
+ for sha1, info in data.items():
+ ts = datetime.timestamp(info.date) if info.date is not None else None
+ preferred = info.origin
if sha1 in existing:
- cnt = existing[sha1]
- if ts is not None and (cnt["ts"] is None or ts < cnt["ts"]):
- self.db.content.update_one(
- {"_id": cnt["_id"]}, {"$set": {"ts": ts}}
+ rev = existing[sha1]
+ if ts is None or (rev["ts"] is not None and ts >= rev["ts"]):
+ ts = rev["ts"]
+ if preferred is None:
+ preferred = rev["preferred"]
+ if ts != rev["ts"] or preferred != rev["preferred"]:
+ self.db.revision.update_one(
+ {"_id": rev["_id"]},
+ {"$set": {"ts": ts, "preferred": preferred}},
)
else:
- self.db.content.insert_one(
+ self.db.revision.insert_one(
{
"sha1": sha1,
+ "preferred": preferred,
+ "origin": [],
+ "revision": [],
"ts": ts,
- "revision": {},
- "directory": {},
}
)
return True
+ def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ # FIXME, add index in contnet sha1 and ts
+ # FIXME, do the timezone operation in mongo
+ return {
+ x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
+ # FIXME try to avoid this loop and return directly in the needed format from mongo
+ for x in self.db.content.find(
+ {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}},
+ {"sha1": 1, "ts": 1, "_id": 0},
+ )
+ }
+
+ def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ return {
+ x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
+ # FIXME try to avoid this loop and return directly in the needed format from mongo
+ # FIXME add ts to index in directory
+ for x in self.db.directory.find(
+ {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}},
+ {"sha1": 1, "ts": 1, "_id": 0},
+ )
+ }
+
+ def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
+ # only for tests
+
+ return set(self.db.get_collection(entity.value).distinct('sha1'))
+
+ def location_add(self, paths: Iterable[bytes]) -> bool:
+ # TODO: implement this methods if path are to be stored in a separate collection
+ return True
+
+ #---------------------------------------------#
+
+
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
# get all the revisions
# iterate and find the earliest
@@ -139,61 +219,11 @@
)
yield from sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path))
- def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
- return {
- x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
- for x in self.db.content.find(
- {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}},
- {"sha1": 1, "ts": 1, "_id": 0},
- )
- }
-
- def directory_add(
- self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]
- ) -> bool:
- data = dirs if isinstance(dirs, dict) else dict.fromkeys(dirs)
- existing = {
- x["sha1"]: x
- for x in self.db.directory.find(
- {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1}
- )
- }
- for sha1, date in data.items():
- ts = datetime.timestamp(date) if date is not None else None
- if sha1 in existing:
- dir = existing[sha1]
- if ts is not None and (dir["ts"] is None or ts < dir["ts"]):
- self.db.directory.update_one(
- {"_id": dir["_id"]}, {"$set": {"ts": ts}}
- )
- else:
- self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}})
- return True
-
- def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
- return {
- x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
- for x in self.db.directory.find(
- {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}},
- {"sha1": 1, "ts": 1, "_id": 0},
- )
- }
-
- def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
- return {
- x["sha1"]
- for x in self.db.get_collection(entity.value).find(
- {}, {"sha1": 1, "_id": 0}
- )
- }
-
- def location_add(self, paths: Iterable[bytes]) -> bool:
- # TODO: implement this methods if path are to be stored in a separate collection
- return True
def location_get_all(self) -> Set[bytes]:
contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1})
paths: List[Iterable[bytes]] = []
+
for content in contents:
paths.extend(value for _, value in content["revision"].items())
paths.extend(value for _, value in content["directory"].items())
@@ -216,54 +246,16 @@
self.db.origin.insert_one({"sha1": sha1, "url": url})
return True
+
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
return {
x["sha1"]: x["url"]
+ # FIXME try to avoid this loop and return directly in the needed format from mongo
for x in self.db.origin.find(
{"sha1": {"$in": list(ids)}}, {"sha1": 1, "url": 1, "_id": 0}
)
}
- def revision_add(
- self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]
- ) -> bool:
- data = (
- revs
- if isinstance(revs, dict)
- else dict.fromkeys(revs, RevisionData(date=None, origin=None))
- )
- existing = {
- x["sha1"]: x
- for x in self.db.revision.find(
- {"sha1": {"$in": list(data)}},
- {"sha1": 1, "ts": 1, "preferred": 1, "_id": 1},
- )
- }
- for sha1, info in data.items():
- ts = datetime.timestamp(info.date) if info.date is not None else None
- preferred = info.origin
- if sha1 in existing:
- rev = existing[sha1]
- if ts is None or (rev["ts"] is not None and ts >= rev["ts"]):
- ts = rev["ts"]
- if preferred is None:
- preferred = rev["preferred"]
- if ts != rev["ts"] or preferred != rev["preferred"]:
- self.db.revision.update_one(
- {"_id": rev["_id"]},
- {"$set": {"ts": ts, "preferred": preferred}},
- )
- else:
- self.db.revision.insert_one(
- {
- "sha1": sha1,
- "preferred": preferred,
- "origin": [],
- "revision": [],
- "ts": ts,
- }
- )
- return True
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
return {
@@ -283,6 +275,7 @@
def relation_add(
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
) -> bool:
+
src_relation, *_, dst_relation = relation.value.split("_")
dst_objs = {
@@ -319,7 +312,7 @@
if src_relation != "revision":
k = {
obj_id: list(set(paths + dsts.get(obj_id, [])))
- for obj_id, paths in src_objs[sha1][dst_relation].items()
+ for obj_id, paths in src_objs[sha1].get(dst_relation, {}).items()
}
self.db.get_collection(src_relation).update_one(
{"_id": src_objs[sha1]["_id"]},
@@ -388,7 +381,7 @@
)
}
src_objs = {
- x["sha1"]: x[dst]
+ x["sha1"]: x.get(dst, {})
for x in self.db.get_collection(src).find(
{}, {"_id": 0, "sha1": 1, dst: 1}
)
@@ -420,7 +413,7 @@
src, *_, dst = relation.value.split("_")
empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else []
src_objs = {
- x["sha1"]: x[dst]
+ x["sha1"]: x.get(dst, {})
for x in self.db.get_collection(src).find(
{dst: {"$ne": empty}}, {"_id": 0, "sha1": 1, dst: 1}
)
diff --git a/swh/provenance/mongo/bootstrap.py b/swh/provenance/mongo/bootstrap.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/bootstrap.py
@@ -0,0 +1,8 @@
+# FIXME, maybe this should not be part of the code
+
+collections = []
+
+indexes = []
+
+def bootstrap_db():
+ pass
diff --git a/swh/provenance/mongo/errors.py b/swh/provenance/mongo/errors.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/errors.py
@@ -0,0 +1,6 @@
+class DBError(Exception):
+ # FIXME, add mongo specific logging
+ pass
+
+class EntiryError(Exception):
+ pass
diff --git a/swh/provenance/mongo/models/__init__.py b/swh/provenance/mongo/models/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/__init__.py
@@ -0,0 +1,13 @@
+from .content import Content
+from .directory import Directory
+# from ..error import EntiryError
+
+
+def entity_factory(entity: str, db=None):
+ mapping = {
+ 'content': Content,
+ 'directory': Directory
+ }
+ if entity in mapping:
+ return mapping[entity](db)
+ raise AttributeError(f"invalid entity type {entity}")
diff --git a/swh/provenance/mongo/models/cache.py b/swh/provenance/mongo/models/cache.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/cache.py
@@ -0,0 +1,35 @@
+class Cache:
+ """
+ A object cache layer
+ Simple in memory implementation
+ (Can later be extended if needed)
+ """
+
+ def __init__(self, key):
+ self.key = key
+
+ def add_data(self, data):
+ self.data = self._get_formatted_data(data)
+
+ def _clear(self):
+ self.data = dict()
+
+ def _get_formatted_data(self, data):
+ if dict:
+ return data
+ return {x.get for x in make_obj()}
+
+ def add_obj(self, obj):
+ self.data[getattr(obj, self.key)] = obj
+
+
+ def get(self, key):
+ self.data.get(key, None)
+
+ def set(self, data):
+ if data is dict():
+ pass
+ if data is lst():
+ pass
+ if data is obj:
+ pass
diff --git a/swh/provenance/mongo/models/content.py b/swh/provenance/mongo/models/content.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/content.py
@@ -0,0 +1,14 @@
+from .entity import Entity
+
+
+class Content(Entity):
+ collection = 'content'
+ validate_model = False
+
+
+ def find_first(sef):
+ pass
+
+
+ def find_all(self):
+ pass
diff --git a/swh/provenance/mongo/models/directory.py b/swh/provenance/mongo/models/directory.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/directory.py
@@ -0,0 +1,5 @@
+from .entity import Entity
+
+class Directory(Entity):
+ collection = 'directory'
+ validate_model = False
diff --git a/swh/provenance/mongo/models/entity.py b/swh/provenance/mongo/models/entity.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/entity.py
@@ -0,0 +1,53 @@
+from abc import ABCMeta
+
+
+class Entity: #(ABCMeta):
+ """
+ An object saved in the db
+ """
+
+ collection: str
+ model: dict
+ validate_model: bool
+
+ def __init__(self, db):
+ self.db = db
+ self.db_collection = self.db.get_collection(self.collection)
+
+ def bulk_write(self, writes):
+ self.db_collection.bulk_write(writes)
+
+ # def _load_model(self):
+ # return {}
+
+ # def _set_data(self, data):
+ # pass
+
+ # def _validate(self):
+ # self.db.command() # use json in schema.json
+
+ # def save(self):
+ # if self.validate_model and self._validate():
+ # raise DataError()
+ # save()
+
+ # def get(self, qry):
+ # pass
+
+ # def _is_older_in_time(self):
+ # return self.data.ts < ts
+
+ # def add_if_older(self):
+ # if _is_older():
+ # self.save()
+
+ # def with_excetion_handle(self):
+ # pass
+
+
+class EntityList:
+ """
+ List or array of entities and their operations
+ Operate mostly on object cache
+ """
+ pass
diff --git a/swh/provenance/mongo/models/origin.py b/swh/provenance/mongo/models/origin.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/origin.py
@@ -0,0 +1,4 @@
+class Origin(Entity):
+ collection = 'origin'
+ model = {}
+ validate_model = False
diff --git a/swh/provenance/mongo/models/revision.py b/swh/provenance/mongo/models/revision.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/models/revision.py
@@ -0,0 +1,4 @@
+class Revision(Entity):
+ collection = 'revision'
+ model = {}
+ validate_model = False
diff --git a/swh/provenance/mongo/schema.json b/swh/provenance/mongo/schema.json
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/schema.json
@@ -0,0 +1,34 @@
+// The json schema for the first version
+
+{
+ "content": {
+ "bsonType": "object",
+ "required": [ "sha1" ],
+ "properties": {
+ "sha1": {
+ "bsonType": "Binary",
+ "description": ""
+ },
+ }
+ },
+ 'directory': {
+ "bsonType": "object",
+ "required": [ "sha1" ],
+ "properties": {
+ "sha1": {
+ "bsonType": "Binary",
+ "description": ""
+ },
+ }
+ },
+ 'revision': {
+ "bsonType": "object",
+ "required": [ "sha1" ],
+ "properties": {
+ "sha1": {
+ "bsonType": "Binary",
+ "description": ""
+ },
+ }
+ }
+}
diff --git a/swh/provenance/mongo/storage.py b/swh/provenance/mongo/storage.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/storage.py
@@ -0,0 +1,104 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import datetime, timezone
+import os
+from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Union
+
+from bson import ObjectId
+import pymongo.database
+
+from swh.model.model import Sha1Git
+
+from ..interface import (
+ EntityType,
+ ProvenanceResult,
+ RelationData,
+ RelationType,
+ RevisionData,
+)
+
+
+class ProvenanceStorageMongoDb:
+ def __init__(self, db: pymongo.database.Database):
+ self.db = db
+
+ def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
+ pass
+
+ def content_find_all(self, id: Sha1Git, limit: Optional[int] = None) -> Generator[ProvenanceResult, None, None]:
+ pass
+
+ def content_add(self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool:
+ for each_cnt in cnts:
+ try:
+ Entity.factory('content')(each_cnt).add_if_older()
+ except DBError as e:
+ # logging and skipping this item
+ # FIXME, add logging, raise if needed
+ pass
+ return True
+
+ def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ try:
+ contents = Entity.factory('content').get_all_in_list({'sha1': ids})
+ except DBError as e:
+ # logging and returning None
+ # FIXME, add logging, raise if needed
+ return None
+ return Entity.factory('content').dump_list_as_date_dict(contents)
+
+ def directory_add(self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]]) -> bool:
+ for each_cnt in cnts:
+ try:
+ Entity.factory('directory')(each_cnt).add_if_older()
+ except DBError as e:
+ # logging and skipping this item
+ # FIXME, add logging, raise if needed
+ pass
+ return True
+
+ def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ try:
+ directories = Entity.factory('directory').get_all_in_list({'sha1': ids})
+ except DBError as e:
+ # logging and returning None
+ # FIXME, add logging, raise if needed
+ return None
+ return Entity.factory('directory').dump_list_as_date_dict(contents)
+
+ def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
+ return Entity.factory(entity).get_all_in_list({'sha1': ids})
+
+ def location_add(self, paths: Iterable[bytes]) -> bool:
+ # TODO: implement this methods if path are to be stored in a separate collection
+ return True
+
+ def location_get_all(self) -> Set[bytes]:
+ pass
+
+ def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
+ return True
+
+ def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
+ pass
+
+ def revision_add(self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]) -> bool:
+ return True
+
+ def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
+ pass
+
+ def relation_add(self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]) -> bool:
+ return True
+
+ def relation_get(self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False) -> Dict[Sha1Git, Set[RelationData]]:
+ return pass
+
+ def relation_get_all(self, relation: RelationType) -> Dict[Sha1Git, Set[RelationData]]:
+ pass
+
+ def with_path(self) -> bool:
+ return True
diff --git a/swh/provenance/mongo/tests/__init__.py b/swh/provenance/mongo/tests/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/mongo/tests/__init__.py
@@ -0,0 +1 @@
+pass

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:21 PM (5 d, 19 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222960

Event Timeline