diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 33319c8..17f8392 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,167 +1,173 @@ from datetime import datetime import itertools import logging from typing import Any, Dict, Generator, List, Optional, Set, Tuple import psycopg2 import psycopg2.extras class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: try: # First insert entities - self.insert_entity("content", data["content"]) - self.insert_entity("directory", data["directory"]) - self.insert_entity("revision", data["revision"]) + for entity in ("content", "directory", "revision"): + + self.insert_entity( + entity, + { + sha1: data[entity]["data"][sha1] + for sha1 in data[entity]["added"] + }, + ) # Relations should come after ids for entities were resolved self.insert_relation( "content", "revision", "content_early_in_rev", data["content_early_in_rev"], ) self.insert_relation( "content", "directory", "content_in_dir", data["content_in_dir"] ) self.insert_relation( "directory", "revision", "directory_in_rev", data["directory_in_rev"] ) # TODO: this should be updated when origin-revision layer gets properly # updated. # if data["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_before_rev; # INSERT INTO revision_before_rev VALUES %s # ON CONFLICT DO NOTHING # """, # data["revision_before_rev"], # ) # data["revision_before_rev"].clear() # # if data["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_in_org; # INSERT INTO revision_in_org VALUES %s # ON CONFLICT DO NOTHING # """, # data["revision_in_org"], # ) # data["revision_in_org"].clear() return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if raise_on_commit: raise return False def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: dates = {} if ids: values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", tuple(ids), ) dates.update(self.cursor.fetchall()) return dates def insert_entity(self, entity: str, data: Dict[bytes, datetime]): if data: psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, data.items(), ) # XXX: not sure if Python takes a reference or a copy. # This might be useless! data.clear() def insert_relation( self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] ): ... def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def origin_get_id(self, url: str) -> int: # Insert origin in the DB and return the assigned id self.cursor.execute( """ LOCK TABLE ONLY origin; INSERT INTO origin(url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id """, (url,), ) return self.cursor.fetchone()[0] def revision_get_preferred_origin(self, revision: bytes) -> int: self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_before_rev JOIN revision ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin(self, origin: int, revision: bytes): self.cursor.execute( """UPDATE revision SET org=%s WHERE sha1=%s""", (origin, revision) ) def revision_visited(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_in_org JOIN revision ON revision.id=revision_in_org.rev WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 1e6edb7..8c869a9 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,270 +1,280 @@ from datetime import datetime import logging import os -from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple +from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple import psycopg2 -from typing_extensions import Protocol, runtime_checkable +from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry # XXX: this protocol doesn't make much sense now that flavours have been delegated to # another class, lower in the callstack. @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... +class Cache(TypedDict): + data: Dict[bytes, datetime] + added: Set[bytes] + + +class ProvenanceCache(TypedDict): + content: Cache + directory: Cache + revision: Cache + # below are insertion caches only + content_early_in_rev: Set[Tuple[bytes, bytes, bytes]] + content_in_dir: Set[Tuple[bytes, bytes, bytes]] + directory_in_rev: Set[Tuple[bytes, bytes, bytes]] + # these two are for the origin layer + revision_before_rev: List[Tuple[bytes, bytes]] + revision_in_org: List[Tuple[bytes, int]] + + +def new_cache(): + return ProvenanceCache( + content=Cache(data={}, added=set()), + directory=Cache(data={}, added=set()), + revision=Cache(data={}, added=set()), + content_early_in_rev=set(), + content_in_dir=set(), + directory_in_rev=set(), + revision_before_rev=[], + revision_in_org=[], + ) + + # TODO: maybe move this to a separate file class ProvenanceBackend: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection, with_path: bool = True): from .postgresql.provenancedb_base import ProvenanceDBBase # TODO: this class should not know what the actual used DB is. self.storage: ProvenanceDBBase if with_path: from .postgresql.provenancedb_with_path import ProvenanceWithPathDB self.storage = ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB self.storage = ProvenanceWithoutPathDB(conn) - self.write_cache: Dict[str, Any] = {} - self.read_cache: Dict[str, Any] = {} - self.clear_caches() + self.cache: ProvenanceCache = new_cache() def clear_caches(self): - self.write_cache = { - "content": dict(), - "content_early_in_rev": set(), - "content_in_dir": set(), - "directory": dict(), - "directory_in_rev": set(), - "revision": dict(), - "revision_before_rev": list(), - "revision_in_org": list(), - } - self.read_cache = {"content": dict(), "directory": dict(), "revision": dict()} + self.cache = new_cache() def commit(self): # TODO: for now we just forward the write_cache. This should be improved! - while not self.storage.commit( - self.write_cache, raise_on_commit=self.raise_on_commit - ): + while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( f"Unable to commit cached information {self.write_cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): - self.write_cache["content_in_dir"].add( + self.cache["content_in_dir"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): - self.write_cache["content_early_in_rev"].add( + self.cache["content_early_in_rev"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: return self.storage.content_find_first(blob) def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: yield from self.storage.content_find_all(blob, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id, None) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime): - self.write_cache["content"][blob.id] = date - # update read cache as well - self.read_cache["content"][blob.id] = date + self.cache["content"]["data"][blob.id] = date + self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): - self.write_cache["directory_in_rev"].add( - (directory.id, revision.id, normalize(path)) - ) + self.cache["directory_in_rev"].add((directory.id, revision.id, normalize(path))) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id, None) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): - self.write_cache["directory"][directory.id] = date - # update read cache as well - self.read_cache["directory"][directory.id] = date - - def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for sha1 in ids: - # Check whether the date has been queried before - date = self.read_cache[entity].get(sha1, None) - if date is not None: - dates[sha1] = date - else: - pending.append(sha1) - dates.update(self.storage.get_dates(entity, pending)) - return dates + self.cache["directory"]["data"][directory.id] = date + self.cache["directory"]["added"].add(directory.id) + + def get_dates( + self, entity: Literal["content", "revision", "directory"], ids: List[bytes] + ) -> Dict[bytes, datetime]: + cache = self.cache[entity] + missing_ids = set(id for id in ids if id not in cache) + if missing_ids: + cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) + return {sha1: cache["data"][sha1] for sha1 in ids if sha1 in cache["data"]} def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: return self.storage.origin_get_id(origin.url) else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB - self.write_cache["revision"][revision.id] = revision.date - # update read cache as well - self.read_cache["revision"][revision.id] = revision.date + assert revision.date is not None + self.cache["revision"]["data"][revision.id] = revision.date + self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): - self.write_cache["revision_before_rev"].append((revision.id, relative.id)) + self.cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.write_cache["revision_in_org"].append((revision.id, origin.id)) + assert origin.id is not None + self.cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id, None) def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values return self.storage.revision_get_preferred_origin(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values return self.storage.revision_in_history(revision.id) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): assert origin.id is not None # TODO: adapt this method to consider cached values self.storage.revision_set_preferred_origin(origin.id, revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values return self.storage.revision_visited(revision.id) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path