Page Menu
Software Heritage
Configure Global Search
Log In
No One
View File
Edit File
Delete File
View Transforms
Mute Notifications
Award Token
Flag For Later
26 KB
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -26,9 +26,6 @@
ignore_missing_imports = True
-ignore_missing_imports = True
ignore_missing_imports = True
diff --git a/pytest.ini b/pytest.ini
--- a/pytest.ini
+++ b/pytest.ini
@@ -1,8 +1,4 @@
norecursedirs = docs .*
-mongodb_fixture_dir = swh/provenance/tests/data/mongo
-mongodb_engine = mongomock
-mongodb_dbname = test
postgresql_postgres_options = -N 500
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,5 +1,4 @@
swh.loader.git >= 0.8
swh.journal >= 0.8
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,9 +5,7 @@
diff --git a/swh/provenance/ b/swh/provenance/
--- a/swh/provenance/
+++ b/swh/provenance/
@@ -108,12 +108,6 @@
raise_on_commit=raise_on_commit, **kwargs["db"]
- elif cls == "mongodb":
- from .mongo.backend import ProvenanceStorageMongoDb
- engine = kwargs.get("engine", "pymongo")
- return ProvenanceStorageMongoDb(engine=engine, **kwargs["db"])
elif cls == "rabbitmq":
from .api.client import ProvenanceStorageRabbitMQClient
diff --git a/swh/provenance/ b/swh/provenance/
--- a/swh/provenance/
+++ b/swh/provenance/
@@ -52,11 +52,6 @@
# "password": "postgres",
# "dbname": "provenance",
# },
- # Local MongoDB Storage
- # "cls": "mongodb",
- # "db": {
- # "dbname": "provenance",
- # },
# Remote RabbitMQ/PostgreSQL Storage
"cls": "rabbitmq",
"url": "amqp://localhost:5672/%2f",
diff --git a/swh/provenance/mongo/ b/swh/provenance/mongo/
deleted file mode 100644
--- a/swh/provenance/mongo/
+++ /dev/null
@@ -1,44 +0,0 @@
-mongo backend
-Provenance storage implementation using MongoDB
-initial data-model
- id: sha1
- ts: int // optional
- revision: {<ref revision str>: [<ref path>]}
- directory: {<ref directory str>: [<ref path>]}
- id: sha1
- ts: int // optional
- revision: {<ref revision str>: [<ref path>]}
- id: sha1
- ts: int // optional
- preferred <ref origin> // optional
- origin [<ref origin>]
- revision [<ref revisions>]
- id: sha1
- url: str
- path: str
diff --git a/swh/provenance/mongo/ b/swh/provenance/mongo/
deleted file mode 100644
diff --git a/swh/provenance/mongo/ b/swh/provenance/mongo/
deleted file mode 100644
--- a/swh/provenance/mongo/
+++ /dev/null
@@ -1,529 +0,0 @@
-# Copyright (C) 2021-2022 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-from __future__ import annotations
-from datetime import datetime, timezone
-import os
-from types import TracebackType
-from typing import (
- Any,
- Dict,
- Generator,
- Iterable,
- List,
- Optional,
- Set,
- Type,
- Union,
-from bson import ObjectId
-from swh.core.statsd import statsd
-from swh.model.model import Sha1Git
-from ..interface import (
- DirectoryData,
- EntityType,
- ProvenanceResult,
- ProvenanceStorageInterface,
- RelationData,
- RelationType,
- RevisionData,
-STORAGE_DURATION_METRIC = "swh_provenance_storage_mongodb_duration_seconds"
- from pymongo.database import Database
-class ProvenanceStorageMongoDb:
- def __init__(self, engine: str, **kwargs):
- self.engine = engine
- self.dbname = kwargs.pop("dbname")
- self.conn_args = kwargs
- def __enter__(self) -> ProvenanceStorageInterface:
- return self
- def __exit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
- ) -> None:
- self.close()
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"})
- def close(self) -> None:
- self.db.client.close()
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"})
- def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool:
- existing = {
- x["sha1"]: x
- for x in self.db.content.find(
- {"sha1": {"$in": list(cnts)}}, {"sha1": 1, "ts": 1, "_id": 1}
- )
- }
- for sha1, date in cnts.items():
- ts = datetime.timestamp(date)
- if sha1 in existing:
- cnt = existing[sha1]
- if ts < cnt["ts"]:
- self.db.content.update_one(
- {"_id": cnt["_id"]}, {"$set": {"ts": ts}}
- )
- else:
- self.db.content.insert_one(
- {
- "sha1": sha1,
- "ts": ts,
- "revision": {},
- "directory": {},
- }
- )
- return True
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"})
- def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
- # get all the revisions
- # iterate and find the earliest
- content = self.db.content.find_one({"sha1": id})
- if not content:
- return None
- occurs = []
- for revision in self.db.revision.find(
- {"_id": {"$in": [ObjectId(obj_id) for obj_id in content["revision"]]}}
- ):
- if revision["preferred"] is not None:
- origin = self.db.origin.find_one({"sha1": revision["preferred"]})
- else:
- origin = {"url": None}
- assert origin is not None
- for path in content["revision"][str(revision["_id"])]:
- occurs.append(
- ProvenanceResult(
- content=id,
- revision=revision["sha1"],
- date=datetime.fromtimestamp(revision["ts"], timezone.utc),
- origin=origin["url"],
- path=path,
- )
- )
- return sorted(occurs, key=lambda x: (, x.revision, x.origin, x.path))[0]
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"})
- def content_find_all(
- self, id: Sha1Git, limit: Optional[int] = None
- ) -> Generator[ProvenanceResult, None, None]:
- content = self.db.content.find_one({"sha1": id})
- if not content:
- return None
- occurs = []
- for revision in self.db.revision.find(
- {"_id": {"$in": [ObjectId(obj_id) for obj_id in content["revision"]]}}
- ):
- if revision["preferred"] is not None:
- origin = self.db.origin.find_one({"sha1": revision["preferred"]})
- else:
- origin = {"url": None}
- assert origin is not None
- for path in content["revision"][str(revision["_id"])]:
- occurs.append(
- ProvenanceResult(
- content=id,
- revision=revision["sha1"],
- date=datetime.fromtimestamp(revision["ts"], timezone.utc),
- origin=origin["url"],
- path=path,
- )
- )
- for directory in
- {"_id": {"$in": [ObjectId(obj_id) for obj_id in content["directory"]]}}
- ):
- for revision in self.db.revision.find(
- {"_id": {"$in": [ObjectId(obj_id) for obj_id in directory["revision"]]}}
- ):
- if revision["preferred"] is not None:
- origin = self.db.origin.find_one({"sha1": revision["preferred"]})
- else:
- origin = {"url": None}
- assert origin is not None
- for suffix in content["directory"][str(directory["_id"])]:
- for prefix in directory["revision"][str(revision["_id"])]:
- path = (
- os.path.join(prefix, suffix)
- if prefix not in [b".", b""]
- else suffix
- )
- occurs.append(
- ProvenanceResult(
- content=id,
- revision=revision["sha1"],
- date=datetime.fromtimestamp(
- revision["ts"], timezone.utc
- ),
- origin=origin["url"],
- path=path,
- )
- )
- yield from sorted(occurs, key=lambda x: (, x.revision, x.origin, x.path))
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"})
- def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
- return {
- x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
- for x in self.db.content.find(
- {"sha1": {"$in": list(ids)}}, {"sha1": 1, "ts": 1, "_id": 0}
- )
- }
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"})
- def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool:
- existing = {
- x["sha1"]: x
- for x in
- {"sha1": {"$in": list(dirs)}}, {"sha1": 1, "ts": 1, "flat": 1, "_id": 1}
- )
- }
- for sha1, info in dirs.items():
- ts = datetime.timestamp(
- if sha1 in existing:
- dir = existing[sha1]
- if ts >= dir["ts"]:
- ts = dir["ts"]
- flat = info.flat or dir["flat"]
- if ts != dir["ts"] or flat != dir["flat"]:
- {"_id": dir["_id"]}, {"$set": {"ts": ts, "flat": flat}}
- )
- else:
- {"sha1": sha1, "ts": ts, "revision": {}, "flat": info.flat}
- )
- return True
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"})
- def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]:
- return {
- x["sha1"]: DirectoryData(
- date=datetime.fromtimestamp(x["ts"], timezone.utc), flat=x["flat"]
- )
- for x in
- {"sha1": {"$in": list(ids)}}, {"sha1": 1, "ts": 1, "flat": 1, "_id": 0}
- )
- }
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"})
- def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
- return {
- x["sha1"]
- for x in self.db.get_collection(entity.value).find(
- {}, {"sha1": 1, "_id": 0}
- )
- }
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"})
- def location_add(self, paths: Iterable[bytes]) -> bool:
- # TODO: implement this methods if path are to be stored in a separate collection
- return True
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"})
- def location_get_all(self) -> Set[bytes]:
- contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1})
- paths: List[Iterable[bytes]] = []
- for content in contents:
- paths.extend(value for _, value in content["revision"].items())
- paths.extend(value for _, value in content["directory"].items())
- dirs ={}, {"revision": 1, "_id": 0})
- for each_dir in dirs:
- paths.extend(value for _, value in each_dir["revision"].items())
- return set(sum(paths, []))
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"})
- def open(self) -> None:
- if self.engine == "mongomock":
- from mongomock import MongoClient as MongoClient
- else: # assume real MongoDB server by default
- from pymongo import MongoClient
- self.db: Database = MongoClient(**self.conn_args).get_database(self.dbname)
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"})
- def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
- existing = {
- x["sha1"]: x
- for x in self.db.origin.find(
- {"sha1": {"$in": list(orgs)}}, {"sha1": 1, "url": 1, "_id": 1}
- )
- }
- for sha1, url in orgs.items():
- if sha1 not in existing:
- # add new origin
- self.db.origin.insert_one({"sha1": sha1, "url": url})
- return True
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"})
- def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
- return {
- x["sha1"]: x["url"]
- for x in self.db.origin.find(
- {"sha1": {"$in": list(ids)}}, {"sha1": 1, "url": 1, "_id": 0}
- )
- }
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"})
- def revision_add(
- self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]
- ) -> bool:
- data = (
- revs
- if isinstance(revs, dict)
- else dict.fromkeys(revs, RevisionData(date=None, origin=None))
- )
- existing = {
- x["sha1"]: x
- for x in self.db.revision.find(
- {"sha1": {"$in": list(data)}},
- {"sha1": 1, "ts": 1, "preferred": 1, "_id": 1},
- )
- }
- for sha1, info in data.items():
- ts = datetime.timestamp( if is not None else None
- preferred = info.origin
- if sha1 in existing:
- rev = existing[sha1]
- if ts is None or (rev["ts"] is not None and ts >= rev["ts"]):
- ts = rev["ts"]
- if preferred is None:
- preferred = rev["preferred"]
- if ts != rev["ts"] or preferred != rev["preferred"]:
- self.db.revision.update_one(
- {"_id": rev["_id"]},
- {"$set": {"ts": ts, "preferred": preferred}},
- )
- else:
- self.db.revision.insert_one(
- {
- "sha1": sha1,
- "preferred": preferred,
- "origin": [],
- "revision": [],
- "ts": ts,
- }
- )
- return True
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"})
- def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
- return {
- x["sha1"]: RevisionData(
- date=datetime.fromtimestamp(x["ts"], timezone.utc) if x["ts"] else None,
- origin=x["preferred"],
- )
- for x in self.db.revision.find(
- {
- "sha1": {"$in": list(ids)},
- "$or": [{"preferred": {"$ne": None}}, {"ts": {"$ne": None}}],
- },
- {"sha1": 1, "preferred": 1, "ts": 1, "_id": 0},
- )
- }
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"})
- def relation_add(
- self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
- ) -> bool:
- src_relation, *_, dst_relation = relation.value.split("_")
- dst_objs = {
- x["sha1"]: x["_id"]
- for x in self.db.get_collection(dst_relation).find(
- {
- "sha1": {
- "$in": list({rel.dst for rels in data.values() for rel in rels})
- }
- },
- {"_id": 1, "sha1": 1},
- )
- }
- denorm: Dict[Sha1Git, Any] = {}
- for src, rels in data.items():
- for rel in rels:
- if src_relation != "revision":
- denorm.setdefault(src, {}).setdefault(
- str(dst_objs[rel.dst]), []
- ).append(rel.path)
- else:
- denorm.setdefault(src, []).append(dst_objs[rel.dst])
- src_objs = {
- x["sha1"]: x
- for x in self.db.get_collection(src_relation).find(
- {"sha1": {"$in": list(denorm.keys())}}
- )
- }
- for sha1, dsts in denorm.items():
- # update
- if src_relation != "revision":
- k = {
- obj_id: list(set(paths + dsts.get(obj_id, [])))
- for obj_id, paths in src_objs[sha1][dst_relation].items()
- }
- self.db.get_collection(src_relation).update_one(
- {"_id": src_objs[sha1]["_id"]},
- {"$set": {dst_relation: dict(dsts, **k)}},
- )
- else:
- self.db.get_collection(src_relation).update_one(
- {"_id": src_objs[sha1]["_id"]},
- {
- "$set": {
- dst_relation: list(set(src_objs[sha1][dst_relation] + dsts))
- }
- },
- )
- return True
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"})
- def relation_get(
- self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
- ) -> Dict[Sha1Git, Set[RelationData]]:
- src, *_, dst = relation.value.split("_")
- sha1s = set(ids)
- if not reverse:
- empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else []
- src_objs = {
- x["sha1"]: x[dst]
- for x in self.db.get_collection(src).find(
- {"sha1": {"$in": list(sha1s)}, dst: {"$ne": empty}},
- {"_id": 0, "sha1": 1, dst: 1},
- )
- }
- dst_ids = list(
- {ObjectId(obj_id) for _, value in src_objs.items() for obj_id in value}
- )
- dst_objs = {
- x["sha1"]: x["_id"]
- for x in self.db.get_collection(dst).find(
- {"_id": {"$in": dst_ids}}, {"_id": 1, "sha1": 1}
- )
- }
- if src != "revision":
- return {
- src_sha1: {
- RelationData(dst=dst_sha1, path=path)
- for dst_sha1, dst_obj_id in dst_objs.items()
- for dst_obj_str, paths in denorm.items()
- for path in paths
- if dst_obj_id == ObjectId(dst_obj_str)
- }
- for src_sha1, denorm in src_objs.items()
- }
- else:
- return {
- src_sha1: {
- RelationData(dst=dst_sha1, path=None)
- for dst_sha1, dst_obj_id in dst_objs.items()
- for dst_obj_ref in denorm
- if dst_obj_id == dst_obj_ref
- }
- for src_sha1, denorm in src_objs.items()
- }
- else:
- dst_objs = {
- x["sha1"]: x["_id"]
- for x in self.db.get_collection(dst).find(
- {"sha1": {"$in": list(sha1s)}}, {"_id": 1, "sha1": 1}
- )
- }
- src_objs = {
- x["sha1"]: x[dst]
- for x in self.db.get_collection(src).find(
- {}, {"_id": 0, "sha1": 1, dst: 1}
- )
- }
- result: Dict[Sha1Git, Set[RelationData]] = {}
- if src != "revision":
- for dst_sha1, dst_obj_id in dst_objs.items():
- for src_sha1, denorm in src_objs.items():
- for dst_obj_str, paths in denorm.items():
- if dst_obj_id == ObjectId(dst_obj_str):
- result.setdefault(src_sha1, set()).update(
- RelationData(dst=dst_sha1, path=path)
- for path in paths
- )
- else:
- for dst_sha1, dst_obj_id in dst_objs.items():
- for src_sha1, denorm in src_objs.items():
- if dst_obj_id in {
- ObjectId(dst_obj_str) for dst_obj_str in denorm
- }:
- result.setdefault(src_sha1, set()).add(
- RelationData(dst=dst_sha1, path=None)
- )
- return result
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get_all"})
- def relation_get_all(
- self, relation: RelationType
- ) -> Dict[Sha1Git, Set[RelationData]]:
- src, *_, dst = relation.value.split("_")
- empty: Union[Dict[str, bytes], List[str]] = {} if src != "revision" else []
- src_objs = {
- x["sha1"]: x[dst]
- for x in self.db.get_collection(src).find(
- {dst: {"$ne": empty}}, {"_id": 0, "sha1": 1, dst: 1}
- )
- }
- dst_ids = list(
- {ObjectId(obj_id) for _, value in src_objs.items() for obj_id in value}
- )
- dst_objs = {
- x["_id"]: x["sha1"]
- for x in self.db.get_collection(dst).find(
- {"_id": {"$in": dst_ids}}, {"_id": 1, "sha1": 1}
- )
- }
- if src != "revision":
- return {
- src_sha1: {
- RelationData(dst=dst_sha1, path=path)
- for dst_obj_id, dst_sha1 in dst_objs.items()
- for dst_obj_str, paths in denorm.items()
- for path in paths
- if dst_obj_id == ObjectId(dst_obj_str)
- }
- for src_sha1, denorm in src_objs.items()
- }
- else:
- return {
- src_sha1: {
- RelationData(dst=dst_sha1, path=None)
- for dst_obj_id, dst_sha1 in dst_objs.items()
- for dst_obj_ref in denorm
- if dst_obj_id == dst_obj_ref
- }
- for src_sha1, denorm in src_objs.items()
- }
- @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "with_path"})
- def with_path(self) -> bool:
- return True
diff --git a/swh/provenance/tests/ b/swh/provenance/tests/
--- a/swh/provenance/tests/
+++ b/swh/provenance/tests/
@@ -8,7 +8,6 @@
from typing import Any, Dict, Generator, List
from _pytest.fixtures import SubRequest
-import mongomock.database
import msgpack
import psycopg2.extensions
import pytest
@@ -49,26 +48,14 @@
return postgresql.get_dsn_parameters()
-@pytest.fixture(params=["mongodb", "postgresql", "rabbitmq"])
+@pytest.fixture(params=["postgresql", "rabbitmq"])
def provenance_storage(
request: SubRequest,
provenance_postgresqldb: Dict[str, str],
- mongodb: mongomock.database.Database,
) -> Generator[ProvenanceStorageInterface, None, None]:
"""Return a working and initialized ProvenanceStorageInterface object"""
- if request.param == "mongodb":
- mongodb_params = {
- "host": mongodb.client.address[0],
- "port": mongodb.client.address[1],
- "dbname":,
- }
- with get_provenance_storage(
- cls=request.param, db=mongodb_params, engine="mongomock"
- ) as storage:
- yield storage
- elif request.param == "rabbitmq":
+ if request.param == "rabbitmq":
from swh.provenance.api.server import ProvenanceStorageRabbitMQServer
rabbitmq = request.getfixturevalue("rabbitmq")
@@ -77,7 +64,7 @@
rabbitmq_params: Dict[str, Any] = {
"url": f"amqp://guest:guest@{host}:{port}/%2f",
"storage_config": {
- "cls": "postgresql", # TODO: also test with underlying mongodb storage
+ "cls": "postgresql",
"db": provenance_postgresqldb,
"raise_on_commit": True,
diff --git a/swh/provenance/tests/ b/swh/provenance/tests/
--- a/swh/provenance/tests/
+++ b/swh/provenance/tests/
@@ -22,7 +22,6 @@
from swh.provenance.model import OriginEntry, RevisionEntry
-from swh.provenance.mongo.backend import ProvenanceStorageMongoDb
from swh.provenance.origin import origin_add
from swh.provenance.provenance import Provenance
from swh.provenance.revision import revision_add
@@ -97,10 +96,6 @@
paths = {entry["name"] for dir in data["directory"] for entry in dir["entries"]}
assert provenance_storage.location_add(paths)
- if isinstance(provenance_storage, ProvenanceStorageMongoDb):
- # TODO: remove this when `location_add` is properly implemented for MongoDb.
- return
if provenance_storage.with_path():
assert provenance_storage.location_get_all() == paths
File Metadata
Mime Type
Dec 21 2024, 12:18 PM (11 w, 4 d ago)
Storage Engine
Storage Format
Raw Data
Storage Handle
Attached To
D8099: Remove unused mongo backend
Event Timeline
Log In to Comment