diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py index 3925cdd..b79b2ce 100644 --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,45 +1,45 @@ from typing import Any, Dict, Iterable from typing_extensions import Protocol, runtime_checkable -from swh.model.model import Revision, Sha1 +from swh.model.model import Revision, Sha1Git @runtime_checkable class ArchiveInterface(Protocol): - def directory_ls(self, id: Sha1) -> Iterable[Dict[str, Any]]: + def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: """List entries for one directory. Args: id: sha1 id of the directory to list entries from. Yields: directory entries for such directory. """ ... - def revision_get(self, ids: Iterable[Sha1]) -> Iterable[Revision]: + def revision_get(self, ids: Iterable[Sha1Git]) -> Iterable[Revision]: """Given a list of sha1, return the revisions' information Args: revisions: list of sha1s for the revisions to be retrieved Yields: revisions matching the identifiers. If a revision does not exist, the provided sha1 is simply ignored. """ ... - def snapshot_get_heads(self, id: Sha1) -> Iterable[Sha1]: + def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: """List all revisions pointed by one snapshot. Args: - snapshot: the snapshot's identifier + id: sha1 id of the snapshot. Yields: sha1 ids of found revisions. """ ... diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py index 3cc7b40..0eb15a7 100644 --- a/swh/provenance/graph.py +++ b/swh/provenance/graph.py @@ -1,256 +1,258 @@ from datetime import datetime, timezone import logging import os from typing import Dict, Optional, Set +from swh.model.model import Sha1Git + from .archive import ArchiveInterface from .model import DirectoryEntry, RevisionEntry from .provenance import ProvenanceInterface UTCMIN = datetime.min.replace(tzinfo=timezone.utc) class HistoryNode: def __init__( self, entry: RevisionEntry, visited: bool = False, in_history: bool = False ): self.entry = entry # A revision is `visited` if it is directly pointed by an origin (ie. a head # revision for some snapshot) self.visited = visited # A revision is `in_history` if it appears in the history graph of an already # processed revision in the provenance database self.in_history = in_history self.parents: Set[HistoryNode] = set() def add_parent( self, parent: RevisionEntry, visited: bool = False, in_history: bool = False ) -> "HistoryNode": node = HistoryNode(parent, visited=visited, in_history=in_history) self.parents.add(node) return node def __str__(self): return ( f"<{self.entry}: visited={self.visited}, in_history={self.in_history}, " f"parents=[{', '.join(str(parent) for parent in self.parents)}]>" ) def __eq__(self, other): return isinstance(other, HistoryNode) and self.__dict__ == other.__dict__ def __hash__(self): return hash((self.entry, self.visited, self.in_history)) def build_history_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, ) -> HistoryNode: """Recursively build the history graph from the given revision""" root = HistoryNode( revision, visited=provenance.revision_visited(revision), in_history=provenance.revision_in_history(revision), ) stack = [root] logging.debug( f"Recursively creating history graph for revision {revision.id.hex()}..." ) while stack: current = stack.pop() if not current.visited: current.entry.retrieve_parents(archive) for rev in current.entry.parents: node = current.add_parent( rev, visited=provenance.revision_visited(rev), in_history=provenance.revision_in_history(rev), ) stack.append(node) logging.debug( f"History graph for revision {revision.id.hex()} successfully created!" ) return root class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ): self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None # known is True if this node is already known in the db; either because # the current directory actually exists in the database, or because all # the content of the current directory is known (subdirectories and files) self.known = self.dbdate is not None self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: Set[IsochroneNode] = set() @property def dbdate(self): # use a property to make this attribute (mostly) read-only return self._dbdate def invalidate(self): self._dbdate = None self.maxdate = None self.known = False self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> "IsochroneNode": # we should not be processing this node (ie add subdirectories or files) if it's # actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.add(node) return node def __str__(self): return ( f"<{self.entry}: depth={self.depth}, " f"dbdate={self.dbdate}, maxdate={self.maxdate}, " f"known={self.known}, invalid={self.invalid}, path={self.path}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) def __eq__(self, other): return isinstance(other, IsochroneNode) and self.__dict__ == other.__dict__ def __hash__(self): # only immutable attributes are considered to compute hash return hash((self.entry, self.depth, self.path)) def build_isochrone_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] logging.debug( f"Recursively creating isochrone graph for revision {revision.id.hex()}..." ) - fdates: Dict[bytes, datetime] = {} # map {file_id: date} + fdates: Dict[Sha1Git, datetime] = {} # map {file_id: date} while stack: current = stack.pop() if current.dbdate is None or current.dbdate > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate > revision.date: logging.debug( f"Invalidating frontier on {current.entry.id.hex()}" f" (date {current.dbdate})" f" when processing revision {revision.id.hex()}" f" (date {revision.date})" ) current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. current.entry.retrieve_children(archive) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) logging.debug( f"Isochrone graph for revision {revision.id.hex()} successfully created!" ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). logging.debug(f"Computing maxdates for revision {revision.id.hex()}...") stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.known: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ child.maxdate for child in current.children if child.maxdate is not None # unnecessary, but needed for mypy ] + [ fdates.get(file.id, revision.date) for file in current.entry.files ] ) if current.maxdate <= revision.date: current.known = ( # true if all subdirectories are known all(child.known for child in current.children) # true if all files are in fdates, i.e. if all files were known # *before building this isochrone graph node* # Note: the 'all()' is lazy: will stop iterating as soon as # possible and all((file.id in fdates) for file in current.entry.files) ) else: # at least one content is being processed out-of-order, then current # node should be treated as unknown current.maxdate = revision.date current.known = False logging.debug(f"Maxdates for revision {revision.id.hex()} successfully computed!") return root diff --git a/swh/provenance/model.py b/swh/provenance/model.py index 9c96d7e..7001e2b 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,158 +1,158 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Iterable, Iterator, List, Optional from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import origin_identifier from swh.model.model import Sha1Git from .archive import ArchiveInterface class OriginEntry: def __init__(self, url: str, snapshot: Sha1Git): self.url = url self.id: Sha1Git = hash_to_bytes(origin_identifier({"url": self.url})) self.snapshot = snapshot self._revisions: Optional[List[RevisionEntry]] = None def retrieve_revisions(self, archive: ArchiveInterface): if self._revisions is None: self._revisions = [ RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) ] @property def revisions(self) -> Iterator["RevisionEntry"]: if self._revisions is None: raise RuntimeError( "Revisions of this node has not yet been retrieved. " "Please call retrieve_revisions() before using this property." ) return (x for x in self._revisions) def __str__(self): return f"" class RevisionEntry: def __init__( self, - id: bytes, + id: Sha1Git, date: Optional[datetime] = None, - root: Optional[bytes] = None, - parents: Optional[Iterable[bytes]] = None, + root: Optional[Sha1Git] = None, + parents: Optional[Iterable[Sha1Git]] = None, ): self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root self._parents_ids = parents self._parents_entries: Optional[List[RevisionEntry]] = None def retrieve_parents(self, archive: ArchiveInterface): if self._parents_entries is None: if self._parents_ids is None: revision = list(archive.revision_get([self.id])) if revision: self._parents_ids = revision[0].parents else: self._parents_ids = [] self._parents_entries = [ RevisionEntry( id=rev.id, root=rev.directory, date=rev.date.to_datetime(), parents=rev.parents, ) for rev in archive.revision_get(self._parents_ids) if rev.date is not None ] @property def parents(self) -> Iterator["RevisionEntry"]: if self._parents_entries is None: raise RuntimeError( "Parents of this node has not yet been retrieved. " "Please call retrieve_parents() before using this property." ) return (x for x in self._parents_entries) def __str__(self): return ( f"" ) class DirectoryEntry: - def __init__(self, id: bytes, name: bytes = b""): + def __init__(self, id: Sha1Git, name: bytes = b""): self.id = id self.name = name self._files: Optional[List[FileEntry]] = None self._dirs: Optional[List[DirectoryEntry]] = None def retrieve_children(self, archive: ArchiveInterface): if self._files is None and self._dirs is None: self._files = [] self._dirs = [] for child in archive.directory_ls(self.id): if child["type"] == "dir": self._dirs.append( DirectoryEntry(child["target"], name=child["name"]) ) elif child["type"] == "file": self._files.append(FileEntry(child["target"], child["name"])) @property def files(self) -> Iterator["FileEntry"]: if self._files is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._files) @property def dirs(self) -> Iterator["DirectoryEntry"]: if self._dirs is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._dirs) def __str__(self): return f"" def __eq__(self, other): return isinstance(other, DirectoryEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self): return hash((self.id, self.name)) class FileEntry: - def __init__(self, id: bytes, name: bytes): + def __init__(self, id: Sha1Git, name: bytes): self.id = id self.name = name def __str__(self): return f"" def __eq__(self, other): return isinstance(other, FileEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self): return hash((self.id, self.name)) diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index d655ce4..95f3bec 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,134 +1,134 @@ from typing import Any, Dict, Iterable, List, Set from methodtools import lru_cache import psycopg2 -from swh.model.model import ObjectType, Revision, Sha1, TargetType +from swh.model.model import ObjectType, Revision, Sha1Git, TargetType from swh.storage.postgresql.storage import Storage class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection): self.conn = conn self.storage = Storage(conn, objstorage={"cls": "memory"}) - def directory_ls(self, id: Sha1) -> Iterable[Dict[str, Any]]: + def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: # TODO: only call directory_ls_internal if the id is not being queried by # someone else. Otherwise wait until results get properly cached. entries = self.directory_ls_internal(id) yield from entries @lru_cache(maxsize=100000) - def directory_ls_internal(self, id: Sha1) -> List[Dict[str, Any]]: + def directory_ls_internal(self, id: Sha1Git) -> List[Dict[str, Any]]: # TODO: add file size filtering with self.conn.cursor() as cursor: cursor.execute( """WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) ) ) ORDER BY name """, (id,), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor.fetchall() ] - def revision_get(self, ids: Iterable[Sha1]) -> Iterable[Revision]: + def revision_get(self, ids: Iterable[Sha1Git]) -> Iterable[Revision]: with self.conn.cursor() as cursor: psycopg2.extras.execute_values( cursor, """ SELECT t.id, revision.date, revision.directory, ARRAY( SELECT rh.parent_id::bytea FROM revision_history rh WHERE rh.id = t.id ORDER BY rh.parent_rank ) FROM (VALUES %s) as t(sortkey, id) LEFT JOIN revision ON t.id = revision.id LEFT JOIN person author ON revision.author = author.id LEFT JOIN person committer ON revision.committer = committer.id ORDER BY sortkey """, ((sortkey, id) for sortkey, id in enumerate(ids)), ) for row in cursor.fetchall(): parents = [] for parent in row[3]: if parent: parents.append(parent) yield Revision.from_dict( { "id": row[0], "date": row[1], "directory": row[2], "parents": tuple(parents), } ) - def snapshot_get_heads(self, id: Sha1) -> Iterable[Sha1]: + def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: # TODO: this code is duplicated here (same as in swh.provenance.storage.archive) # but it's just temporary. This method should actually perform a direct query to # the SQL db of the archive. from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches snapshot = snapshot_get_all_branches(self.storage, id) assert snapshot is not None targets_set = set() releases_set = set() if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets_set.add(snapshot.branches[branch].target) elif snapshot.branches[branch].target_type == TargetType.RELEASE: releases_set.add(snapshot.branches[branch].target) batchsize = 100 for releases in grouper(releases_set, batchsize): targets_set.update( release.target for release in self.storage.release_get(releases) if release is not None and release.target_type == ObjectType.REVISION ) - revisions: Set[Sha1] = set() + revisions: Set[Sha1Git] = set() for targets in grouper(targets_set, batchsize): revisions.update( revision.id for revision in self.storage.revision_get(targets) if revision is not None ) yield from revisions diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index ebe1beb..ca68b0e 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,234 +1,234 @@ from datetime import datetime import itertools import logging from typing import Any, Dict, Generator, List, Optional, Set, Tuple import psycopg2 import psycopg2.extras from swh.model.model import Sha1Git class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") self._flavor: Optional[str] = None @property def flavor(self) -> str: if self._flavor is None: self.cursor.execute("select swh_get_dbflavor()") self._flavor = self.cursor.fetchone()[0] assert self._flavor is not None return self._flavor @property def with_path(self) -> bool: return self.flavor == "with-path" def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: try: # First insert entities for entity in ("content", "directory", "revision"): self.insert_entity( entity, { sha1: data[entity]["data"][sha1] for sha1 in data[entity]["added"] }, ) data[entity]["data"].clear() data[entity]["added"].clear() # Relations should come after ids for entities were resolved for relation in ( "content_in_revision", "content_in_directory", "directory_in_revision", ): self.insert_relation(relation, data[relation]) # Insert origins self.insert_origin( { sha1: data["origin"]["data"][sha1] for sha1 in data["origin"]["added"] }, ) data["origin"]["data"].clear() data["origin"]["added"].clear() # Insert relations from the origin-revision layer self.insert_origin_head(data["revision_in_origin"]) self.insert_revision_history(data["revision_before_revision"]) # Update preferred origins self.update_preferred_origin( { sha1: data["revision_origin"]["data"][sha1] for sha1 in data["revision_origin"]["added"] } ) data["revision_origin"]["data"].clear() data["revision_origin"]["added"].clear() return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if raise_on_commit: raise return False def content_find_first( - self, blob: bytes - ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + self, id: Sha1Git + ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: ... def content_find_all( - self, blob: bytes, limit: Optional[int] = None - ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: ... - def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: + def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]: dates = {} if ids: values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", tuple(ids), ) dates.update(self.cursor.fetchall()) return dates - def insert_entity(self, entity: str, data: Dict[bytes, datetime]): + def insert_entity(self, entity: str, data: Dict[Sha1Git, datetime]): if data: psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, data.items(), ) # XXX: not sure if Python takes a reference or a copy. # This might be useless! data.clear() def insert_origin(self, data: Dict[Sha1Git, str]): if data: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY origin; INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """, data.items(), ) # XXX: not sure if Python takes a reference or a copy. # This might be useless! data.clear() def insert_origin_head(self, data: Set[Tuple[Sha1Git, Sha1Git]]): if data: psycopg2.extras.execute_values( self.cursor, # XXX: not clear how conflicts are handled here! """ LOCK TABLE ONLY revision_in_origin; INSERT INTO revision_in_origin SELECT R.id, O.id FROM (VALUES %s) AS V(rev, org) INNER JOIN revision AS R on (R.sha1=V.rev) INNER JOIN origin AS O on (O.sha1=V.org) """, data, ) data.clear() - def insert_relation(self, relation: str, data: Set[Tuple[bytes, bytes, bytes]]): + def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): ... - def insert_revision_history(self, data: Dict[bytes, bytes]): + def insert_revision_history(self, data: Dict[Sha1Git, Sha1Git]): if data: values = [[(prev, next) for next in data[prev]] for prev in data] psycopg2.extras.execute_values( self.cursor, # XXX: not clear how conflicts are handled here! """ LOCK TABLE ONLY revision_before_revision; INSERT INTO revision_before_revision SELECT P.id, N.id FROM (VALUES %s) AS V(prev, next) INNER JOIN revision AS P on (P.sha1=V.prev) INNER JOIN revision AS N on (N.sha1=V.next) """, tuple(sum(values, [])), ) data.clear() def revision_get_preferred_origin(self, revision: Sha1Git) -> Optional[Sha1Git]: self.cursor.execute( """ SELECT O.sha1 FROM revision AS R JOIN origin as O ON R.origin=O.id WHERE R.sha1=%s""", (revision,), ) row = self.cursor.fetchone() return row[0] if row is not None else None - def revision_in_history(self, revision: bytes) -> bool: + def revision_in_history(self, revision: Sha1Git) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_before_revision JOIN revision ON revision.id=revision_before_revision.prev WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None - def revision_visited(self, revision: bytes) -> bool: + def revision_visited(self, revision: Sha1Git) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_in_origin JOIN revision ON revision.id=revision_in_origin.revision WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None def update_preferred_origin(self, data: Dict[Sha1Git, Sha1Git]): if data: # XXX: this is assuming the revision already exists in the db! It should # be improved by allowing null dates in the revision table. psycopg2.extras.execute_values( self.cursor, """ UPDATE revision R SET origin=O.id FROM (VALUES %s) AS V(rev, org) INNER JOIN origin AS O on (O.sha1=V.org) WHERE R.sha1=V.rev """, data.items(), ) data.clear() diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py index 7808e47..62c6f87 100644 --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,105 +1,107 @@ from datetime import datetime from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras +from swh.model.model import Sha1Git + from .provenancedb_base import ProvenanceDBBase class ProvenanceWithPathDB(ProvenanceDBBase): def content_find_first( - self, blob: bytes - ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + self, id: Sha1Git + ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: self.cursor.execute( """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, L.path AS path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content = C.id) INNER JOIN location as L ON (CR.location = L.id) INNER JOIN revision as R ON (CR.revision = R.id) WHERE C.sha1=%s ORDER BY date, rev, path ASC LIMIT 1 """, - (blob,), + (id,), ) return self.cursor.fetchone() def content_find_all( - self, blob: bytes, limit: Optional[int] = None - ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, L.path AS path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content = C.id) INNER JOIN location AS L ON (CR.location = L.id) INNER JOIN revision AS R ON (CR.revision = R.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, CASE DL.path WHEN '' THEN CL.path WHEN '.' THEN CL.path ELSE (DL.path || '/' || CL.path)::unix_path END AS path FROM content AS C INNER JOIN content_in_directory AS CD ON (C.id = CD.content) INNER JOIN directory_in_revision AS DR ON (CD.directory = DR.directory) INNER JOIN revision AS R ON (DR.revision = R.id) INNER JOIN location AS CL ON (CD.location = CL.id) INNER JOIN location AS DL ON (DR.location = DL.id) WHERE C.sha1=%s) ORDER BY date, rev, path {early_cut} """, - (blob, blob), + (id, id), ) yield from self.cursor.fetchall() - def insert_relation(self, relation: str, data: Set[Tuple[bytes, bytes, bytes]]): + def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): """Insert entries in `relation` from `data` Also insert missing location entries in the 'location' table. """ if data: assert relation in ( "content_in_revision", "content_in_directory", "directory_in_revision", ) src, dst = relation.split("_in_") # insert missing locations locations = tuple(set((loc,) for (_, _, loc) in data)) psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO NOTHING """, locations, ) psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {relation}; INSERT INTO {relation} SELECT {src}.id, {dst}.id, location.id FROM (VALUES %s) AS V(src, dst, path) INNER JOIN {src} on ({src}.sha1=V.src) INNER JOIN {dst} on ({dst}.sha1=V.dst) INNER JOIN location on (location.path=V.path) """, data, ) data.clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py index 7218f1e..a55065a 100644 --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,85 +1,83 @@ from datetime import datetime from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras -from .provenancedb_base import ProvenanceDBBase +from swh.model.model import Sha1Git -######################################################################################## -######################################################################################## -######################################################################################## +from .provenancedb_base import ProvenanceDBBase class ProvenanceWithoutPathDB(ProvenanceDBBase): def content_find_first( - self, blob: bytes - ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + self, id: Sha1Git + ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: self.cursor.execute( """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, '\\x'::bytea as path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content = C.id) INNER JOIN revision as R ON (CR.revision = R.id) WHERE C.sha1=%s ORDER BY date, rev ASC LIMIT 1 """, - (blob,), + (id,), ) return self.cursor.fetchone() def content_find_all( - self, blob: bytes, limit: Optional[int] = None - ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, '\\x'::bytea as path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content = C.id) INNER JOIN revision AS R ON (CR.revision = R.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, '\\x'::bytea as path FROM content AS C INNER JOIN content_in_directory AS CD ON (C.id = CD.content) INNER JOIN directory_in_revision AS DR ON (CD.directory = DR.directory) INNER JOIN revision AS R ON (DR.revision = R.id) WHERE C.sha1=%s) ORDER BY date, rev, path {early_cut} """, - (blob, blob), + (id, id), ) yield from self.cursor.fetchall() - def insert_relation(self, relation: str, data: Set[Tuple[bytes, bytes, bytes]]): + def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): if data: assert relation in ( "content_in_revision", "content_in_directory", "directory_in_revision", ) src, dst = relation.split("_in_") psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {relation}; INSERT INTO {relation} SELECT {src}.id, {dst}.id FROM (VALUES %s) AS V(src, dst) INNER JOIN {src} on ({src}.sha1=V.src) INNER JOIN {dst} on ({dst}.sha1=V.dst) """, data, ) data.clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 313db75..45fabc4 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,306 +1,306 @@ from datetime import datetime import logging import os from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple import psycopg2 from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry # XXX: this protocol doesn't make much sense now that flavours have been delegated to # another class, lower in the callstack. @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( - self, blob: bytes - ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + self, id: Sha1Git + ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: ... def content_find_all( - self, blob: bytes, limit: Optional[int] = None - ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates( self, blobs: Iterable[FileEntry] - ) -> Dict[bytes, datetime]: + ) -> Dict[Sha1Git, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] - ) -> Dict[bytes, datetime]: + ) -> Dict[Sha1Git, datetime]: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_add(self, origin: OriginEntry) -> None: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... class DatetimeCache(TypedDict): - data: Dict[bytes, datetime] - added: Set[bytes] + data: Dict[Sha1Git, datetime] + added: Set[Sha1Git] class OriginCache(TypedDict): data: Dict[Sha1Git, str] added: Set[Sha1Git] class RevisionCache(TypedDict): data: Dict[Sha1Git, Sha1Git] added: Set[Sha1Git] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache revision: DatetimeCache # below are insertion caches only - content_in_revision: Set[Tuple[bytes, bytes, bytes]] - content_in_directory: Set[Tuple[bytes, bytes, bytes]] - directory_in_revision: Set[Tuple[bytes, bytes, bytes]] + content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] + content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] + directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] # these two are for the origin layer origin: OriginCache revision_origin: RevisionCache - revision_before_revision: Dict[bytes, Set[bytes]] + revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] def new_cache(): return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), origin=OriginCache(data={}, added=set()), revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), ) # TODO: maybe move this to a separate file class ProvenanceBackend: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): from .postgresql.provenancedb_base import ProvenanceDBBase # TODO: this class should not know what the actual used DB is. self.storage: ProvenanceDBBase flavor = ProvenanceDBBase(conn).flavor if flavor == "with-path": from .postgresql.provenancedb_with_path import ProvenanceWithPathDB self.storage = ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB self.storage = ProvenanceWithoutPathDB(conn) self.cache: ProvenanceCache = new_cache() def clear_caches(self): self.cache = new_cache() def commit(self): # TODO: for now we just forward the cache. This should be improved! while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( f"Unable to commit cached information {self.write_cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( - self, blob: bytes - ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: - return self.storage.content_find_first(blob) + self, id: Sha1Git + ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + return self.storage.content_find_first(id) def content_find_all( - self, blob: bytes, limit: Optional[int] = None - ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: - yield from self.storage.content_find_all(blob, limit=limit) + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id, None) def content_get_early_dates( self, blobs: Iterable[FileEntry] - ) -> Dict[bytes, datetime]: + ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime): self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id, None) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] - ) -> Dict[bytes, datetime]: + ) -> Dict[Sha1Git, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( - self, entity: Literal["content", "revision", "directory"], ids: List[bytes] - ) -> Dict[bytes, datetime]: + self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git] + ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) return {sha1: cache["data"][sha1] for sha1 in ids if sha1 in cache["data"]} def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB assert revision.date is not None self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.cache["revision_before_revision"].setdefault(revision.id, set()).add( relative.id ) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id, None) def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: cache = self.cache["revision_origin"] if revision.id not in cache: origin = self.storage.revision_get_preferred_origin(revision.id) if origin is not None: cache["data"][revision.id] = origin return cache["data"].get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: return revision.id in self.cache[ "revision_before_revision" ] or self.storage.revision_in_history(revision.id) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): self.cache["revision_origin"]["data"][revision.id] = origin.id self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: return revision.id in dict( self.cache["revision_in_origin"] ) or self.storage.revision_visited(revision.id) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 8910ff3..2f3e047 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,255 +1,256 @@ from datetime import datetime, timezone from itertools import islice import logging import os import time from typing import Iterable, Iterator, List, Optional, Tuple import iso8601 from swh.model.hashutil import hash_to_bytes +from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import IsochroneNode, build_isochrone_graph from .model import DirectoryEntry, RevisionEntry from .provenance import ProvenanceInterface class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, - revisions: Iterable[Tuple[bytes, datetime, bytes]], + revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, ): - self.revisions: Iterator[Tuple[bytes, datetime, bytes]] + self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self): return self def __next__(self): id, date, root = next(self.revisions) date = iso8601.parse_date(date) if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) return RevisionEntry( hash_to_bytes(id), date=date, root=hash_to_bytes(root), ) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, commit: bool = True, ) -> None: start = time.time() for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: logging.debug( f"Processing revisions {revision.id.hex()}" f" (known date {date} / revision date {revision.date})..." ) graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), ) # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, ) done = time.time() if commit: provenance.commit() stop = time.time() logging.debug( f"Revisions {';'.join([revision.id.hex() for revision in revisions])} " f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) # logging.critical( # ";".join([revision.id.hex() for revision in revisions]) # + f",{stop - start},{stop - done}" # ) def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, ): assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) flatten_directory(archive, provenance, current.entry) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() current.retrieve_children(archive) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: # The only real condition for a directory to be a frontier is that its # content is already known and its maxdate is less (or equal) than # current revision's date. Checking mindepth is meant to skip root # directories (or any arbitrary depth) to improve the result. The # option lower tries to maximize the reusage rate of previously defined # frontiers by keeping them low in the directory tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an extrictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index 92d6388..9dff527 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,53 +1,53 @@ from typing import Any, Dict, Iterable, Set -from swh.model.model import ObjectType, Revision, Sha1, TargetType +from swh.model.model import ObjectType, Revision, Sha1Git, TargetType from swh.storage.interface import StorageInterface class ArchiveStorage: def __init__(self, storage: StorageInterface): self.storage = storage - def directory_ls(self, id: Sha1) -> Iterable[Dict[str, Any]]: + def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: # TODO: filter unused fields yield from self.storage.directory_ls(id) - def revision_get(self, ids: Iterable[Sha1]) -> Iterable[Revision]: + def revision_get(self, ids: Iterable[Sha1Git]) -> Iterable[Revision]: # TODO: filter unused fields yield from ( rev for rev in self.storage.revision_get(list(ids)) if rev is not None ) - def snapshot_get_heads(self, id: Sha1) -> Iterable[Sha1]: + def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches snapshot = snapshot_get_all_branches(self.storage, id) assert snapshot is not None targets_set = set() releases_set = set() if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets_set.add(snapshot.branches[branch].target) elif snapshot.branches[branch].target_type == TargetType.RELEASE: releases_set.add(snapshot.branches[branch].target) batchsize = 100 for releases in grouper(releases_set, batchsize): targets_set.update( release.target for release in self.storage.release_get(list(releases)) if release is not None and release.target_type == ObjectType.REVISION ) - revisions: Set[Sha1] = set() + revisions: Set[Sha1Git] = set() for targets in grouper(targets_set, batchsize): revisions.update( revision.id for revision in self.storage.revision_get(list(targets)) if revision is not None ) yield from revisions diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index 79ae7a4..ddde62f 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,233 +1,235 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from os import path import re from typing import Iterable, Iterator, List, Optional import msgpack import pytest from typing_extensions import TypedDict from swh.core.db import BaseDb from swh.journal.serializers import msgpack_ext_hook +from swh.model.hashutil import hash_to_bytes +from swh.model.model import Sha1Git from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.storage.replay import process_replay_objects @pytest.fixture(params=["with-path", "without-path"]) def provenance(request, postgresql): """return a working and initialized provenance db""" from swh.core.cli.db import populate_database_for_package flavor = request.param populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor) from swh.provenance.provenance import ProvenanceBackend BaseDb.adapt_conn(postgresql) prov = ProvenanceBackend(postgresql) assert prov.storage.flavor == flavor # in test sessions, we DO want to raise any exception occurring at commit time prov.raise_on_commit = True return prov @pytest.fixture def swh_storage_with_objects(swh_storage): """return a Storage object (postgresql-based by default) with a few of each object type in it The inserted content comes from swh.model.tests.swh_model_data. """ for obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", ): getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) return swh_storage @pytest.fixture def archive_direct(swh_storage_with_objects): return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) @pytest.fixture def archive_api(swh_storage_with_objects): return ArchiveStorage(swh_storage_with_objects) @pytest.fixture(params=["archive", "db"]) def archive(request, swh_storage_with_objects): """Return a ArchivePostgreSQL based StorageInterface object""" # this is a workaround to prevent tests from hanging because of an unclosed # transaction. # TODO: refactor the ArchivePostgreSQL to properly deal with # transactions and get rif of this fixture if request.param == "db": archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) yield archive archive.conn.rollback() else: yield ArchiveStorage(swh_storage_with_objects) def get_datafile(fname): return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo): data = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for objtype, objd in unpacker: data.setdefault(objtype, []).append(objd) return data def filter_dict(d, keys): return {k: v for (k, v) in d.items() if k in keys} def fill_storage(storage, data): process_replay_objects(data, storage=storage) class SynthRelation(TypedDict): prefix: Optional[str] path: str - src: bytes - dst: bytes + src: Sha1Git + dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): - sha1: bytes + sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: - "sha1": (bytes) sha1 of the revision, + "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location - "src": (bytes) sha1 of the source of the relation - "dst": (bytes) sha1 of the destination of the relation + "src": (Sha1Git) sha1 of the source of the relation + "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_file(fobj) def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-z]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( - sha1=bytes.fromhex(synth_rev[0]["sha1"]), + sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], - dst=bytes.fromhex(row["sha1"]), + dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], - dst=bytes.fromhex(row["sha1"]), + dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], - dst=bytes.fromhex(row["sha1"]), + dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev diff --git a/swh/provenance/tests/data/generate_storage_from_git.py b/swh/provenance/tests/data/generate_storage_from_git.py index 0d8b0cd..a9e5ddf 100644 --- a/swh/provenance/tests/data/generate_storage_from_git.py +++ b/swh/provenance/tests/data/generate_storage_from_git.py @@ -1,114 +1,115 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import os from subprocess import check_output import click import yaml from swh.loader.git.from_disk import GitLoaderFromDisk +from swh.model.hashutil import hash_to_bytes from swh.model.model import ( Origin, OriginVisit, OriginVisitStatus, Snapshot, SnapshotBranch, TargetType, ) from swh.storage import get_storage def load_git_repo(url, directory, storage): visit_date = datetime.now(tz=timezone.utc) loader = GitLoaderFromDisk( url=url, directory=directory, visit_date=visit_date, storage=storage, ) return loader.load() def pop_key(d, k): d.pop(k) return d @click.command() @click.option("-o", "--output", default=None, help="output file") @click.option( "-v", "--visits", type=click.File(mode="rb"), default=None, help="additional visits to generate.", ) @click.argument("git-repo", type=click.Path(exists=True, file_okay=False)) def main(output, visits, git_repo): "simple tool to generate the git_repo.msgpack dataset file used in some tests" if output is None: output = f"{git_repo}.msgpack" with open(output, "wb") as outstream: sto = get_storage( cls="memory", journal_writer={"cls": "stream", "output_stream": outstream} ) if git_repo.endswith("/"): git_repo = git_repo[:-1] reponame = os.path.basename(git_repo) load_git_repo(f"https://{reponame}", git_repo, sto) if visits: # retrieve all branches from the actual git repo all_branches = { ref: sha1 for sha1, ref in ( line.strip().split() for line in check_output(["git", "-C", git_repo, "show-ref"]) .decode() .splitlines() ) } for visit in yaml.full_load(visits): # add the origin (if it already exists, this is a noop) sto.origin_add([Origin(url=visit["origin"])]) # add a new visit for this origin visit_id = sto.origin_visit_add( [ OriginVisit( origin=visit["origin"], date=datetime.fromtimestamp(visit["date"], tz=timezone.utc), type="git", ) ] )[0].visit # add a snapshot with branches from the input file branches = { f"refs/heads/{name}".encode(): SnapshotBranch( - target=bytes.fromhex(all_branches[f"refs/heads/{name}"]), + target=hash_to_bytes(all_branches[f"refs/heads/{name}"]), target_type=TargetType.REVISION, ) for name in visit["branches"] } snap = Snapshot(branches=branches) sto.snapshot_add([snap]) # add a "closing" origin visit status update referencing the snapshot status = OriginVisitStatus( origin=visit["origin"], visit=visit_id, date=datetime.fromtimestamp(visit["date"], tz=timezone.utc), status="full", snapshot=snap.id, ) sto.origin_visit_status_add([status]) click.echo(f"Serialized the storage made from {reponame} in {output}") if __name__ == "__main__": main() diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py index d8f47c0..90af26b 100644 --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -1,346 +1,347 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List, Tuple import pytest +from swh.model.hashutil import hash_to_bytes from swh.provenance.model import RevisionEntry from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, synthetic_result, ) from swh.provenance.tests.test_provenance_db import ts2dt def sha1s(cur, table): """return the 'sha1' column from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ cur.execute(f"SELECT sha1 FROM {table}") return set(sha1.hex() for (sha1,) in cur.fetchall()) def locations(cur): """return the 'path' column from the DB location table 'cur' is a cursor to the provenance index DB. """ cur.execute("SELECT encode(location.path::bytea, 'escape') FROM location") return set(x for (x,) in cur.fetchall()) def relations(cur, src, dst): """return the triplets ('sha1', 'sha1', 'path') from the DB for the relation between 'src' table and 'dst' table (i.e. for C-R, C-D and D-R relations). 'cur' is a cursor to the provenance index DB. """ relation = f"{src}_in_{dst}" cur.execute("select swh_get_dbflavor()") with_path = cur.fetchone()[0] == "with-path" # note that the columns have the same name as the relations they refer to, # so we can write things like "rel.{dst}=src.id" in the query below if with_path: cur.execute( f""" SELECT encode(src.sha1::bytea, 'hex'), encode(dst.sha1::bytea, 'hex'), encode(location.path::bytea, 'escape') FROM {relation} as relation INNER JOIN {src} AS src ON (relation.{src} = src.id) INNER JOIN {dst} AS dst ON (relation.{dst} = dst.id) INNER JOIN location ON (relation.location = location.id) """ ) else: cur.execute( f""" SELECT encode(src.sha1::bytea, 'hex'), encode(dst.sha1::bytea, 'hex'), '' FROM {relation} as relation INNER JOIN {src} AS src ON (src.id = relation.{src}) INNER JOIN {dst} AS dst ON (dst.id = relation.{dst}) """ ) return set(cur.fetchall()) def get_timestamp(cur, table, sha1): """return the date for the 'sha1' from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ if isinstance(sha1, str): - sha1 = bytes.fromhex(sha1) + sha1 = hash_to_bytes(sha1) cur.execute(f"SELECT date FROM {table} WHERE sha1=%s", (sha1,)) return [date.timestamp() for (date,) in cur.fetchall()] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics(provenance, swh_storage, archive, repo, lower, mindepth): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } cursor = provenance.storage.cursor def maybe_path(path: str) -> str: if provenance.storage.with_path: return path return "" for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"].hex()) assert rows["revision"] == sha1s(cursor, "revision"), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] assert get_timestamp(cursor, "revision", synth_rev["sha1"].hex()) == [ rev_ts ], synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"].hex() for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"].hex() for x in synth_rev["D_C"]) assert rows["content"] == sha1s(cursor, "content"), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) for x in synth_rev["R_C"] ) assert rows["content_in_revision"] == relations( cursor, "content", "revision" ), synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert get_timestamp(cursor, "content", rc["dst"]) == [ rev_ts + rc["rel_ts"] ], synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"].hex() for x in synth_rev["R_D"]) assert rows["directory"] == sha1s(cursor, "directory"), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) for x in synth_rev["R_D"] ) assert rows["directory_in_revision"] == relations( cursor, "directory", "revision" ), synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: assert get_timestamp(cursor, "directory", rd["dst"]) == [ rev_ts + rd["rel_ts"] ], synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) for x in synth_rev["D_C"] ) assert rows["content_in_directory"] == relations( cursor, "content", "directory" ), synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert get_timestamp(cursor, "content", dc["dst"]) == [ rev_ts + dc["rel_ts"] ], synth_rev["msg"] if provenance.storage.with_path: # check for location entries rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) assert rows["location"] == locations(cursor), synth_rev["msg"] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_all( provenance, swh_storage, archive, repo, lower, mindepth ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] def maybe_path(path: str) -> str: if provenance.storage.with_path: return path return "" # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences = {} for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, maybe_path(dc["prefix"] + "/" + dc["path"])) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ (blob.hex(), rev.hex(), date.timestamp(), path.decode()) for blob, rev, date, path in provenance.content_find_all( - bytes.fromhex(content_id) + hash_to_bytes(content_id) ) ] if provenance.storage.with_path: # this is not true if the db stores no path, because a same content # that appears several times in a given revision may be reported # only once by content_find_all() assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_first( provenance, swh_storage, archive, repo, lower, mindepth ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, str, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, rc["path"]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): (r_sha1, r_rev_id, r_ts, r_path) = provenance.content_find_first( - bytes.fromhex(content_id) + hash_to_bytes(content_id) ) assert r_sha1.hex() == content_id assert r_rev_id.hex() == rev_id assert r_ts.timestamp() == ts if provenance.storage.with_path: assert r_path.decode() in paths