Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/mongo/backend.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime, timezone | from datetime import datetime, timezone | ||||
import os | import os | ||||
from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Union | from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Union | ||||
from bson import ObjectId | from bson import ObjectId | ||||
import pymongo.database | import pymongo.database | ||||
from swh.core.statsd import statsd | |||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from ..interface import ( | from ..interface import ( | ||||
EntityType, | EntityType, | ||||
ProvenanceResult, | ProvenanceResult, | ||||
RelationData, | RelationData, | ||||
RelationType, | RelationType, | ||||
RevisionData, | RevisionData, | ||||
) | ) | ||||
class ProvenanceStorageMongoDb: | class ProvenanceStorageMongoDb: | ||||
def __init__(self, db: pymongo.database.Database): | def __init__(self, db: pymongo.database.Database): | ||||
self.db = db | self.db = db | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "close"}, | |||||
) | |||||
def close(self) -> None: | def close(self) -> None: | ||||
pass | pass | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "content_add"}, | |||||
) | |||||
def content_add( | def content_add( | ||||
self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] | self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] | ||||
) -> bool: | ) -> bool: | ||||
data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts) | data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts) | ||||
existing = { | existing = { | ||||
x["sha1"]: x | x["sha1"]: x | ||||
for x in self.db.content.find( | for x in self.db.content.find( | ||||
{"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} | {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} | ||||
Show All 13 Lines | ) -> bool: | ||||
"sha1": sha1, | "sha1": sha1, | ||||
"ts": ts, | "ts": ts, | ||||
"revision": {}, | "revision": {}, | ||||
"directory": {}, | "directory": {}, | ||||
} | } | ||||
) | ) | ||||
return True | return True | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "content_find_first"}, | |||||
) | |||||
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | ||||
# get all the revisions | # get all the revisions | ||||
# iterate and find the earliest | # iterate and find the earliest | ||||
content = self.db.content.find_one({"sha1": id}) | content = self.db.content.find_one({"sha1": id}) | ||||
if not content: | if not content: | ||||
return None | return None | ||||
occurs = [] | occurs = [] | ||||
Show All 12 Lines | def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | ||||
revision=revision["sha1"], | revision=revision["sha1"], | ||||
date=datetime.fromtimestamp(revision["ts"], timezone.utc), | date=datetime.fromtimestamp(revision["ts"], timezone.utc), | ||||
origin=origin["url"], | origin=origin["url"], | ||||
path=path, | path=path, | ||||
) | ) | ||||
) | ) | ||||
return sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path))[0] | return sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path))[0] | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "content_find_all"}, | |||||
) | |||||
def content_find_all( | def content_find_all( | ||||
self, id: Sha1Git, limit: Optional[int] = None | self, id: Sha1Git, limit: Optional[int] = None | ||||
) -> Generator[ProvenanceResult, None, None]: | ) -> Generator[ProvenanceResult, None, None]: | ||||
content = self.db.content.find_one({"sha1": id}) | content = self.db.content.find_one({"sha1": id}) | ||||
if not content: | if not content: | ||||
return None | return None | ||||
occurs = [] | occurs = [] | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | ) -> Generator[ProvenanceResult, None, None]: | ||||
revision["ts"], timezone.utc | revision["ts"], timezone.utc | ||||
), | ), | ||||
origin=origin["url"], | origin=origin["url"], | ||||
path=path, | path=path, | ||||
) | ) | ||||
) | ) | ||||
yield from sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path)) | yield from sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path)) | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "content_get"}, | |||||
) | |||||
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | ||||
return { | return { | ||||
x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) | x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) | ||||
for x in self.db.content.find( | for x in self.db.content.find( | ||||
{"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, | {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, | ||||
{"sha1": 1, "ts": 1, "_id": 0}, | {"sha1": 1, "ts": 1, "_id": 0}, | ||||
) | ) | ||||
} | } | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "directory_add"}, | |||||
) | |||||
def directory_add( | def directory_add( | ||||
self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] | self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] | ||||
) -> bool: | ) -> bool: | ||||
data = dirs if isinstance(dirs, dict) else dict.fromkeys(dirs) | data = dirs if isinstance(dirs, dict) else dict.fromkeys(dirs) | ||||
existing = { | existing = { | ||||
x["sha1"]: x | x["sha1"]: x | ||||
for x in self.db.directory.find( | for x in self.db.directory.find( | ||||
{"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} | {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} | ||||
) | ) | ||||
} | } | ||||
for sha1, date in data.items(): | for sha1, date in data.items(): | ||||
ts = datetime.timestamp(date) if date is not None else None | ts = datetime.timestamp(date) if date is not None else None | ||||
if sha1 in existing: | if sha1 in existing: | ||||
dir = existing[sha1] | dir = existing[sha1] | ||||
if ts is not None and (dir["ts"] is None or ts < dir["ts"]): | if ts is not None and (dir["ts"] is None or ts < dir["ts"]): | ||||
self.db.directory.update_one( | self.db.directory.update_one( | ||||
{"_id": dir["_id"]}, {"$set": {"ts": ts}} | {"_id": dir["_id"]}, {"$set": {"ts": ts}} | ||||
) | ) | ||||
else: | else: | ||||
self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}}) | self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}}) | ||||
return True | return True | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "directory_get"}, | |||||
) | |||||
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | ||||
return { | return { | ||||
x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) | x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) | ||||
for x in self.db.directory.find( | for x in self.db.directory.find( | ||||
{"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, | {"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, | ||||
{"sha1": 1, "ts": 1, "_id": 0}, | {"sha1": 1, "ts": 1, "_id": 0}, | ||||
) | ) | ||||
} | } | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "entity_get_all"}, | |||||
) | |||||
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: | def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: | ||||
return { | return { | ||||
x["sha1"] | x["sha1"] | ||||
for x in self.db.get_collection(entity.value).find( | for x in self.db.get_collection(entity.value).find( | ||||
{}, {"sha1": 1, "_id": 0} | {}, {"sha1": 1, "_id": 0} | ||||
) | ) | ||||
} | } | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "location_add"}, | |||||
) | |||||
def location_add(self, paths: Iterable[bytes]) -> bool: | def location_add(self, paths: Iterable[bytes]) -> bool: | ||||
# TODO: implement this methods if path are to be stored in a separate collection | # TODO: implement this methods if path are to be stored in a separate collection | ||||
return True | return True | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "location_get_all"}, | |||||
) | |||||
def location_get_all(self) -> Set[bytes]: | def location_get_all(self) -> Set[bytes]: | ||||
contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1}) | contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1}) | ||||
paths: List[Iterable[bytes]] = [] | paths: List[Iterable[bytes]] = [] | ||||
for content in contents: | for content in contents: | ||||
paths.extend(value for _, value in content["revision"].items()) | paths.extend(value for _, value in content["revision"].items()) | ||||
paths.extend(value for _, value in content["directory"].items()) | paths.extend(value for _, value in content["directory"].items()) | ||||
dirs = self.db.directory.find({}, {"revision": 1, "_id": 0}) | dirs = self.db.directory.find({}, {"revision": 1, "_id": 0}) | ||||
for each_dir in dirs: | for each_dir in dirs: | ||||
paths.extend(value for _, value in each_dir["revision"].items()) | paths.extend(value for _, value in each_dir["revision"].items()) | ||||
return set(sum(paths, [])) | return set(sum(paths, [])) | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "origin_add"}, | |||||
) | |||||
def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: | def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: | ||||
existing = { | existing = { | ||||
x["sha1"]: x | x["sha1"]: x | ||||
for x in self.db.origin.find( | for x in self.db.origin.find( | ||||
{"sha1": {"$in": list(orgs)}}, {"sha1": 1, "url": 1, "_id": 1} | {"sha1": {"$in": list(orgs)}}, {"sha1": 1, "url": 1, "_id": 1} | ||||
) | ) | ||||
} | } | ||||
for sha1, url in orgs.items(): | for sha1, url in orgs.items(): | ||||
if sha1 not in existing: | if sha1 not in existing: | ||||
# add new origin | # add new origin | ||||
self.db.origin.insert_one({"sha1": sha1, "url": url}) | self.db.origin.insert_one({"sha1": sha1, "url": url}) | ||||
return True | return True | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "origin_get"}, | |||||
) | |||||
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | ||||
return { | return { | ||||
x["sha1"]: x["url"] | x["sha1"]: x["url"] | ||||
for x in self.db.origin.find( | for x in self.db.origin.find( | ||||
{"sha1": {"$in": list(ids)}}, {"sha1": 1, "url": 1, "_id": 0} | {"sha1": {"$in": list(ids)}}, {"sha1": 1, "url": 1, "_id": 0} | ||||
) | ) | ||||
} | } | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "revision_add"}, | |||||
) | |||||
def revision_add( | def revision_add( | ||||
self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] | self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] | ||||
) -> bool: | ) -> bool: | ||||
data = ( | data = ( | ||||
revs | revs | ||||
if isinstance(revs, dict) | if isinstance(revs, dict) | ||||
else dict.fromkeys(revs, RevisionData(date=None, origin=None)) | else dict.fromkeys(revs, RevisionData(date=None, origin=None)) | ||||
) | ) | ||||
Show All 25 Lines | ) -> bool: | ||||
"preferred": preferred, | "preferred": preferred, | ||||
"origin": [], | "origin": [], | ||||
"revision": [], | "revision": [], | ||||
"ts": ts, | "ts": ts, | ||||
} | } | ||||
) | ) | ||||
return True | return True | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "revision_get"}, | |||||
) | |||||
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | ||||
return { | return { | ||||
x["sha1"]: RevisionData( | x["sha1"]: RevisionData( | ||||
date=datetime.fromtimestamp(x["ts"], timezone.utc) if x["ts"] else None, | date=datetime.fromtimestamp(x["ts"], timezone.utc) if x["ts"] else None, | ||||
origin=x["preferred"], | origin=x["preferred"], | ||||
) | ) | ||||
for x in self.db.revision.find( | for x in self.db.revision.find( | ||||
{ | { | ||||
"sha1": {"$in": list(ids)}, | "sha1": {"$in": list(ids)}, | ||||
"$or": [{"preferred": {"$ne": None}}, {"ts": {"$ne": None}}], | "$or": [{"preferred": {"$ne": None}}, {"ts": {"$ne": None}}], | ||||
}, | }, | ||||
{"sha1": 1, "preferred": 1, "ts": 1, "_id": 0}, | {"sha1": 1, "preferred": 1, "ts": 1, "_id": 0}, | ||||
) | ) | ||||
} | } | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "relation_add"}, | |||||
) | |||||
def relation_add( | def relation_add( | ||||
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] | self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] | ||||
) -> bool: | ) -> bool: | ||||
src_relation, *_, dst_relation = relation.value.split("_") | src_relation, *_, dst_relation = relation.value.split("_") | ||||
dst_objs = { | dst_objs = { | ||||
x["sha1"]: x["_id"] | x["sha1"]: x["_id"] | ||||
for x in self.db.get_collection(dst_relation).find( | for x in self.db.get_collection(dst_relation).find( | ||||
Show All 40 Lines | ) -> bool: | ||||
{ | { | ||||
"$set": { | "$set": { | ||||
dst_relation: list(set(src_objs[sha1][dst_relation] + dsts)) | dst_relation: list(set(src_objs[sha1][dst_relation] + dsts)) | ||||
} | } | ||||
}, | }, | ||||
) | ) | ||||
return True | return True | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "relation_get"}, | |||||
) | |||||
def relation_get( | def relation_get( | ||||
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False | self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False | ||||
) -> Dict[Sha1Git, Set[RelationData]]: | ) -> Dict[Sha1Git, Set[RelationData]]: | ||||
src, *_, dst = relation.value.split("_") | src, *_, dst = relation.value.split("_") | ||||
sha1s = set(ids) | sha1s = set(ids) | ||||
if not reverse: | if not reverse: | ||||
empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else [] | empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else [] | ||||
src_objs = { | src_objs = { | ||||
▲ Show 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | ) -> Dict[Sha1Git, Set[RelationData]]: | ||||
if dst_obj_id in { | if dst_obj_id in { | ||||
ObjectId(dst_obj_str) for dst_obj_str in denorm | ObjectId(dst_obj_str) for dst_obj_str in denorm | ||||
}: | }: | ||||
result.setdefault(src_sha1, set()).add( | result.setdefault(src_sha1, set()).add( | ||||
RelationData(dst=dst_sha1, path=None) | RelationData(dst=dst_sha1, path=None) | ||||
) | ) | ||||
return result | return result | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "relation_get_all"}, | |||||
) | |||||
def relation_get_all( | def relation_get_all( | ||||
self, relation: RelationType | self, relation: RelationType | ||||
) -> Dict[Sha1Git, Set[RelationData]]: | ) -> Dict[Sha1Git, Set[RelationData]]: | ||||
src, *_, dst = relation.value.split("_") | src, *_, dst = relation.value.split("_") | ||||
empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else [] | empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else [] | ||||
src_objs = { | src_objs = { | ||||
x["sha1"]: x[dst] | x["sha1"]: x[dst] | ||||
for x in self.db.get_collection(src).find( | for x in self.db.get_collection(src).find( | ||||
Show All 26 Lines | ) -> Dict[Sha1Git, Set[RelationData]]: | ||||
RelationData(dst=dst_sha1, path=None) | RelationData(dst=dst_sha1, path=None) | ||||
for dst_obj_id, dst_sha1 in dst_objs.items() | for dst_obj_id, dst_sha1 in dst_objs.items() | ||||
for dst_obj_ref in denorm | for dst_obj_ref in denorm | ||||
if dst_obj_id == dst_obj_ref | if dst_obj_id == dst_obj_ref | ||||
} | } | ||||
for src_sha1, denorm in src_objs.items() | for src_sha1, denorm in src_objs.items() | ||||
} | } | ||||
@statsd.timed( | |||||
metric="swh_provenance_storage_mongodb_accesstime_seconds", | |||||
tags={"method": "with_path"}, | |||||
) | |||||
def with_path(self) -> bool: | def with_path(self) -> bool: | ||||
return True | return True |