diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -33,7 +33,7 @@ ) with_path = kwargs.get("with_path") - from swh.provenance.provenance import ProvenanceBackend + from swh.provenance.backend import ProvenanceBackend prov = ProvenanceBackend(conn) if with_path is not None: diff --git a/swh/provenance/provenance.py b/swh/provenance/backend.py copy from swh/provenance/provenance.py copy to swh/provenance/backend.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/backend.py @@ -3,109 +3,13 @@ import os from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple -import psycopg2 -from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable +import psycopg2 # TODO: remove this dependency +from typing_extensions import Literal, TypedDict from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry - - -# XXX: this protocol doesn't make much sense now that flavours have been delegated to -# another class, lower in the callstack. -@runtime_checkable -class ProvenanceInterface(Protocol): - raise_on_commit: bool = False - - def commit(self): - """Commit currently ongoing transactions in the backend DB""" - ... - - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ) -> None: - ... - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ) -> None: - ... - - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: - ... - - def content_find_all( - self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: - ... - - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - ... - - def content_get_early_dates( - self, blobs: Iterable[FileEntry] - ) -> Dict[Sha1Git, datetime]: - ... - - def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: - ... - - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ) -> None: - ... - - def directory_get_date_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> Optional[datetime]: - ... - - def directory_get_dates_in_isochrone_frontier( - self, dirs: Iterable[DirectoryEntry] - ) -> Dict[Sha1Git, datetime]: - ... - - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ) -> None: - ... - - def origin_add(self, origin: OriginEntry) -> None: - ... - - def revision_add(self, revision: RevisionEntry) -> None: - ... - - def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry - ) -> None: - ... - - def revision_add_to_origin( - self, origin: OriginEntry, revision: RevisionEntry - ) -> None: - ... - - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - ... - - def revision_get_preferred_origin( - self, revision: RevisionEntry - ) -> Optional[Sha1Git]: - ... - - def revision_in_history(self, revision: RevisionEntry) -> bool: - ... - - def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry - ) -> None: - ... - - def revision_visited(self, revision: RevisionEntry) -> bool: - ... +from .provenance import ProvenanceResult class DatetimeCache(TypedDict): @@ -138,7 +42,7 @@ revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] -def new_cache(): +def new_cache() -> ProvenanceCache: return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), @@ -173,39 +77,37 @@ self.storage = ProvenanceWithoutPathDB(conn) self.cache: ProvenanceCache = new_cache() - def clear_caches(self): + def clear_caches(self) -> None: self.cache = new_cache() - def commit(self): + def flush(self) -> None: # TODO: for now we just forward the cache. This should be improved! while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( - f"Unable to commit cached information {self.write_cache}. Retrying..." + f"Unable to commit cached information {self.cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): + ) -> None: self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): + ) -> None: self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: @@ -216,13 +118,13 @@ ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) - def content_set_early_date(self, blob: FileEntry, date: datetime): + def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ): + ) -> None: self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) @@ -239,7 +141,7 @@ def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime - ): + ) -> None: self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) @@ -261,21 +163,23 @@ self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) - def revision_add(self, revision: RevisionEntry): + def revision_add(self, revision: RevisionEntry) -> None: self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry - ): + self, head: RevisionEntry, revision: RevisionEntry + ) -> None: self.cache["revision_before_revision"].setdefault(revision.id, set()).add( - relative.id + head.id ) - def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): + def revision_add_to_origin( + self, origin: OriginEntry, revision: RevisionEntry + ) -> None: self.cache["revision_in_origin"].add((revision.id, origin.id)) - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: + def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( @@ -295,7 +199,7 @@ def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry - ): + ) -> None: self.cache["revision_origin"]["data"][revision.id] = origin.id self.cache["revision_origin"]["added"].add(revision.id) diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -165,15 +165,14 @@ provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field - row = provenance.content_find_first(hash_to_bytes(swhid)) - if row is not None: + occur = provenance.content_find_first(hash_to_bytes(swhid)) + if occur is not None: print( - "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( - cnt=hash_to_hex(row[0]), - rev=hash_to_hex(row[1]), - date=row[2], - path=os.fsdecode(row[3]), - ) + f"swh:1:cnt:{hash_to_hex(occur.content)}, " + f"swh:1:rev:{hash_to_hex(occur.revision)}, " + f"{occur.date}, " + f"{occur.origin}, " + f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @@ -189,12 +188,11 @@ provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field - for row in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): + for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( - "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( - cnt=hash_to_hex(row[0]), - rev=hash_to_hex(row[1]), - date=row[2], - path=os.fsdecode(row[3]), - ) + f"swh:1:cnt:{hash_to_hex(occur.content)}, " + f"swh:1:rev:{hash_to_hex(occur.revision)}, " + f"{occur.date}, " + f"{occur.origin}, " + f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -52,7 +52,7 @@ graph = build_history_graph(archive, provenance, revision) origin_add_revision(provenance, origin, graph) done = time.time() - provenance.commit() + provenance.flush() stop = time.time() logging.debug( "Origins " diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,20 +1,22 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Generator, List, Optional, Set, Tuple +from typing import Any, Dict, Generator, List, Mapping, Optional, Set, Tuple import psycopg2 import psycopg2.extras from swh.model.model import Sha1Git +from ..provenance import ProvenanceResult + class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn - self.cursor = self.conn.cursor() + self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") self._flavor: Optional[str] = None @@ -22,8 +24,8 @@ @property def flavor(self) -> str: if self._flavor is None: - self.cursor.execute("select swh_get_dbflavor()") - self._flavor = self.cursor.fetchone()[0] + self.cursor.execute("SELECT swh_get_dbflavor() AS flavor") + self._flavor = self.cursor.fetchone()["flavor"] assert self._flavor is not None return self._flavor @@ -31,7 +33,7 @@ def with_path(self) -> bool: return self.flavor == "with-path" - def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: + def commit(self, data: Mapping[str, Any], raise_on_commit: bool = False) -> bool: try: # First insert entities for entity in ("content", "directory", "revision"): @@ -87,25 +89,23 @@ return False - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: ... def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]: - dates = {} + dates: Dict[Sha1Git, datetime] = {} if ids: values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", tuple(ids), ) - dates.update(self.cursor.fetchall()) + dates.update(((row["sha1"], row["date"]) for row in self.cursor.fetchall())) return dates def insert_entity(self, entity: str, data: Dict[Sha1Git, datetime]): @@ -217,7 +217,7 @@ (revision,), ) row = self.cursor.fetchone() - return row[0] if row is not None else None + return row["sha1"] if row is not None else None def revision_in_history(self, revision: Sha1Git) -> bool: self.cursor.execute( diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,4 +1,3 @@ -from datetime import datetime from typing import Generator, Optional, Set, Tuple import psycopg2 @@ -6,66 +5,68 @@ from swh.model.model import Sha1Git +from ..provenance import ProvenanceResult from .provenancedb_base import ProvenanceDBBase class ProvenanceWithPathDB(ProvenanceDBBase): - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: - self.cursor.execute( - """ - SELECT C.sha1 AS blob, - R.sha1 AS rev, + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + sql = """ + SELECT C.sha1 AS content, + R.sha1 AS revision, R.date AS date, + O.url AS origin, L.path AS path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN location as L ON (CR.location = L.id) - INNER JOIN revision as R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN location as L ON (CR.location=L.id) + INNER JOIN revision as R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s - ORDER BY date, rev, path ASC LIMIT 1 - """, - (id,), - ) - return self.cursor.fetchone() + ORDER BY date, revision, origin, path ASC LIMIT 1 + """ + self.cursor.execute(sql, (id,)) + row = self.cursor.fetchone() + return ProvenanceResult(**row) if row is not None else None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" - self.cursor.execute( - f""" - (SELECT C.sha1 AS blob, - R.sha1 AS rev, + sql = f""" + (SELECT C.sha1 AS content, + R.sha1 AS revision, R.date AS date, + O.url AS origin, L.path AS path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN location AS L ON (CR.location = L.id) - INNER JOIN revision AS R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN location AS L ON (CR.location=L.id) + INNER JOIN revision AS R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, + O.url AS origin, CASE DL.path WHEN '' THEN CL.path WHEN '.' THEN CL.path ELSE (DL.path || '/' || CL.path)::unix_path END AS path FROM content AS C - INNER JOIN content_in_directory AS CD ON (C.id = CD.content) - INNER JOIN directory_in_revision AS DR ON (CD.directory = DR.directory) - INNER JOIN revision AS R ON (DR.revision = R.id) - INNER JOIN location AS CL ON (CD.location = CL.id) - INNER JOIN location AS DL ON (DR.location = DL.id) + INNER JOIN content_in_directory AS CD ON (C.id=CD.content) + INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory) + INNER JOIN revision AS R ON (DR.revision=R.id) + INNER JOIN location AS CL ON (CD.location=CL.id) + INNER JOIN location AS DL ON (DR.location=DL.id) + LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s) - ORDER BY date, rev, path {early_cut} - """, - (id, id), - ) - yield from self.cursor.fetchall() + ORDER BY date, revision, origin, path {early_cut} + """ + self.cursor.execute(sql, (id, id)) + yield from (ProvenanceResult(**row) for row in self.cursor.fetchall()) def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): """Insert entries in `relation` from `data` diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,4 +1,3 @@ -from datetime import datetime from typing import Generator, Optional, Set, Tuple import psycopg2 @@ -6,58 +5,60 @@ from swh.model.model import Sha1Git +from ..provenance import ProvenanceResult from .provenancedb_base import ProvenanceDBBase class ProvenanceWithoutPathDB(ProvenanceDBBase): - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: - self.cursor.execute( - """ - SELECT C.sha1 AS blob, - R.sha1 AS rev, + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + sql = """ + SELECT C.sha1 AS content, + R.sha1 AS revision, R.date AS date, + O.url AS origin, '\\x'::bytea as path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN revision as R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN revision as R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s - ORDER BY date, rev ASC LIMIT 1 - """, - (id,), - ) - return self.cursor.fetchone() + ORDER BY date, revision, origin ASC LIMIT 1 + """ + self.cursor.execute(sql, (id,)) + row = self.cursor.fetchone() + return ProvenanceResult(**row) if row is not None else None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" - self.cursor.execute( - f""" - (SELECT C.sha1 AS blob, - R.sha1 AS rev, + sql = f""" + (SELECT C.sha1 AS content, + R.sha1 AS revision, R.date AS date, + O.url AS origin, '\\x'::bytea as path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN revision AS R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN revision AS R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, + O.url AS origin, '\\x'::bytea as path FROM content AS C - INNER JOIN content_in_directory AS CD ON (C.id = CD.content) - INNER JOIN directory_in_revision AS DR ON (CD.directory = DR.directory) - INNER JOIN revision AS R ON (DR.revision = R.id) + INNER JOIN content_in_directory AS CD ON (C.id=CD.content) + INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory) + INNER JOIN revision AS R ON (DR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) - ORDER BY date, rev, path {early_cut} - """, - (id, id), - ) - yield from self.cursor.fetchall() + ORDER BY date, revision, origin {early_cut} + """ + self.cursor.execute(sql, (id, id)) + yield from (ProvenanceResult(**row) for row in self.cursor.fetchall()) def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): if data: diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,309 +1,161 @@ from datetime import datetime -import logging -import os -from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple +from typing import Dict, Generator, Iterable, Optional -import psycopg2 -from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable +from typing_extensions import Protocol, runtime_checkable from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -# XXX: this protocol doesn't make much sense now that flavours have been delegated to -# another class, lower in the callstack. +class ProvenanceResult: + def __init__( + self, + content: Sha1Git, + revision: Sha1Git, + date: datetime, + origin: Optional[str], + path: bytes, + ) -> None: + self.content = content + self.revision = revision + self.date = date + self.origin = origin + self.path = path + + @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False - def commit(self): - """Commit currently ongoing transactions in the backend DB""" + def flush(self) -> None: + """Flush internal cache to the underlying `storage`.""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: + """Associate `blob` with `directory` in the provenance model. `prefix` is the + relative path from `directory` to `blob` (excluding `blob`'s name). + """ ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: + """Associate `blob` with `revision` in the provenance model. `prefix` is the + absolute path from `revision`'s root directory to `blob` (excluding `blob`'s + name). + """ ... - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + """Retrieve the first occurrence of the blob identified by `id`.""" ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: + """Retrieve all the occurrences of the blob identified by `id`.""" ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: + """Retrieve the earliest known date of `blob`.""" ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: + """Retrieve the earliest known date for each blob in `blobs`. If some blob has + no associated date, it is not present in the resulting dictionary. + """ ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: + """Associate `date` to `blob` as it's earliest known date.""" ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: + """Associate `directory` with `revision` in the provenance model. `path` is the + absolute path from `revision`'s root directory to `directory` (including + `directory`'s name). + """ ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: + """Retrieve the earliest known date of `directory` as an isochrone frontier in + the provenance model. + """ ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: + """Retrieve the earliest known date for each directory in `dirs` as isochrone + frontiers provenance model. If some directory has no associated date, it is not + present in the resulting dictionary. + """ ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: + """Associate `date` to `directory` as it's earliest known date as an isochrone + frontier in the provenance model. + """ ... def origin_add(self, origin: OriginEntry) -> None: + """Add `origin` to the provenance model.""" ... def revision_add(self, revision: RevisionEntry) -> None: + """Add `revision` to the provenance model. This implies storing `revision`'s + date in the model, thus `revision.date` must be a valid date. + """ ... def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry + self, head: RevisionEntry, revision: RevisionEntry ) -> None: + """Associate `revision` to `head` as an ancestor of the latter.""" ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: + """Associate `revision` to `origin` as a head revision of the latter (ie. the + target of an snapshot for `origin` in the archive).""" ... - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: + def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: + """Retrieve the date associated to `revision`.""" ... def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: + """Retrieve the preferred origin associated to `revision`.""" ... def revision_in_history(self, revision: RevisionEntry) -> bool: + """Check if `revision` is known to be an ancestor of some head revision in the + provenance model. + """ ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: + """Associate `origin` as the preferred origin for `revision`.""" ... def revision_visited(self, revision: RevisionEntry) -> bool: + """Check if `revision` is known to be a head revision for some origin in the + provenance model. + """ ... - - -class DatetimeCache(TypedDict): - data: Dict[Sha1Git, Optional[datetime]] - added: Set[Sha1Git] - - -class OriginCache(TypedDict): - data: Dict[Sha1Git, str] - added: Set[Sha1Git] - - -class RevisionCache(TypedDict): - data: Dict[Sha1Git, Sha1Git] - added: Set[Sha1Git] - - -class ProvenanceCache(TypedDict): - content: DatetimeCache - directory: DatetimeCache - revision: DatetimeCache - # below are insertion caches only - content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] - content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] - directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] - # these two are for the origin layer - origin: OriginCache - revision_origin: RevisionCache - revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] - revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] - - -def new_cache(): - return ProvenanceCache( - content=DatetimeCache(data={}, added=set()), - directory=DatetimeCache(data={}, added=set()), - revision=DatetimeCache(data={}, added=set()), - content_in_revision=set(), - content_in_directory=set(), - directory_in_revision=set(), - origin=OriginCache(data={}, added=set()), - revision_origin=RevisionCache(data={}, added=set()), - revision_before_revision={}, - revision_in_origin=set(), - ) - - -# TODO: maybe move this to a separate file -class ProvenanceBackend: - raise_on_commit: bool = False - - def __init__(self, conn: psycopg2.extensions.connection): - from .postgresql.provenancedb_base import ProvenanceDBBase - - # TODO: this class should not know what the actual used DB is. - self.storage: ProvenanceDBBase - flavor = ProvenanceDBBase(conn).flavor - if flavor == "with-path": - from .postgresql.provenancedb_with_path import ProvenanceWithPathDB - - self.storage = ProvenanceWithPathDB(conn) - else: - from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB - - self.storage = ProvenanceWithoutPathDB(conn) - self.cache: ProvenanceCache = new_cache() - - def clear_caches(self): - self.cache = new_cache() - - def commit(self): - # TODO: for now we just forward the cache. This should be improved! - while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): - logging.warning( - f"Unable to commit cached information {self.write_cache}. Retrying..." - ) - self.clear_caches() - - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - self.cache["content_in_directory"].add( - (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - self.cache["content_in_revision"].add( - (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: - return self.storage.content_find_first(id) - - def content_find_all( - self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: - yield from self.storage.content_find_all(id, limit=limit) - - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - return self.get_dates("content", [blob.id]).get(blob.id) - - def content_get_early_dates( - self, blobs: Iterable[FileEntry] - ) -> Dict[Sha1Git, datetime]: - return self.get_dates("content", [blob.id for blob in blobs]) - - def content_set_early_date(self, blob: FileEntry, date: datetime): - self.cache["content"]["data"][blob.id] = date - self.cache["content"]["added"].add(blob.id) - - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ): - self.cache["directory_in_revision"].add( - (directory.id, revision.id, normalize(path)) - ) - - def directory_get_date_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> Optional[datetime]: - return self.get_dates("directory", [directory.id]).get(directory.id) - - def directory_get_dates_in_isochrone_frontier( - self, dirs: Iterable[DirectoryEntry] - ) -> Dict[Sha1Git, datetime]: - return self.get_dates("directory", [directory.id for directory in dirs]) - - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ): - self.cache["directory"]["data"][directory.id] = date - self.cache["directory"]["added"].add(directory.id) - - def get_dates( - self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git] - ) -> Dict[Sha1Git, datetime]: - cache = self.cache[entity] - missing_ids = set(id for id in ids if id not in cache) - if missing_ids: - cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) - dates: Dict[Sha1Git, datetime] = {} - for sha1 in ids: - date = cache["data"].get(sha1) - if date is not None: - dates[sha1] = date - return dates - - def origin_add(self, origin: OriginEntry) -> None: - self.cache["origin"]["data"][origin.id] = origin.url - self.cache["origin"]["added"].add(origin.id) - - def revision_add(self, revision: RevisionEntry): - self.cache["revision"]["data"][revision.id] = revision.date - self.cache["revision"]["added"].add(revision.id) - - def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry - ): - self.cache["revision_before_revision"].setdefault(revision.id, set()).add( - relative.id - ) - - def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.cache["revision_in_origin"].add((revision.id, origin.id)) - - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - return self.get_dates("revision", [revision.id]).get(revision.id) - - def revision_get_preferred_origin( - self, revision: RevisionEntry - ) -> Optional[Sha1Git]: - cache = self.cache["revision_origin"] - if revision.id not in cache: - origin = self.storage.revision_get_preferred_origin(revision.id) - if origin is not None: - cache["data"][revision.id] = origin - return cache["data"].get(revision.id) - - def revision_in_history(self, revision: RevisionEntry) -> bool: - return revision.id in self.cache[ - "revision_before_revision" - ] or self.storage.revision_in_history(revision.id) - - def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry - ): - self.cache["revision_origin"]["data"][revision.id] = origin.id - self.cache["revision_origin"]["added"].add(revision.id) - - def revision_visited(self, revision: RevisionEntry) -> bool: - return revision.id in dict( - self.cache["revision_in_origin"] - ) or self.storage.revision_visited(revision.id) - - -def normalize(path: bytes) -> bytes: - return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -69,7 +69,7 @@ assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. - date = provenance.revision_get_early_date(revision) + date = provenance.revision_get_date(revision) if date is None or revision.date < date: logging.debug( f"Processing revisions {revision.id.hex()}" @@ -93,16 +93,12 @@ ) done = time.time() if commit: - provenance.commit() + provenance.flush() stop = time.time() logging.debug( f"Revisions {';'.join([revision.id.hex() for revision in revisions])} " f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) - # logging.critical( - # ";".join([revision.id.hex() for revision in revisions]) - # + f",{stop - start},{stop - done}" - # ) def revision_process_content( diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -29,7 +29,7 @@ flavor = request.param populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor) - from swh.provenance.provenance import ProvenanceBackend + from swh.provenance.backend import ProvenanceBackend BaseDb.adapt_conn(postgresql) prov = ProvenanceBackend(postgresql) diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -7,7 +7,7 @@ def test_provenance_fixture(provenance): """Check the 'provenance' fixture produce a working ProvenanceDB object""" assert provenance - provenance.commit() # should be a noop + provenance.flush() # should be a noop def test_storage(swh_storage_with_objects): diff --git a/swh/provenance/tests/test_history_graph.py b/swh/provenance/tests/test_history_graph.py --- a/swh/provenance/tests/test_history_graph.py +++ b/swh/provenance/tests/test_history_graph.py @@ -59,4 +59,4 @@ origin_add_revision(provenance, entry, computed_graph) if not batch: - provenance.commit() + provenance.flush() diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -25,7 +25,7 @@ 'cur' is a cursor to the provenance index DB. """ cur.execute(f"SELECT sha1 FROM {table}") - return set(sha1.hex() for (sha1,) in cur.fetchall()) + return set(row["sha1"].hex() for row in cur.fetchall()) def locations(cur): @@ -33,8 +33,8 @@ 'cur' is a cursor to the provenance index DB. """ - cur.execute("SELECT encode(location.path::bytea, 'escape') FROM location") - return set(x for (x,) in cur.fetchall()) + cur.execute("SELECT encode(location.path::bytea, 'escape') AS path FROM location") + return set(row["path"] for row in cur.fetchall()) def relations(cur, src, dst): @@ -46,17 +46,17 @@ 'cur' is a cursor to the provenance index DB. """ relation = f"{src}_in_{dst}" - cur.execute("select swh_get_dbflavor()") - with_path = cur.fetchone()[0] == "with-path" + cur.execute("SELECT swh_get_dbflavor() AS flavor") + with_path = cur.fetchone()["flavor"] == "with-path" # note that the columns have the same name as the relations they refer to, # so we can write things like "rel.{dst}=src.id" in the query below if with_path: cur.execute( f""" - SELECT encode(src.sha1::bytea, 'hex'), - encode(dst.sha1::bytea, 'hex'), - encode(location.path::bytea, 'escape') + SELECT encode(src.sha1::bytea, 'hex') AS src, + encode(dst.sha1::bytea, 'hex') AS dst, + encode(location.path::bytea, 'escape') AS path FROM {relation} as relation INNER JOIN {src} AS src ON (relation.{src} = src.id) INNER JOIN {dst} AS dst ON (relation.{dst} = dst.id) @@ -66,15 +66,15 @@ else: cur.execute( f""" - SELECT encode(src.sha1::bytea, 'hex'), - encode(dst.sha1::bytea, 'hex'), - '' + SELECT encode(src.sha1::bytea, 'hex') AS src, + encode(dst.sha1::bytea, 'hex') AS dst, + '' AS path FROM {relation} as relation INNER JOIN {src} AS src ON (src.id = relation.{src}) INNER JOIN {dst} AS dst ON (dst.id = relation.{dst}) """ ) - return set(cur.fetchall()) + return set((row["src"], row["dst"], row["path"]) for row in cur.fetchall()) def get_timestamp(cur, table, sha1): @@ -85,7 +85,7 @@ if isinstance(sha1, str): sha1 = hash_to_bytes(sha1) cur.execute(f"SELECT date FROM {table} WHERE sha1=%s", (sha1,)) - return [date.timestamp() for (date,) in cur.fetchall()] + return [row["date"].timestamp() for row in cur.fetchall()] @pytest.mark.parametrize( @@ -253,21 +253,25 @@ for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( - (rev_id, rev_ts, maybe_path(rc["path"])) + (rev_id, rev_ts, None, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( - (rev_id, rev_ts, maybe_path(dc["prefix"] + "/" + dc["path"])) + (rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ - (blob.hex(), rev.hex(), date.timestamp(), path.decode()) - for blob, rev, date, path in provenance.content_find_all( - hash_to_bytes(content_id) + ( + occur.content.hex(), + occur.revision.hex(), + occur.date.timestamp(), + occur.origin, + occur.path.decode(), ) + for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] if provenance.storage.with_path: # this is not true if the db stores no path, because a same content @@ -337,11 +341,10 @@ # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): - (r_sha1, r_rev_id, r_ts, r_path) = provenance.content_find_first( - hash_to_bytes(content_id) - ) - assert r_sha1.hex() == content_id - assert r_rev_id.hex() == rev_id - assert r_ts.timestamp() == ts + occur = provenance.content_find_first(hash_to_bytes(content_id)) + assert occur.content.hex() == content_id + assert occur.revision.hex() == rev_id + assert occur.date.timestamp() == ts + assert occur.origin is None if provenance.storage.with_path: - assert r_path.decode() in paths + assert occur.path.decode() in paths