diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,10 +1,8 @@ from .archive import ArchiveInterface from .postgresql.archive import ArchivePostgreSQL from .postgresql.db_utils import connect -from .postgresql.provenance import ProvenancePostgreSQL -from .postgresql_nopath.provenance import ProvenancePostgreSQLNoPath -from .provenance import ProvenanceInterface from .storage.archive import ArchiveStorage +from .provenance import ProvenanceInterface def get_archive(cls: str, **kwargs) -> ArchiveInterface: @@ -21,8 +19,10 @@ if cls == "local": conn = connect(kwargs["db"]) if kwargs.get("with_path", True): - return ProvenancePostgreSQL(conn) + from .postgresql.provenance_with_path import ProvenanceWithPathDB + return ProvenanceWithPathDB(conn) else: - return ProvenancePostgreSQLNoPath(conn) + from .postgresql.provenance_without_path import ProvenanceWithoutPathDB + return ProvenanceWithoutPathDB(conn) else: raise NotImplementedError diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -28,14 +28,14 @@ "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } - # "cls": "ps", + # "cls": "local", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, - "provenance": {"cls": "ps", "db": {"host": "localhost", "dbname": "provenance"}}, + "provenance": {"cls": "local", "db": {"host": "localhost", "dbname": "provenance"}}, } diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py deleted file mode 100644 --- a/swh/provenance/postgresql/provenance.py +++ /dev/null @@ -1,484 +0,0 @@ -import itertools -import logging -import operator -import os -from datetime import datetime -from typing import Any, Dict, Generator, List, Optional, Tuple - -import psycopg2 -import psycopg2.extras - -from ..model import DirectoryEntry, FileEntry -from ..origin import OriginEntry -from ..provenance import ProvenanceInterface -from ..revision import RevisionEntry -from .db_utils import connect, execute_sql - - -def normalize(path: bytes) -> bytes: - return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path - -######################################################################################## -######################################################################################## -######################################################################################## - - -class ProvenancePostgreSQL(ProvenanceInterface): - def __init__(self, conn: psycopg2.extensions.connection): - # TODO: consider adding a mutex for thread safety - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - self.conn = conn - self.cursor = self.conn.cursor() - self.insert_cache: Dict[str, Any] = {} - self.remove_cache: Dict[str, Any] = {} - self.select_cache: Dict[str, Any] = {} - self.clear_caches() - - def clear_caches(self): - self.insert_cache = { - "content": dict(), - "content_early_in_rev": list(), - "content_in_dir": list(), - "directory": dict(), - "directory_in_rev": list(), - "revision": dict(), - "revision_before_rev": list(), - "revision_in_org": list(), - } - self.remove_cache = {"directory": dict()} - self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} - - def commit(self): - result = False - try: - self.insert_all() - self.clear_caches() - result = True - - except Exception as error: - # Unexpected error occurred, rollback all changes and log message - logging.error(f"Unexpected error: {error}") - - return result - - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_in_dir"].append( - (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_early_in_rev"].append( - (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_find_first( - self, blobid: bytes - ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: - self.cursor.execute( - """SELECT content_location.sha1 AS blob, - revision.sha1 AS rev, - revision.date AS date, - content_location.path AS path - FROM (SELECT content_hex.sha1, - content_hex.rev, - location.path - FROM (SELECT content.sha1, - content_early_in_rev.rev, - content_early_in_rev.loc - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_hex - JOIN location - ON location.id=content_hex.loc - ) AS content_location - JOIN revision - ON revision.id=content_location.rev - ORDER BY date, rev, path ASC LIMIT 1""", - (blobid,), - ) - return self.cursor.fetchone() - - def content_find_all( - self, blobid: bytes - ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: - self.cursor.execute( - """(SELECT content_location.sha1 AS blob, - revision.sha1 AS rev, - revision.date AS date, - content_location.path AS path - FROM (SELECT content_hex.sha1, - content_hex.rev, - location.path - FROM (SELECT content.sha1, - content_early_in_rev.rev, - content_early_in_rev.loc - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_hex - JOIN location - ON location.id=content_hex.loc - ) AS content_location - JOIN revision - ON revision.id=content_location.rev - ) - UNION - (SELECT content_prefix.sha1 AS blob, - revision.sha1 AS rev, - revision.date AS date, - content_prefix.path AS path - FROM (SELECT content_in_rev.sha1, - content_in_rev.rev, - CASE location.path - WHEN '' THEN content_in_rev.suffix - WHEN '.' THEN content_in_rev.suffix - ELSE (location.path || '/' || - content_in_rev.suffix)::unix_path - END AS path - FROM (SELECT content_suffix.sha1, - directory_in_rev.rev, - directory_in_rev.loc, - content_suffix.path AS suffix - FROM (SELECT content_hex.sha1, - content_hex.dir, - location.path - FROM (SELECT content.sha1, - content_in_dir.dir, - content_in_dir.loc - FROM content_in_dir - JOIN content - ON content_in_dir.blob=content.id - WHERE content.sha1=%s - ) AS content_hex - JOIN location - ON location.id=content_hex.loc - ) AS content_suffix - JOIN directory_in_rev - ON directory_in_rev.dir=content_suffix.dir - ) AS content_in_rev - JOIN location - ON location.id=content_in_rev.loc - ) AS content_prefix - JOIN revision - ON revision.id=content_prefix.rev - ) - ORDER BY date, rev, path""", - (blobid, blobid), - ) - # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. - yield from self.cursor.fetchall() - - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM content WHERE sha1=%s""", (blob.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["content"][blob.id] = date - return date - - def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for blob in blobs: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - pending.append(blob.id) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) - self.cursor.execute( - f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", - tuple(pending), - ) - for row in self.cursor.fetchall(): - dates[row[0]] = row[1] - self.select_cache["content"][row[0]] = row[1] - return dates - - def content_set_early_date(self, blob: FileEntry, date: datetime): - self.insert_cache["content"][blob.id] = date - - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ): - self.insert_cache["directory_in_rev"].append( - (directory.id, revision.id, normalize(path)) - ) - - def directory_get_date_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> Optional[datetime]: - # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) - if date is None and directory.id not in self.remove_cache["directory"]: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["directory"][directory.id] = date - return date - - def directory_get_dates_in_isochrone_frontier( - self, dirs: List[DirectoryEntry] - ) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for directory in dirs: - # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) - if date is not None: - dates[directory.id] = date - elif directory.id not in self.remove_cache["directory"]: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is not None: - dates[directory.id] = date - else: - pending.append(directory.id) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) - self.cursor.execute( - f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", - tuple(pending), - ) - for row in self.cursor.fetchall(): - dates[row[0]] = row[1] - self.select_cache["directory"][row[0]] = row[1] - return dates - - def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): - self.remove_cache["directory"][directory.id] = None - self.insert_cache["directory"].pop(directory.id, None) - - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ): - self.insert_cache["directory"][directory.id] = date - self.remove_cache["directory"].pop(directory.id, None) - - def insert_all(self): - # Performe insertions with cached information - if self.insert_cache["content"]: - psycopg2.extras.execute_values( - self.cursor, - """LOCK TABLE ONLY content; - INSERT INTO content(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,content.date)""", - self.insert_cache["content"].items(), - ) - self.insert_cache["content"].clear() - - if self.insert_cache["directory"]: - psycopg2.extras.execute_values( - self.cursor, - """LOCK TABLE ONLY directory; - INSERT INTO directory(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,directory.date)""", - self.insert_cache["directory"].items(), - ) - self.insert_cache["directory"].clear() - - if self.insert_cache["revision"]: - psycopg2.extras.execute_values( - self.cursor, - """LOCK TABLE ONLY revision; - INSERT INTO revision(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,revision.date)""", - self.insert_cache["revision"].items(), - ) - self.insert_cache["revision"].clear() - - # Relations should come after ids for elements were resolved - if self.insert_cache["content_early_in_rev"]: - self.insert_location("content", "revision", "content_early_in_rev") - - if self.insert_cache["content_in_dir"]: - self.insert_location("content", "directory", "content_in_dir") - - if self.insert_cache["directory_in_rev"]: - self.insert_location("directory", "revision", "directory_in_rev") - - # if self.insert_cache["revision_before_rev"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """INSERT INTO revision_before_rev VALUES %s - # ON CONFLICT DO NOTHING""", - # self.insert_cache["revision_before_rev"], - # ) - # self.insert_cache["revision_before_rev"].clear() - - # if self.insert_cache["revision_in_org"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """INSERT INTO revision_in_org VALUES %s - # ON CONFLICT DO NOTHING""", - # self.insert_cache["revision_in_org"], - # ) - # self.insert_cache["revision_in_org"].clear() - - def insert_location(self, src0_table, src1_table, dst_table): - # Resolve src0 ids - src0_values = dict().fromkeys( - map(operator.itemgetter(0), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src0_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", - tuple(src0_values), - ) - src0_values = dict(self.cursor.fetchall()) - - # Resolve src1 ids - src1_values = dict().fromkeys( - map(operator.itemgetter(1), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src1_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", - tuple(src1_values), - ) - src1_values = dict(self.cursor.fetchall()) - - # Resolve location ids - location = dict().fromkeys( - map(operator.itemgetter(2), self.insert_cache[dst_table]) - ) - location = dict( - psycopg2.extras.execute_values( - self.cursor, - """LOCK TABLE ONLY location; - INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO - UPDATE SET path=EXCLUDED.path - RETURNING path, id""", - map(lambda path: (path,), location.keys()), - fetch=True, - ) - ) - - # Insert values in dst_table - rows = map( - lambda row: (src0_values[row[0]], src1_values[row[1]], location[row[2]]), - self.insert_cache[dst_table], - ) - psycopg2.extras.execute_values( - self.cursor, - f"""INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING""", - rows, - ) - self.insert_cache[dst_table].clear() - - def origin_get_id(self, origin: OriginEntry) -> int: - if origin.id is None: - # Insert origin in the DB and return the assigned id - self.cursor.execute( - """INSERT INTO origin (url) VALUES (%s) - ON CONFLICT DO NOTHING - RETURNING id""", - (origin.url,), - ) - return self.cursor.fetchone()[0] - else: - return origin.id - - def revision_add(self, revision: RevisionEntry): - # Add current revision to the compact DB - self.insert_cache["revision"][revision.id] = revision.date - - def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry - ): - self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) - - def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.insert_cache["revision_in_org"].append((revision.id, origin.id)) - - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - date = self.insert_cache["revision"].get(revision.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["revision"].get(revision.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["revision"][revision.id] = date - return date - - def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: - # TODO: adapt this method to consider cached values - self.cursor.execute( - """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) - ) - row = self.cursor.fetchone() - # None means revision is not in database; - # 0 means revision has no preferred origin - return row[0] if row is not None and row[0] != 0 else None - - def revision_in_history(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values - self.cursor.execute( - """SELECT 1 - FROM revision_before_rev - JOIN revision - ON revision.id=revision_before_rev.prev - WHERE revision.sha1=%s""", - (revision.id,), - ) - return self.cursor.fetchone() is not None - - def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry - ): - # TODO: adapt this method to consider cached values - self.cursor.execute( - """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) - ) - - def revision_visited(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values - self.cursor.execute( - """SELECT 1 - FROM revision_in_org - JOIN revision - ON revision.id=revision_in_org.rev - WHERE revision.sha1=%s""", - (revision.id,), - ) - return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql_nopath/provenance.py b/swh/provenance/postgresql/provenancedb_base.py rename from swh/provenance/postgresql_nopath/provenance.py rename to swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql_nopath/provenance.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,26 +1,18 @@ import itertools import logging -import operator -import os -from datetime import datetime -from typing import Any, Dict, Generator, List, Optional, Tuple - import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry -from ..postgresql.db_utils import connect, execute_sql from ..provenance import ProvenanceInterface from ..revision import RevisionEntry - -######################################################################################## -######################################################################################## -######################################################################################## +from datetime import datetime +from typing import Any, Dict, List, Optional -class ProvenancePostgreSQLNoPath(ProvenanceInterface): +class ProvenanceDBBase(ProvenanceInterface): def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) @@ -58,80 +50,6 @@ return result - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_in_dir"].add((blob.id, directory.id)) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) - - def content_find_first( - self, blobid: bytes - ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: - self.cursor.execute( - """SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT content_early_in_rev.rev - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ORDER BY date, rev ASC LIMIT 1""", - (blobid,), - ) - row = self.cursor.fetchone() - if row is not None: - # TODO: query revision from the archive and look for blobid into a - # recursive directory_ls of the revision's root. - return blobid, row[0], row[1], b"" - return None - - def content_find_all( - self, blobid: bytes - ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: - self.cursor.execute( - """(SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT content_early_in_rev.rev - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ) - UNION - (SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT directory_in_rev.rev - FROM (SELECT content_in_dir.dir - FROM content_in_dir - JOIN content - ON content_in_dir.blob=content.id - WHERE content.sha1=%s - ) AS content_dir - JOIN directory_in_rev - ON directory_in_rev.dir=content_dir.dir - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ) - ORDER BY date, rev""", - (blobid, blobid), - ) - # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. - for row in self.cursor.fetchall(): - # TODO: query revision from the archive and look for blobid into a - # recursive directory_ls of the revision's root. - yield blobid, row[0], row[1], b"" - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) @@ -178,11 +96,6 @@ def content_set_early_date(self, blob: FileEntry, date: datetime): self.insert_cache["content"][blob.id] = date - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ): - self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) - def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: @@ -303,42 +216,6 @@ # ) # self.insert_cache["revision_in_org"].clear() - def insert_location(self, src0_table, src1_table, dst_table): - # Resolve src0 ids - src0_values = dict().fromkeys( - map(operator.itemgetter(0), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src0_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", - tuple(src0_values), - ) - src0_values = dict(self.cursor.fetchall()) - - # Resolve src1 ids - src1_values = dict().fromkeys( - map(operator.itemgetter(1), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src1_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", - tuple(src1_values), - ) - src1_values = dict(self.cursor.fetchall()) - - # Insert values in dst_table - rows = map( - lambda row: (src0_values[row[0]], src1_values[row[1]]), - self.insert_cache[dst_table], - ) - psycopg2.extras.execute_values( - self.cursor, - f"""INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING""", - rows, - ) - self.insert_cache[dst_table].clear() - def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Insert origin in the DB and return the assigned id diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py new file mode 100644 --- /dev/null +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -0,0 +1,192 @@ +from datetime import datetime +import itertools +import operator +import os +from typing import Generator, Optional, Tuple + +import psycopg2 +import psycopg2.extras + +from ..model import DirectoryEntry, FileEntry +from ..revision import RevisionEntry +from .provenancedb_base import ProvenanceDBBase + + +def normalize(path: bytes) -> bytes: + return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path + + +class ProvenanceWithPathDB(ProvenanceDBBase): + def content_add_to_directory( + self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes + ): + self.insert_cache["content_in_dir"].add( + (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) + ) + + def content_add_to_revision( + self, revision: RevisionEntry, blob: FileEntry, prefix: bytes + ): + self.insert_cache["content_early_in_rev"].add( + (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) + ) + + def content_find_first( + self, blobid: bytes + ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + self.cursor.execute( + """SELECT content_location.sha1 AS blob, + revision.sha1 AS rev, + revision.date AS date, + content_location.path AS path + FROM (SELECT content_hex.sha1, + content_hex.rev, + location.path + FROM (SELECT content.sha1, + content_early_in_rev.rev, + content_early_in_rev.loc + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_hex + JOIN location + ON location.id=content_hex.loc + ) AS content_location + JOIN revision + ON revision.id=content_location.rev + ORDER BY date, rev, path ASC LIMIT 1""", + (blobid,), + ) + return self.cursor.fetchone() + + def content_find_all( + self, blobid: bytes + ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + self.cursor.execute( + """(SELECT content_location.sha1 AS blob, + revision.sha1 AS rev, + revision.date AS date, + content_location.path AS path + FROM (SELECT content_hex.sha1, + content_hex.rev, + location.path + FROM (SELECT content.sha1, + content_early_in_rev.rev, + content_early_in_rev.loc + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_hex + JOIN location + ON location.id=content_hex.loc + ) AS content_location + JOIN revision + ON revision.id=content_location.rev + ) + UNION + (SELECT content_prefix.sha1 AS blob, + revision.sha1 AS rev, + revision.date AS date, + content_prefix.path AS path + FROM (SELECT content_in_rev.sha1, + content_in_rev.rev, + CASE location.path + WHEN '' THEN content_in_rev.suffix + WHEN '.' THEN content_in_rev.suffix + ELSE (location.path || '/' || + content_in_rev.suffix)::unix_path + END AS path + FROM (SELECT content_suffix.sha1, + directory_in_rev.rev, + directory_in_rev.loc, + content_suffix.path AS suffix + FROM (SELECT content_hex.sha1, + content_hex.dir, + location.path + FROM (SELECT content.sha1, + content_in_dir.dir, + content_in_dir.loc + FROM content_in_dir + JOIN content + ON content_in_dir.blob=content.id + WHERE content.sha1=%s + ) AS content_hex + JOIN location + ON location.id=content_hex.loc + ) AS content_suffix + JOIN directory_in_rev + ON directory_in_rev.dir=content_suffix.dir + ) AS content_in_rev + JOIN location + ON location.id=content_in_rev.loc + ) AS content_prefix + JOIN revision + ON revision.id=content_prefix.rev + ) + ORDER BY date, rev, path""", + (blobid, blobid), + ) + # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. + yield from self.cursor.fetchall() + + def directory_add_to_revision( + self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + ): + self.insert_cache["directory_in_rev"].append( + (directory.id, revision.id, normalize(path)) + ) + + def insert_location(self, src0_table, src1_table, dst_table): + # Resolve src0 ids + src0_values = dict().fromkeys( + map(operator.itemgetter(0), self.insert_cache[dst_table]) + ) + values = ", ".join(itertools.repeat("%s", len(src0_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", + tuple(src0_values), + ) + src0_values = dict(self.cursor.fetchall()) + + # Resolve src1 ids + src1_values = dict().fromkeys( + map(operator.itemgetter(1), self.insert_cache[dst_table]) + ) + values = ", ".join(itertools.repeat("%s", len(src1_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", + tuple(src1_values), + ) + src1_values = dict(self.cursor.fetchall()) + + # Resolve location ids + location = dict().fromkeys( + map(operator.itemgetter(2), self.insert_cache[dst_table]) + ) + location = dict( + psycopg2.extras.execute_values( + self.cursor, + """LOCK TABLE ONLY location; + INSERT INTO location(path) VALUES %s + ON CONFLICT (path) DO + UPDATE SET path=EXCLUDED.path + RETURNING path, id""", + map(lambda path: (path,), location.keys()), + fetch=True, + ) + ) + + # Insert values in dst_table + rows = map( + lambda row: (src0_values[row[0]], src1_values[row[1]], location[row[2]]), + self.insert_cache[dst_table], + ) + psycopg2.extras.execute_values( + self.cursor, + f"""INSERT INTO {dst_table} VALUES %s + ON CONFLICT DO NOTHING""", + rows, + ) + self.insert_cache[dst_table].clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py new file mode 100644 --- /dev/null +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -0,0 +1,132 @@ +from datetime import datetime +import itertools +import operator +from typing import Generator, Optional, Tuple + +import psycopg2 +import psycopg2.extras + +from ..model import DirectoryEntry, FileEntry +from ..revision import RevisionEntry +from .provenancedb_base import ProvenanceDBBase + +######################################################################################## +######################################################################################## +######################################################################################## + + +class ProvenanceWithoutPathDB(ProvenanceDBBase): + def content_add_to_directory( + self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes + ): + self.insert_cache["content_in_dir"].add((blob.id, directory.id)) + + def content_add_to_revision( + self, revision: RevisionEntry, blob: FileEntry, prefix: bytes + ): + self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) + + def content_find_first( + self, blobid: bytes + ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + self.cursor.execute( + """SELECT revision.sha1 AS rev, + revision.date AS date + FROM (SELECT content_early_in_rev.rev + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_in_rev + JOIN revision + ON revision.id=content_in_rev.rev + ORDER BY date, rev ASC LIMIT 1""", + (blobid,), + ) + row = self.cursor.fetchone() + if row is not None: + # TODO: query revision from the archive and look for blobid into a + # recursive directory_ls of the revision's root. + return blobid, row[0], row[1], b"" + return None + + def content_find_all( + self, blobid: bytes + ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + self.cursor.execute( + """(SELECT revision.sha1 AS rev, + revision.date AS date + FROM (SELECT content_early_in_rev.rev + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_in_rev + JOIN revision + ON revision.id=content_in_rev.rev + ) + UNION + (SELECT revision.sha1 AS rev, + revision.date AS date + FROM (SELECT directory_in_rev.rev + FROM (SELECT content_in_dir.dir + FROM content_in_dir + JOIN content + ON content_in_dir.blob=content.id + WHERE content.sha1=%s + ) AS content_dir + JOIN directory_in_rev + ON directory_in_rev.dir=content_dir.dir + ) AS content_in_rev + JOIN revision + ON revision.id=content_in_rev.rev + ) + ORDER BY date, rev""", + (blobid, blobid), + ) + # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. + for row in self.cursor.fetchall(): + # TODO: query revision from the archive and look for blobid into a + # recursive directory_ls of the revision's root. + yield blobid, row[0], row[1], b"" + + def directory_add_to_revision( + self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + ): + self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) + + def insert_location(self, src0_table, src1_table, dst_table): + # Resolve src0 ids + src0_values = dict().fromkeys( + map(operator.itemgetter(0), self.insert_cache[dst_table]) + ) + values = ", ".join(itertools.repeat("%s", len(src0_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", + tuple(src0_values), + ) + src0_values = dict(self.cursor.fetchall()) + + # Resolve src1 ids + src1_values = dict().fromkeys( + map(operator.itemgetter(1), self.insert_cache[dst_table]) + ) + values = ", ".join(itertools.repeat("%s", len(src1_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", + tuple(src1_values), + ) + src1_values = dict(self.cursor.fetchall()) + + # Insert values in dst_table + rows = map( + lambda row: (src0_values[row[0]], src1_values[row[1]]), + self.insert_cache[dst_table], + ) + psycopg2.extras.execute_values( + self.cursor, + f"""INSERT INTO {dst_table} VALUES %s + ON CONFLICT DO NOTHING""", + rows, + ) + self.insert_cache[dst_table].clear() diff --git a/swh/provenance/postgresql_nopath/__init__.py b/swh/provenance/postgresql_nopath/__init__.py deleted file mode 100644 diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -12,17 +12,23 @@ from swh.core.utils import numfile_sortkey as sortkey import swh.provenance - SQL_DIR = path.join(path.dirname(swh.provenance.__file__), "sql") -SQL_FILES = [sqlfile for sqlfile in sorted(glob.glob(path.join(SQL_DIR, "*.sql")), key=sortkey) - if '-without-path-' not in sqlfile] +SQL_FILES = [ + sqlfile + for sqlfile in sorted(glob.glob(path.join(SQL_DIR, "*.sql")), key=sortkey) + if "-without-path-" not in sqlfile +] provenance_db = postgresql_fact( - "postgresql_proc", db_name="provenance", dump_files=SQL_FILES) + "postgresql_proc", db_name="provenance", dump_files=SQL_FILES +) @pytest.fixture def provenance(provenance_db): """return a working and initialized provenance db""" - from swh.provenance.postgresql.provenance import ProvenancePostgreSQL as ProvenanceDB + from swh.provenance.postgresql.provenancedb_with_path import ( + ProvenanceWithPathDB as ProvenanceDB, + ) + return ProvenanceDB(provenance_db)