diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index fb20774..96c0f61 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,318 +1,257 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Iterable, Optional +from typing import Any, Dict, Iterable, List, Optional import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry from ..revision import RevisionEntry class ProvenanceDBBase: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): - # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") - self.insert_cache: Dict[str, Any] = {} - self.select_cache: Dict[str, Any] = {} + self.write_cache: Dict[str, Any] = {} + self.read_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): - self.insert_cache = { + self.write_cache = { "content": dict(), "content_early_in_rev": set(), "content_in_dir": set(), "directory": dict(), "directory_in_rev": set(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } - self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} + self.read_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): try: self.insert_all() self.clear_caches() return True - except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise - return False def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM content WHERE sha1=%s""", (blob.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["content"][blob.id] = date - return date + return self.get_dates("content", [blob.id]).get(blob.id, None) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for blob in blobs: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - pending.append(blob.id) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) - self.cursor.execute( - f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", - tuple(pending), - ) - for sha1, date in self.cursor.fetchall(): - dates[sha1] = date - self.select_cache["content"][sha1] = date - return dates + return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime): - self.insert_cache["content"][blob.id] = date + self.write_cache["content"][blob.id] = date + # update read cache as well + self.read_cache["content"][blob.id] = date def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: - # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["directory"][directory.id] = date - return date + return self.get_dates("directory", [directory.id]).get(directory.id, None) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: + return self.get_dates("directory", [directory.id for directory in dirs]) + + def directory_set_date_in_isochrone_frontier( + self, directory: DirectoryEntry, date: datetime + ): + self.write_cache["directory"][directory.id] = date + # update read cache as well + self.read_cache["directory"][directory.id] = date + + def get_dates(self, table: str, ids: List[bytes]) -> Dict[bytes, datetime]: dates = {} pending = [] - for directory in dirs: - # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) + for sha1 in ids: + # Check whether the date has been queried before + date = self.read_cache[table].get(sha1, None) if date is not None: - dates[directory.id] = date + dates[sha1] = date else: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is not None: - dates[directory.id] = date - else: - pending.append(directory.id) + pending.append(sha1) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( - f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", + f"""SELECT sha1, date FROM {table} WHERE sha1 IN ({values})""", tuple(pending), ) for sha1, date in self.cursor.fetchall(): dates[sha1] = date - self.select_cache["directory"][sha1] = date + self.read_cache[table][sha1] = date return dates - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ): - self.insert_cache["directory"][directory.id] = date - def insert_all(self): - # Performe insertions with cached information - if self.insert_cache["content"]: + # Perform insertions with cached information + if self.write_cache["content"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY content; INSERT INTO content(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date) """, - self.insert_cache["content"].items(), + self.write_cache["content"].items(), ) - self.insert_cache["content"].clear() + self.write_cache["content"].clear() - if self.insert_cache["directory"]: + if self.write_cache["directory"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY directory; INSERT INTO directory(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,directory.date) """, - self.insert_cache["directory"].items(), + self.write_cache["directory"].items(), ) - self.insert_cache["directory"].clear() + self.write_cache["directory"].clear() - if self.insert_cache["revision"]: + if self.write_cache["revision"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,revision.date) """, - self.insert_cache["revision"].items(), + self.write_cache["revision"].items(), ) - self.insert_cache["revision"].clear() + self.write_cache["revision"].clear() # Relations should come after ids for elements were resolved - if self.insert_cache["content_early_in_rev"]: + if self.write_cache["content_early_in_rev"]: self.insert_location("content", "revision", "content_early_in_rev") - if self.insert_cache["content_in_dir"]: + if self.write_cache["content_in_dir"]: self.insert_location("content", "directory", "content_in_dir") - if self.insert_cache["directory_in_rev"]: + if self.write_cache["directory_in_rev"]: self.insert_location("directory", "revision", "directory_in_rev") - # if self.insert_cache["revision_before_rev"]: + # if self.write_cache["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_before_rev; # INSERT INTO revision_before_rev VALUES %s # ON CONFLICT DO NOTHING # """, - # self.insert_cache["revision_before_rev"], + # self.write_cache["revision_before_rev"], # ) - # self.insert_cache["revision_before_rev"].clear() + # self.write_cache["revision_before_rev"].clear() - # if self.insert_cache["revision_in_org"]: + # if self.write_cache["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_in_org; # INSERT INTO revision_in_org VALUES %s # ON CONFLICT DO NOTHING # """, - # self.insert_cache["revision_in_org"], + # self.write_cache["revision_in_org"], # ) - # self.insert_cache["revision_in_org"].clear() + # self.write_cache["revision_in_org"].clear() def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Insert origin in the DB and return the assigned id self.cursor.execute( """ LOCK TABLE ONLY origin; INSERT INTO origin(url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id """, (origin.url,), ) return self.cursor.fetchone()[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB - self.insert_cache["revision"][revision.id] = revision.date + self.write_cache["revision"][revision.id] = revision.date + # update read cache as well + self.read_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): - self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) + self.write_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.insert_cache["revision_in_org"].append((revision.id, origin.id)) + self.write_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - date = self.insert_cache["revision"].get(revision.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["revision"].get(revision.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["revision"][revision.id] = date - return date + return self.get_dates("revision", [revision.id]).get(revision.id, None) def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """ SELECT 1 FROM revision_before_rev JOIN revision ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s """, (revision.id,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): # TODO: adapt this method to consider cached values self.cursor.execute( """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """ SELECT 1 FROM revision_in_org JOIN revision ON revision.id=revision_in_org.rev WHERE revision.sha1=%s """, (revision.id,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py index e61e05a..ed8bbeb 100644 --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,157 +1,157 @@ from datetime import datetime import os from typing import Generator, Optional, Tuple import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path class ProvenanceWithPathDB(ProvenanceDBBase): def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): - self.insert_cache["content_in_dir"].add( + self.write_cache["content_in_dir"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): - self.insert_cache["content_early_in_rev"].add( + self.write_cache["content_early_in_rev"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, L.path AS path FROM content AS C INNER JOIN content_early_in_rev AS CR ON (CR.blob = C.id) INNER JOIN location as L ON (CR.loc = L.id) INNER JOIN revision as R ON (CR.rev = R.id) WHERE C.sha1=%s ORDER BY date, rev, path ASC LIMIT 1 """, (blobid,), ) return self.cursor.fetchone() def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, L.path AS path FROM content AS C INNER JOIN content_early_in_rev AS CR ON (CR.blob = C.id) INNER JOIN location AS L ON (CR.loc = L.id) INNER JOIN revision AS R ON (CR.rev = R.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, CASE DL.path WHEN '' THEN CL.path WHEN '.' THEN CL.path ELSE (DL.path || '/' || CL.path)::unix_path END AS path FROM content AS C INNER JOIN content_in_dir AS CD ON (C.id = CD.blob) INNER JOIN directory_in_rev AS DR ON (CD.dir = DR.dir) INNER JOIN revision AS R ON (DR.rev = R.id) INNER JOIN location AS CL ON (CD.loc = CL.id) INNER JOIN location AS DL ON (DR.loc = DL.id) WHERE C.sha1=%s) ORDER BY date, rev, path {early_cut} """, (blobid, blobid), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. yield from self.cursor.fetchall() def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): - self.insert_cache["directory_in_rev"].add( + self.write_cache["directory_in_rev"].add( (directory.id, revision.id, normalize(path)) ) def insert_location(self, src0_table, src1_table, dst_table): - """Insert location entries in `dst_table` from the insert_cache + """Insert location entries in `dst_table` from the write_cache Also insert missing location entries in the 'location' table. """ # TODO: find a better way of doing this; might be doable in a coupls of # SQL queries (one to insert missing entries in the location' table, # one to insert entries in the dst_table) # Resolve src0 ids - src0_sha1s = tuple(set(sha1 for (sha1, _, _) in self.insert_cache[dst_table])) + src0_sha1s = tuple(set(sha1 for (sha1, _, _) in self.write_cache[dst_table])) fmt = ",".join(["%s"] * len(src0_sha1s)) self.cursor.execute( f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({fmt})""", src0_sha1s, ) src0_values = dict(self.cursor.fetchall()) # Resolve src1 ids - src1_sha1s = tuple(set(sha1 for (_, sha1, _) in self.insert_cache[dst_table])) + src1_sha1s = tuple(set(sha1 for (_, sha1, _) in self.write_cache[dst_table])) fmt = ",".join(["%s"] * len(src1_sha1s)) self.cursor.execute( f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({fmt})""", src1_sha1s, ) src1_values = dict(self.cursor.fetchall()) # insert missing locations - locations = tuple(set((loc,) for (_, _, loc) in self.insert_cache[dst_table])) + locations = tuple(set((loc,) for (_, _, loc) in self.write_cache[dst_table])) psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO NOTHING """, locations, ) # fetch location ids fmt = ",".join(["%s"] * len(locations)) self.cursor.execute( f"SELECT path, id FROM location WHERE path IN ({fmt})", locations, ) loc_ids = dict(self.cursor.fetchall()) # Insert values in dst_table rows = [ (src0_values[sha1_src], src1_values[sha1_dst], loc_ids[loc]) - for (sha1_src, sha1_dst, loc) in self.insert_cache[dst_table] + for (sha1_src, sha1_dst, loc) in self.write_cache[dst_table] ] psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {dst_table}; INSERT INTO {dst_table} VALUES %s ON CONFLICT DO NOTHING """, rows, ) - self.insert_cache[dst_table].clear() + self.write_cache[dst_table].clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py index a7ab205..90bc38e 100644 --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,140 +1,140 @@ from datetime import datetime import itertools import operator from typing import Generator, Optional, Tuple import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase ######################################################################################## ######################################################################################## ######################################################################################## class ProvenanceWithoutPathDB(ProvenanceDBBase): def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): - self.insert_cache["content_in_dir"].add((blob.id, directory.id)) + self.write_cache["content_in_dir"].add((blob.id, directory.id)) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): - self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) + self.write_cache["content_early_in_rev"].add((blob.id, revision.id)) def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT content_early_in_rev.rev FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ORDER BY date, rev ASC LIMIT 1 """, (blobid,), ) row = self.cursor.fetchone() if row is not None: # TODO: query revision from the archive and look for blobid into a # recursive directory_ls of the revision's root. return blobid, row[0], row[1], b"" return None def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT content_early_in_rev.rev FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ) UNION (SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT directory_in_rev.rev FROM (SELECT content_in_dir.dir FROM content_in_dir JOIN content ON content_in_dir.blob=content.id WHERE content.sha1=%s ) AS content_dir JOIN directory_in_rev ON directory_in_rev.dir=content_dir.dir ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ) ORDER BY date, rev {early_cut} """, (blobid, blobid), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. for row in self.cursor.fetchall(): # TODO: query revision from the archive and look for blobid into a # recursive directory_ls of the revision's root. yield blobid, row[0], row[1], b"" def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): - self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) + self.write_cache["directory_in_rev"].add((directory.id, revision.id)) def insert_location(self, src0_table, src1_table, dst_table): # Resolve src0 ids src0_values = dict().fromkeys( - map(operator.itemgetter(0), self.insert_cache[dst_table]) + map(operator.itemgetter(0), self.write_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src0_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", tuple(src0_values), ) src0_values = dict(self.cursor.fetchall()) # Resolve src1 ids src1_values = dict().fromkeys( - map(operator.itemgetter(1), self.insert_cache[dst_table]) + map(operator.itemgetter(1), self.write_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src1_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", tuple(src1_values), ) src1_values = dict(self.cursor.fetchall()) # Insert values in dst_table rows = map( lambda row: (src0_values[row[0]], src1_values[row[1]]), - self.insert_cache[dst_table], + self.write_cache[dst_table], ) psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {dst_table}; INSERT INTO {dst_table} VALUES %s ON CONFLICT DO NOTHING """, rows, ) - self.insert_cache[dst_table].clear() + self.write_cache[dst_table].clear()