diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,16 +1,47 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Iterable, Optional, Set +from typing import Dict, Iterable, List, Optional, Set, Tuple import psycopg2 import psycopg2.extras +from typing_extensions import TypedDict from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry from ..revision import RevisionEntry +class Cache(TypedDict): + data: Dict[bytes, datetime] + added: Set[bytes] + removed: Set[bytes] + + +class ProvenanceCache(TypedDict): + content: Cache + directory: Cache + revision: Cache + content_early_in_rev: Set[Tuple[bytes, bytes, bytes]] + content_in_dir: Set[Tuple[bytes, bytes, bytes]] + directory_in_rev: Set[Tuple[bytes, bytes, bytes]] + revision_before_rev: List[Tuple[bytes, bytes]] + revision_in_org: List[Tuple[bytes, bytes]] + + +def new_cache(): + return ProvenanceCache( + content=Cache(data={}, added=set(), removed=set()), + directory=Cache(data={}, added=set(), removed=set()), + revision=Cache(data={}, added=set(), removed=set()), + content_early_in_rev=set(), + content_in_dir=set(), + directory_in_rev=set(), + revision_before_rev=[], + revision_in_org=[], + ) + + class ProvenanceDBBase: raise_on_commit: bool = False @@ -22,24 +53,11 @@ self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") - self.insert_cache: Dict[str, Any] = {} - self.remove_cache: Dict[str, Set[bytes]] = {} - self.select_cache: Dict[str, Any] = {} + self.cache: ProvenanceCache = new_cache() self.clear_caches() def clear_caches(self): - self.insert_cache = { - "content": dict(), - "content_early_in_rev": set(), - "content_in_dir": set(), - "directory": dict(), - "directory_in_rev": set(), - "revision": dict(), - "revision_before_rev": list(), - "revision_in_org": list(), - } - self.remove_cache = {"directory": set()} - self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} + self.cache = new_cache() def commit(self): try: @@ -56,113 +74,72 @@ return False def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM content WHERE sha1=%s""", (blob.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["content"][blob.id] = date - return date + dates = self.content_get_early_dates([blob]) + if dates: + return dates[blob.id] + return None def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for blob in blobs: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - pending.append(blob.id) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) + cache: Dict[bytes, datetime] = self.cache["content"]["data"] + all_ids = set(blob.id for blob in blobs) + missing_ids = set(id for id in all_ids if id not in cache) + if missing_ids: + values = ", ".join(itertools.repeat("%s", len(missing_ids))) self.cursor.execute( f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", - tuple(pending), + tuple(missing_ids), ) for sha1, date in self.cursor.fetchall(): - dates[sha1] = date - self.select_cache["content"][sha1] = date - return dates + cache[sha1] = date + return {id: cache[id] for id in all_ids if id in cache} def content_set_early_date(self, blob: FileEntry, date: datetime): - self.insert_cache["content"][blob.id] = date + self.cache["content"]["data"][blob.id] = date + self.cache["content"]["added"].add(blob.id) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) - if date is None and directory.id not in self.remove_cache["directory"]: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["directory"][directory.id] = date - return date + dates = self.directory_get_dates_in_isochrone_frontier([directory]) + if dates: + return dates[directory.id] + return None def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for directory in dirs: - # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) - if date is not None: - dates[directory.id] = date - elif directory.id not in self.remove_cache["directory"]: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is not None: - dates[directory.id] = date - else: - pending.append(directory.id) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) + cache = self.cache["directory"]["data"] + removed = self.cache["directory"]["removed"] + + all_ids = set(dir.id for dir in dirs if id not in removed) + missing_ids = set(id for id in all_ids if id not in cache) + if missing_ids: + values = ", ".join(itertools.repeat("%s", len(missing_ids))) self.cursor.execute( f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", - tuple(pending), + tuple(missing_ids), ) for sha1, date in self.cursor.fetchall(): - dates[sha1] = date - self.select_cache["directory"][sha1] = date - return dates + cache[sha1] = date + return {id: cache[id] for id in all_ids if id in cache} def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): - self.remove_cache["directory"].add(directory.id) - self.insert_cache["directory"].pop(directory.id, None) + self.cache["directory"]["removed"].add(directory.id) + self.cache["directory"]["added"].discard(directory.id) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): - self.insert_cache["directory"][directory.id] = date - self.remove_cache["directory"].discard(directory.id) + self.cache["directory"]["data"][directory.id] = date + self.cache["directory"]["added"].add(directory.id) + self.cache["directory"]["removed"].discard(directory.id) def insert_all(self): # Performe insertions with cached information - if self.insert_cache["content"]: + if self.cache["content"]["added"]: psycopg2.extras.execute_values( self.cursor, """ @@ -171,11 +148,14 @@ ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date) """, - self.insert_cache["content"].items(), + [ + (x, self.cache["content"]["data"][x]) + for x in self.cache["content"]["added"] + ], ) - self.insert_cache["content"].clear() + self.cache["content"]["added"].clear() - if self.insert_cache["directory"]: + if self.cache["directory"]["added"]: psycopg2.extras.execute_values( self.cursor, """ @@ -184,11 +164,14 @@ ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,directory.date) """, - self.insert_cache["directory"].items(), + [ + (x, self.cache["directory"]["data"][x]) + for x in self.cache["directory"]["added"] + ], ) - self.insert_cache["directory"].clear() + self.cache["directory"]["added"].clear() - if self.insert_cache["revision"]: + if self.cache["revision"]["added"]: psycopg2.extras.execute_values( self.cursor, """ @@ -197,18 +180,21 @@ ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,revision.date) """, - self.insert_cache["revision"].items(), + [ + (x, self.cache["revision"]["data"][x]) + for x in self.cache["revision"]["added"] + ], ) - self.insert_cache["revision"].clear() + self.cache["revision"]["added"].clear() # Relations should come after ids for elements were resolved - if self.insert_cache["content_early_in_rev"]: + if self.cache["content_early_in_rev"]: self.insert_location("content", "revision", "content_early_in_rev") - if self.insert_cache["content_in_dir"]: + if self.cache["content_in_dir"]: self.insert_location("content", "directory", "content_in_dir") - if self.insert_cache["directory_in_rev"]: + if self.cache["directory_in_rev"]: self.insert_location("directory", "revision", "directory_in_rev") # if self.insert_cache["revision_before_rev"]: @@ -253,30 +239,28 @@ def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB - self.insert_cache["revision"][revision.id] = revision.date + assert revision.date is not None # for mypy + self.cache["revision"]["data"][revision.id] = revision.date + self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): - self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) + self.cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.insert_cache["revision_in_org"].append((revision.id, origin.id)) + self.cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - date = self.insert_cache["revision"].get(revision.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["revision"].get(revision.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["revision"][revision.id] = date - return date + cache = self.cache["revision"]["data"] + if revision.id not in cache: + self.cursor.execute( + """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) + ) + row = self.cursor.fetchone() + date = row[0] if row is not None else None + cache[revision.id] = date + return cache.get(revision.id) def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -18,17 +18,22 @@ def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): - self.insert_cache["content_in_dir"].add( + self.cache["content_in_dir"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): - self.insert_cache["content_early_in_rev"].add( + self.cache["content_early_in_rev"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) + def directory_add_to_revision( + self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + ): + self.cache["directory_in_rev"].add((directory.id, revision.id, normalize(path))) + def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: @@ -87,13 +92,6 @@ # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. yield from self.cursor.fetchall() - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ): - self.insert_cache["directory_in_rev"].add( - (directory.id, revision.id, normalize(path)) - ) - def insert_location(self, src0_table, src1_table, dst_table): """Insert location entries in `dst_table` from the insert_cache @@ -104,7 +102,7 @@ # one to insert entries in the dst_table) # Resolve src0 ids - src0_sha1s = tuple(set(sha1 for (sha1, _, _) in self.insert_cache[dst_table])) + src0_sha1s = tuple(set(sha1 for (sha1, _, _) in self.cache[dst_table])) fmt = ",".join(["%s"] * len(src0_sha1s)) self.cursor.execute( f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({fmt})""", @@ -113,7 +111,7 @@ src0_values = dict(self.cursor.fetchall()) # Resolve src1 ids - src1_sha1s = tuple(set(sha1 for (_, sha1, _) in self.insert_cache[dst_table])) + src1_sha1s = tuple(set(sha1 for (_, sha1, _) in self.cache[dst_table])) fmt = ",".join(["%s"] * len(src1_sha1s)) self.cursor.execute( f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({fmt})""", @@ -122,7 +120,7 @@ src1_values = dict(self.cursor.fetchall()) # insert missing locations - locations = tuple(set((loc,) for (_, _, loc) in self.insert_cache[dst_table])) + locations = tuple(set((loc,) for (_, _, loc) in self.cache[dst_table])) psycopg2.extras.execute_values( self.cursor, """ @@ -143,7 +141,7 @@ # Insert values in dst_table rows = [ (src0_values[sha1_src], src1_values[sha1_dst], loc_ids[loc]) - for (sha1_src, sha1_dst, loc) in self.insert_cache[dst_table] + for (sha1_src, sha1_dst, loc) in self.cache[dst_table] ] psycopg2.extras.execute_values( self.cursor, @@ -154,4 +152,4 @@ """, rows, ) - self.insert_cache[dst_table].clear() + self.cache[dst_table].clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -19,12 +19,12 @@ def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): - self.insert_cache["content_in_dir"].add((blob.id, directory.id)) + self.cache["content_in_dir"].add((blob.id, directory.id, b"")) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): - self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) + self.cache["content_early_in_rev"].add((blob.id, revision.id, b"")) def content_find_first( self, blobid: bytes @@ -98,7 +98,7 @@ def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): - self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) + self.cache["directory_in_rev"].add((directory.id, revision.id, b"")) def insert_location(self, src0_table, src1_table, dst_table): # Resolve src0 ids diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -496,8 +496,7 @@ return ( node.known and node.maxdate <= revision.date # all content is earlier than revision - and node.depth - >= mindepth # current node is deeper than the min allowed depth + and node.depth >= mindepth # current node is deeper than the min depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: