Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/mongo/backend.py
Show All 12 Lines | |||||
from bson import ObjectId | from bson import ObjectId | ||||
import mongomock | import mongomock | ||||
import pymongo | import pymongo | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from ..interface import ( | from ..interface import ( | ||||
DirectoryData, | |||||
EntityType, | EntityType, | ||||
ProvenanceResult, | ProvenanceResult, | ||||
ProvenanceStorageInterface, | ProvenanceStorageInterface, | ||||
RelationData, | RelationData, | ||||
RelationType, | RelationType, | ||||
RevisionData, | RevisionData, | ||||
) | ) | ||||
Show All 18 Lines | class ProvenanceStorageMongoDb: | ||||
) -> None: | ) -> None: | ||||
self.close() | self.close() | ||||
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"}) | @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"}) | ||||
def close(self) -> None: | def close(self) -> None: | ||||
self.db.client.close() | self.db.client.close() | ||||
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) | @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) | ||||
def content_add( | def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool: | ||||
self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] | |||||
) -> bool: | |||||
data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts) | |||||
existing = { | existing = { | ||||
x["sha1"]: x | x["sha1"]: x | ||||
for x in self.db.content.find( | for x in self.db.content.find( | ||||
{"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} | {"sha1": {"$in": list(cnts)}}, {"sha1": 1, "ts": 1, "_id": 1} | ||||
) | ) | ||||
} | } | ||||
for sha1, date in data.items(): | for sha1, date in cnts.items(): | ||||
ts = datetime.timestamp(date) if date is not None else None | ts = datetime.timestamp(date) | ||||
if sha1 in existing: | if sha1 in existing: | ||||
cnt = existing[sha1] | cnt = existing[sha1] | ||||
if ts is not None and (cnt["ts"] is None or ts < cnt["ts"]): | if ts < cnt["ts"]: | ||||
self.db.content.update_one( | self.db.content.update_one( | ||||
{"_id": cnt["_id"]}, {"$set": {"ts": ts}} | {"_id": cnt["_id"]}, {"$set": {"ts": ts}} | ||||
) | ) | ||||
else: | else: | ||||
self.db.content.insert_one( | self.db.content.insert_one( | ||||
{ | { | ||||
"sha1": sha1, | "sha1": sha1, | ||||
"ts": ts, | "ts": ts, | ||||
▲ Show 20 Lines • Show All 90 Lines • ▼ Show 20 Lines | ) -> Generator[ProvenanceResult, None, None]: | ||||
) | ) | ||||
yield from sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path)) | yield from sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path)) | ||||
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"}) | @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"}) | ||||
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | ||||
return { | return { | ||||
x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) | x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) | ||||
for x in self.db.content.find( | for x in self.db.content.find( | ||||
{"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, | {"sha1": {"$in": list(ids)}}, {"sha1": 1, "ts": 1, "_id": 0} | ||||
{"sha1": 1, "ts": 1, "_id": 0}, | |||||
) | ) | ||||
} | } | ||||
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) | @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) | ||||
def directory_add( | def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool: | ||||
self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] | |||||
) -> bool: | |||||
data = dirs if isinstance(dirs, dict) else dict.fromkeys(dirs) | |||||
existing = { | existing = { | ||||
x["sha1"]: x | x["sha1"]: x | ||||
for x in self.db.directory.find( | for x in self.db.directory.find( | ||||
{"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} | {"sha1": {"$in": list(dirs)}}, {"sha1": 1, "ts": 1, "flat": 1, "_id": 1} | ||||
) | ) | ||||
} | } | ||||
for sha1, date in data.items(): | for sha1, info in dirs.items(): | ||||
ts = datetime.timestamp(date) if date is not None else None | ts = datetime.timestamp(info.date) | ||||
if sha1 in existing: | if sha1 in existing: | ||||
dir = existing[sha1] | dir = existing[sha1] | ||||
if ts is not None and (dir["ts"] is None or ts < dir["ts"]): | if ts >= dir["ts"]: | ||||
ts = dir["ts"] | |||||
flat = info.flat or dir["flat"] | |||||
if ts != dir["ts"] or flat != dir["flat"]: | |||||
self.db.directory.update_one( | self.db.directory.update_one( | ||||
{"_id": dir["_id"]}, {"$set": {"ts": ts}} | {"_id": dir["_id"]}, {"$set": {"ts": ts, "flat": flat}} | ||||
) | ) | ||||
else: | else: | ||||
self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}}) | self.db.directory.insert_one( | ||||
{"sha1": sha1, "ts": ts, "revision": {}, "flat": info.flat} | |||||
) | |||||
return True | return True | ||||
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"}) | @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"}) | ||||
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]: | ||||
return { | return { | ||||
x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) | x["sha1"]: DirectoryData( | ||||
date=datetime.fromtimestamp(x["ts"], timezone.utc), flat=x["flat"] | |||||
) | |||||
for x in self.db.directory.find( | for x in self.db.directory.find( | ||||
{"sha1": {"$in": list(ids)}, "ts": {"$ne": None}}, | {"sha1": {"$in": list(ids)}}, {"sha1": 1, "ts": 1, "flat": 1, "_id": 0} | ||||
{"sha1": 1, "ts": 1, "_id": 0}, | |||||
) | ) | ||||
} | } | ||||
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"}) | @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"}) | ||||
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: | def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: | ||||
return { | return { | ||||
x["sha1"] | x["sha1"] | ||||
for x in self.db.get_collection(entity.value).find( | for x in self.db.get_collection(entity.value).find( | ||||
▲ Show 20 Lines • Show All 293 Lines • Show Last 20 Lines |