diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index bf7a475..8705725 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,28 +1,28 @@ from .archive import ArchiveInterface from .postgresql.archive import ArchivePostgreSQL from .postgresql.db_utils import connect -from .postgresql.provenance import ProvenancePostgreSQL -from .postgresql_nopath.provenance import ProvenancePostgreSQLNoPath -from .provenance import ProvenanceInterface from .storage.archive import ArchiveStorage +from .provenance import ProvenanceInterface def get_archive(cls: str, **kwargs) -> ArchiveInterface: if cls == "api": return ArchiveStorage(**kwargs["storage"]) elif cls == "direct": conn = connect(kwargs["db"]) return ArchivePostgreSQL(conn) else: raise NotImplementedError def get_provenance(cls: str, **kwargs) -> ProvenanceInterface: if cls == "local": conn = connect(kwargs["db"]) if kwargs.get("with_path", True): - return ProvenancePostgreSQL(conn) + from .postgresql.provenance_with_path import ProvenanceWithPathDB + return ProvenanceWithPathDB(conn) else: - return ProvenancePostgreSQLNoPath(conn) + from .postgresql.provenance_without_path import ProvenanceWithoutPathDB + return ProvenanceWithoutPathDB(conn) else: raise NotImplementedError diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index bc0fd39..a22cb1d 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,200 +1,200 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } - # "cls": "ps", + # "cls": "local", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, - "provenance": {"cls": "ps", "db": {"host": "localhost", "dbname": "provenance"}}, + "provenance": {"cls": "local", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="create", deprecated=True) @click.option("--maintenance-db", default=None) @click.option("--drop/--no-drop", "drop_db", default=False) @click.pass_context def create(ctx, maintenance_db, drop_db): """Deprecated, please use: swh db create provenance and swh db init provenance instead. """ @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_revisions(ctx, filename, limit): # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .provenance import revision_add from .revision import FileRevisionIterator archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions = FileRevisionIterator(filename, archive, limit=limit) while True: revision = revisions.next() if revision is None: break revision_add(provenance, archive, revision) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import FileOriginIterator from .provenance import origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) for origin in FileOriginIterator(filename, archive, limit=limit): origin_add(provenance, origin) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: print( "{blob}, {rev}, {date}, {path}".format( blob=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.pass_context def find_all(ctx, swhid): """Find all occurrences of the requested blob.""" from swh.provenance import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for row in provenance.content_find_all(hash_to_bytes(swhid)): print( "{blob}, {rev}, {date}, {path}".format( blob=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py deleted file mode 100644 index bdaa580..0000000 --- a/swh/provenance/postgresql/provenance.py +++ /dev/null @@ -1,484 +0,0 @@ -import itertools -import logging -import operator -import os -from datetime import datetime -from typing import Any, Dict, Generator, List, Optional, Tuple - -import psycopg2 -import psycopg2.extras - -from ..model import DirectoryEntry, FileEntry -from ..origin import OriginEntry -from ..provenance import ProvenanceInterface -from ..revision import RevisionEntry -from .db_utils import connect, execute_sql - - -def normalize(path: bytes) -> bytes: - return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path - -######################################################################################## -######################################################################################## -######################################################################################## - - -class ProvenancePostgreSQL(ProvenanceInterface): - def __init__(self, conn: psycopg2.extensions.connection): - # TODO: consider adding a mutex for thread safety - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - self.conn = conn - self.cursor = self.conn.cursor() - self.insert_cache: Dict[str, Any] = {} - self.remove_cache: Dict[str, Any] = {} - self.select_cache: Dict[str, Any] = {} - self.clear_caches() - - def clear_caches(self): - self.insert_cache = { - "content": dict(), - "content_early_in_rev": list(), - "content_in_dir": list(), - "directory": dict(), - "directory_in_rev": list(), - "revision": dict(), - "revision_before_rev": list(), - "revision_in_org": list(), - } - self.remove_cache = {"directory": dict()} - self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} - - def commit(self): - result = False - try: - self.insert_all() - self.clear_caches() - result = True - - except Exception as error: - # Unexpected error occurred, rollback all changes and log message - logging.error(f"Unexpected error: {error}") - - return result - - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_in_dir"].append( - (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_early_in_rev"].append( - (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_find_first( - self, blobid: bytes - ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: - self.cursor.execute( - """SELECT content_location.sha1 AS blob, - revision.sha1 AS rev, - revision.date AS date, - content_location.path AS path - FROM (SELECT content_hex.sha1, - content_hex.rev, - location.path - FROM (SELECT content.sha1, - content_early_in_rev.rev, - content_early_in_rev.loc - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_hex - JOIN location - ON location.id=content_hex.loc - ) AS content_location - JOIN revision - ON revision.id=content_location.rev - ORDER BY date, rev, path ASC LIMIT 1""", - (blobid,), - ) - return self.cursor.fetchone() - - def content_find_all( - self, blobid: bytes - ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: - self.cursor.execute( - """(SELECT content_location.sha1 AS blob, - revision.sha1 AS rev, - revision.date AS date, - content_location.path AS path - FROM (SELECT content_hex.sha1, - content_hex.rev, - location.path - FROM (SELECT content.sha1, - content_early_in_rev.rev, - content_early_in_rev.loc - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_hex - JOIN location - ON location.id=content_hex.loc - ) AS content_location - JOIN revision - ON revision.id=content_location.rev - ) - UNION - (SELECT content_prefix.sha1 AS blob, - revision.sha1 AS rev, - revision.date AS date, - content_prefix.path AS path - FROM (SELECT content_in_rev.sha1, - content_in_rev.rev, - CASE location.path - WHEN '' THEN content_in_rev.suffix - WHEN '.' THEN content_in_rev.suffix - ELSE (location.path || '/' || - content_in_rev.suffix)::unix_path - END AS path - FROM (SELECT content_suffix.sha1, - directory_in_rev.rev, - directory_in_rev.loc, - content_suffix.path AS suffix - FROM (SELECT content_hex.sha1, - content_hex.dir, - location.path - FROM (SELECT content.sha1, - content_in_dir.dir, - content_in_dir.loc - FROM content_in_dir - JOIN content - ON content_in_dir.blob=content.id - WHERE content.sha1=%s - ) AS content_hex - JOIN location - ON location.id=content_hex.loc - ) AS content_suffix - JOIN directory_in_rev - ON directory_in_rev.dir=content_suffix.dir - ) AS content_in_rev - JOIN location - ON location.id=content_in_rev.loc - ) AS content_prefix - JOIN revision - ON revision.id=content_prefix.rev - ) - ORDER BY date, rev, path""", - (blobid, blobid), - ) - # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. - yield from self.cursor.fetchall() - - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM content WHERE sha1=%s""", (blob.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["content"][blob.id] = date - return date - - def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for blob in blobs: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - pending.append(blob.id) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) - self.cursor.execute( - f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", - tuple(pending), - ) - for row in self.cursor.fetchall(): - dates[row[0]] = row[1] - self.select_cache["content"][row[0]] = row[1] - return dates - - def content_set_early_date(self, blob: FileEntry, date: datetime): - self.insert_cache["content"][blob.id] = date - - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ): - self.insert_cache["directory_in_rev"].append( - (directory.id, revision.id, normalize(path)) - ) - - def directory_get_date_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> Optional[datetime]: - # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) - if date is None and directory.id not in self.remove_cache["directory"]: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["directory"][directory.id] = date - return date - - def directory_get_dates_in_isochrone_frontier( - self, dirs: List[DirectoryEntry] - ) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for directory in dirs: - # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) - if date is not None: - dates[directory.id] = date - elif directory.id not in self.remove_cache["directory"]: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is not None: - dates[directory.id] = date - else: - pending.append(directory.id) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) - self.cursor.execute( - f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", - tuple(pending), - ) - for row in self.cursor.fetchall(): - dates[row[0]] = row[1] - self.select_cache["directory"][row[0]] = row[1] - return dates - - def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): - self.remove_cache["directory"][directory.id] = None - self.insert_cache["directory"].pop(directory.id, None) - - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ): - self.insert_cache["directory"][directory.id] = date - self.remove_cache["directory"].pop(directory.id, None) - - def insert_all(self): - # Performe insertions with cached information - if self.insert_cache["content"]: - psycopg2.extras.execute_values( - self.cursor, - """LOCK TABLE ONLY content; - INSERT INTO content(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,content.date)""", - self.insert_cache["content"].items(), - ) - self.insert_cache["content"].clear() - - if self.insert_cache["directory"]: - psycopg2.extras.execute_values( - self.cursor, - """LOCK TABLE ONLY directory; - INSERT INTO directory(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,directory.date)""", - self.insert_cache["directory"].items(), - ) - self.insert_cache["directory"].clear() - - if self.insert_cache["revision"]: - psycopg2.extras.execute_values( - self.cursor, - """LOCK TABLE ONLY revision; - INSERT INTO revision(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,revision.date)""", - self.insert_cache["revision"].items(), - ) - self.insert_cache["revision"].clear() - - # Relations should come after ids for elements were resolved - if self.insert_cache["content_early_in_rev"]: - self.insert_location("content", "revision", "content_early_in_rev") - - if self.insert_cache["content_in_dir"]: - self.insert_location("content", "directory", "content_in_dir") - - if self.insert_cache["directory_in_rev"]: - self.insert_location("directory", "revision", "directory_in_rev") - - # if self.insert_cache["revision_before_rev"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """INSERT INTO revision_before_rev VALUES %s - # ON CONFLICT DO NOTHING""", - # self.insert_cache["revision_before_rev"], - # ) - # self.insert_cache["revision_before_rev"].clear() - - # if self.insert_cache["revision_in_org"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """INSERT INTO revision_in_org VALUES %s - # ON CONFLICT DO NOTHING""", - # self.insert_cache["revision_in_org"], - # ) - # self.insert_cache["revision_in_org"].clear() - - def insert_location(self, src0_table, src1_table, dst_table): - # Resolve src0 ids - src0_values = dict().fromkeys( - map(operator.itemgetter(0), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src0_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", - tuple(src0_values), - ) - src0_values = dict(self.cursor.fetchall()) - - # Resolve src1 ids - src1_values = dict().fromkeys( - map(operator.itemgetter(1), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src1_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", - tuple(src1_values), - ) - src1_values = dict(self.cursor.fetchall()) - - # Resolve location ids - location = dict().fromkeys( - map(operator.itemgetter(2), self.insert_cache[dst_table]) - ) - location = dict( - psycopg2.extras.execute_values( - self.cursor, - """LOCK TABLE ONLY location; - INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO - UPDATE SET path=EXCLUDED.path - RETURNING path, id""", - map(lambda path: (path,), location.keys()), - fetch=True, - ) - ) - - # Insert values in dst_table - rows = map( - lambda row: (src0_values[row[0]], src1_values[row[1]], location[row[2]]), - self.insert_cache[dst_table], - ) - psycopg2.extras.execute_values( - self.cursor, - f"""INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING""", - rows, - ) - self.insert_cache[dst_table].clear() - - def origin_get_id(self, origin: OriginEntry) -> int: - if origin.id is None: - # Insert origin in the DB and return the assigned id - self.cursor.execute( - """INSERT INTO origin (url) VALUES (%s) - ON CONFLICT DO NOTHING - RETURNING id""", - (origin.url,), - ) - return self.cursor.fetchone()[0] - else: - return origin.id - - def revision_add(self, revision: RevisionEntry): - # Add current revision to the compact DB - self.insert_cache["revision"][revision.id] = revision.date - - def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry - ): - self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) - - def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.insert_cache["revision_in_org"].append((revision.id, origin.id)) - - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - date = self.insert_cache["revision"].get(revision.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["revision"].get(revision.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["revision"][revision.id] = date - return date - - def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: - # TODO: adapt this method to consider cached values - self.cursor.execute( - """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) - ) - row = self.cursor.fetchone() - # None means revision is not in database; - # 0 means revision has no preferred origin - return row[0] if row is not None and row[0] != 0 else None - - def revision_in_history(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values - self.cursor.execute( - """SELECT 1 - FROM revision_before_rev - JOIN revision - ON revision.id=revision_before_rev.prev - WHERE revision.sha1=%s""", - (revision.id,), - ) - return self.cursor.fetchone() is not None - - def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry - ): - # TODO: adapt this method to consider cached values - self.cursor.execute( - """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) - ) - - def revision_visited(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values - self.cursor.execute( - """SELECT 1 - FROM revision_in_org - JOIN revision - ON revision.id=revision_in_org.rev - WHERE revision.sha1=%s""", - (revision.id,), - ) - return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql_nopath/provenance.py b/swh/provenance/postgresql/provenancedb_base.py similarity index 70% rename from swh/provenance/postgresql_nopath/provenance.py rename to swh/provenance/postgresql/provenancedb_base.py index ccf3f6b..994f8ed 100644 --- a/swh/provenance/postgresql_nopath/provenance.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,422 +1,299 @@ import itertools import logging -import operator -import os -from datetime import datetime -from typing import Any, Dict, Generator, List, Optional, Tuple - import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry -from ..postgresql.db_utils import connect, execute_sql from ..provenance import ProvenanceInterface from ..revision import RevisionEntry - -######################################################################################## -######################################################################################## -######################################################################################## +from datetime import datetime +from typing import Any, Dict, List, Optional -class ProvenancePostgreSQLNoPath(ProvenanceInterface): +class ProvenanceDBBase(ProvenanceInterface): def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) self.conn = conn self.cursor = self.conn.cursor() self.insert_cache: Dict[str, Any] = {} self.remove_cache: Dict[str, Any] = {} self.select_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): self.insert_cache = { "content": dict(), "content_early_in_rev": set(), "content_in_dir": set(), "directory": dict(), "directory_in_rev": set(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } self.remove_cache = {"directory": dict()} self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): result = False try: self.insert_all() self.clear_caches() result = True except Exception as error: # Unexpected error occurred, rollback all changes and log message logging.error(f"Unexpected error: {error}") return result - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_in_dir"].add((blob.id, directory.id)) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) - - def content_find_first( - self, blobid: bytes - ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: - self.cursor.execute( - """SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT content_early_in_rev.rev - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ORDER BY date, rev ASC LIMIT 1""", - (blobid,), - ) - row = self.cursor.fetchone() - if row is not None: - # TODO: query revision from the archive and look for blobid into a - # recursive directory_ls of the revision's root. - return blobid, row[0], row[1], b"" - return None - - def content_find_all( - self, blobid: bytes - ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: - self.cursor.execute( - """(SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT content_early_in_rev.rev - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ) - UNION - (SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT directory_in_rev.rev - FROM (SELECT content_in_dir.dir - FROM content_in_dir - JOIN content - ON content_in_dir.blob=content.id - WHERE content.sha1=%s - ) AS content_dir - JOIN directory_in_rev - ON directory_in_rev.dir=content_dir.dir - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ) - ORDER BY date, rev""", - (blobid, blobid), - ) - # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. - for row in self.cursor.fetchall(): - # TODO: query revision from the archive and look for blobid into a - # recursive directory_ls of the revision's root. - yield blobid, row[0], row[1], b"" - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM content WHERE sha1=%s""", (blob.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["content"][blob.id] = date return date def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: dates = {} pending = [] for blob in blobs: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: pending.append(blob.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["content"][row[0]] = row[1] return dates def content_set_early_date(self, blob: FileEntry, date: datetime): self.insert_cache["content"][blob.id] = date - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ): - self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) - def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is None and directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["directory"][directory.id] = date return date def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: dates = {} pending = [] for directory in dirs: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date elif directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date else: pending.append(directory.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["directory"][row[0]] = row[1] return dates def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): self.remove_cache["directory"][directory.id] = None self.insert_cache["directory"].pop(directory.id, None) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.insert_cache["directory"][directory.id] = date self.remove_cache["directory"].pop(directory.id, None) def insert_all(self): # Performe insertions with cached information if self.insert_cache["content"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY content; INSERT INTO content(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date)""", self.insert_cache["content"].items(), ) self.insert_cache["content"].clear() if self.insert_cache["directory"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY directory; INSERT INTO directory(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,directory.date)""", self.insert_cache["directory"].items(), ) self.insert_cache["directory"].clear() if self.insert_cache["revision"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY revision; INSERT INTO revision(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,revision.date)""", self.insert_cache["revision"].items(), ) self.insert_cache["revision"].clear() # Relations should come after ids for elements were resolved if self.insert_cache["content_early_in_rev"]: self.insert_location("content", "revision", "content_early_in_rev") if self.insert_cache["content_in_dir"]: self.insert_location("content", "directory", "content_in_dir") if self.insert_cache["directory_in_rev"]: self.insert_location("directory", "revision", "directory_in_rev") # if self.insert_cache["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, # """INSERT INTO revision_before_rev VALUES %s # ON CONFLICT DO NOTHING""", # self.insert_cache["revision_before_rev"], # ) # self.insert_cache["revision_before_rev"].clear() # if self.insert_cache["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, # """INSERT INTO revision_in_org VALUES %s # ON CONFLICT DO NOTHING""", # self.insert_cache["revision_in_org"], # ) # self.insert_cache["revision_in_org"].clear() - def insert_location(self, src0_table, src1_table, dst_table): - # Resolve src0 ids - src0_values = dict().fromkeys( - map(operator.itemgetter(0), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src0_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", - tuple(src0_values), - ) - src0_values = dict(self.cursor.fetchall()) - - # Resolve src1 ids - src1_values = dict().fromkeys( - map(operator.itemgetter(1), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src1_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", - tuple(src1_values), - ) - src1_values = dict(self.cursor.fetchall()) - - # Insert values in dst_table - rows = map( - lambda row: (src0_values[row[0]], src1_values[row[1]]), - self.insert_cache[dst_table], - ) - psycopg2.extras.execute_values( - self.cursor, - f"""INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING""", - rows, - ) - self.insert_cache[dst_table].clear() - def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Insert origin in the DB and return the assigned id self.cursor.execute( """INSERT INTO origin (url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id""", (origin.url,), ) return self.cursor.fetchone()[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.insert_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.insert_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: date = self.insert_cache["revision"].get(revision.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["revision"].get(revision.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["revision"][revision.id] = date return date def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT 1 FROM revision_before_rev JOIN revision ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s""", (revision.id,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): # TODO: adapt this method to consider cached values self.cursor.execute( """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT 1 FROM revision_in_org JOIN revision ON revision.id=revision_in_org.rev WHERE revision.sha1=%s""", (revision.id,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py new file mode 100644 index 0000000..faa7cd8 --- /dev/null +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -0,0 +1,192 @@ +from datetime import datetime +import itertools +import operator +import os +from typing import Generator, Optional, Tuple + +import psycopg2 +import psycopg2.extras + +from ..model import DirectoryEntry, FileEntry +from ..revision import RevisionEntry +from .provenancedb_base import ProvenanceDBBase + + +def normalize(path: bytes) -> bytes: + return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path + + +class ProvenanceWithPathDB(ProvenanceDBBase): + def content_add_to_directory( + self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes + ): + self.insert_cache["content_in_dir"].add( + (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) + ) + + def content_add_to_revision( + self, revision: RevisionEntry, blob: FileEntry, prefix: bytes + ): + self.insert_cache["content_early_in_rev"].add( + (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) + ) + + def content_find_first( + self, blobid: bytes + ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + self.cursor.execute( + """SELECT content_location.sha1 AS blob, + revision.sha1 AS rev, + revision.date AS date, + content_location.path AS path + FROM (SELECT content_hex.sha1, + content_hex.rev, + location.path + FROM (SELECT content.sha1, + content_early_in_rev.rev, + content_early_in_rev.loc + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_hex + JOIN location + ON location.id=content_hex.loc + ) AS content_location + JOIN revision + ON revision.id=content_location.rev + ORDER BY date, rev, path ASC LIMIT 1""", + (blobid,), + ) + return self.cursor.fetchone() + + def content_find_all( + self, blobid: bytes + ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + self.cursor.execute( + """(SELECT content_location.sha1 AS blob, + revision.sha1 AS rev, + revision.date AS date, + content_location.path AS path + FROM (SELECT content_hex.sha1, + content_hex.rev, + location.path + FROM (SELECT content.sha1, + content_early_in_rev.rev, + content_early_in_rev.loc + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_hex + JOIN location + ON location.id=content_hex.loc + ) AS content_location + JOIN revision + ON revision.id=content_location.rev + ) + UNION + (SELECT content_prefix.sha1 AS blob, + revision.sha1 AS rev, + revision.date AS date, + content_prefix.path AS path + FROM (SELECT content_in_rev.sha1, + content_in_rev.rev, + CASE location.path + WHEN '' THEN content_in_rev.suffix + WHEN '.' THEN content_in_rev.suffix + ELSE (location.path || '/' || + content_in_rev.suffix)::unix_path + END AS path + FROM (SELECT content_suffix.sha1, + directory_in_rev.rev, + directory_in_rev.loc, + content_suffix.path AS suffix + FROM (SELECT content_hex.sha1, + content_hex.dir, + location.path + FROM (SELECT content.sha1, + content_in_dir.dir, + content_in_dir.loc + FROM content_in_dir + JOIN content + ON content_in_dir.blob=content.id + WHERE content.sha1=%s + ) AS content_hex + JOIN location + ON location.id=content_hex.loc + ) AS content_suffix + JOIN directory_in_rev + ON directory_in_rev.dir=content_suffix.dir + ) AS content_in_rev + JOIN location + ON location.id=content_in_rev.loc + ) AS content_prefix + JOIN revision + ON revision.id=content_prefix.rev + ) + ORDER BY date, rev, path""", + (blobid, blobid), + ) + # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. + yield from self.cursor.fetchall() + + def directory_add_to_revision( + self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + ): + self.insert_cache["directory_in_rev"].append( + (directory.id, revision.id, normalize(path)) + ) + + def insert_location(self, src0_table, src1_table, dst_table): + # Resolve src0 ids + src0_values = dict().fromkeys( + map(operator.itemgetter(0), self.insert_cache[dst_table]) + ) + values = ", ".join(itertools.repeat("%s", len(src0_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", + tuple(src0_values), + ) + src0_values = dict(self.cursor.fetchall()) + + # Resolve src1 ids + src1_values = dict().fromkeys( + map(operator.itemgetter(1), self.insert_cache[dst_table]) + ) + values = ", ".join(itertools.repeat("%s", len(src1_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", + tuple(src1_values), + ) + src1_values = dict(self.cursor.fetchall()) + + # Resolve location ids + location = dict().fromkeys( + map(operator.itemgetter(2), self.insert_cache[dst_table]) + ) + location = dict( + psycopg2.extras.execute_values( + self.cursor, + """LOCK TABLE ONLY location; + INSERT INTO location(path) VALUES %s + ON CONFLICT (path) DO + UPDATE SET path=EXCLUDED.path + RETURNING path, id""", + map(lambda path: (path,), location.keys()), + fetch=True, + ) + ) + + # Insert values in dst_table + rows = map( + lambda row: (src0_values[row[0]], src1_values[row[1]], location[row[2]]), + self.insert_cache[dst_table], + ) + psycopg2.extras.execute_values( + self.cursor, + f"""INSERT INTO {dst_table} VALUES %s + ON CONFLICT DO NOTHING""", + rows, + ) + self.insert_cache[dst_table].clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py new file mode 100644 index 0000000..1ec6962 --- /dev/null +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -0,0 +1,132 @@ +from datetime import datetime +import itertools +import operator +from typing import Generator, Optional, Tuple + +import psycopg2 +import psycopg2.extras + +from ..model import DirectoryEntry, FileEntry +from ..revision import RevisionEntry +from .provenancedb_base import ProvenanceDBBase + +######################################################################################## +######################################################################################## +######################################################################################## + + +class ProvenanceWithoutPathDB(ProvenanceDBBase): + def content_add_to_directory( + self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes + ): + self.insert_cache["content_in_dir"].add((blob.id, directory.id)) + + def content_add_to_revision( + self, revision: RevisionEntry, blob: FileEntry, prefix: bytes + ): + self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) + + def content_find_first( + self, blobid: bytes + ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + self.cursor.execute( + """SELECT revision.sha1 AS rev, + revision.date AS date + FROM (SELECT content_early_in_rev.rev + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_in_rev + JOIN revision + ON revision.id=content_in_rev.rev + ORDER BY date, rev ASC LIMIT 1""", + (blobid,), + ) + row = self.cursor.fetchone() + if row is not None: + # TODO: query revision from the archive and look for blobid into a + # recursive directory_ls of the revision's root. + return blobid, row[0], row[1], b"" + return None + + def content_find_all( + self, blobid: bytes + ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + self.cursor.execute( + """(SELECT revision.sha1 AS rev, + revision.date AS date + FROM (SELECT content_early_in_rev.rev + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_in_rev + JOIN revision + ON revision.id=content_in_rev.rev + ) + UNION + (SELECT revision.sha1 AS rev, + revision.date AS date + FROM (SELECT directory_in_rev.rev + FROM (SELECT content_in_dir.dir + FROM content_in_dir + JOIN content + ON content_in_dir.blob=content.id + WHERE content.sha1=%s + ) AS content_dir + JOIN directory_in_rev + ON directory_in_rev.dir=content_dir.dir + ) AS content_in_rev + JOIN revision + ON revision.id=content_in_rev.rev + ) + ORDER BY date, rev""", + (blobid, blobid), + ) + # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. + for row in self.cursor.fetchall(): + # TODO: query revision from the archive and look for blobid into a + # recursive directory_ls of the revision's root. + yield blobid, row[0], row[1], b"" + + def directory_add_to_revision( + self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + ): + self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) + + def insert_location(self, src0_table, src1_table, dst_table): + # Resolve src0 ids + src0_values = dict().fromkeys( + map(operator.itemgetter(0), self.insert_cache[dst_table]) + ) + values = ", ".join(itertools.repeat("%s", len(src0_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", + tuple(src0_values), + ) + src0_values = dict(self.cursor.fetchall()) + + # Resolve src1 ids + src1_values = dict().fromkeys( + map(operator.itemgetter(1), self.insert_cache[dst_table]) + ) + values = ", ".join(itertools.repeat("%s", len(src1_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", + tuple(src1_values), + ) + src1_values = dict(self.cursor.fetchall()) + + # Insert values in dst_table + rows = map( + lambda row: (src0_values[row[0]], src1_values[row[1]]), + self.insert_cache[dst_table], + ) + psycopg2.extras.execute_values( + self.cursor, + f"""INSERT INTO {dst_table} VALUES %s + ON CONFLICT DO NOTHING""", + rows, + ) + self.insert_cache[dst_table].clear() diff --git a/swh/provenance/postgresql_nopath/__init__.py b/swh/provenance/postgresql_nopath/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index 1df70ee..7210fe0 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,28 +1,34 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob from os import path import pytest from swh.core.db.pytest_plugin import postgresql_fact from swh.core.utils import numfile_sortkey as sortkey import swh.provenance - SQL_DIR = path.join(path.dirname(swh.provenance.__file__), "sql") -SQL_FILES = [sqlfile for sqlfile in sorted(glob.glob(path.join(SQL_DIR, "*.sql")), key=sortkey) - if '-without-path-' not in sqlfile] +SQL_FILES = [ + sqlfile + for sqlfile in sorted(glob.glob(path.join(SQL_DIR, "*.sql")), key=sortkey) + if "-without-path-" not in sqlfile +] provenance_db = postgresql_fact( - "postgresql_proc", db_name="provenance", dump_files=SQL_FILES) + "postgresql_proc", db_name="provenance", dump_files=SQL_FILES +) @pytest.fixture def provenance(provenance_db): """return a working and initialized provenance db""" - from swh.provenance.postgresql.provenance import ProvenancePostgreSQL as ProvenanceDB + from swh.provenance.postgresql.provenancedb_with_path import ( + ProvenanceWithPathDB as ProvenanceDB, + ) + return ProvenanceDB(provenance_db)