diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index d0a08a7..8ab3842 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,177 +1,215 @@ from datetime import datetime import itertools import logging from typing import Any, Dict, Generator, List, Optional, Set, Tuple import psycopg2 import psycopg2.extras class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") self._flavor: Optional[str] = None @property def flavor(self) -> str: if self._flavor is None: self.cursor.execute("select swh_get_dbflavor()") self._flavor = self.cursor.fetchone()[0] assert self._flavor is not None return self._flavor @property def with_path(self) -> bool: return self.flavor == "with-path" def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: try: # First insert entities for entity in ("content", "directory", "revision"): self.insert_entity( entity, { sha1: data[entity]["data"][sha1] for sha1 in data[entity]["added"] }, ) + data[entity]["data"].clear() + data[entity]["added"].clear() # Relations should come after ids for entities were resolved for relation in ( "content_in_revision", "content_in_directory", "directory_in_revision", ): self.insert_relation(relation, data[relation]) - # TODO: this should be updated when origin-revision layer gets properly - # updated. - # if data["revision_before_revision"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """ - # LOCK TABLE ONLY revision_before_revision; - # INSERT INTO revision_before_revision VALUES %s - # ON CONFLICT DO NOTHING - # """, - # data["revision_before_revision"], - # ) - # data["revision_before_revision"].clear() - # - # if data["revision_in_origin"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """ - # LOCK TABLE ONLY revision_in_origin; - # INSERT INTO revision_in_origin VALUES %s - # ON CONFLICT DO NOTHING - # """, - # data["revision_in_origin"], - # ) - # data["revision_in_origin"].clear() + # Insert relations from the origin-revision layer + self.insert_origin_head(data["revision_in_origin"]) + self.insert_revision_history(data["revision_before_revision"]) + + # Update preferred origins + self.update_preferred_origin( + { + sha1: data["revision_preferred_origin"]["data"][sha1] + for sha1 in data["revision_preferred_origin"]["added"] + } + ) + data["revision_preferred_origin"]["data"].clear() + data["revision_preferred_origin"]["added"].clear() return True + except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if raise_on_commit: raise + return False + def content_find_first( + self, blob: bytes + ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + ... + + def content_find_all( + self, blob: bytes, limit: Optional[int] = None + ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + ... + def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: dates = {} if ids: values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", tuple(ids), ) dates.update(self.cursor.fetchall()) return dates def insert_entity(self, entity: str, data: Dict[bytes, datetime]): if data: psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, data.items(), ) # XXX: not sure if Python takes a reference or a copy. # This might be useless! data.clear() - def insert_relation(self, relation: str, data: Set[Tuple[bytes, bytes, bytes]]): - ... + def insert_origin_head(self, data: Dict[bytes, int]): + if data: + psycopg2.extras.execute_values( + self.cursor, + # XXX: not clear how conflicts are handled here! + """ + LOCK TABLE ONLY revision_in_origin; + INSERT INTO revision_in_origin + SELECT R.id, V.org + FROM (VALUES %s) AS V(rev, org) + INNER JOIN revision AS R on (R.sha1=V.rev) + """, + data, + ) + data.clear() - def content_find_first( - self, blob: bytes - ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + def insert_relation(self, relation: str, data: Set[Tuple[bytes, bytes, bytes]]): ... - def content_find_all( - self, blob: bytes, limit: Optional[int] = None - ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: - ... + def insert_revision_history(self, data: Dict[bytes, bytes]): + if data: + values = [[(prev, next) for next in data[prev]] for prev in data] + psycopg2.extras.execute_values( + self.cursor, + # XXX: not clear how conflicts are handled here! + """ + LOCK TABLE ONLY revision_before_revision; + INSERT INTO revision_before_revision + SELECT P.id, N.id + FROM (VALUES %s) AS V(prev, next) + INNER JOIN revision AS P on (P.sha1=V.prev) + INNER JOIN revision AS N on (N.sha1=V.next) + """, + tuple(sum(values, [])), + ) + data.clear() def origin_get_id(self, url: str) -> int: # Insert origin in the DB and return the assigned id + # XXX: not sure this works as expected if url is already in the db! self.cursor.execute( """ LOCK TABLE ONLY origin; INSERT INTO origin(url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id """, (url,), ) return self.cursor.fetchone()[0] - def revision_get_preferred_origin(self, revision: bytes) -> int: + def revision_get_preferred_origin(self, revision: bytes) -> Optional[int]: self.cursor.execute( """SELECT COALESCE(origin, 0) FROM revision WHERE sha1=%s""", (revision,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_before_revision JOIN revision ON revision.id=revision_before_revision.prev WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None - def revision_set_preferred_origin(self, origin: int, revision: bytes): - self.cursor.execute( - """UPDATE revision SET origin=%s WHERE sha1=%s""", (origin, revision) - ) - def revision_visited(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_in_origin JOIN revision ON revision.id=revision_in_origin.revision WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None + + def update_preferred_origin(self, data: Dict[bytes, int]): + if data: + # XXX: this is assuming the revision already exists in the db! It should + # be improved by allowing null dates in the revision table. + psycopg2.extras.execute_values( + self.cursor, + """ + UPDATE revision + SET origin=V.org + FROM (VALUES %s) AS V(rev, org) + WHERE sha1=V.rev + """, + data.items(), + ) + data.clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index bc05e6b..76f4ef7 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,282 +1,296 @@ from datetime import datetime import logging import os from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple import psycopg2 from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry # XXX: this protocol doesn't make much sense now that flavours have been delegated to # another class, lower in the callstack. @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... - def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: + def revision_get_preferred_origin(self, revision: RevisionEntry) -> Optional[int]: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... -class Cache(TypedDict): +class DatetimeCache(TypedDict): data: Dict[bytes, datetime] added: Set[bytes] +class OriginCache(TypedDict): + data: Dict[bytes, int] # TODO: we should switch to use Url instead + added: Set[bytes] + + class ProvenanceCache(TypedDict): - content: Cache - directory: Cache - revision: Cache + content: DatetimeCache + directory: DatetimeCache + revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[bytes, bytes, bytes]] content_in_directory: Set[Tuple[bytes, bytes, bytes]] directory_in_revision: Set[Tuple[bytes, bytes, bytes]] # these two are for the origin layer - revision_before_revision: List[Tuple[bytes, bytes]] - revision_in_origin: List[Tuple[bytes, int]] + revision_before_revision: Dict[bytes, Set[bytes]] + revision_in_origin: Set[Tuple[bytes, int]] + revision_preferred_origin: OriginCache def new_cache(): return ProvenanceCache( - content=Cache(data={}, added=set()), - directory=Cache(data={}, added=set()), - revision=Cache(data={}, added=set()), + content=DatetimeCache(data={}, added=set()), + directory=DatetimeCache(data={}, added=set()), + revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), - revision_before_revision=[], - revision_in_origin=[], + revision_before_revision={}, + revision_in_origin=set(), + revision_preferred_origin=OriginCache(data={}, added=set()), ) # TODO: maybe move this to a separate file class ProvenanceBackend: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): from .postgresql.provenancedb_base import ProvenanceDBBase # TODO: this class should not know what the actual used DB is. self.storage: ProvenanceDBBase flavor = ProvenanceDBBase(conn).flavor if flavor == "with-path": from .postgresql.provenancedb_with_path import ProvenanceWithPathDB self.storage = ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB self.storage = ProvenanceWithoutPathDB(conn) self.cache: ProvenanceCache = new_cache() def clear_caches(self): self.cache = new_cache() def commit(self): # TODO: for now we just forward the cache. This should be improved! while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( f"Unable to commit cached information {self.write_cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: return self.storage.content_find_first(blob) def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: yield from self.storage.content_find_all(blob, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id, None) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime): self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id, None) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "revision", "directory"], ids: List[bytes] ) -> Dict[bytes, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) return {sha1: cache["data"][sha1] for sha1 in ids if sha1 in cache["data"]} def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: return self.storage.origin_get_id(origin.url) else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB assert revision.date is not None self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): - self.cache["revision_before_revision"].append((revision.id, relative.id)) + self.cache["revision_before_revision"].setdefault(revision.id, set()).add( + relative.id + ) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): assert origin.id is not None - self.cache["revision_in_origin"].append((revision.id, origin.id)) + self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id, None) - def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: - # TODO: adapt this method to consider cached values - return self.storage.revision_get_preferred_origin(revision.id) + def revision_get_preferred_origin(self, revision: RevisionEntry) -> Optional[int]: + if revision.id not in self.cache["revision_preferred_origin"]["data"]: + origin = self.storage.revision_get_preferred_origin(revision.id) + if origin is not None: + self.cache["revision_preferred_origin"]["data"][revision.id] = origin + return self.cache["revision_preferred_origin"]["data"].get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values - return self.storage.revision_in_history(revision.id) + return revision.id in self.cache[ + "revision_before_revision" + ] or self.storage.revision_in_history(revision.id) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): assert origin.id is not None - # TODO: adapt this method to consider cached values - self.storage.revision_set_preferred_origin(origin.id, revision.id) + self.cache["revision_preferred_origin"]["data"][revision.id] = origin.id + self.cache["revision_preferred_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values - return self.storage.revision_visited(revision.id) + return revision.id in dict( + self.cache["revision_in_origin"] + ) or self.storage.revision_visited(revision.id) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path