diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -24,17 +24,9 @@ def get_provenance(cls: str, **kwargs) -> "ProvenanceInterface": if cls == "local": conn = connect(kwargs["db"]) - if kwargs.get("with_path", True): - from swh.provenance.postgresql.provenancedb_with_path import ( - ProvenanceWithPathDB, - ) - - return ProvenanceWithPathDB(conn) - else: - from swh.provenance.postgresql.provenancedb_without_path import ( - ProvenanceWithoutPathDB, - ) - - return ProvenanceWithoutPathDB(conn) + with_path = kwargs.get("with_path", True) + from swh.provenance.provenance import ProvenanceBackend + + return ProvenanceBackend(conn, with_path=with_path) else: raise NotImplementedError diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,288 +1,141 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Iterable, Optional +from typing import Any, Dict, Generator, List, Optional, Set, Tuple import psycopg2 import psycopg2.extras -from ..model import DirectoryEntry, FileEntry -from ..origin import OriginEntry -from ..revision import RevisionEntry - class ProvenanceDBBase: - raise_on_commit: bool = False - def __init__(self, conn: psycopg2.extensions.connection): - # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") - self.insert_cache: Dict[str, Any] = {} - self.select_cache: Dict[str, Any] = {} - self.clear_caches() - - def clear_caches(self): - self.insert_cache = { - "content": dict(), - "content_early_in_rev": set(), - "content_in_dir": set(), - "directory": dict(), - "directory_in_rev": set(), - "revision": dict(), - "revision_before_rev": list(), - "revision_in_org": list(), - } - self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} - def commit(self): + def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: try: - self.insert_all() - self.clear_caches() - return True + # First insert entities + self.insert_entity("content", data["content"]) + self.insert_entity("directory", data["directory"]) + self.insert_entity("revision", data["revision"]) + + # Relations should come after ids for entities were resolved + self.insert_relation( + "content", + "revision", + "content_early_in_rev", + data["content_early_in_rev"], + ) + self.insert_relation( + "content", "directory", "content_in_dir", data["content_in_dir"] + ) + self.insert_relation( + "directory", "revision", "directory_in_rev", data["directory_in_rev"] + ) + + # TODO: this should be updated when origin-revision layer gets properly + # updated. + # if data["revision_before_rev"]: + # psycopg2.extras.execute_values( + # self.cursor, + # """ + # LOCK TABLE ONLY revision_before_rev; + # INSERT INTO revision_before_rev VALUES %s + # ON CONFLICT DO NOTHING + # """, + # data["revision_before_rev"], + # ) + # data["revision_before_rev"].clear() + # + # if data["revision_in_org"]: + # psycopg2.extras.execute_values( + # self.cursor, + # """ + # LOCK TABLE ONLY revision_in_org; + # INSERT INTO revision_in_org VALUES %s + # ON CONFLICT DO NOTHING + # """, + # data["revision_in_org"], + # ) + # data["revision_in_org"].clear() + return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") - if self.raise_on_commit: + if raise_on_commit: raise - return False - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM content WHERE sha1=%s""", (blob.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["content"][blob.id] = date - return date - - def content_get_early_dates( - self, blobs: Iterable[FileEntry] - ) -> Dict[bytes, datetime]: + def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: dates = {} - pending = [] - for blob in blobs: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - pending.append(blob.id) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) + if ids: + values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( - f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", - tuple(pending), + f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", + tuple(ids), ) - for sha1, date in self.cursor.fetchall(): - dates[sha1] = date - self.select_cache["content"][sha1] = date + dates.update(self.cursor.fetchall()) return dates - def content_set_early_date(self, blob: FileEntry, date: datetime): - self.insert_cache["content"][blob.id] = date - - def directory_get_date_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> Optional[datetime]: - # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["directory"][directory.id] = date - return date - - def directory_get_dates_in_isochrone_frontier( - self, dirs: Iterable[DirectoryEntry] - ) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for directory in dirs: - # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) - if date is not None: - dates[directory.id] = date - else: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is not None: - dates[directory.id] = date - else: - pending.append(directory.id) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) - self.cursor.execute( - f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", - tuple(pending), - ) - for sha1, date in self.cursor.fetchall(): - dates[sha1] = date - self.select_cache["directory"][sha1] = date - return dates - - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ): - self.insert_cache["directory"][directory.id] = date - - def insert_all(self): - # Performe insertions with cached information - if self.insert_cache["content"]: - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY content; - INSERT INTO content(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,content.date) - """, - self.insert_cache["content"].items(), - ) - self.insert_cache["content"].clear() - - if self.insert_cache["directory"]: - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY directory; - INSERT INTO directory(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,directory.date) - """, - self.insert_cache["directory"].items(), - ) - self.insert_cache["directory"].clear() - - if self.insert_cache["revision"]: + def insert_entity(self, entity: str, data: Dict[bytes, datetime]): + if data: psycopg2.extras.execute_values( self.cursor, - """ - LOCK TABLE ONLY revision; - INSERT INTO revision(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,revision.date) + f""" + LOCK TABLE ONLY {entity}; + INSERT INTO {entity}(sha1, date) VALUES %s + ON CONFLICT (sha1) DO + UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, - self.insert_cache["revision"].items(), + data.items(), ) - self.insert_cache["revision"].clear() - - # Relations should come after ids for elements were resolved - if self.insert_cache["content_early_in_rev"]: - self.insert_location("content", "revision", "content_early_in_rev") - - if self.insert_cache["content_in_dir"]: - self.insert_location("content", "directory", "content_in_dir") - - if self.insert_cache["directory_in_rev"]: - self.insert_location("directory", "revision", "directory_in_rev") - - # if self.insert_cache["revision_before_rev"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """ - # LOCK TABLE ONLY revision_before_rev; - # INSERT INTO revision_before_rev VALUES %s - # ON CONFLICT DO NOTHING - # """, - # self.insert_cache["revision_before_rev"], - # ) - # self.insert_cache["revision_before_rev"].clear() - - # if self.insert_cache["revision_in_org"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """ - # LOCK TABLE ONLY revision_in_org; - # INSERT INTO revision_in_org VALUES %s - # ON CONFLICT DO NOTHING - # """, - # self.insert_cache["revision_in_org"], - # ) - # self.insert_cache["revision_in_org"].clear() + # XXX: not sure if Python takes a reference or a copy. + # This might be useless! + data.clear() - def origin_get_id(self, origin: OriginEntry) -> int: - if origin.id is None: - # Insert origin in the DB and return the assigned id - self.cursor.execute( - """ - LOCK TABLE ONLY origin; - INSERT INTO origin(url) VALUES (%s) - ON CONFLICT DO NOTHING - RETURNING id - """, - (origin.url,), - ) - return self.cursor.fetchone()[0] - else: - return origin.id - - def revision_add(self, revision: RevisionEntry): - # Add current revision to the compact DB - self.insert_cache["revision"][revision.id] = revision.date - - def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry + def insert_relation( + self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] ): - self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) + ... - def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.insert_cache["revision_in_org"].append((revision.id, origin.id)) + def content_find_first( + self, blob: bytes + ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + ... - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - date = self.insert_cache["revision"].get(revision.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["revision"].get(revision.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["revision"][revision.id] = date - return date + def content_find_all( + self, blob: bytes, limit: Optional[int] = None + ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + ... - def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: - # TODO: adapt this method to consider cached values + def origin_get_id(self, url: str) -> int: + # Insert origin in the DB and return the assigned id self.cursor.execute( - """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) + """ + LOCK TABLE ONLY origin; + INSERT INTO origin(url) VALUES (%s) + ON CONFLICT DO NOTHING + RETURNING id + """, + (url,), + ) + return self.cursor.fetchone()[0] + + def revision_get_preferred_origin(self, revision: bytes) -> int: + self.cursor.execute( + """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None - def revision_in_history(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values + def revision_in_history(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 @@ -291,20 +144,16 @@ ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s """, - (revision.id,), + (revision,), ) return self.cursor.fetchone() is not None - def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry - ): - # TODO: adapt this method to consider cached values + def revision_set_preferred_origin(self, origin: int, revision: bytes): self.cursor.execute( - """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) + """UPDATE revision SET org=%s WHERE sha1=%s""", (origin, revision) ) - def revision_visited(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values + def revision_visited(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 @@ -313,6 +162,6 @@ ON revision.id=revision_in_org.rev WHERE revision.sha1=%s """, - (revision.id,), + (revision,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,36 +1,15 @@ from datetime import datetime -import os -from typing import Generator, Optional, Tuple +from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras -from ..model import DirectoryEntry, FileEntry -from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase -def normalize(path: bytes) -> bytes: - return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path - - class ProvenanceWithPathDB(ProvenanceDBBase): - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_in_dir"].add( - (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_early_in_rev"].add( - (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) - ) - def content_find_first( - self, blobid: bytes + self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ @@ -45,12 +24,12 @@ WHERE C.sha1=%s ORDER BY date, rev, path ASC LIMIT 1 """, - (blobid,), + (blob,), ) return self.cursor.fetchone() def content_find_all( - self, blobid: bytes, limit: Optional[int] = None + self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( @@ -82,76 +61,72 @@ WHERE C.sha1=%s) ORDER BY date, rev, path {early_cut} """, - (blobid, blobid), + (blob, blob), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. yield from self.cursor.fetchall() - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + def insert_relation( + self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] ): - self.insert_cache["directory_in_rev"].add( - (directory.id, revision.id, normalize(path)) - ) - - def insert_location(self, src0_table, src1_table, dst_table): - """Insert location entries in `dst_table` from the insert_cache + """Insert entries in `relation` from `data` Also insert missing location entries in the 'location' table. """ - # TODO: find a better way of doing this; might be doable in a coupls of - # SQL queries (one to insert missing entries in the location' table, - # one to insert entries in the dst_table) - - # Resolve src0 ids - src0_sha1s = tuple(set(sha1 for (sha1, _, _) in self.insert_cache[dst_table])) - fmt = ",".join(["%s"] * len(src0_sha1s)) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({fmt})""", - src0_sha1s, - ) - src0_values = dict(self.cursor.fetchall()) - - # Resolve src1 ids - src1_sha1s = tuple(set(sha1 for (_, sha1, _) in self.insert_cache[dst_table])) - fmt = ",".join(["%s"] * len(src1_sha1s)) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({fmt})""", - src1_sha1s, - ) - src1_values = dict(self.cursor.fetchall()) - - # insert missing locations - locations = tuple(set((loc,) for (_, _, loc) in self.insert_cache[dst_table])) - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY location; - INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO NOTHING - """, - locations, - ) - # fetch location ids - fmt = ",".join(["%s"] * len(locations)) - self.cursor.execute( - f"SELECT path, id FROM location WHERE path IN ({fmt})", - locations, - ) - loc_ids = dict(self.cursor.fetchall()) - - # Insert values in dst_table - rows = [ - (src0_values[sha1_src], src1_values[sha1_dst], loc_ids[loc]) - for (sha1_src, sha1_dst, loc) in self.insert_cache[dst_table] - ] - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {dst_table}; - INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING - """, - rows, - ) - self.insert_cache[dst_table].clear() + if data: + # TODO: find a better way of doing this; might be doable in a couple of + # SQL queries (one to insert missing entries in the location' table, + # one to insert entries in the relation) + + # Resolve src ids + src_sha1s = tuple(set(sha1 for (sha1, _, _) in data)) + fmt = ",".join(["%s"] * len(src_sha1s)) + self.cursor.execute( + f"""SELECT sha1, id FROM {src} WHERE sha1 IN ({fmt})""", + src_sha1s, + ) + src_values = dict(self.cursor.fetchall()) + + # Resolve dst ids + dst_sha1s = tuple(set(sha1 for (_, sha1, _) in data)) + fmt = ",".join(["%s"] * len(dst_sha1s)) + self.cursor.execute( + f"""SELECT sha1, id FROM {dst} WHERE sha1 IN ({fmt})""", + dst_sha1s, + ) + dst_values = dict(self.cursor.fetchall()) + + # insert missing locations + locations = tuple(set((loc,) for (_, _, loc) in data)) + psycopg2.extras.execute_values( + self.cursor, + """ + LOCK TABLE ONLY location; + INSERT INTO location(path) VALUES %s + ON CONFLICT (path) DO NOTHING + """, + locations, + ) + # fetch location ids + fmt = ",".join(["%s"] * len(locations)) + self.cursor.execute( + f"SELECT path, id FROM location WHERE path IN ({fmt})", + locations, + ) + loc_ids = dict(self.cursor.fetchall()) + + # Insert values in relation + rows = [ + (src_values[sha1_src], dst_values[sha1_dst], loc_ids[loc]) + for (sha1_src, sha1_dst, loc) in data + ] + psycopg2.extras.execute_values( + self.cursor, + f""" + LOCK TABLE ONLY {relation}; + INSERT INTO {relation} VALUES %s + ON CONFLICT DO NOTHING + """, + rows, + ) + data.clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,13 +1,11 @@ from datetime import datetime import itertools import operator -from typing import Generator, Optional, Tuple +from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras -from ..model import DirectoryEntry, FileEntry -from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase ######################################################################################## @@ -16,18 +14,8 @@ class ProvenanceWithoutPathDB(ProvenanceDBBase): - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_in_dir"].add((blob.id, directory.id)) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) - def content_find_first( - self, blobid: bytes + self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ @@ -43,17 +31,17 @@ ON revision.id=content_in_rev.rev ORDER BY date, rev ASC LIMIT 1 """, - (blobid,), + (blob,), ) row = self.cursor.fetchone() if row is not None: - # TODO: query revision from the archive and look for blobid into a + # TODO: query revision from the archive and look for blob into a # recursive directory_ls of the revision's root. - return blobid, row[0], row[1], b"" + return blob, row[0], row[1], b"" return None def content_find_all( - self, blobid: bytes, limit: Optional[int] = None + self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( @@ -87,54 +75,48 @@ ) ORDER BY date, rev {early_cut} """, - (blobid, blobid), + (blob, blob), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. for row in self.cursor.fetchall(): - # TODO: query revision from the archive and look for blobid into a + # TODO: query revision from the archive and look for blob into a # recursive directory_ls of the revision's root. - yield blobid, row[0], row[1], b"" + yield blob, row[0], row[1], b"" - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + def insert_relation( + self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] ): - self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) - - def insert_location(self, src0_table, src1_table, dst_table): - # Resolve src0 ids - src0_values = dict().fromkeys( - map(operator.itemgetter(0), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src0_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", - tuple(src0_values), - ) - src0_values = dict(self.cursor.fetchall()) + if data: + # Resolve src ids + src_values = dict().fromkeys(map(operator.itemgetter(0), data)) + values = ", ".join(itertools.repeat("%s", len(src_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src} WHERE sha1 IN ({values})""", + tuple(src_values), + ) + src_values = dict(self.cursor.fetchall()) - # Resolve src1 ids - src1_values = dict().fromkeys( - map(operator.itemgetter(1), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src1_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", - tuple(src1_values), - ) - src1_values = dict(self.cursor.fetchall()) + # Resolve dst ids + dst_values = dict().fromkeys(map(operator.itemgetter(1), data)) + values = ", ".join(itertools.repeat("%s", len(dst_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {dst} WHERE sha1 IN ({values})""", + tuple(dst_values), + ) + dst_values = dict(self.cursor.fetchall()) - # Insert values in dst_table - rows = map( - lambda row: (src0_values[row[0]], src1_values[row[1]]), - self.insert_cache[dst_table], - ) - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {dst_table}; - INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING - """, - rows, - ) - self.insert_cache[dst_table].clear() + # Insert values in relation + rows = map( + lambda row: (src_values[row[0]], dst_values[row[1]]), + data, + ) + psycopg2.extras.execute_values( + self.cursor, + f""" + LOCK TABLE ONLY {relation}; + INSERT INTO {relation} VALUES %s + ON CONFLICT DO NOTHING + """, + rows, + ) + data.clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -3,8 +3,9 @@ import logging import os import time -from typing import Dict, Generator, Iterable, List, Optional, Tuple +from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple +import psycopg2 from typing_extensions import Protocol, runtime_checkable from swh.model.hashutil import hash_to_hex @@ -15,6 +16,8 @@ UTCMIN = datetime.min.replace(tzinfo=timezone.utc) +# XXX: this protocol doesn't make much sense now that flavours have been delegated to +# another class, lower in the callstack. @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False @@ -34,12 +37,12 @@ ... def content_find_first( - self, blobid: bytes + self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( - self, blobid: bytes, limit: Optional[int] = None + self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... @@ -108,6 +111,172 @@ ... +# TODO: maybe move this to a separate file +class ProvenanceBackend: + raise_on_commit: bool = False + + def __init__(self, conn: psycopg2.extensions.connection, with_path: bool = True): + from .postgresql.provenancedb_base import ProvenanceDBBase + + # TODO: this class should not know what the actual used DB is. + self.storage: ProvenanceDBBase + if with_path: + from .postgresql.provenancedb_with_path import ProvenanceWithPathDB + + self.storage = ProvenanceWithPathDB(conn) + else: + from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB + + self.storage = ProvenanceWithoutPathDB(conn) + + self.write_cache: Dict[str, Any] = {} + self.read_cache: Dict[str, Any] = {} + self.clear_caches() + + def clear_caches(self): + self.write_cache = { + "content": dict(), + "content_early_in_rev": set(), + "content_in_dir": set(), + "directory": dict(), + "directory_in_rev": set(), + "revision": dict(), + "revision_before_rev": list(), + "revision_in_org": list(), + } + self.read_cache = {"content": dict(), "directory": dict(), "revision": dict()} + + def commit(self): + # TODO: for now we just forward the write_cache. This should be improved! + while not self.storage.commit( + self.write_cache, raise_on_commit=self.raise_on_commit + ): + logging.warning( + f"Unable to commit cached information {self.write_cache}. Retrying..." + ) + self.clear_caches() + + def content_add_to_directory( + self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes + ): + self.write_cache["content_in_dir"].add( + (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) + ) + + def content_add_to_revision( + self, revision: RevisionEntry, blob: FileEntry, prefix: bytes + ): + self.write_cache["content_early_in_rev"].add( + (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) + ) + + def content_find_first( + self, blob: bytes + ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + return self.storage.content_find_first(blob) + + def content_find_all( + self, blob: bytes, limit: Optional[int] = None + ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + yield from self.storage.content_find_all(blob, limit=limit) + + def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: + return self.get_dates("content", [blob.id]).get(blob.id, None) + + def content_get_early_dates( + self, blobs: Iterable[FileEntry] + ) -> Dict[bytes, datetime]: + return self.get_dates("content", [blob.id for blob in blobs]) + + def content_set_early_date(self, blob: FileEntry, date: datetime): + self.write_cache["content"][blob.id] = date + # update read cache as well + self.read_cache["content"][blob.id] = date + + def directory_add_to_revision( + self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + ): + self.write_cache["directory_in_rev"].add( + (directory.id, revision.id, normalize(path)) + ) + + def directory_get_date_in_isochrone_frontier( + self, directory: DirectoryEntry + ) -> Optional[datetime]: + return self.get_dates("directory", [directory.id]).get(directory.id, None) + + def directory_get_dates_in_isochrone_frontier( + self, dirs: Iterable[DirectoryEntry] + ) -> Dict[bytes, datetime]: + return self.get_dates("directory", [directory.id for directory in dirs]) + + def directory_set_date_in_isochrone_frontier( + self, directory: DirectoryEntry, date: datetime + ): + self.write_cache["directory"][directory.id] = date + # update read cache as well + self.read_cache["directory"][directory.id] = date + + def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: + dates = {} + pending = [] + for sha1 in ids: + # Check whether the date has been queried before + date = self.read_cache[entity].get(sha1, None) + if date is not None: + dates[sha1] = date + else: + pending.append(sha1) + dates.update(self.storage.get_dates(entity, pending)) + return dates + + def origin_get_id(self, origin: OriginEntry) -> int: + if origin.id is None: + return self.storage.origin_get_id(origin.url) + else: + return origin.id + + def revision_add(self, revision: RevisionEntry): + # Add current revision to the compact DB + self.write_cache["revision"][revision.id] = revision.date + # update read cache as well + self.read_cache["revision"][revision.id] = revision.date + + def revision_add_before_revision( + self, relative: RevisionEntry, revision: RevisionEntry + ): + self.write_cache["revision_before_rev"].append((revision.id, relative.id)) + + def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): + self.write_cache["revision_in_org"].append((revision.id, origin.id)) + + def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: + return self.get_dates("revision", [revision.id]).get(revision.id, None) + + def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: + # TODO: adapt this method to consider cached values + return self.storage.revision_get_preferred_origin(revision.id) + + def revision_in_history(self, revision: RevisionEntry) -> bool: + # TODO: adapt this method to consider cached values + return self.storage.revision_in_history(revision.id) + + def revision_set_preferred_origin( + self, origin: OriginEntry, revision: RevisionEntry + ): + assert origin.id is not None + # TODO: adapt this method to consider cached values + self.storage.revision_set_preferred_origin(origin.id, revision.id) + + def revision_visited(self, revision: RevisionEntry) -> bool: + # TODO: adapt this method to consider cached values + return self.storage.revision_visited(revision.id) + + +def normalize(path: bytes) -> bytes: + return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path + + def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, @@ -157,6 +326,7 @@ revision: RevisionEntry, ) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] + origin.id = provenance.origin_get_id(origin) while stack: relative, current = stack.pop() @@ -243,14 +413,7 @@ ) done = time.time() if commit: - # TODO: improve this! Maybe using a max attempt counter? - # Ideally Provenance class should guarantee that a commit never fails. - while not provenance.commit(): - logging.warning( - "Could not commit revisions " - + ";".join([hash_to_hex(revision.id) for revision in revisions]) - + ". Retrying..." - ) + provenance.commit() stop = time.time() logging.debug( f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -36,12 +36,10 @@ @pytest.fixture def provenance(provenance_db): """return a working and initialized provenance db""" - from swh.provenance.postgresql.provenancedb_with_path import ( - ProvenanceWithPathDB as ProvenanceDB, - ) + from swh.provenance.provenance import ProvenanceBackend BaseDb.adapt_conn(provenance_db) - prov = ProvenanceDB(provenance_db) + prov = ProvenanceBackend(provenance_db) # in test sessions, we DO want to raise any exception occurring at commit time prov.raise_on_commit = True return prov diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -7,7 +7,7 @@ def test_provenance_fixture(provenance): """Check the 'provenance' fixture produce a working ProvenanceDB object""" assert provenance - provenance.insert_all() # should be a noop + provenance.commit() # should be a noop def test_storage(swh_storage_with_objects): diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -116,19 +116,21 @@ # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"].hex()) - assert rows["revision"] == sha1s(provenance.cursor, "revision"), synth_rev[ - "msg" - ] + assert rows["revision"] == sha1s( + provenance.storage.cursor, "revision" + ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] assert get_timestamp( - provenance.cursor, "revision", synth_rev["sha1"].hex() + provenance.storage.cursor, "revision", synth_rev["sha1"].hex() ) == [rev_ts], synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"].hex() for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"].hex() for x in synth_rev["D_C"]) - assert rows["content"] == sha1s(provenance.cursor, "content"), synth_rev["msg"] + assert rows["content"] == sha1s( + provenance.storage.cursor, "content" + ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table @@ -136,11 +138,11 @@ (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_C"] ) assert rows["content_early_in_rev"] == relations( - provenance.cursor, "content", "revision" + provenance.storage.cursor, "content", "revision" ), synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: - assert get_timestamp(provenance.cursor, "content", rc["dst"]) == [ + assert get_timestamp(provenance.storage.cursor, "content", rc["dst"]) == [ rev_ts + rc["rel_ts"] ], synth_rev["msg"] @@ -148,9 +150,9 @@ # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"].hex() for x in synth_rev["R_D"]) - assert rows["directory"] == sha1s(provenance.cursor, "directory"), synth_rev[ - "msg" - ] + assert rows["directory"] == sha1s( + provenance.storage.cursor, "directory" + ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries @@ -158,11 +160,11 @@ (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_D"] ) assert rows["directory_in_rev"] == relations( - provenance.cursor, "directory", "revision" + provenance.storage.cursor, "directory", "revision" ), synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: - assert get_timestamp(provenance.cursor, "directory", rd["dst"]) == [ + assert get_timestamp(provenance.storage.cursor, "directory", rd["dst"]) == [ rev_ts + rd["rel_ts"] ], synth_rev["msg"] @@ -173,11 +175,11 @@ (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["D_C"] ) assert rows["content_in_dir"] == relations( - provenance.cursor, "content", "directory" + provenance.storage.cursor, "content", "directory" ), synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: - assert get_timestamp(provenance.cursor, "content", dc["dst"]) == [ + assert get_timestamp(provenance.storage.cursor, "content", dc["dst"]) == [ rev_ts + dc["rel_ts"] ], synth_rev["msg"] @@ -185,7 +187,9 @@ rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) - assert rows["location"] == locations(provenance.cursor), synth_rev["msg"] + assert rows["location"] == locations(provenance.storage.cursor), synth_rev[ + "msg" + ] @pytest.mark.parametrize(