diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 96c0f61..6a62540 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,257 +1,233 @@ from datetime import datetime import itertools import logging from typing import Any, Dict, Iterable, List, Optional import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry from ..revision import RevisionEntry class ProvenanceDBBase: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") self.write_cache: Dict[str, Any] = {} self.read_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): self.write_cache = { "content": dict(), "content_early_in_rev": set(), "content_in_dir": set(), "directory": dict(), "directory_in_rev": set(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } self.read_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): try: self.insert_all() self.clear_caches() return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id, None) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime): self.write_cache["content"][blob.id] = date # update read cache as well self.read_cache["content"][blob.id] = date def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id, None) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.write_cache["directory"][directory.id] = date # update read cache as well self.read_cache["directory"][directory.id] = date def get_dates(self, table: str, ids: List[bytes]) -> Dict[bytes, datetime]: dates = {} pending = [] for sha1 in ids: # Check whether the date has been queried before date = self.read_cache[table].get(sha1, None) if date is not None: dates[sha1] = date else: pending.append(sha1) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM {table} WHERE sha1 IN ({values})""", tuple(pending), ) for sha1, date in self.cursor.fetchall(): dates[sha1] = date self.read_cache[table][sha1] = date return dates - def insert_all(self): + def insert_entity(self, entity): # Perform insertions with cached information - if self.write_cache["content"]: + if self.write_cache[entity]: psycopg2.extras.execute_values( self.cursor, - """ - LOCK TABLE ONLY content; - INSERT INTO content(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,content.date) + f""" + LOCK TABLE ONLY {entity}; + INSERT INTO {entity}(sha1, date) VALUES %s + ON CONFLICT (sha1) DO + UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, - self.write_cache["content"].items(), + self.write_cache[entity].items(), ) - self.write_cache["content"].clear() + self.write_cache[entity].clear() - if self.write_cache["directory"]: - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY directory; - INSERT INTO directory(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,directory.date) - """, - self.write_cache["directory"].items(), - ) - self.write_cache["directory"].clear() - - if self.write_cache["revision"]: - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY revision; - INSERT INTO revision(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,revision.date) - """, - self.write_cache["revision"].items(), - ) - self.write_cache["revision"].clear() - - # Relations should come after ids for elements were resolved - if self.write_cache["content_early_in_rev"]: - self.insert_location("content", "revision", "content_early_in_rev") - - if self.write_cache["content_in_dir"]: - self.insert_location("content", "directory", "content_in_dir") + def insert_all(self): + # First insert entities + self.insert_entity("content") + self.insert_entity("directory") + self.insert_entity("revision") - if self.write_cache["directory_in_rev"]: - self.insert_location("directory", "revision", "directory_in_rev") + # Relations should come after ids for entities were resolved + self.insert_relation("content", "revision", "content_early_in_rev") + self.insert_relation("content", "directory", "content_in_dir") + self.insert_relation("directory", "revision", "directory_in_rev") + # TODO: this should be updated when origin-revision layer gets properly updated. # if self.write_cache["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_before_rev; # INSERT INTO revision_before_rev VALUES %s # ON CONFLICT DO NOTHING # """, # self.write_cache["revision_before_rev"], # ) # self.write_cache["revision_before_rev"].clear() - + # # if self.write_cache["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_in_org; # INSERT INTO revision_in_org VALUES %s # ON CONFLICT DO NOTHING # """, # self.write_cache["revision_in_org"], # ) # self.write_cache["revision_in_org"].clear() def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Insert origin in the DB and return the assigned id self.cursor.execute( """ LOCK TABLE ONLY origin; INSERT INTO origin(url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id """, (origin.url,), ) return self.cursor.fetchone()[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.write_cache["revision"][revision.id] = revision.date # update read cache as well self.read_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.write_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.write_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id, None) def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """ SELECT 1 FROM revision_before_rev JOIN revision ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s """, (revision.id,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): # TODO: adapt this method to consider cached values self.cursor.execute( """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """ SELECT 1 FROM revision_in_org JOIN revision ON revision.id=revision_in_org.rev WHERE revision.sha1=%s """, (revision.id,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py index ed8bbeb..1a7ea05 100644 --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,157 +1,158 @@ from datetime import datetime import os from typing import Generator, Optional, Tuple import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path class ProvenanceWithPathDB(ProvenanceDBBase): def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.write_cache["content_in_dir"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.write_cache["content_early_in_rev"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, L.path AS path FROM content AS C INNER JOIN content_early_in_rev AS CR ON (CR.blob = C.id) INNER JOIN location as L ON (CR.loc = L.id) INNER JOIN revision as R ON (CR.rev = R.id) WHERE C.sha1=%s ORDER BY date, rev, path ASC LIMIT 1 """, (blobid,), ) return self.cursor.fetchone() def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, L.path AS path FROM content AS C INNER JOIN content_early_in_rev AS CR ON (CR.blob = C.id) INNER JOIN location AS L ON (CR.loc = L.id) INNER JOIN revision AS R ON (CR.rev = R.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, CASE DL.path WHEN '' THEN CL.path WHEN '.' THEN CL.path ELSE (DL.path || '/' || CL.path)::unix_path END AS path FROM content AS C INNER JOIN content_in_dir AS CD ON (C.id = CD.blob) INNER JOIN directory_in_rev AS DR ON (CD.dir = DR.dir) INNER JOIN revision AS R ON (DR.rev = R.id) INNER JOIN location AS CL ON (CD.loc = CL.id) INNER JOIN location AS DL ON (DR.loc = DL.id) WHERE C.sha1=%s) ORDER BY date, rev, path {early_cut} """, (blobid, blobid), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. yield from self.cursor.fetchall() def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.write_cache["directory_in_rev"].add( (directory.id, revision.id, normalize(path)) ) - def insert_location(self, src0_table, src1_table, dst_table): - """Insert location entries in `dst_table` from the write_cache + def insert_relation(self, src, dst, relation): + """Insert entries in `relation` from the write_cache Also insert missing location entries in the 'location' table. """ - # TODO: find a better way of doing this; might be doable in a coupls of - # SQL queries (one to insert missing entries in the location' table, - # one to insert entries in the dst_table) - - # Resolve src0 ids - src0_sha1s = tuple(set(sha1 for (sha1, _, _) in self.write_cache[dst_table])) - fmt = ",".join(["%s"] * len(src0_sha1s)) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({fmt})""", - src0_sha1s, - ) - src0_values = dict(self.cursor.fetchall()) - - # Resolve src1 ids - src1_sha1s = tuple(set(sha1 for (_, sha1, _) in self.write_cache[dst_table])) - fmt = ",".join(["%s"] * len(src1_sha1s)) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({fmt})""", - src1_sha1s, - ) - src1_values = dict(self.cursor.fetchall()) - - # insert missing locations - locations = tuple(set((loc,) for (_, _, loc) in self.write_cache[dst_table])) - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY location; - INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO NOTHING - """, - locations, - ) - # fetch location ids - fmt = ",".join(["%s"] * len(locations)) - self.cursor.execute( - f"SELECT path, id FROM location WHERE path IN ({fmt})", - locations, - ) - loc_ids = dict(self.cursor.fetchall()) - - # Insert values in dst_table - rows = [ - (src0_values[sha1_src], src1_values[sha1_dst], loc_ids[loc]) - for (sha1_src, sha1_dst, loc) in self.write_cache[dst_table] - ] - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {dst_table}; - INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING - """, - rows, - ) - self.write_cache[dst_table].clear() + if self.write_cache[relation]: + # TODO: find a better way of doing this; might be doable in a couple of + # SQL queries (one to insert missing entries in the location' table, + # one to insert entries in the relation) + + # Resolve src ids + src_sha1s = tuple(set(sha1 for (sha1, _, _) in self.write_cache[relation])) + fmt = ",".join(["%s"] * len(src_sha1s)) + self.cursor.execute( + f"""SELECT sha1, id FROM {src} WHERE sha1 IN ({fmt})""", + src_sha1s, + ) + src_values = dict(self.cursor.fetchall()) + + # Resolve dst ids + dst_sha1s = tuple(set(sha1 for (_, sha1, _) in self.write_cache[relation])) + fmt = ",".join(["%s"] * len(dst_sha1s)) + self.cursor.execute( + f"""SELECT sha1, id FROM {dst} WHERE sha1 IN ({fmt})""", + dst_sha1s, + ) + dst_values = dict(self.cursor.fetchall()) + + # insert missing locations + locations = tuple(set((loc,) for (_, _, loc) in self.write_cache[relation])) + psycopg2.extras.execute_values( + self.cursor, + """ + LOCK TABLE ONLY location; + INSERT INTO location(path) VALUES %s + ON CONFLICT (path) DO NOTHING + """, + locations, + ) + # fetch location ids + fmt = ",".join(["%s"] * len(locations)) + self.cursor.execute( + f"SELECT path, id FROM location WHERE path IN ({fmt})", + locations, + ) + loc_ids = dict(self.cursor.fetchall()) + + # Insert values in relation + rows = [ + (src_values[sha1_src], dst_values[sha1_dst], loc_ids[loc]) + for (sha1_src, sha1_dst, loc) in self.write_cache[relation] + ] + psycopg2.extras.execute_values( + self.cursor, + f""" + LOCK TABLE ONLY {relation}; + INSERT INTO {relation} VALUES %s + ON CONFLICT DO NOTHING + """, + rows, + ) + self.write_cache[relation].clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py index 90bc38e..7c0fe78 100644 --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,140 +1,141 @@ from datetime import datetime import itertools import operator from typing import Generator, Optional, Tuple import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase ######################################################################################## ######################################################################################## ######################################################################################## class ProvenanceWithoutPathDB(ProvenanceDBBase): def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.write_cache["content_in_dir"].add((blob.id, directory.id)) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.write_cache["content_early_in_rev"].add((blob.id, revision.id)) def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT content_early_in_rev.rev FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ORDER BY date, rev ASC LIMIT 1 """, (blobid,), ) row = self.cursor.fetchone() if row is not None: # TODO: query revision from the archive and look for blobid into a # recursive directory_ls of the revision's root. return blobid, row[0], row[1], b"" return None def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT content_early_in_rev.rev FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ) UNION (SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT directory_in_rev.rev FROM (SELECT content_in_dir.dir FROM content_in_dir JOIN content ON content_in_dir.blob=content.id WHERE content.sha1=%s ) AS content_dir JOIN directory_in_rev ON directory_in_rev.dir=content_dir.dir ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ) ORDER BY date, rev {early_cut} """, (blobid, blobid), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. for row in self.cursor.fetchall(): # TODO: query revision from the archive and look for blobid into a # recursive directory_ls of the revision's root. yield blobid, row[0], row[1], b"" def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.write_cache["directory_in_rev"].add((directory.id, revision.id)) - def insert_location(self, src0_table, src1_table, dst_table): - # Resolve src0 ids - src0_values = dict().fromkeys( - map(operator.itemgetter(0), self.write_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src0_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", - tuple(src0_values), - ) - src0_values = dict(self.cursor.fetchall()) + def insert_relation(self, src, dst, relation): + if self.write_cache[relation]: + # Resolve src ids + src_values = dict().fromkeys( + map(operator.itemgetter(0), self.write_cache[relation]) + ) + values = ", ".join(itertools.repeat("%s", len(src_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src} WHERE sha1 IN ({values})""", + tuple(src_values), + ) + src_values = dict(self.cursor.fetchall()) - # Resolve src1 ids - src1_values = dict().fromkeys( - map(operator.itemgetter(1), self.write_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src1_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", - tuple(src1_values), - ) - src1_values = dict(self.cursor.fetchall()) + # Resolve dst ids + dst_values = dict().fromkeys( + map(operator.itemgetter(1), self.write_cache[relation]) + ) + values = ", ".join(itertools.repeat("%s", len(dst_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {dst} WHERE sha1 IN ({values})""", + tuple(dst_values), + ) + dst_values = dict(self.cursor.fetchall()) - # Insert values in dst_table - rows = map( - lambda row: (src0_values[row[0]], src1_values[row[1]]), - self.write_cache[dst_table], - ) - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {dst_table}; - INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING - """, - rows, - ) - self.write_cache[dst_table].clear() + # Insert values in relation + rows = map( + lambda row: (src_values[row[0]], dst_values[row[1]]), + self.write_cache[relation], + ) + psycopg2.extras.execute_values( + self.cursor, + f""" + LOCK TABLE ONLY {relation}; + INSERT INTO {relation} VALUES %s + ON CONFLICT DO NOTHING + """, + rows, + ) + self.write_cache[relation].clear()