diff --git a/swh/provenance/interface.py b/swh/provenance/interface.py --- a/swh/provenance/interface.py +++ b/swh/provenance/interface.py @@ -6,7 +6,7 @@ from dataclasses import dataclass from datetime import datetime import enum -from typing import Dict, Generator, Iterable, Optional, Set +from typing import Dict, Generator, Iterable, Optional, Set, Union from typing_extensions import Protocol, runtime_checkable @@ -66,6 +66,16 @@ @runtime_checkable class ProvenanceStorageInterface(Protocol): + @remote_api_endpoint("content_add") + def content_add( + self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]] + ) -> bool: + """Add blobs identified by sha1 ids, with an optional associated date (as paired + in `cnts`) to the provenance storage. Return a boolean stating whether the + information was successfully stored. + """ + ... + @remote_api_endpoint("content_find_first") def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" @@ -78,13 +88,6 @@ """Retrieve all the occurrences of the blob identified by `id`.""" ... - @remote_api_endpoint("content_set_date") - def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return - a boolean stating whether the information was successfully stored. - """ - ... - @remote_api_endpoint("content_get") def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: """Retrieve the associated date for each blob sha1 in `ids`. If some blob has @@ -92,11 +95,13 @@ """ ... - @remote_api_endpoint("directory_set_date") - def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to directories identified by sha1 ids, as paired in - `dates`. Return a boolean stating whether the information was successfully - stored. + @remote_api_endpoint("directory_add") + def directory_add( + self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]] + ) -> bool: + """Add directories identified by sha1 ids, with an optional associated date (as + paired in `dirs`) to the provenance storage. Return a boolean stating if the + information was successfully stored. """ ... @@ -114,37 +119,36 @@ """ ... - @remote_api_endpoint("location_get") - def location_get(self) -> Set[bytes]: + @remote_api_endpoint("location_add") + def location_add(self, paths: Iterable[bytes]) -> bool: + """Register the given `paths` in the storage.""" + ... + + @remote_api_endpoint("location_get_all") + def location_get_all(self) -> Set[bytes]: """Retrieve all paths present in the provenance model.""" ... - @remote_api_endpoint("origin_set_url") - def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: - """Associate urls to origins identified by sha1 ids, as paired in `urls`. Return - a boolean stating whether the information was successfully stored. + @remote_api_endpoint("origin_add") + def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: + """Add origins identified by sha1 ids, with their corresponding url (as paired + in `orgs`) to the provenance storage. Return a boolean stating if the + information was successfully stored. """ ... @remote_api_endpoint("origin_get") def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: - """Retrieve the associated url for each origin sha1 in `ids`. If some origin has - no associated date, it is not present in the resulting dictionary. - """ - ... - - @remote_api_endpoint("revision_set_date") - def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to revisions identified by sha1 ids, as paired in `dates`. - Return a boolean stating whether the information was successfully stored. - """ + """Retrieve the associated url for each origin sha1 in `ids`.""" ... - @remote_api_endpoint("revision_set_origin") - def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: - """Associate origins to revisions identified by sha1 ids, as paired in - `origins` (revision ids are keys and origin ids, values). Return a boolean - stating whether the information was successfully stored. + @remote_api_endpoint("revision_add") + def revision_add( + self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] + ) -> bool: + """Add revisions identified by sha1 ids, with optional associated date or origin + (as paired in `revs`) to the provenance storage. Return a boolean stating if the + information was successfully stored. """ ... @@ -160,7 +164,10 @@ def relation_add( self, relation: RelationType, data: Iterable[RelationData] ) -> bool: - """Add entries in the selected `relation`.""" + """Add entries in the selected `relation`. This method assumes all entities + being related are already registered in the storage. See `content_add`, + `directory_add`, `origin_add`, and `revision_add`. + """ ... @remote_api_endpoint("relation_get") diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py --- a/swh/provenance/mongo/backend.py +++ b/swh/provenance/mongo/backend.py @@ -5,7 +5,7 @@ from datetime import datetime, timezone import os -from typing import Any, Dict, Generator, Iterable, List, Optional, Set +from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Union from bson import ObjectId import pymongo.database @@ -25,6 +25,35 @@ def __init__(self, db: pymongo.database.Database): self.db = db + def content_add( + self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]] + ) -> bool: + data = cnts if isinstance(cnts, dict) else dict.fromkeys(cnts) + existing = { + x["sha1"]: x + for x in self.db.content.find( + {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} + ) + } + for sha1, date in data.items(): + ts = datetime.timestamp(date) if date is not None else None + if sha1 in existing: + cnt = existing[sha1] + if ts is not None and (cnt["ts"] is None or ts < cnt["ts"]): + self.db.content.update_one( + {"_id": cnt["_id"]}, {"$set": {"ts": ts}} + ) + else: + self.db.content.insert_one( + { + "sha1": sha1, + "ts": ts, + "revision": {}, + "directory": {}, + } + ) + return True + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: # get all the revisions # iterate and find the earliest @@ -36,8 +65,10 @@ for revision in self.db.revision.find( {"_id": {"$in": [ObjectId(obj_id) for obj_id in content["revision"]]}} ): - origin = self.db.origin.find_one({"sha1": revision["preferred"]}) - assert origin is not None + if revision["preferred"] is not None: + origin = self.db.origin.find_one({"sha1": revision["preferred"]}) + else: + origin = {"url": None} for path in content["revision"][str(revision["_id"])]: occurs.append( @@ -62,8 +93,10 @@ for revision in self.db.revision.find( {"_id": {"$in": [ObjectId(obj_id) for obj_id in content["revision"]]}} ): - origin = self.db.origin.find_one({"sha1": revision["preferred"]}) - assert origin is not None + if revision["preferred"] is not None: + origin = self.db.origin.find_one({"sha1": revision["preferred"]}) + else: + origin = {"url": None} for path in content["revision"][str(revision["_id"])]: occurs.append( @@ -81,8 +114,10 @@ for revision in self.db.revision.find( {"_id": {"$in": [ObjectId(obj_id) for obj_id in directory["revision"]]}} ): - origin = self.db.origin.find_one({"sha1": revision["preferred"]}) - assert origin is not None + if revision["preferred"] is not None: + origin = self.db.origin.find_one({"sha1": revision["preferred"]}) + else: + origin = {"url": None} for suffix in content["directory"][str(directory["_id"])]: for prefix in directory["revision"][str(revision["_id"])]: @@ -113,52 +148,25 @@ ) } - def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - # get all the docuemtns with the id, add date, add missing records - cnts = { - x["sha1"]: x - for x in self.db.content.find( - {"sha1": {"$in": list(dates)}}, {"sha1": 1, "ts": 1, "_id": 1} - ) - } - - for sha1, date in dates.items(): - ts = datetime.timestamp(date) - if sha1 in cnts: - # update - if cnts[sha1]["ts"] is None or ts < cnts[sha1]["ts"]: - self.db.content.update_one( - {"_id": cnts[sha1]["_id"]}, {"$set": {"ts": ts}} - ) - else: - # add new content - self.db.content.insert_one( - { - "sha1": sha1, - "ts": ts, - "revision": {}, - "directory": {}, - } - ) - return True - - def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - dirs = { + def directory_add( + self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]] + ) -> bool: + data = dirs if isinstance(dirs, dict) else dict.fromkeys(dirs) + existing = { x["sha1"]: x for x in self.db.directory.find( - {"sha1": {"$in": list(dates)}}, {"sha1": 1, "ts": 1, "_id": 1} + {"sha1": {"$in": list(data)}}, {"sha1": 1, "ts": 1, "_id": 1} ) } - for sha1, date in dates.items(): - ts = datetime.timestamp(date) - if sha1 in dirs: - # update - if dirs[sha1]["ts"] is None or ts < dirs[sha1]["ts"]: + for sha1, date in data.items(): + ts = datetime.timestamp(date) if date is not None else None + if sha1 in existing: + dir = existing[sha1] + if ts is not None and (dir["ts"] is None or ts < dir["ts"]): self.db.directory.update_one( - {"_id": dirs[sha1]["_id"]}, {"$set": {"ts": ts}} + {"_id": dir["_id"]}, {"$set": {"ts": ts}} ) else: - # add new dir self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}}) return True @@ -179,7 +187,11 @@ ) } - def location_get(self) -> Set[bytes]: + def location_add(self, paths: Iterable[bytes]) -> bool: + # TODO: implement this methods if path are to be stored in a separate collection + return True + + def location_get_all(self) -> Set[bytes]: contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1}) paths: List[Iterable[bytes]] = [] for content in contents: @@ -191,15 +203,15 @@ paths.extend(value for _, value in each_dir["revision"].items()) return set(sum(paths, [])) - def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: - origins = { + def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: + existing = { x["sha1"]: x for x in self.db.origin.find( - {"sha1": {"$in": list(urls)}}, {"sha1": 1, "url": 1, "_id": 1} + {"sha1": {"$in": list(orgs)}}, {"sha1": 1, "url": 1, "_id": 1} ) } - for sha1, url in urls.items(): - if sha1 not in origins: + for sha1, url in orgs.items(): + if sha1 not in existing: # add new origin self.db.origin.insert_one({"sha1": sha1, "url": url}) return True @@ -212,27 +224,40 @@ ) } - def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - revs = { + def revision_add( + self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] + ) -> bool: + data = ( + revs + if isinstance(revs, dict) + else dict.fromkeys(revs, RevisionData(date=None, origin=None)) + ) + existing = { x["sha1"]: x for x in self.db.revision.find( - {"sha1": {"$in": list(dates)}}, {"sha1": 1, "ts": 1, "_id": 1} + {"sha1": {"$in": list(data)}}, + {"sha1": 1, "ts": 1, "preferred": 1, "_id": 1}, ) } - for sha1, date in dates.items(): - ts = datetime.timestamp(date) - if sha1 in revs: - # update - if revs[sha1]["ts"] is None or ts < revs[sha1]["ts"]: + for sha1, info in data.items(): + ts = datetime.timestamp(info.date) if info.date is not None else None + preferred = info.origin + if sha1 in existing: + rev = existing[sha1] + if ts is None or (rev["ts"] is not None and ts >= rev["ts"]): + ts = rev["ts"] + if preferred is None: + preferred = rev["preferred"] + if ts != rev["ts"] or preferred != rev["preferred"]: self.db.revision.update_one( - {"_id": revs[sha1]["_id"]}, {"$set": {"ts": ts}} + {"_id": rev["_id"]}, + {"$set": {"ts": ts, "preferred": preferred}}, ) else: - # add new rev self.db.revision.insert_one( { "sha1": sha1, - "preferred": None, + "preferred": preferred, "origin": [], "revision": [], "ts": ts, @@ -240,31 +265,6 @@ ) return True - def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: - revs = { - x["sha1"]: x - for x in self.db.revision.find( - {"sha1": {"$in": list(origins)}}, {"sha1": 1, "preferred": 1, "_id": 1} - ) - } - for sha1, origin in origins.items(): - if sha1 in revs: - self.db.revision.update_one( - {"_id": revs[sha1]["_id"]}, {"$set": {"preferred": origin}} - ) - else: - # add new rev - self.db.revision.insert_one( - { - "sha1": sha1, - "preferred": origin, - "origin": [], - "revision": [], - "ts": None, - } - ) - return True - def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: return { x["sha1"]: RevisionData( @@ -272,7 +272,10 @@ origin=x["preferred"], ) for x in self.db.revision.find( - {"sha1": {"$in": list(ids)}}, + { + "sha1": {"$in": list(ids)}, + "$or": [{"preferred": {"$ne": None}}, {"ts": {"$ne": None}}], + }, {"sha1": 1, "preferred": 1, "ts": 1, "_id": 0}, ) } @@ -283,40 +286,10 @@ src_relation, *_, dst_relation = relation.value.split("_") set_data = set(data) - dst_sha1s = {x.dst for x in set_data} - if dst_relation in ["content", "directory", "revision"]: - dst_obj: Dict[str, Any] = {"ts": None} - if dst_relation == "content": - dst_obj["revision"] = {} - dst_obj["directory"] = {} - if dst_relation == "directory": - dst_obj["revision"] = {} - if dst_relation == "revision": - dst_obj["preferred"] = None - dst_obj["origin"] = [] - dst_obj["revision"] = [] - - existing = { - x["sha1"] - for x in self.db.get_collection(dst_relation).find( - {"sha1": {"$in": list(dst_sha1s)}}, {"_id": 0, "sha1": 1} - ) - } - - for sha1 in dst_sha1s: - if sha1 not in existing: - self.db.get_collection(dst_relation).insert_one( - dict(dst_obj, **{"sha1": sha1}) - ) - elif dst_relation == "origin": - # TODO, check origins are already in the DB - # if not, algo has something wrong (algo inserts it initially) - pass - dst_objs = { x["sha1"]: x["_id"] for x in self.db.get_collection(dst_relation).find( - {"sha1": {"$in": list(dst_sha1s)}}, {"_id": 1, "sha1": 1} + {"sha1": {"$in": [x.dst for x in set_data]}}, {"_id": 1, "sha1": 1} ) } @@ -337,42 +310,24 @@ } for sha1, dsts in denorm.items(): - if sha1 in src_objs: - # update - if src_relation != "revision": - k = { - obj_id: list(set(paths + dsts.get(obj_id, []))) - for obj_id, paths in src_objs[sha1][dst_relation].items() - } - self.db.get_collection(src_relation).update_one( - {"_id": src_objs[sha1]["_id"]}, - {"$set": {dst_relation: dict(dsts, **k)}}, - ) - else: - self.db.get_collection(src_relation).update_one( - {"_id": src_objs[sha1]["_id"]}, - { - "$set": { - dst_relation: list( - set(src_objs[sha1][dst_relation] + dsts) - ) - } - }, - ) + # update + if src_relation != "revision": + k = { + obj_id: list(set(paths + dsts.get(obj_id, []))) + for obj_id, paths in src_objs[sha1][dst_relation].items() + } + self.db.get_collection(src_relation).update_one( + {"_id": src_objs[sha1]["_id"]}, + {"$set": {dst_relation: dict(dsts, **k)}}, + ) else: - # add new rev - src_obj: Dict[str, Any] = {"ts": None} - if src_relation == "content": - src_obj["revision"] = {} - src_obj["directory"] = {} - if src_relation == "directory": - src_obj["revision"] = {} - if src_relation == "revision": - src_obj["preferred"] = None - src_obj["origin"] = [] - src_obj["revision"] = [] - self.db.get_collection(src_relation).insert_one( - dict(src_obj, **{"sha1": sha1, dst_relation: dsts}) + self.db.get_collection(src_relation).update_one( + {"_id": src_objs[sha1]["_id"]}, + { + "$set": { + dst_relation: list(set(src_objs[sha1][dst_relation] + dsts)) + } + }, ) return True diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py --- a/swh/provenance/postgresql/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -7,7 +7,7 @@ from datetime import datetime import itertools import logging -from typing import Dict, Generator, Iterable, List, Optional, Set +from typing import Dict, Generator, Iterable, List, Optional, Set, Union import psycopg2.extensions import psycopg2.extras @@ -60,6 +60,11 @@ def denormalized(self) -> bool: return "denormalized" in self.flavor + def content_add( + self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]] + ) -> bool: + return self._entity_set_date("content", cnts) + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: sql = "SELECT * FROM swh_provenance_content_find_first(%s)" with self.transaction(readonly=True) as cursor: @@ -75,14 +80,13 @@ cursor.execute(query=sql, vars=(id, limit)) yield from (ProvenanceResult(**row) for row in cursor) - def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - return self._entity_set_date("content", dates) - def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("content", ids) - def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - return self._entity_set_date("directory", dates) + def directory_add( + self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]] + ) -> bool: + return self._entity_set_date("directory", dirs) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("directory", ids) @@ -92,21 +96,41 @@ cursor.execute(f"SELECT sha1 FROM {entity.value}") return {row["sha1"] for row in cursor} - def location_get(self) -> Set[bytes]: + def location_add(self, paths: Iterable[bytes]) -> bool: + if not self.with_path(): + return True + try: + values = [(path,) for path in paths] + if values: + sql = """ + INSERT INTO location(path) VALUES %s + ON CONFLICT DO NOTHING + """ + with self.transaction() as cursor: + psycopg2.extras.execute_values(cursor, sql, argslist=values) + return True + except: # noqa: E722 + # Unexpected error occurred, rollback all changes and log message + LOGGER.exception("Unexpected error") + if self.raise_on_commit: + raise + return False + + def location_get_all(self) -> Set[bytes]: with self.transaction(readonly=True) as cursor: cursor.execute("SELECT location.path AS path FROM location") return {row["path"] for row in cursor} - def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: + def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: try: - if urls: + if orgs: sql = """ INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """ with self.transaction() as cursor: psycopg2.extras.execute_values( - cur=cursor, sql=sql, argslist=urls.items() + cur=cursor, sql=sql, argslist=orgs.items() ) return True except: # noqa: E722 @@ -132,24 +156,27 @@ urls.update((row["sha1"], row["url"]) for row in cursor) return urls - def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - return self._entity_set_date("revision", dates) - - def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: + def revision_add( + self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] + ) -> bool: + if isinstance(revs, dict): + data = [(sha1, rev.date, rev.origin) for sha1, rev in revs.items()] + else: + data = [(sha1, None, None) for sha1 in revs] try: - if origins: + if data: sql = """ - INSERT INTO revision(sha1, origin) - (SELECT V.rev AS sha1, O.id AS origin - FROM (VALUES %s) AS V(rev, org) - JOIN origin AS O ON (O.sha1=V.org)) + INSERT INTO revision(sha1, date, origin) + (SELECT V.rev AS sha1, V.date::timestamptz AS date, O.id AS origin + FROM (VALUES %s) AS V(rev, date, org) + LEFT JOIN origin AS O ON (O.sha1=V.org::sha1_git)) ON CONFLICT (sha1) DO - UPDATE SET origin=EXCLUDED.origin + UPDATE SET + date=LEAST(EXCLUDED.date, revision.date), + origin=COALESCE(EXCLUDED.origin, revision.origin) """ with self.transaction() as cursor: - psycopg2.extras.execute_values( - cur=cursor, sql=sql, argslist=origins.items() - ) + psycopg2.extras.execute_values(cur=cursor, sql=sql, argslist=data) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message @@ -169,6 +196,7 @@ FROM revision AS R LEFT JOIN origin AS O ON (O.id=R.origin) WHERE R.sha1 IN ({values}) + AND (R.date is not NULL OR O.sha1 is not NULL) """ with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=sha1s) @@ -181,38 +209,12 @@ def relation_add( self, relation: RelationType, data: Iterable[RelationData] ) -> bool: + rows = [(rel.src, rel.dst, rel.path) for rel in data] try: - rows = [(rel.src, rel.dst, rel.path) for rel in data] if rows: rel_table = relation.value src_table, *_, dst_table = rel_table.split("_") - if src_table != "origin": - # Origin entries should be inserted previously as they require extra - # non-null information - srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) - sql = f""" - INSERT INTO {src_table}(sha1) VALUES %s - ON CONFLICT DO NOTHING - """ - with self.transaction() as cursor: - psycopg2.extras.execute_values( - cur=cursor, sql=sql, argslist=srcs - ) - - if dst_table != "origin": - # Origin entries should be inserted previously as they require extra - # non-null information - dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) - sql = f""" - INSERT INTO {dst_table}(sha1) VALUES %s - ON CONFLICT DO NOTHING - """ - with self.transaction() as cursor: - psycopg2.extras.execute_values( - cur=cursor, sql=sql, argslist=dsts - ) - # Put the next three queries in a manual single transaction: # they use the same temp table with self.transaction() as cursor: @@ -263,9 +265,10 @@ def _entity_set_date( self, - entity: Literal["content", "directory", "revision"], - data: Dict[Sha1Git, datetime], + entity: Literal["content", "directory"], + dates: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]], ) -> bool: + data = dates if isinstance(dates, dict) else dict.fromkeys(dates) try: if data: sql = f""" diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -17,6 +17,7 @@ ProvenanceStorageInterface, RelationData, RelationType, + RevisionData, ) from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry @@ -79,6 +80,49 @@ def flush(self) -> None: # Revision-content layer insertions ############################################ + # After relations, dates for the entities can be safely set, acknowledging that + # these entities won't need to be reprocessed in case of failure. + cnts = { + src + for src, _, _ in self.cache["content_in_revision"] + | self.cache["content_in_directory"] + } + if cnts: + while not self.storage.content_add(cnts): + LOGGER.warning( + "Unable to write content entities to the storage. Retrying..." + ) + + dirs = {dst for _, dst, _ in self.cache["content_in_directory"]} + if dirs: + while not self.storage.directory_add(dirs): + LOGGER.warning( + "Unable to write directory entities to the storage. Retrying..." + ) + + revs = { + dst + for _, dst, _ in self.cache["content_in_revision"] + | self.cache["directory_in_revision"] + } + if revs: + while not self.storage.revision_add(revs): + LOGGER.warning( + "Unable to write revision entities to the storage. Retrying..." + ) + + paths = { + path + for _, _, path in self.cache["content_in_revision"] + | self.cache["content_in_directory"] + | self.cache["directory_in_revision"] + } + if paths: + while not self.storage.location_add(paths): + LOGGER.warning( + "Unable to write locations entities to the storage. Retrying..." + ) + # For this layer, relations need to be inserted first so that, in case of # failure, reprocessing the input does not generated an inconsistent database. if self.cache["content_in_revision"]: @@ -122,54 +166,72 @@ # After relations, dates for the entities can be safely set, acknowledging that # these entities won't need to be reprocessed in case of failure. - dates = { + cnt_dates = { sha1: date for sha1, date in self.cache["content"]["data"].items() if sha1 in self.cache["content"]["added"] and date is not None } - if dates: - while not self.storage.content_set_date(dates): + if cnt_dates: + while not self.storage.content_add(cnt_dates): LOGGER.warning( "Unable to write content dates to the storage. Retrying..." ) - dates = { + dir_dates = { sha1: date for sha1, date in self.cache["directory"]["data"].items() if sha1 in self.cache["directory"]["added"] and date is not None } - if dates: - while not self.storage.directory_set_date(dates): + if dir_dates: + while not self.storage.directory_add(dir_dates): LOGGER.warning( "Unable to write directory dates to the storage. Retrying..." ) - dates = { - sha1: date + rev_dates = { + sha1: RevisionData(date=date, origin=None) for sha1, date in self.cache["revision"]["data"].items() if sha1 in self.cache["revision"]["added"] and date is not None } - if dates: - while not self.storage.revision_set_date(dates): + if rev_dates: + while not self.storage.revision_add(rev_dates): LOGGER.warning( "Unable to write revision dates to the storage. Retrying..." ) # Origin-revision layer insertions ############################################# - # Origins urls should be inserted first so that internal ids' resolution works - # properly. + # Origins and revisions should be inserted first so that internal ids' + # resolution works properly. urls = { sha1: url for sha1, url in self.cache["origin"]["data"].items() if sha1 in self.cache["origin"]["added"] } if urls: - while not self.storage.origin_set_url(urls): + while not self.storage.origin_add(urls): LOGGER.warning( "Unable to write origins urls to the storage. Retrying..." ) + rev_orgs = { + # Destinations in this relation should match origins in the next one + **{ + src: RevisionData(date=None, origin=None) + for src in self.cache["revision_before_revision"] + }, + **{ + # This relation comes second so that non-None origins take precedence + src: RevisionData(date=None, origin=org) + for src, org in self.cache["revision_in_origin"] + }, + } + if rev_orgs: + while not self.storage.revision_add(rev_orgs): + LOGGER.warning( + "Unable to write revision entities to the storage. Retrying..." + ) + # Second, flat models for revisions' histories (ie. revision-before-revision). data: Iterable[RelationData] = sum( [ @@ -203,18 +265,6 @@ RelationType.REV_IN_ORG, ) - # Finally, preferred origins for the visited revisions are set (this step can be - # reordered if required). - origins = { - sha1: self.cache["revision_origin"]["data"][sha1] - for sha1 in self.cache["revision_origin"]["added"] - } - if origins: - while not self.storage.revision_set_origin(origins): - LOGGER.warning( - "Unable to write preferred origins to the storage. Retrying..." - ) - # clear local cache ############################################################ self.clear_caches() diff --git a/swh/provenance/sql/40-funcs.sql b/swh/provenance/sql/40-funcs.sql --- a/swh/provenance/sql/40-funcs.sql +++ b/swh/provenance/sql/40-funcs.sql @@ -99,11 +99,6 @@ join_location text; begin if src_table in ('content'::regclass, 'directory'::regclass) then - insert into location(path) - select V.path - from tmp_relation_add as V - on conflict (path) do nothing; - select_fields := 'D.id, L.id'; join_location := 'inner join location as L on (L.path = V.path)'; else @@ -419,11 +414,6 @@ on_conflict text; begin if src_table in ('content'::regclass, 'directory'::regclass) then - insert into location(path) - select V.path - from tmp_relation_add as V - on conflict (path) do nothing; - select_fields := 'array_agg((D.id, L.id)::rel_dst)'; join_location := 'inner join location as L on (L.path = V.path)'; group_entries := 'group by S.id'; diff --git a/swh/provenance/tests/test_provenance_storage.py b/swh/provenance/tests/test_provenance_storage.py --- a/swh/provenance/tests/test_provenance_storage.py +++ b/swh/provenance/tests/test_provenance_storage.py @@ -3,16 +3,15 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime +from datetime import datetime, timezone import inspect import os from typing import Any, Dict, Iterable, Optional, Set -import pytest - from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import origin_identifier from swh.model.model import Sha1Git +from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ( EntityType, ProvenanceInterface, @@ -20,54 +19,156 @@ ProvenanceStorageInterface, RelationData, RelationType, + RevisionData, ) -from swh.provenance.tests.conftest import load_repo_data, ts2dt +from swh.provenance.model import OriginEntry, RevisionEntry +from swh.provenance.mongo.backend import ProvenanceStorageMongoDb +from swh.provenance.origin import origin_add +from swh.provenance.provenance import Provenance +from swh.provenance.revision import revision_add +from swh.provenance.tests.conftest import fill_storage, load_repo_data, ts2dt -def relation_add_and_compare_result( - relation: RelationType, - data: Set[RelationData], - refstorage: ProvenanceStorageInterface, - storage: ProvenanceStorageInterface, - with_path: bool = True, +def test_provenance_storage_content( + provenance_storage: ProvenanceStorageInterface, ) -> None: - assert data - assert refstorage.relation_add(relation, data) == storage.relation_add( - relation, data - ) + """Tests content methods for every `ProvenanceStorageInterface` implementation.""" - assert relation_compare_result( - refstorage.relation_get(relation, (reldata.src for reldata in data)), - storage.relation_get(relation, (reldata.src for reldata in data)), - with_path, - ) - assert relation_compare_result( - refstorage.relation_get( - relation, - (reldata.dst for reldata in data), - reverse=True, - ), - storage.relation_get( - relation, - (reldata.dst for reldata in data), - reverse=True, - ), - with_path, + # Read data/README.md for more details on how these datasets are generated. + data = load_repo_data("cmdbts2") + + # Add all content present in the current repo to the storage, just assigning their + # creation dates. Then check that the returned results when querying are the same. + cnts = {cnt["sha1_git"] for idx, cnt in enumerate(data["content"]) if idx % 2 == 0} + cnt_dates = { + cnt["sha1_git"]: cnt["ctime"] + for idx, cnt in enumerate(data["content"]) + if idx % 2 == 1 + } + assert cnts or cnt_dates + assert provenance_storage.content_add(cnts) + assert provenance_storage.content_add(cnt_dates) + assert provenance_storage.content_get(set(cnt_dates.keys())) == cnt_dates + assert provenance_storage.entity_get_all(EntityType.CONTENT) == cnts | set( + cnt_dates.keys() ) - assert relation_compare_result( - refstorage.relation_get_all(relation), - storage.relation_get_all(relation), - with_path, + + +def test_provenance_storage_directory( + provenance_storage: ProvenanceStorageInterface, +) -> None: + """Tests directory methods for every `ProvenanceStorageInterface` implementation.""" + + # Read data/README.md for more details on how these datasets are generated. + data = load_repo_data("cmdbts2") + + # Of all directories present in the current repo, only assign a date to those + # containing blobs (picking the max date among the available ones). Then check that + # the returned results when querying are the same. + def getmaxdate( + directory: Dict[str, Any], contents: Iterable[Dict[str, Any]] + ) -> Optional[datetime]: + dates = [ + content["ctime"] + for entry in directory["entries"] + for content in contents + if entry["type"] == "file" and entry["target"] == content["sha1_git"] + ] + return max(dates) if dates else None + + dirs = { + dir["id"] + for dir in data["directory"] + if getmaxdate(dir, data["content"]) is None + } + dir_dates = { + dir["id"]: getmaxdate(dir, data["content"]) + for dir in data["directory"] + if getmaxdate(dir, data["content"]) is not None + } + assert dirs + assert provenance_storage.directory_add(dirs) + assert provenance_storage.directory_add(dir_dates) + assert provenance_storage.directory_get(set(dir_dates.keys())) == dir_dates + assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == dirs | set( + dir_dates.keys() ) -def relation_compare_result( - expected: Set[RelationData], computed: Set[RelationData], with_path: bool -) -> bool: - return { - RelationData(reldata.src, reldata.dst, reldata.path if with_path else None) - for reldata in expected - } == computed +def test_provenance_storage_location( + provenance_storage: ProvenanceStorageInterface, +) -> None: + """Tests location methods for every `ProvenanceStorageInterface` implementation.""" + + # Read data/README.md for more details on how these datasets are generated. + data = load_repo_data("cmdbts2") + + # Add all names of entries present in the directories of the current repo as paths + # to the storage. Then check that the returned results when querying are the same. + paths = {entry["name"] for dir in data["directory"] for entry in dir["entries"]} + assert provenance_storage.location_add(paths) + + if isinstance(provenance_storage, ProvenanceStorageMongoDb): + # TODO: remove this when `location_add` is properly implemented for MongoDb. + return + + if provenance_storage.with_path(): + assert provenance_storage.location_get_all() == paths + else: + assert provenance_storage.location_get_all() == set() + + +def test_provenance_storage_origin( + provenance_storage: ProvenanceStorageInterface, +) -> None: + """Tests origin methods for every `ProvenanceStorageInterface` implementation.""" + + # Read data/README.md for more details on how these datasets are generated. + data = load_repo_data("cmdbts2") + + # Test origin methods. + # Add all origins present in the current repo to the storage. Then check that the + # returned results when querying are the same. + orgs = {hash_to_bytes(origin_identifier(org)): org["url"] for org in data["origin"]} + assert orgs + assert provenance_storage.origin_add(orgs) + assert provenance_storage.origin_get(set(orgs.keys())) == orgs + assert provenance_storage.entity_get_all(EntityType.ORIGIN) == set(orgs.keys()) + + +def test_provenance_storage_revision( + provenance_storage: ProvenanceStorageInterface, +) -> None: + """Tests revision methods for every `ProvenanceStorageInterface` implementation.""" + + # Read data/README.md for more details on how these datasets are generated. + data = load_repo_data("cmdbts2") + + # Test revision methods. + # Add all revisions present in the current repo to the storage, assigning their + # dates and an arbitrary origin to each one. Then check that the returned results + # when querying are the same. + origin = next(iter(data["origin"])) + origin_sha1 = hash_to_bytes(origin_identifier(origin)) + # Origin must be inserted in advance. + assert provenance_storage.origin_add({origin_sha1: origin["url"]}) + + revs = {rev["id"] for idx, rev in enumerate(data["revision"]) if idx % 6 == 0} + rev_data = { + rev["id"]: RevisionData( + date=ts2dt(rev["date"]) if idx % 2 != 0 else None, + origin=origin_sha1 if idx % 3 != 0 else None, + ) + for idx, rev in enumerate(data["revision"]) + if idx % 6 != 0 + } + assert revs + assert provenance_storage.revision_add(revs) + assert provenance_storage.revision_add(rev_data) + assert provenance_storage.revision_get(set(rev_data.keys())) == rev_data + assert provenance_storage.entity_get_all(EntityType.REVISION) == revs | set( + rev_data.keys() + ) def dircontent( @@ -94,40 +195,73 @@ return content -@pytest.mark.parametrize( - "repo", - ("cmdbts2", "out-of-order", "with-merges"), -) -def test_provenance_storage( - provenance: ProvenanceInterface, - provenance_storage: ProvenanceStorageInterface, - repo: str, +def entity_add( + storage: ProvenanceStorageInterface, entity: EntityType, ids: Set[Sha1Git] +) -> bool: + if entity == EntityType.CONTENT: + return storage.content_add({sha1: None for sha1 in ids}) + elif entity == EntityType.DIRECTORY: + return storage.directory_add({sha1: None for sha1 in ids}) + else: # entity == EntityType.REVISION: + return storage.revision_add( + {sha1: RevisionData(date=None, origin=None) for sha1 in ids} + ) + + +def relation_add_and_compare_result( + storage: ProvenanceStorageInterface, relation: RelationType, data: Set[RelationData] ) -> None: - """Tests every ProvenanceStorageInterface implementation against the one provided - for provenance.storage.""" - # Read data/README.md for more details on how these datasets are generated. - data = load_repo_data(repo) + # Source, destinations and locations must be added in advance. + src, *_, dst = relation.value.split("_") + if src != "origin": + assert entity_add(storage, EntityType(src), {entry.src for entry in data}) + if dst != "origin": + assert entity_add(storage, EntityType(dst), {entry.dst for entry in data}) + if storage.with_path(): + assert storage.location_add( + {entry.path for entry in data if entry.path is not None} + ) - # Assuming provenance.storage has the 'with-path' flavor. - assert provenance.storage.with_path() + assert data + assert storage.relation_add(relation, data) - # Test origin methods. - # Add all origins present in the current repo to both storages. Then check that the - # inserted data is the same in both cases. - org_urls = { - hash_to_bytes(origin_identifier(org)): org["url"] for org in data["origin"] - } - assert org_urls - assert provenance.storage.origin_set_url( - org_urls - ) == provenance_storage.origin_set_url(org_urls) + for row in data: + assert relation_compare_result( + storage.relation_get(relation, [row.src]), + {entry for entry in data if entry.src == row.src}, + storage.with_path(), + ) + assert relation_compare_result( + storage.relation_get( + relation, + [row.dst], + reverse=True, + ), + {entry for entry in data if entry.dst == row.dst}, + storage.with_path(), + ) - assert provenance.storage.origin_get(org_urls) == provenance_storage.origin_get( - org_urls + assert relation_compare_result( + storage.relation_get_all(relation), data, storage.with_path() ) - assert provenance.storage.entity_get_all( - EntityType.ORIGIN - ) == provenance_storage.entity_get_all(EntityType.ORIGIN) + + +def relation_compare_result( + computed: Set[RelationData], expected: Set[RelationData], with_path: bool +) -> bool: + return { + RelationData(row.src, row.dst, row.path if with_path else None) + for row in expected + } == computed + + +def test_provenance_storage_relation( + provenance_storage: ProvenanceStorageInterface, +) -> None: + """Tests relation methods for every `ProvenanceStorageInterface` implementation.""" + + # Read data/README.md for more details on how these datasets are generated. + data = load_repo_data("cmdbts2") # Test content-in-revision relation. # Create flat models of every root directory for the revisions in the dataset. @@ -137,13 +271,8 @@ subdir for subdir in data["directory"] if subdir["id"] == rev["directory"] ) cnt_in_rev.update(dircontent(data, rev["id"], root)) - relation_add_and_compare_result( - RelationType.CNT_EARLY_IN_REV, - cnt_in_rev, - provenance.storage, - provenance_storage, - provenance_storage.with_path(), + provenance_storage, RelationType.CNT_EARLY_IN_REV, cnt_in_rev ) # Test content-in-directory relation. @@ -151,13 +280,8 @@ cnt_in_dir: Set[RelationData] = set() for dir in data["directory"]: cnt_in_dir.update(dircontent(data, dir["id"], dir)) - relation_add_and_compare_result( - RelationType.CNT_IN_DIR, - cnt_in_dir, - provenance.storage, - provenance_storage, - provenance_storage.with_path(), + provenance_storage, RelationType.CNT_IN_DIR, cnt_in_dir ) # Test content-in-directory relation. @@ -165,13 +289,8 @@ dir_in_rev = { RelationData(rev["directory"], rev["id"], b".") for rev in data["revision"] } - relation_add_and_compare_result( - RelationType.DIR_IN_REV, - dir_in_rev, - provenance.storage, - provenance_storage, - provenance_storage.with_path(), + provenance_storage, RelationType.DIR_IN_REV, dir_in_rev ) # Test revision-in-origin relation. @@ -190,12 +309,16 @@ for _, branch in snapshot["branches"].items() if branch["target_type"] == "revision" } + # Origins must be inserted in advance (cannot be done by `entity_add` inside + # `relation_add_and_compare_result`). + orgs = { + hash_to_bytes(origin_identifier(origin)): origin["url"] + for origin in data["origin"] + } + assert provenance_storage.origin_add(orgs) relation_add_and_compare_result( - RelationType.REV_IN_ORG, - rev_in_org, - provenance.storage, - provenance_storage, + provenance_storage, RelationType.REV_IN_ORG, rev_in_org ) # Test revision-before-revision relation. @@ -205,89 +328,26 @@ for rev in data["revision"] for parent in rev["parents"] } - relation_add_and_compare_result( - RelationType.REV_BEFORE_REV, - rev_before_rev, - provenance.storage, - provenance_storage, + provenance_storage, RelationType.REV_BEFORE_REV, rev_before_rev ) - # Test content methods. - # Add all content present in the current repo to both storages, just assigning their - # creation dates. Then check that the inserted content is the same in both cases. - cnt_dates = {cnt["sha1_git"]: cnt["ctime"] for cnt in data["content"]} - assert cnt_dates - assert provenance.storage.content_set_date( - cnt_dates - ) == provenance_storage.content_set_date(cnt_dates) - - assert provenance.storage.content_get(cnt_dates) == provenance_storage.content_get( - cnt_dates - ) - assert provenance.storage.entity_get_all( - EntityType.CONTENT - ) == provenance_storage.entity_get_all(EntityType.CONTENT) - # Test directory methods. - # Of all directories present in the current repo, only assign a date to those - # containing blobs (picking the max date among the available ones). Then check that - # the inserted data is the same in both storages. - def getmaxdate( - dir: Dict[str, Any], cnt_dates: Dict[Sha1Git, datetime] - ) -> Optional[datetime]: - dates = [ - cnt_dates[entry["target"]] - for entry in dir["entries"] - if entry["type"] == "file" - ] - return max(dates) if dates else None - - dir_dates = {dir["id"]: getmaxdate(dir, cnt_dates) for dir in data["directory"]} - assert dir_dates - assert provenance.storage.directory_set_date( - {sha1: date for sha1, date in dir_dates.items() if date is not None} - ) == provenance_storage.directory_set_date( - {sha1: date for sha1, date in dir_dates.items() if date is not None} - ) - assert provenance.storage.directory_get( - dir_dates - ) == provenance_storage.directory_get(dir_dates) - assert provenance.storage.entity_get_all( - EntityType.DIRECTORY - ) == provenance_storage.entity_get_all(EntityType.DIRECTORY) +def test_provenance_storage_find( + archive: ArchiveInterface, + provenance: ProvenanceInterface, + provenance_storage: ProvenanceStorageInterface, +) -> None: + """Tests `content_find_first` and `content_find_all` methods for every + `ProvenanceStorageInterface` implementation. + """ - # Test revision methods. - # Add all revisions present in the current repo to both storages, assigning their - # dataes and an arbitrary origin to each one. Then check that the inserted data is - # the same in both cases. - rev_dates = {rev["id"]: ts2dt(rev["date"]) for rev in data["revision"]} - assert rev_dates - assert provenance.storage.revision_set_date( - rev_dates - ) == provenance_storage.revision_set_date(rev_dates) - - rev_origins = { - rev["id"]: next(iter(org_urls)) # any arbitrary origin will do - for rev in data["revision"] - } - assert rev_origins - assert provenance.storage.revision_set_origin( - rev_origins - ) == provenance_storage.revision_set_origin(rev_origins) - - assert provenance.storage.revision_get( - rev_dates - ) == provenance_storage.revision_get(rev_dates) - assert provenance.storage.entity_get_all( - EntityType.REVISION - ) == provenance_storage.entity_get_all(EntityType.REVISION) - - # Test location_get. - if provenance_storage.with_path(): - assert provenance.storage.location_get() == provenance_storage.location_get() + # Read data/README.md for more details on how these datasets are generated. + data = load_repo_data("cmdbts2") + fill_storage(archive.storage, data) - # Test content_find_first and content_find_all. + # Test content_find_first and content_find_all, first only executing the + # revision-content algorithm, then adding the origin-revision layer. def adapt_result( result: Optional[ProvenanceResult], with_path: bool ) -> Optional[ProvenanceResult]: @@ -301,18 +361,69 @@ ) return result - for cnt in cnt_dates: + # Execute the revision-content algorithm on both storages. + revisions = [ + RevisionEntry(id=rev["id"], date=ts2dt(rev["date"]), root=rev["directory"]) + for rev in data["revision"] + ] + revision_add(provenance, archive, revisions) + revision_add(Provenance(provenance_storage), archive, revisions) + + assert adapt_result( + ProvenanceResult( + content=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), + revision=hash_to_bytes("c0d8929936631ecbcf9147be6b8aa13b13b014e4"), + date=datetime.fromtimestamp(1000000000.0, timezone.utc), + origin=None, + path=b"A/B/C/a", + ), + provenance_storage.with_path(), + ) == provenance_storage.content_find_first( + hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494") + ) + + for cnt in {cnt["sha1_git"] for cnt in data["content"]}: assert adapt_result( provenance.storage.content_find_first(cnt), provenance_storage.with_path() ) == provenance_storage.content_find_first(cnt) + assert { + adapt_result(occur, provenance_storage.with_path()) + for occur in provenance.storage.content_find_all(cnt) + } == set(provenance_storage.content_find_all(cnt)) + + # Execute the origin-revision algorithm on both storages. + origins = [ + OriginEntry(url=sta["origin"], snapshot=sta["snapshot"]) + for sta in data["origin_visit_status"] + if sta["snapshot"] is not None + ] + origin_add(provenance, archive, origins) + origin_add(Provenance(provenance_storage), archive, origins) + + assert adapt_result( + ProvenanceResult( + content=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), + revision=hash_to_bytes("c0d8929936631ecbcf9147be6b8aa13b13b014e4"), + date=datetime.fromtimestamp(1000000000.0, timezone.utc), + origin="https://cmdbts2", + path=b"A/B/C/a", + ), + provenance_storage.with_path(), + ) == provenance_storage.content_find_first( + hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494") + ) + for cnt in {cnt["sha1_git"] for cnt in data["content"]}: + assert adapt_result( + provenance.storage.content_find_first(cnt), provenance_storage.with_path() + ) == provenance_storage.content_find_first(cnt) assert { adapt_result(occur, provenance_storage.with_path()) for occur in provenance.storage.content_find_all(cnt) } == set(provenance_storage.content_find_all(cnt)) -def test_types(provenance: ProvenanceInterface) -> None: +def test_types(provenance_storage: ProvenanceInterface) -> None: """Checks all methods of ProvenanceStorageInterface are implemented by this backend, and that they have the same signature.""" # Create an instance of the protocol (which cannot be instantiated @@ -328,7 +439,7 @@ continue interface_meth = getattr(interface, meth_name) try: - concrete_meth = getattr(provenance.storage, meth_name) + concrete_meth = getattr(provenance_storage, meth_name) except AttributeError: if not getattr(interface_meth, "deprecated_endpoint", False): # The backend is missing a (non-deprecated) endpoint @@ -346,4 +457,4 @@ # But there's no harm in double-checking. # And we could replace the assertions above by this one, but unlike # the assertions above, it doesn't explain what is missing. - assert isinstance(provenance.storage, ProvenanceStorageInterface) + assert isinstance(provenance_storage, ProvenanceStorageInterface) diff --git a/swh/provenance/tests/test_revision_content_layer.py b/swh/provenance/tests/test_revision_content_layer.py --- a/swh/provenance/tests/test_revision_content_layer.py +++ b/swh/provenance/tests/test_revision_content_layer.py @@ -282,7 +282,7 @@ rows["location"] |= set(x["path"].encode() for x in synth_rev["R_C"]) rows["location"] |= set(x["path"].encode() for x in synth_rev["D_C"]) rows["location"] |= set(x["path"].encode() for x in synth_rev["R_D"]) - assert rows["location"] == provenance.storage.location_get(), synth_rev[ + assert rows["location"] == provenance.storage.location_get_all(), synth_rev[ "msg" ]