diff --git a/swh/provenance/backend.py b/swh/provenance/backend.py index 788091a..afac165 100644 --- a/swh/provenance/backend.py +++ b/swh/provenance/backend.py @@ -1,324 +1,344 @@ from datetime import datetime import logging import os from typing import Dict, Generator, Iterable, Optional, Set, Tuple from typing_extensions import Literal, TypedDict from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -from .provenance import ProvenanceResult, ProvenanceStorageInterface, RelationType +from .provenance import ( + ProvenanceResult, + ProvenanceStorageInterface, + RelationData, + RelationType, +) class DatetimeCache(TypedDict): data: Dict[Sha1Git, Optional[datetime]] added: Set[Sha1Git] class OriginCache(TypedDict): data: Dict[Sha1Git, str] added: Set[Sha1Git] class RevisionCache(TypedDict): data: Dict[Sha1Git, Sha1Git] added: Set[Sha1Git] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] # these two are for the origin layer origin: OriginCache revision_origin: RevisionCache revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] def new_cache() -> ProvenanceCache: return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), origin=OriginCache(data={}, added=set()), revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), ) # TODO: maybe move this to a separate file class ProvenanceBackend: def __init__(self, storage: ProvenanceStorageInterface) -> None: self.storage = storage self.cache = new_cache() def clear_caches(self) -> None: self.cache = new_cache() def flush(self) -> None: # Revision-content layer insertions ############################################ # For this layer, relations need to be inserted first so that, in case of # failure, reprocessing the input does not generated an inconsistent database. while not self.storage.relation_add( - RelationType.CNT_EARLY_IN_REV, self.cache["content_in_revision"] + RelationType.CNT_EARLY_IN_REV, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["content_in_revision"] + ), ): logging.warning( f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " f"Data: {self.cache['content_in_revision']}. Retrying..." ) while not self.storage.relation_add( - RelationType.CNT_IN_DIR, self.cache["content_in_directory"] + RelationType.CNT_IN_DIR, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["content_in_directory"] + ), ): logging.warning( f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " f"Data: {self.cache['content_in_directory']}. Retrying..." ) while not self.storage.relation_add( - RelationType.DIR_IN_REV, self.cache["directory_in_revision"] + RelationType.DIR_IN_REV, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["directory_in_revision"] + ), ): logging.warning( f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " f"Data: {self.cache['directory_in_revision']}. Retrying..." ) # After relations, dates for the entities can be safely set, acknowledging that # these entities won't need to be reprocessed in case of failure. dates = { sha1: date for sha1, date in self.cache["content"]["data"].items() if sha1 in self.cache["content"]["added"] and date is not None } while not self.storage.content_set_date(dates): logging.warning( f"Unable to write content dates to the storage. " f"Data: {dates}. Retrying..." ) dates = { sha1: date for sha1, date in self.cache["directory"]["data"].items() if sha1 in self.cache["directory"]["added"] and date is not None } while not self.storage.directory_set_date(dates): logging.warning( f"Unable to write directory dates to the storage. " f"Data: {dates}. Retrying..." ) dates = { sha1: date for sha1, date in self.cache["revision"]["data"].items() if sha1 in self.cache["revision"]["added"] and date is not None } while not self.storage.revision_set_date(dates): logging.warning( f"Unable to write revision dates to the storage. " f"Data: {dates}. Retrying..." ) # Origin-revision layer insertions ############################################# # Origins urls should be inserted first so that internal ids' resolution works # properly. urls = { sha1: date for sha1, date in self.cache["origin"]["data"].items() if sha1 in self.cache["origin"]["added"] } while not self.storage.origin_set_url(urls): logging.warning( f"Unable to write origins urls to the storage. " f"Data: {urls}. Retrying..." ) # Second, flat models for revisions' histories (ie. revision-before-revision). - rbr_data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = sum( + data: Iterable[RelationData] = sum( [ [ - (prev, next, None) + RelationData(src=prev, dst=next, path=None) for next in self.cache["revision_before_revision"][prev] ] for prev in self.cache["revision_before_revision"] ], [], ) - while not self.storage.relation_add(RelationType.REV_BEFORE_REV, rbr_data): + while not self.storage.relation_add(RelationType.REV_BEFORE_REV, data): logging.warning( f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " - f"Data: {rbr_data}. Retrying..." + f"Data: {data}. Retrying..." ) # Heads (ie. revision-in-origin entries) should be inserted once flat models for # their histories were already added. This is to guarantee consistent results if # something needs to be reprocessed due to a failure: already inserted heads # won't get reprocessed in such a case. - rio_data = [(rev, org, None) for rev, org in self.cache["revision_in_origin"]] - while not self.storage.relation_add(RelationType.REV_IN_ORG, rio_data): + data = ( + RelationData(src=rev, dst=org, path=None) + for rev, org in self.cache["revision_in_origin"] + ) + while not self.storage.relation_add(RelationType.REV_IN_ORG, data): logging.warning( f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " - f"Data: {rio_data}. Retrying..." + f"Data: {data}. Retrying..." ) # Finally, preferred origins for the visited revisions are set (this step can be # reordered if required). origins = { sha1: self.cache["revision_origin"]["data"][sha1] for sha1 in self.cache["revision_origin"]["added"] } while not self.storage.revision_set_origin(origins): logging.warning( f"Unable to write preferred origins to the storage. " f"Data: {origins}. Retrying..." ) # clear local cache ############################################################ self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: if entity == "revision": updated = { - id: date - for id, (date, _) in self.storage.revision_get(missing_ids).items() - if date is not None + id: rev.date + for id, rev in self.storage.revision_get(missing_ids).items() + if rev.date is not None } else: updated = getattr(self.storage, f"{entity}_get")(missing_ids) cache["data"].update(updated) dates: Dict[Sha1Git, datetime] = {} for sha1 in ids: date = cache["data"].get(sha1) if date is not None: dates[sha1] = date return dates def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry) -> None: self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: self.cache["revision_before_revision"].setdefault(revision.id, set()).add( head.id ) def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: cache = self.cache["revision_origin"]["data"] if revision.id not in cache: ret = self.storage.revision_get([revision.id]) if revision.id in ret: - origin = ret[revision.id][1] # TODO: make this not a tuple + origin = ret[revision.id].origin if origin is not None: cache[revision.id] = origin return cache.get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: return revision.id in self.cache["revision_before_revision"] or bool( self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) ) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_origin"]["data"][revision.id] = origin.id self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: return revision.id in dict(self.cache["revision_in_origin"]) or bool( self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) ) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 9d8acdd..b2df264 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,315 +1,314 @@ from datetime import datetime import itertools import logging from typing import Dict, Generator, Iterable, Optional, Set, Tuple import psycopg2 import psycopg2.extras from typing_extensions import Literal from swh.core.db import BaseDb from swh.model.model import Sha1Git -from ..provenance import EntityType, ProvenanceResult, RelationType +from ..provenance import ( + EntityType, + ProvenanceResult, + RelationData, + RelationType, + RevisionData, +) class ProvenanceDBBase: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): BaseDb.adapt_conn(conn) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # XXX: not sure this is the best place to do it! sql = "SET timezone TO 'UTC'" self.cursor.execute(sql) self._flavor: Optional[str] = None @property def flavor(self) -> str: if self._flavor is None: sql = "SELECT swh_get_dbflavor() AS flavor" self.cursor.execute(sql) self._flavor = self.cursor.fetchone()["flavor"] assert self._flavor is not None return self._flavor @property def with_path(self) -> bool: return self.flavor == "with-path" def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: ... def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("content", dates) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("content", ids) def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("directory", dates) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("directory", ids) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: sql = f"SELECT sha1 FROM {entity.value}" self.cursor.execute(sql) return {row["sha1"] for row in self.cursor.fetchall()} def location_get(self) -> Set[bytes]: sql = "SELECT encode(location.path::bytea, 'escape') AS path FROM location" self.cursor.execute(sql) return {row["path"] for row in self.cursor.fetchall()} def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: try: if urls: sql = """ LOCK TABLE ONLY origin; INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, urls.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: urls: Dict[Sha1Git, str] = {} sha1s = tuple(ids) if sha1s: values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, url FROM origin WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) urls.update((row["sha1"], row["url"]) for row in self.cursor.fetchall()) return urls def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("revision", dates) def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: try: if origins: sql = """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1, origin) (SELECT V.rev AS sha1, O.id AS origin FROM (VALUES %s) AS V(rev, org) JOIN origin AS O ON (O.sha1=V.org)) ON CONFLICT (sha1) DO UPDATE SET origin=EXCLUDED.origin """ psycopg2.extras.execute_values(self.cursor, sql, origins.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False - def revision_get( - self, ids: Iterable[Sha1Git] - ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]: - result: Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]] = {} + def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: + result: Dict[Sha1Git, RevisionData] = {} sha1s = tuple(ids) if sha1s: values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date, origin FROM revision WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) result.update( - (row["sha1"], (row["date"], row["origin"])) + (row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) for row in self.cursor.fetchall() ) return result def relation_add( - self, - relation: RelationType, - data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]], + self, relation: RelationType, data: Iterable[RelationData] ) -> bool: try: - if data: + rows = tuple((rel.src, rel.dst, rel.path) for rel in data) + if rows: table = relation.value src, *_, dst = table.split("_") if src != "origin": # Origin entries should be inserted previously as they require extra # non-null information - srcs = tuple(set((sha1,) for (sha1, _, _) in data)) + srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) sql = f""" LOCK TABLE ONLY {src}; INSERT INTO {src}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, srcs) if dst != "origin": # Origin entries should be inserted previously as they require extra # non-null information - dsts = tuple(set((sha1,) for (_, sha1, _) in data)) + dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) sql = f""" LOCK TABLE ONLY {dst}; INSERT INTO {dst}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, dsts) joins = [ f"INNER JOIN {src} AS S ON (S.sha1=V.src)", f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)", ] selected = ["S.id", "D.id"] if self._relation_uses_location_table(relation): - locations = tuple(set((path,) for (_, _, path) in data)) + locations = tuple(set((path,) for (_, _, path) in rows)) sql = """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, locations) joins.append("INNER JOIN location AS L ON (L.path=V.path)") selected.append("L.id") sql = f""" INSERT INTO {table} (SELECT {", ".join(selected)} FROM (VALUES %s) AS V(src, dst, path) {''' '''.join(joins)}) ON CONFLICT DO NOTHING """ - psycopg2.extras.execute_values(self.cursor, sql, data) + psycopg2.extras.execute_values(self.cursor, sql, rows) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False - ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: + ) -> Set[RelationData]: return self._relation_get(relation, ids, reverse) - def relation_get_all( - self, relation: RelationType - ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: + def relation_get_all(self, relation: RelationType) -> Set[RelationData]: return self._relation_get(relation, None) def _entity_get_date( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: dates: Dict[Sha1Git, datetime] = {} sha1s = tuple(ids) if sha1s: values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date FROM {entity} WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall()) return dates def _entity_set_date( self, entity: Literal["content", "directory", "revision"], data: Dict[Sha1Git, datetime], ) -> bool: try: if data: sql = f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """ psycopg2.extras.execute_values(self.cursor, sql, data.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def _relation_get( self, relation: RelationType, ids: Optional[Iterable[Sha1Git]], reverse: bool = False, - ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: - result: Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = set() + ) -> Set[RelationData]: + result: Set[RelationData] = set() - sha1s: Optional[Tuple[Tuple[bytes, ...]]] + sha1s: Optional[Tuple[Tuple[Sha1Git, ...]]] if ids is not None: sha1s = (tuple(ids),) where = f"WHERE {'S.sha1' if not reverse else 'D.sha1'} IN %s" else: sha1s = None where = "" if sha1s is None or sha1s[0]: table = relation.value src, *_, dst = table.split("_") # TODO: improve this! if src == "revision" and dst == "revision": src_field = "prev" dst_field = "next" else: src_field = src dst_field = dst joins = [ f"INNER JOIN {src} AS S ON (S.id=R.{src_field})", f"INNER JOIN {dst} AS D ON (D.id=R.{dst_field})", ] selected = ["S.sha1 AS src", "D.sha1 AS dst"] if self._relation_uses_location_table(relation): joins.append("INNER JOIN location AS L ON (L.id=R.location)") selected.append("L.path AS path") else: selected.append("NULL AS path") sql = f""" SELECT {", ".join(selected)} FROM {table} AS R {" ".join(joins)} {where} """ self.cursor.execute(sql, sha1s) - result.update( - (row["src"], row["dst"], row["path"]) for row in self.cursor.fetchall() - ) + result.update(RelationData(**row) for row in self.cursor.fetchall()) return result def _relation_uses_location_table(self, relation: RelationType) -> bool: ... diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index b431c2e..db25752 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,290 +1,298 @@ +from dataclasses import dataclass from datetime import datetime import enum -from typing import Dict, Generator, Iterable, Optional, Set, Tuple +from typing import Dict, Generator, Iterable, Optional, Set from typing_extensions import Protocol, runtime_checkable from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry class EntityType(enum.Enum): CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" ORIGIN = "origin" class RelationType(enum.Enum): CNT_EARLY_IN_REV = "content_in_revision" CNT_IN_DIR = "content_in_directory" DIR_IN_REV = "directory_in_revision" REV_IN_ORG = "revision_in_origin" REV_BEFORE_REV = "revision_before_revision" +@dataclass(eq=True, frozen=True) class ProvenanceResult: - def __init__( - self, - content: Sha1Git, - revision: Sha1Git, - date: datetime, - origin: Optional[str], - path: bytes, - ) -> None: - self.content = content - self.revision = revision - self.date = date - self.origin = origin - self.path = path + content: Sha1Git + revision: Sha1Git + date: datetime + origin: Optional[str] + path: bytes + + +@dataclass(eq=True, frozen=True) +class RevisionData: + """Object representing the data associated to a revision in the provenance model, + where `date` is the optional date of the revision (specifying it acknowledges that + the revision was already processed by the revision-content algorithm); and `origin` + identifies the preferred origin for the revision, if any. + """ + + date: Optional[datetime] + origin: Optional[Sha1Git] + + +@dataclass(eq=True, frozen=True) +class RelationData: + """Object representing a relation entry in the provenance model, where `src` and + `dst` are the sha1 ids of the entities being related, and `path` is optional + depending on the relation being represented. + """ + + src: Sha1Git + dst: Sha1Git + path: Optional[bytes] @runtime_checkable class ProvenanceStorageInterface(Protocol): raise_on_commit: bool = False def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: """Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return a boolean stating whether the information was successfully stored. """ ... def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: """Retrieve the associated date for each blob sha1 in `ids`. If some blob has no associated date, it is not present in the resulting dictionary. """ ... def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: """Associate dates to directories identified by sha1 ids, as paired in `dates`. Return a boolean stating whether the information was successfully stored. """ ... def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: """Retrieve the associated date for each directory sha1 in `ids`. If some directory has no associated date, it is not present in the resulting dictionary. """ ... def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: """Retrieve all sha1 ids for entities of type `entity` present in the provenance model. """ ... def location_get(self) -> Set[bytes]: """Retrieve all paths present in the provenance model.""" ... def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: """Associate urls to origins identified by sha1 ids, as paired in `urls`. Return a boolean stating whether the information was successfully stored. """ ... def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: """Retrieve the associated url for each origin sha1 in `ids`. If some origin has no associated date, it is not present in the resulting dictionary. """ ... def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: """Associate dates to revisions identified by sha1 ids, as paired in `dates`. Return a boolean stating whether the information was successfully stored. """ ... def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: """Associate origins to revisions identified by sha1 ids, as paired in `origins` (revision ids are keys and origin ids, values). Return a boolean stating whether the information was successfully stored. """ ... - def revision_get( - self, ids: Iterable[Sha1Git] - ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]: + def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: """Retrieve the associated date and origin for each revision sha1 in `ids`. If some revision has no associated date nor origin, it is not present in the resulting dictionary. """ ... def relation_add( - self, - relation: RelationType, - data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]], + self, relation: RelationType, data: Iterable[RelationData] ) -> bool: - """Add entries in the selected `relation`. Each tuple in `data` is of the from - (`src`, `dst`, `path`), where `src` and `dst` are the sha1 ids of the entities - being related, and `path` is optional depending on the selected `relation`. - """ + """Add entries in the selected `relation`.""" ... def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False - ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: - """Retrieve all tuples in the selected `relation` whose source entities are + ) -> Set[RelationData]: + """Retrieve all entries in the selected `relation` whose source entities are identified by some sha1 id in `ids`. If `reverse` is set, destination entities are matched instead. """ ... - def relation_get_all( - self, relation: RelationType - ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: - """Retrieve all tuples of the form (`src`, `dst`, `path`) present in the - provenance model, where `src` and `dst` are the sha1 ids of the entities being - related, and `path` is optional depending on the selected `relation`. + def relation_get_all(self, relation: RelationType) -> Set[RelationData]: + """Retrieve all entries in the selected `relation` that are present in the + provenance model. """ ... @runtime_checkable class ProvenanceInterface(Protocol): storage: ProvenanceStorageInterface def flush(self) -> None: """Flush internal cache to the underlying `storage`.""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `directory` in the provenance model. `prefix` is the relative path from `directory` to `blob` (excluding `blob`'s name). """ ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `revision` in the provenance model. `prefix` is the absolute path from `revision`'s root directory to `blob` (excluding `blob`'s name). """ ... def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: """Retrieve the earliest known date of `blob`.""" ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each blob in `blobs`. If some blob has no associated date, it is not present in the resulting dictionary. """ ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: """Associate `date` to `blob` as it's earliest known date.""" ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: """Associate `directory` with `revision` in the provenance model. `path` is the absolute path from `revision`'s root directory to `directory` (including `directory`'s name). """ ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: """Retrieve the earliest known date of `directory` as an isochrone frontier in the provenance model. """ ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each directory in `dirs` as isochrone frontiers provenance model. If some directory has no associated date, it is not present in the resulting dictionary. """ ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: """Associate `date` to `directory` as it's earliest known date as an isochrone frontier in the provenance model. """ ... def origin_add(self, origin: OriginEntry) -> None: """Add `origin` to the provenance model.""" ... def revision_add(self, revision: RevisionEntry) -> None: """Add `revision` to the provenance model. This implies storing `revision`'s date in the model, thus `revision.date` must be a valid date. """ ... def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `head` as an ancestor of the latter.""" ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `origin` as a head revision of the latter (ie. the target of an snapshot for `origin` in the archive).""" ... def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: """Retrieve the date associated to `revision`.""" ... def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: """Retrieve the preferred origin associated to `revision`.""" ... def revision_in_history(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be an ancestor of some head revision in the provenance model. """ ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `origin` as the preferred origin for `revision`.""" ... def revision_visited(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be a head revision for some origin in the provenance model. """ ... diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py index 475d954..95ecdb4 100644 --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -1,324 +1,331 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List, Optional, Set, Tuple import pytest from swh.model.hashutil import hash_to_bytes from swh.provenance.archive import ArchiveInterface from swh.provenance.model import RevisionEntry from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase from swh.provenance.provenance import EntityType, ProvenanceInterface, RelationType from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, synthetic_result, ) from swh.provenance.tests.test_provenance_db import ts2dt from swh.storage.postgresql.storage import Storage @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics( provenance: ProvenanceInterface, swh_storage: Storage, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows: Dict[str, Set[Any]] = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } def maybe_path(path: str) -> Optional[bytes]: assert isinstance(provenance.storage, ProvenanceDBBase) if provenance.storage.with_path: return path.encode("utf-8") return None for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"]) assert rows["revision"] == provenance.storage.entity_get_all( EntityType.REVISION ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] - rev_date, _ = provenance.storage.revision_get([synth_rev["sha1"]])[ + rev_data = provenance.storage.revision_get([synth_rev["sha1"]])[ synth_rev["sha1"] ] - assert rev_date is not None and rev_ts == rev_date.timestamp(), synth_rev["msg"] + assert ( + rev_data.date is not None and rev_ts == rev_data.date.timestamp() + ), synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) assert rows["content"] == provenance.storage.entity_get_all( EntityType.CONTENT ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_C"] ) - assert rows["content_in_revision"] == provenance.storage.relation_get_all( - RelationType.CNT_EARLY_IN_REV - ), synth_rev["msg"] + assert rows["content_in_revision"] == { + (rel.src, rel.dst, rel.path) + for rel in provenance.storage.relation_get_all( + RelationType.CNT_EARLY_IN_REV + ) + }, synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert ( rev_ts + rc["rel_ts"] == provenance.storage.content_get([rc["dst"]])[rc["dst"]].timestamp() ), synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"]) assert rows["directory"] == provenance.storage.entity_get_all( EntityType.DIRECTORY ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_D"] ) - assert rows["directory_in_revision"] == provenance.storage.relation_get_all( - RelationType.DIR_IN_REV - ), synth_rev["msg"] + assert rows["directory_in_revision"] == { + (rel.src, rel.dst, rel.path) + for rel in provenance.storage.relation_get_all(RelationType.DIR_IN_REV) + }, synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: assert ( rev_ts + rd["rel_ts"] == provenance.storage.directory_get([rd["dst"]])[rd["dst"]].timestamp() ), synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["D_C"] ) - assert rows["content_in_directory"] == provenance.storage.relation_get_all( - RelationType.CNT_IN_DIR - ), synth_rev["msg"] + assert rows["content_in_directory"] == { + (rel.src, rel.dst, rel.path) + for rel in provenance.storage.relation_get_all(RelationType.CNT_IN_DIR) + }, synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert ( rev_ts + dc["rel_ts"] == provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp() ), synth_rev["msg"] assert isinstance(provenance.storage, ProvenanceDBBase) if provenance.storage.with_path: # check for location entries rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) assert rows["location"] == provenance.storage.location_get(), synth_rev[ "msg" ] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( provenance: ProvenanceInterface, swh_storage: Storage, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] def maybe_path(path: str) -> str: assert isinstance(provenance.storage, ProvenanceDBBase) if provenance.storage.with_path: return path return "" if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences: Dict[str, List[Tuple[str, float, Optional[str], str]]] = {} for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) ) assert isinstance(provenance.storage, ProvenanceDBBase) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ ( occur.content.hex(), occur.revision.hex(), occur.date.timestamp(), occur.origin, occur.path.decode(), ) for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] if provenance.storage.with_path: # this is not true if the db stores no path, because a same content # that appears several times in a given revision may be reported # only once by content_find_all() assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_first( provenance: ProvenanceInterface, swh_storage: Storage, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, float, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" assert isinstance(provenance.storage, ProvenanceDBBase) for content_id, (rev_id, ts, paths) in expected_first.items(): occur = provenance.content_find_first(hash_to_bytes(content_id)) assert occur is not None assert occur.content.hex() == content_id assert occur.revision.hex() == rev_id assert occur.date.timestamp() == ts assert occur.origin is None if provenance.storage.with_path: assert occur.path.decode() in paths