diff --git a/swh/provenance/model.py b/swh/provenance/model.py index a1bd4b5..d2957d8 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,75 +1,75 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Iterable, List, Optional, Union from .archive import ArchiveInterface class OriginEntry: def __init__(self, url, revisions: Iterable["RevisionEntry"], id=None): self.id = id self.url = url self.revisions = revisions class RevisionEntry: def __init__( self, id: bytes, date: Optional[datetime] = None, root: Optional[bytes] = None, parents: Optional[Iterable[bytes]] = None, ): self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root self._parents = parents self._nodes: List[RevisionEntry] = [] def parents(self, archive: ArchiveInterface): if self._parents is None: - # XXX: no check is done to ensure node.id is a known revision in - # the SWH archive - self._parents = archive.revision_get([self.id])[0].parents - if self._parents: - self._nodes = [ - RevisionEntry( - id=rev.id, - root=rev.directory, - date=rev.date, - parents=rev.parents, - ) - for rev in archive.revision_get(self._parents) - if rev - ] + revision = archive.revision_get([self.id]) + if revision: + self._parents = revision[0].parents + if self._parents and not self._nodes: + self._nodes = [ + RevisionEntry( + id=rev.id, + root=rev.directory, + date=rev.date, + parents=rev.parents, + ) + for rev in archive.revision_get(self._parents) + if rev + ] yield from self._nodes class DirectoryEntry: def __init__(self, id: bytes, name: bytes): self.id = id self.name = name self._children: Optional[List[Union[DirectoryEntry, FileEntry]]] = None def ls(self, archive: ArchiveInterface): if self._children is None: self._children = [] for child in archive.directory_ls(self.id): if child["type"] == "dir": self._children.append( DirectoryEntry(child["target"], child["name"]) ) elif child["type"] == "file": self._children.append(FileEntry(child["target"], child["name"])) yield from self._children class FileEntry: def __init__(self, id: bytes, name: bytes): self.id = id self.name = name diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index 552d1b6..8d4110b 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,77 +1,110 @@ from typing import Any, Dict, Iterable, List from methodtools import lru_cache import psycopg2 +from swh.model.model import Revision + class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection): self.conn = conn def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: # TODO: only call directory_ls_internal if the id is not being queried by # someone else. Otherwise wait until results get properly cached. entries = self.directory_ls_internal(id) return entries @lru_cache(maxsize=100000) def directory_ls_internal(self, id: bytes) -> List[Dict[str, Any]]: # TODO: add file size filtering with self.conn.cursor() as cursor: cursor.execute( """WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) ) ) ORDER BY name """, (id,), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor.fetchall() ] def iter_origins(self): raise NotImplementedError def iter_origin_visits(self, origin: str): raise NotImplementedError def iter_origin_visit_statuses(self, origin: str, visit: int): raise NotImplementedError def release_get(self, ids: Iterable[bytes]): raise NotImplementedError def revision_get(self, ids: Iterable[bytes]): - raise NotImplementedError + with self.conn.cursor() as cursor: + psycopg2.extras.execute_values( + cursor, + """ + SELECT t.id, revision.date, revision.directory, + ARRAY( + SELECT rh.parent_id::bytea + FROM revision_history rh + WHERE rh.id = t.id + ORDER BY rh.parent_rank + ) + FROM (VALUES %s) as t(sortkey, id) + LEFT JOIN revision ON t.id = revision.id + LEFT JOIN person author ON revision.author = author.id + LEFT JOIN person committer ON revision.committer = committer.id + ORDER BY sortkey + """, + ((sortkey, id) for sortkey, id in enumerate(ids)), + ) + for row in cursor.fetchall(): + parents = [] + for parent in row[3]: + if parent: + parents.append(parent) + yield Revision.from_dict( + { + "id": row[0], + "date": row[1], + "directory": row[2], + "parents": tuple(parents), + } + ) def snapshot_get_all_branches(self, snapshot: bytes): raise NotImplementedError diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 94555ea..87e1a14 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,298 +1,318 @@ from datetime import datetime import itertools import logging from typing import Any, Dict, List, Optional import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry from ..revision import RevisionEntry class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() self.insert_cache: Dict[str, Any] = {} self.remove_cache: Dict[str, Any] = {} self.select_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): self.insert_cache = { "content": dict(), "content_early_in_rev": set(), "content_in_dir": set(), "directory": dict(), "directory_in_rev": set(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } self.remove_cache = {"directory": dict()} self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): result = False try: self.insert_all() self.clear_caches() result = True except Exception as error: # Unexpected error occurred, rollback all changes and log message logging.error(f"Unexpected error: {error}") return result def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM content WHERE sha1=%s""", (blob.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["content"][blob.id] = date return date def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: dates = {} pending = [] for blob in blobs: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: pending.append(blob.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["content"][row[0]] = row[1] return dates def content_set_early_date(self, blob: FileEntry, date: datetime): self.insert_cache["content"][blob.id] = date def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is None and directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["directory"][directory.id] = date return date def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: dates = {} pending = [] for directory in dirs: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date elif directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date else: pending.append(directory.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["directory"][row[0]] = row[1] return dates def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): self.remove_cache["directory"][directory.id] = None self.insert_cache["directory"].pop(directory.id, None) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.insert_cache["directory"][directory.id] = date self.remove_cache["directory"].pop(directory.id, None) def insert_all(self): # Performe insertions with cached information if self.insert_cache["content"]: psycopg2.extras.execute_values( self.cursor, - """LOCK TABLE ONLY content; - INSERT INTO content(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,content.date)""", + """ + LOCK TABLE ONLY content; + INSERT INTO content(sha1, date) VALUES %s + ON CONFLICT (sha1) DO + UPDATE SET date=LEAST(EXCLUDED.date,content.date) + """, self.insert_cache["content"].items(), ) self.insert_cache["content"].clear() if self.insert_cache["directory"]: psycopg2.extras.execute_values( self.cursor, - """LOCK TABLE ONLY directory; - INSERT INTO directory(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,directory.date)""", + """ + LOCK TABLE ONLY directory; + INSERT INTO directory(sha1, date) VALUES %s + ON CONFLICT (sha1) DO + UPDATE SET date=LEAST(EXCLUDED.date,directory.date) + """, self.insert_cache["directory"].items(), ) self.insert_cache["directory"].clear() if self.insert_cache["revision"]: psycopg2.extras.execute_values( self.cursor, - """LOCK TABLE ONLY revision; - INSERT INTO revision(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,revision.date)""", + """ + LOCK TABLE ONLY revision; + INSERT INTO revision(sha1, date) VALUES %s + ON CONFLICT (sha1) DO + UPDATE SET date=LEAST(EXCLUDED.date,revision.date) + """, self.insert_cache["revision"].items(), ) self.insert_cache["revision"].clear() # Relations should come after ids for elements were resolved if self.insert_cache["content_early_in_rev"]: self.insert_location("content", "revision", "content_early_in_rev") if self.insert_cache["content_in_dir"]: self.insert_location("content", "directory", "content_in_dir") if self.insert_cache["directory_in_rev"]: self.insert_location("directory", "revision", "directory_in_rev") # if self.insert_cache["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, - # """INSERT INTO revision_before_rev VALUES %s - # ON CONFLICT DO NOTHING""", + # """ + # LOCK TABLE ONLY revision_before_rev; + # INSERT INTO revision_before_rev VALUES %s + # ON CONFLICT DO NOTHING + # """, # self.insert_cache["revision_before_rev"], # ) # self.insert_cache["revision_before_rev"].clear() # if self.insert_cache["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, - # """INSERT INTO revision_in_org VALUES %s - # ON CONFLICT DO NOTHING""", + # """ + # LOCK TABLE ONLY revision_in_org; + # INSERT INTO revision_in_org VALUES %s + # ON CONFLICT DO NOTHING + # """, # self.insert_cache["revision_in_org"], # ) # self.insert_cache["revision_in_org"].clear() def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Insert origin in the DB and return the assigned id self.cursor.execute( - """INSERT INTO origin (url) VALUES (%s) - ON CONFLICT DO NOTHING - RETURNING id""", + """ + LOCK TABLE ONLY origin; + INSERT INTO origin(url) VALUES (%s) + ON CONFLICT DO NOTHING + RETURNING id + """, (origin.url,), ) return self.cursor.fetchone()[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.insert_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.insert_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: date = self.insert_cache["revision"].get(revision.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["revision"].get(revision.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["revision"][revision.id] = date return date def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( - """SELECT 1 - FROM revision_before_rev - JOIN revision - ON revision.id=revision_before_rev.prev - WHERE revision.sha1=%s""", + """ + SELECT 1 + FROM revision_before_rev + JOIN revision + ON revision.id=revision_before_rev.prev + WHERE revision.sha1=%s + """, (revision.id,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): # TODO: adapt this method to consider cached values self.cursor.execute( """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( - """SELECT 1 - FROM revision_in_org - JOIN revision - ON revision.id=revision_in_org.rev - WHERE revision.sha1=%s""", + """ + SELECT 1 + FROM revision_in_org + JOIN revision + ON revision.id=revision_in_org.rev + WHERE revision.sha1=%s + """, (revision.id,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py index 8fedbee..f997fc2 100644 --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,196 +1,204 @@ from datetime import datetime import os from typing import Generator, Optional, Tuple import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path class ProvenanceWithPathDB(ProvenanceDBBase): def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_in_dir"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_early_in_rev"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( - """SELECT content_location.sha1 AS blob, - revision.sha1 AS rev, - revision.date AS date, - content_location.path AS path - FROM (SELECT content_hex.sha1, - content_hex.rev, - location.path - FROM (SELECT content.sha1, - content_early_in_rev.rev, - content_early_in_rev.loc - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_hex - JOIN location - ON location.id=content_hex.loc - ) AS content_location - JOIN revision - ON revision.id=content_location.rev - ORDER BY date, rev, path ASC LIMIT 1""", + """ + SELECT content_location.sha1 AS blob, + revision.sha1 AS rev, + revision.date AS date, + content_location.path AS path + FROM (SELECT content_hex.sha1, + content_hex.rev, + location.path + FROM (SELECT content.sha1, + content_early_in_rev.rev, + content_early_in_rev.loc + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_hex + JOIN location + ON location.id=content_hex.loc + ) AS content_location + JOIN revision + ON revision.id=content_location.rev + ORDER BY date, rev, path ASC LIMIT 1 + """, (blobid,), ) return self.cursor.fetchone() def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( - f"""(SELECT content_location.sha1 AS blob, - revision.sha1 AS rev, - revision.date AS date, - content_location.path AS path - FROM (SELECT content_hex.sha1, - content_hex.rev, - location.path - FROM (SELECT content.sha1, - content_early_in_rev.rev, - content_early_in_rev.loc - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_hex - JOIN location - ON location.id=content_hex.loc - ) AS content_location - JOIN revision - ON revision.id=content_location.rev - ) - UNION - (SELECT content_prefix.sha1 AS blob, - revision.sha1 AS rev, - revision.date AS date, - content_prefix.path AS path - FROM (SELECT content_in_rev.sha1, - content_in_rev.rev, - CASE location.path - WHEN '' THEN content_in_rev.suffix - WHEN '.' THEN content_in_rev.suffix - ELSE (location.path || '/' || - content_in_rev.suffix)::unix_path - END AS path - FROM (SELECT content_suffix.sha1, - directory_in_rev.rev, - directory_in_rev.loc, - content_suffix.path AS suffix - FROM (SELECT content_hex.sha1, - content_hex.dir, - location.path + f""" + (SELECT content_location.sha1 AS blob, + revision.sha1 AS rev, + revision.date AS date, + content_location.path AS path + FROM (SELECT content_hex.sha1, + content_hex.rev, + location.path + FROM (SELECT content.sha1, + content_early_in_rev.rev, + content_early_in_rev.loc + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_hex + JOIN location + ON location.id=content_hex.loc + ) AS content_location + JOIN revision + ON revision.id=content_location.rev + ) + UNION + (SELECT content_prefix.sha1 AS blob, + revision.sha1 AS rev, + revision.date AS date, + content_prefix.path AS path + FROM (SELECT content_in_rev.sha1, + content_in_rev.rev, + CASE location.path + WHEN '' THEN content_in_rev.suffix + WHEN '.' THEN content_in_rev.suffix + ELSE (location.path || '/' || + content_in_rev.suffix)::unix_path + END AS path + FROM (SELECT content_suffix.sha1, + directory_in_rev.rev, + directory_in_rev.loc, + content_suffix.path AS suffix + FROM (SELECT content_hex.sha1, + content_hex.dir, + location.path FROM (SELECT content.sha1, content_in_dir.dir, content_in_dir.loc - FROM content_in_dir - JOIN content - ON content_in_dir.blob=content.id - WHERE content.sha1=%s + FROM content_in_dir + JOIN content + ON content_in_dir.blob=content.id + WHERE content.sha1=%s ) AS content_hex JOIN location ON location.id=content_hex.loc - ) AS content_suffix - JOIN directory_in_rev - ON directory_in_rev.dir=content_suffix.dir - ) AS content_in_rev - JOIN location - ON location.id=content_in_rev.loc - ) AS content_prefix - JOIN revision - ON revision.id=content_prefix.rev - ) - ORDER BY date, rev, path {early_cut}""", + ) AS content_suffix + JOIN directory_in_rev + ON directory_in_rev.dir=content_suffix.dir + ) AS content_in_rev + JOIN location + ON location.id=content_in_rev.loc + ) AS content_prefix + JOIN revision + ON revision.id=content_prefix.rev + ) + ORDER BY date, rev, path {early_cut} + """, (blobid, blobid), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. yield from self.cursor.fetchall() def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.insert_cache["directory_in_rev"].add( (directory.id, revision.id, normalize(path)) ) def insert_location(self, src0_table, src1_table, dst_table): """Insert location entries in `dst_table` from the insert_cache Also insert missing location entries in the 'location' table. """ # TODO: find a better way of doing this; might be doable in a coupls of # SQL queries (one to insert missing entries in the location' table, # one to insert entries in the dst_table) # Resolve src0 ids src0_sha1s = tuple(set(sha1 for (sha1, _, _) in self.insert_cache[dst_table])) fmt = ",".join(["%s"] * len(src0_sha1s)) self.cursor.execute( f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({fmt})""", src0_sha1s, ) src0_values = dict(self.cursor.fetchall()) # Resolve src1 ids src1_sha1s = tuple(set(sha1 for (_, sha1, _) in self.insert_cache[dst_table])) fmt = ",".join(["%s"] * len(src1_sha1s)) self.cursor.execute( f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({fmt})""", src1_sha1s, ) src1_values = dict(self.cursor.fetchall()) # insert missing locations locations = tuple(set((loc,) for (_, _, loc) in self.insert_cache[dst_table])) psycopg2.extras.execute_values( self.cursor, """ + LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO NOTHING + ON CONFLICT (path) DO NOTHING """, locations, ) # fetch location ids fmt = ",".join(["%s"] * len(locations)) self.cursor.execute( f"SELECT path, id FROM location WHERE path IN ({fmt})", locations, ) loc_ids = dict(self.cursor.fetchall()) # Insert values in dst_table rows = [ (src0_values[sha1_src], src1_values[sha1_dst], loc_ids[loc]) for (sha1_src, sha1_dst, loc) in self.insert_cache[dst_table] ] psycopg2.extras.execute_values( self.cursor, - f"""INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING""", + f""" + LOCK TABLE ONLY {dst_table}; + INSERT INTO {dst_table} VALUES %s + ON CONFLICT DO NOTHING + """, rows, ) self.insert_cache[dst_table].clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py index 732e8b3..a7ab205 100644 --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,133 +1,140 @@ from datetime import datetime import itertools import operator from typing import Generator, Optional, Tuple import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase ######################################################################################## ######################################################################################## ######################################################################################## class ProvenanceWithoutPathDB(ProvenanceDBBase): def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_in_dir"].add((blob.id, directory.id)) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( - """SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT content_early_in_rev.rev - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ORDER BY date, rev ASC LIMIT 1""", + """ + SELECT revision.sha1 AS rev, + revision.date AS date + FROM (SELECT content_early_in_rev.rev + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_in_rev + JOIN revision + ON revision.id=content_in_rev.rev + ORDER BY date, rev ASC LIMIT 1 + """, (blobid,), ) row = self.cursor.fetchone() if row is not None: # TODO: query revision from the archive and look for blobid into a # recursive directory_ls of the revision's root. return blobid, row[0], row[1], b"" return None def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( - f"""(SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT content_early_in_rev.rev - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ) - UNION - (SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT directory_in_rev.rev - FROM (SELECT content_in_dir.dir - FROM content_in_dir - JOIN content - ON content_in_dir.blob=content.id - WHERE content.sha1=%s - ) AS content_dir - JOIN directory_in_rev - ON directory_in_rev.dir=content_dir.dir - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ) - ORDER BY date, rev {early_cut}""", + f""" + (SELECT revision.sha1 AS rev, + revision.date AS date + FROM (SELECT content_early_in_rev.rev + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_in_rev + JOIN revision + ON revision.id=content_in_rev.rev + ) + UNION + (SELECT revision.sha1 AS rev, + revision.date AS date + FROM (SELECT directory_in_rev.rev + FROM (SELECT content_in_dir.dir + FROM content_in_dir + JOIN content + ON content_in_dir.blob=content.id + WHERE content.sha1=%s + ) AS content_dir + JOIN directory_in_rev + ON directory_in_rev.dir=content_dir.dir + ) AS content_in_rev + JOIN revision + ON revision.id=content_in_rev.rev + ) + ORDER BY date, rev {early_cut} + """, (blobid, blobid), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. for row in self.cursor.fetchall(): # TODO: query revision from the archive and look for blobid into a # recursive directory_ls of the revision's root. yield blobid, row[0], row[1], b"" def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) def insert_location(self, src0_table, src1_table, dst_table): # Resolve src0 ids src0_values = dict().fromkeys( map(operator.itemgetter(0), self.insert_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src0_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", tuple(src0_values), ) src0_values = dict(self.cursor.fetchall()) # Resolve src1 ids src1_values = dict().fromkeys( map(operator.itemgetter(1), self.insert_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src1_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", tuple(src1_values), ) src1_values = dict(self.cursor.fetchall()) # Insert values in dst_table rows = map( lambda row: (src0_values[row[0]], src1_values[row[1]]), self.insert_cache[dst_table], ) psycopg2.extras.execute_values( self.cursor, - f"""INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING""", + f""" + LOCK TABLE ONLY {dst_table}; + INSERT INTO {dst_table} VALUES %s + ON CONFLICT DO NOTHING + """, rows, ) self.insert_cache[dst_table].clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index b6ac95f..36e5357 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,418 +1,442 @@ from datetime import datetime, timezone +import logging import os from typing import Dict, Generator, List, Optional, Tuple - from typing_extensions import Protocol, runtime_checkable +from swh.model.hashutil import hash_to_hex from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry UTCMIN = datetime.min.replace(tzinfo=timezone.utc) @runtime_checkable class ProvenanceInterface(Protocol): def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_invalidate_in_isochrone_frontier( self, directory: DirectoryEntry ) -> None: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: stack = [(directory, b"")] while stack: current, prefix = stack.pop() for child in current.ls(archive): if isinstance(child, FileEntry): # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, child, prefix) elif isinstance(child, DirectoryEntry): # Recursively walk the child directory. stack.append((child, os.path.join(prefix, child.name))) def origin_add( archive: ArchiveInterface, provenance: ProvenanceInterface, origin: OriginEntry ) -> None: # TODO: refactor to iterate over origin visit statuses and commit only once # per status. origin.id = provenance.origin_get_id(origin) for revision in origin.revisions: origin_add_revision(archive, provenance, origin, revision) # Commit after each revision provenance.commit() # TODO: verify this! def origin_add_revision( archive: ArchiveInterface, provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry, ) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] while stack: relative, current = stack.pop() # Check if current revision has no preferred origin and update if necessary. preferred = provenance.revision_get_preferred_origin(current) if preferred is None: provenance.revision_set_preferred_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(current) provenance.revision_add_to_origin(origin, current) if not visited: stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in current.parents(archive): visited = provenance.revision_visited(parent) if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. provenance.revision_add_to_origin(origin, parent) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, lower: bool = True, mindepth: int = 1, ) -> None: assert revision.date is not None assert revision.root is not None + logging.debug(f"Processing revision {hash_to_hex(revision.id)}...") # Processed content starting from the revision's root directory. date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: provenance.revision_add(revision) # TODO: add file size filtering revision_process_content( archive, provenance, revision, DirectoryEntry(revision.root, b""), lower=lower, mindepth=mindepth, ) # TODO: improve this! Maybe using a max attempt counter? # Ideally Provenance class should guarantee that a commit never fails. + logging.debug(f"Attempt to commit revision {hash_to_hex(revision.id)}...") while not provenance.commit(): - continue + logging.warning( + f"Could not commit revision {hash_to_hex(revision.id)}. Retrying..." + ) + logging.debug(f"Revision {hash_to_hex(revision.id)} successfully committed!") class IsochroneNode: def __init__( self, entry: DirectoryEntry, dates: Dict[bytes, datetime] = {}, depth: int = 0 ): self.entry = entry self.depth = depth self.date = dates.get(self.entry.id, None) self.known = self.date is not None self.children: List[IsochroneNode] = [] self.maxdate: Optional[datetime] = None def add_child( self, child: DirectoryEntry, dates: Dict[bytes, datetime] = {} ) -> "IsochroneNode": assert isinstance(self.entry, DirectoryEntry) and self.date is None node = IsochroneNode(child, dates=dates, depth=self.depth + 1) self.children.append(node) return node def build_isochrone_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # Build the nodes structure root = IsochroneNode(directory) root.date = provenance.directory_get_date_in_isochrone_frontier(directory) + root.known = root.date is not None stack = [root] + logging.debug( + f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." + ) while stack: current = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is None or current.date > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.date is not None and current.date > revision.date: provenance.directory_invalidate_in_isochrone_frontier(current.entry) current.date = None + current.known = False # Pre-query all known dates for content/directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. ddates = provenance.directory_get_dates_in_isochrone_frontier( [ child for child in current.entry.ls(archive) if isinstance(child, DirectoryEntry) ] ) fdates = provenance.content_get_early_dates( [ child for child in current.entry.ls(archive) if isinstance(child, FileEntry) ] ) for child in current.entry.ls(archive): # Recursively analyse directory nodes. if isinstance(child, DirectoryEntry): node = current.add_child(child, dates=ddates) stack.append(node) else: # WARNING: there is a type checking issue here! current.add_child(child, dates=fdates) + logging.debug( + f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" + ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). stack = [root] + logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if not current.known: if any(map(lambda child: child.maxdate is None, current.children)): # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if isinstance(child.entry, FileEntry): # A file node is known if it already has an assigned date (ie. # is was processed before) if child.known: + assert child.date is not None # Just use its known date. child.maxdate = child.date else: # Use current revision date. child.maxdate = revision.date else: - # Recursively analyse directory nodes. stack.append(child) else: maxdates = [ child.maxdate for child in current.children if child.maxdate is not None # mostly to please mypy ] current.maxdate = max(maxdates) if maxdates else UTCMIN # If all content is already known, update current directory info. current.known = all(map(lambda child: child.known, current.children)) else: # Directory node in the frontier, just use its known date. current.maxdate = current.date + logging.debug( + f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" + ) return root def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, root: DirectoryEntry, lower: bool = True, mindepth: int = 1, ): assert revision.date is not None + logging.debug( + f"Building isochrone graph for revision {hash_to_hex(revision.id)}..." + ) stack = [(build_isochrone_graph(archive, provenance, revision, root), root.name)] + logging.debug( + f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully built!" + ) while stack: current, path = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is not None: assert current.date <= revision.date # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision(revision, current.entry, path) else: # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier(current, revision, lower=lower, mindepth=mindepth): assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) provenance.directory_add_to_revision(revision, current.entry, path) flatten_directory(archive, provenance, current.entry) else: # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for child in current.children: if isinstance(child.entry, FileEntry): blob = child.entry if child.date is None or revision.date < child.date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, path) else: stack.append((child, os.path.join(path, child.entry.name))) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, lower: bool = True, mindepth: int = 1 ) -> bool: assert node.maxdate is not None and revision.date is not None # The only real condition for a directory to be a frontier is that its content is # already known and its maxdate is less (or equal) than current revision's date. # Checking mindepth is meant to skip root directories (or any arbitrary depth) to # improve the result. The option lower tries to maximize the reusage rate of # previously defined frontiers by keeping them low in the directory tree. return ( node.known # all content in node was already seen before and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(map(lambda child: isinstance(child.entry, FileEntry), node.children)) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # )