diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index ab025dc..4499c26 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,83 +1,83 @@ from __future__ import annotations from typing import TYPE_CHECKING if TYPE_CHECKING: from .archive import ArchiveInterface - from .provenance import ProvenanceInterface, ProvenanceStorageInterface + from .interface import ProvenanceInterface, ProvenanceStorageInterface def get_archive(cls: str, **kwargs) -> ArchiveInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: archive's class, either 'api' or 'direct' args: dictionary of arguments passed to the archive class constructor Returns: an instance of archive object (either using swh.storage API or direct queries to the archive's database) Raises: - :cls:`ValueError` if passed an unknown archive class. + :cls:`ValueError` if passed an unknown archive class. """ if cls == "api": from swh.storage import get_storage from .storage.archive import ArchiveStorage return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": from swh.core.db import BaseDb from .postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(BaseDb.connect(**kwargs["db"]).conn) else: raise ValueError def get_provenance(**kwargs) -> ProvenanceInterface: """Get an provenance object with arguments ``args``. Args: args: dictionary of arguments to retrieve a swh.provenance.storage class (see :func:`get_provenance_storage` for details) Returns: an instance of provenance object """ - from .backend import ProvenanceBackend + from .provenance import Provenance - return ProvenanceBackend(get_provenance_storage(**kwargs)) + return Provenance(get_provenance_storage(**kwargs)) def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: storage's class, only 'local' is currently supported args: dictionary of arguments passed to the storage class constructor Returns: an instance of storage object Raises: - :cls:`ValueError` if passed an unknown archive class. + :cls:`ValueError` if passed an unknown archive class. """ if cls == "local": from swh.core.db import BaseDb from .postgresql.provenancedb_base import ProvenanceDBBase conn = BaseDb.connect(**kwargs["db"]).conn if ProvenanceDBBase(conn).flavor == "with-path": from .postgresql.provenancedb_with_path import ProvenanceWithPathDB return ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB return ProvenanceWithoutPathDB(conn) else: raise ValueError diff --git a/swh/provenance/backend.py b/swh/provenance/backend.py deleted file mode 100644 index afac165..0000000 --- a/swh/provenance/backend.py +++ /dev/null @@ -1,344 +0,0 @@ -from datetime import datetime -import logging -import os -from typing import Dict, Generator, Iterable, Optional, Set, Tuple - -from typing_extensions import Literal, TypedDict - -from swh.model.model import Sha1Git - -from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -from .provenance import ( - ProvenanceResult, - ProvenanceStorageInterface, - RelationData, - RelationType, -) - - -class DatetimeCache(TypedDict): - data: Dict[Sha1Git, Optional[datetime]] - added: Set[Sha1Git] - - -class OriginCache(TypedDict): - data: Dict[Sha1Git, str] - added: Set[Sha1Git] - - -class RevisionCache(TypedDict): - data: Dict[Sha1Git, Sha1Git] - added: Set[Sha1Git] - - -class ProvenanceCache(TypedDict): - content: DatetimeCache - directory: DatetimeCache - revision: DatetimeCache - # below are insertion caches only - content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] - content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] - directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] - # these two are for the origin layer - origin: OriginCache - revision_origin: RevisionCache - revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] - revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] - - -def new_cache() -> ProvenanceCache: - return ProvenanceCache( - content=DatetimeCache(data={}, added=set()), - directory=DatetimeCache(data={}, added=set()), - revision=DatetimeCache(data={}, added=set()), - content_in_revision=set(), - content_in_directory=set(), - directory_in_revision=set(), - origin=OriginCache(data={}, added=set()), - revision_origin=RevisionCache(data={}, added=set()), - revision_before_revision={}, - revision_in_origin=set(), - ) - - -# TODO: maybe move this to a separate file -class ProvenanceBackend: - def __init__(self, storage: ProvenanceStorageInterface) -> None: - self.storage = storage - self.cache = new_cache() - - def clear_caches(self) -> None: - self.cache = new_cache() - - def flush(self) -> None: - # Revision-content layer insertions ############################################ - - # For this layer, relations need to be inserted first so that, in case of - # failure, reprocessing the input does not generated an inconsistent database. - while not self.storage.relation_add( - RelationType.CNT_EARLY_IN_REV, - ( - RelationData(src=src, dst=dst, path=path) - for src, dst, path in self.cache["content_in_revision"] - ), - ): - logging.warning( - f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " - f"Data: {self.cache['content_in_revision']}. Retrying..." - ) - - while not self.storage.relation_add( - RelationType.CNT_IN_DIR, - ( - RelationData(src=src, dst=dst, path=path) - for src, dst, path in self.cache["content_in_directory"] - ), - ): - logging.warning( - f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " - f"Data: {self.cache['content_in_directory']}. Retrying..." - ) - - while not self.storage.relation_add( - RelationType.DIR_IN_REV, - ( - RelationData(src=src, dst=dst, path=path) - for src, dst, path in self.cache["directory_in_revision"] - ), - ): - logging.warning( - f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " - f"Data: {self.cache['directory_in_revision']}. Retrying..." - ) - - # After relations, dates for the entities can be safely set, acknowledging that - # these entities won't need to be reprocessed in case of failure. - dates = { - sha1: date - for sha1, date in self.cache["content"]["data"].items() - if sha1 in self.cache["content"]["added"] and date is not None - } - while not self.storage.content_set_date(dates): - logging.warning( - f"Unable to write content dates to the storage. " - f"Data: {dates}. Retrying..." - ) - - dates = { - sha1: date - for sha1, date in self.cache["directory"]["data"].items() - if sha1 in self.cache["directory"]["added"] and date is not None - } - while not self.storage.directory_set_date(dates): - logging.warning( - f"Unable to write directory dates to the storage. " - f"Data: {dates}. Retrying..." - ) - - dates = { - sha1: date - for sha1, date in self.cache["revision"]["data"].items() - if sha1 in self.cache["revision"]["added"] and date is not None - } - while not self.storage.revision_set_date(dates): - logging.warning( - f"Unable to write revision dates to the storage. " - f"Data: {dates}. Retrying..." - ) - - # Origin-revision layer insertions ############################################# - - # Origins urls should be inserted first so that internal ids' resolution works - # properly. - urls = { - sha1: date - for sha1, date in self.cache["origin"]["data"].items() - if sha1 in self.cache["origin"]["added"] - } - while not self.storage.origin_set_url(urls): - logging.warning( - f"Unable to write origins urls to the storage. " - f"Data: {urls}. Retrying..." - ) - - # Second, flat models for revisions' histories (ie. revision-before-revision). - data: Iterable[RelationData] = sum( - [ - [ - RelationData(src=prev, dst=next, path=None) - for next in self.cache["revision_before_revision"][prev] - ] - for prev in self.cache["revision_before_revision"] - ], - [], - ) - while not self.storage.relation_add(RelationType.REV_BEFORE_REV, data): - logging.warning( - f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " - f"Data: {data}. Retrying..." - ) - - # Heads (ie. revision-in-origin entries) should be inserted once flat models for - # their histories were already added. This is to guarantee consistent results if - # something needs to be reprocessed due to a failure: already inserted heads - # won't get reprocessed in such a case. - data = ( - RelationData(src=rev, dst=org, path=None) - for rev, org in self.cache["revision_in_origin"] - ) - while not self.storage.relation_add(RelationType.REV_IN_ORG, data): - logging.warning( - f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " - f"Data: {data}. Retrying..." - ) - - # Finally, preferred origins for the visited revisions are set (this step can be - # reordered if required). - origins = { - sha1: self.cache["revision_origin"]["data"][sha1] - for sha1 in self.cache["revision_origin"]["added"] - } - while not self.storage.revision_set_origin(origins): - logging.warning( - f"Unable to write preferred origins to the storage. " - f"Data: {origins}. Retrying..." - ) - - # clear local cache ############################################################ - self.clear_caches() - - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ) -> None: - self.cache["content_in_directory"].add( - (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ) -> None: - self.cache["content_in_revision"].add( - (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - return self.storage.content_find_first(id) - - def content_find_all( - self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[ProvenanceResult, None, None]: - yield from self.storage.content_find_all(id, limit=limit) - - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - return self.get_dates("content", [blob.id]).get(blob.id) - - def content_get_early_dates( - self, blobs: Iterable[FileEntry] - ) -> Dict[Sha1Git, datetime]: - return self.get_dates("content", [blob.id for blob in blobs]) - - def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: - self.cache["content"]["data"][blob.id] = date - self.cache["content"]["added"].add(blob.id) - - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ) -> None: - self.cache["directory_in_revision"].add( - (directory.id, revision.id, normalize(path)) - ) - - def directory_get_date_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> Optional[datetime]: - return self.get_dates("directory", [directory.id]).get(directory.id) - - def directory_get_dates_in_isochrone_frontier( - self, dirs: Iterable[DirectoryEntry] - ) -> Dict[Sha1Git, datetime]: - return self.get_dates("directory", [directory.id for directory in dirs]) - - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ) -> None: - self.cache["directory"]["data"][directory.id] = date - self.cache["directory"]["added"].add(directory.id) - - def get_dates( - self, - entity: Literal["content", "directory", "revision"], - ids: Iterable[Sha1Git], - ) -> Dict[Sha1Git, datetime]: - cache = self.cache[entity] - missing_ids = set(id for id in ids if id not in cache) - if missing_ids: - if entity == "revision": - updated = { - id: rev.date - for id, rev in self.storage.revision_get(missing_ids).items() - if rev.date is not None - } - else: - updated = getattr(self.storage, f"{entity}_get")(missing_ids) - cache["data"].update(updated) - dates: Dict[Sha1Git, datetime] = {} - for sha1 in ids: - date = cache["data"].get(sha1) - if date is not None: - dates[sha1] = date - return dates - - def origin_add(self, origin: OriginEntry) -> None: - self.cache["origin"]["data"][origin.id] = origin.url - self.cache["origin"]["added"].add(origin.id) - - def revision_add(self, revision: RevisionEntry) -> None: - self.cache["revision"]["data"][revision.id] = revision.date - self.cache["revision"]["added"].add(revision.id) - - def revision_add_before_revision( - self, head: RevisionEntry, revision: RevisionEntry - ) -> None: - self.cache["revision_before_revision"].setdefault(revision.id, set()).add( - head.id - ) - - def revision_add_to_origin( - self, origin: OriginEntry, revision: RevisionEntry - ) -> None: - self.cache["revision_in_origin"].add((revision.id, origin.id)) - - def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: - return self.get_dates("revision", [revision.id]).get(revision.id) - - def revision_get_preferred_origin( - self, revision: RevisionEntry - ) -> Optional[Sha1Git]: - cache = self.cache["revision_origin"]["data"] - if revision.id not in cache: - ret = self.storage.revision_get([revision.id]) - if revision.id in ret: - origin = ret[revision.id].origin - if origin is not None: - cache[revision.id] = origin - return cache.get(revision.id) - - def revision_in_history(self, revision: RevisionEntry) -> bool: - return revision.id in self.cache["revision_before_revision"] or bool( - self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) - ) - - def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry - ) -> None: - self.cache["revision_origin"]["data"][revision.id] = origin.id - self.cache["revision_origin"]["added"].add(revision.id) - - def revision_visited(self, revision: RevisionEntry) -> bool: - return revision.id in dict(self.cache["revision_in_origin"]) or bool( - self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) - ) - - -def normalize(path: bytes) -> bytes: - return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py index 1d782cd..52265a5 100644 --- a/swh/provenance/graph.py +++ b/swh/provenance/graph.py @@ -1,260 +1,260 @@ from __future__ import annotations from datetime import datetime, timezone import logging import os from typing import Any, Dict, Optional, Set from swh.model.model import Sha1Git from .archive import ArchiveInterface +from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry -from .provenance import ProvenanceInterface UTCMIN = datetime.min.replace(tzinfo=timezone.utc) class HistoryNode: def __init__( self, entry: RevisionEntry, visited: bool = False, in_history: bool = False ) -> None: self.entry = entry # A revision is `visited` if it is directly pointed by an origin (ie. a head # revision for some snapshot) self.visited = visited # A revision is `in_history` if it appears in the history graph of an already # processed revision in the provenance database self.in_history = in_history self.parents: Set[HistoryNode] = set() def add_parent( self, parent: RevisionEntry, visited: bool = False, in_history: bool = False ) -> HistoryNode: node = HistoryNode(parent, visited=visited, in_history=in_history) self.parents.add(node) return node def __str__(self) -> str: return ( f"<{self.entry}: visited={self.visited}, in_history={self.in_history}, " f"parents=[{', '.join(str(parent) for parent in self.parents)}]>" ) def __eq__(self, other: Any) -> bool: return isinstance(other, HistoryNode) and self.__dict__ == other.__dict__ def __hash__(self) -> int: return hash((self.entry, self.visited, self.in_history)) def build_history_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, ) -> HistoryNode: """Recursively build the history graph from the given revision""" root = HistoryNode( revision, visited=provenance.revision_visited(revision), in_history=provenance.revision_in_history(revision), ) stack = [root] logging.debug( f"Recursively creating history graph for revision {revision.id.hex()}..." ) while stack: current = stack.pop() if not current.visited: current.entry.retrieve_parents(archive) for rev in current.entry.parents: node = current.add_parent( rev, visited=provenance.revision_visited(rev), in_history=provenance.revision_in_history(rev), ) stack.append(node) logging.debug( f"History graph for revision {revision.id.hex()} successfully created!" ) return root class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ) -> None: self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None # known is True if this node is already known in the db; either because # the current directory actually exists in the database, or because all # the content of the current directory is known (subdirectories and files) self.known = self.dbdate is not None self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: Set[IsochroneNode] = set() @property def dbdate(self) -> Optional[datetime]: # use a property to make this attribute (mostly) read-only return self._dbdate def invalidate(self) -> None: self._dbdate = None self.maxdate = None self.known = False self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> IsochroneNode: # we should not be processing this node (ie add subdirectories or files) if it's # actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.add(node) return node def __str__(self) -> str: return ( f"<{self.entry}: depth={self.depth}, " f"dbdate={self.dbdate}, maxdate={self.maxdate}, " f"known={self.known}, invalid={self.invalid}, path={self.path!r}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) def __eq__(self, other: Any) -> bool: return isinstance(other, IsochroneNode) and self.__dict__ == other.__dict__ def __hash__(self) -> int: # only immutable attributes are considered to compute hash return hash((self.entry, self.depth, self.path)) def build_isochrone_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] logging.debug( f"Recursively creating isochrone graph for revision {revision.id.hex()}..." ) fdates: Dict[Sha1Git, datetime] = {} # map {file_id: date} while stack: current = stack.pop() if current.dbdate is None or current.dbdate > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate > revision.date: logging.debug( f"Invalidating frontier on {current.entry.id.hex()}" f" (date {current.dbdate})" f" when processing revision {revision.id.hex()}" f" (date {revision.date})" ) current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. current.entry.retrieve_children(archive) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) logging.debug( f"Isochrone graph for revision {revision.id.hex()} successfully created!" ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). logging.debug(f"Computing maxdates for revision {revision.id.hex()}...") stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.known: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ child.maxdate for child in current.children if child.maxdate is not None # unnecessary, but needed for mypy ] + [ fdates.get(file.id, revision.date) for file in current.entry.files ] ) if current.maxdate <= revision.date: current.known = ( # true if all subdirectories are known all(child.known for child in current.children) # true if all files are in fdates, i.e. if all files were known # *before building this isochrone graph node* # Note: the 'all()' is lazy: will stop iterating as soon as # possible and all((file.id in fdates) for file in current.entry.files) ) else: # at least one content is being processed out-of-order, then current # node should be treated as unknown current.maxdate = revision.date current.known = False logging.debug(f"Maxdates for revision {revision.id.hex()} successfully computed!") return root diff --git a/swh/provenance/provenance.py b/swh/provenance/interface.py similarity index 100% copy from swh/provenance/provenance.py copy to swh/provenance/interface.py diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index 1f0bdf2..c1f5960 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,101 +1,101 @@ from itertools import islice import logging import time from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import HistoryNode, build_history_graph +from .interface import ProvenanceInterface from .model import OriginEntry, RevisionEntry -from .provenance import ProvenanceInterface class CSVOriginIterator: """Iterator over origin visit statuses typically present in the given CSV file. The input is an iterator that produces 2 elements per row: (url, snap) where: - url: is the origin url of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, statuses: Iterable[Tuple[str, Sha1Git]], limit: Optional[int] = None, ) -> None: self.statuses: Iterator[Tuple[str, Sha1Git]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) def __iter__(self) -> Generator[OriginEntry, None, None]: return (OriginEntry(url, snapshot) for url, snapshot in self.statuses) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], ) -> None: start = time.time() for origin in origins: provenance.origin_add(origin) origin.retrieve_revisions(archive) for revision in origin.revisions: graph = build_history_graph(archive, provenance, revision) origin_add_revision(provenance, origin, graph) done = time.time() provenance.flush() stop = time.time() logging.debug( "Origins " ";".join([origin.id.hex() + ":" + origin.snapshot.hex() for origin in origins]) + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, graph: HistoryNode, ) -> None: # head is treated separately since it should always be added to the given origin head = graph.entry check_preferred_origin(provenance, origin, head) provenance.revision_add_to_origin(origin, head) # head's history should be recursively iterated starting from its parents stack = list(graph.parents) while stack: current = stack.pop() check_preferred_origin(provenance, origin, current.entry) if current.visited: # if current revision was already visited just add it to the current origin # and stop recursion (its history has already been flattened) provenance.revision_add_to_origin(origin, current.entry) else: # if current revision was not visited before create a link between it and # the head, and recursively walk its history provenance.revision_add_before_revision(head, current.entry) for parent in current.parents: stack.append(parent) def check_preferred_origin( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry, ) -> None: # if the revision has no preferred origin just set the given origin as the # preferred one. TODO: this should be improved in the future! preferred = provenance.revision_get_preferred_origin(revision) if preferred is None: provenance.revision_set_preferred_origin(origin, revision) diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index b2df264..e5aa257 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,314 +1,314 @@ from datetime import datetime import itertools import logging from typing import Dict, Generator, Iterable, Optional, Set, Tuple import psycopg2 import psycopg2.extras from typing_extensions import Literal from swh.core.db import BaseDb from swh.model.model import Sha1Git -from ..provenance import ( +from ..interface import ( EntityType, ProvenanceResult, RelationData, RelationType, RevisionData, ) class ProvenanceDBBase: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): BaseDb.adapt_conn(conn) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # XXX: not sure this is the best place to do it! sql = "SET timezone TO 'UTC'" self.cursor.execute(sql) self._flavor: Optional[str] = None @property def flavor(self) -> str: if self._flavor is None: sql = "SELECT swh_get_dbflavor() AS flavor" self.cursor.execute(sql) self._flavor = self.cursor.fetchone()["flavor"] assert self._flavor is not None return self._flavor @property def with_path(self) -> bool: return self.flavor == "with-path" def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: ... def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("content", dates) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("content", ids) def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("directory", dates) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("directory", ids) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: sql = f"SELECT sha1 FROM {entity.value}" self.cursor.execute(sql) return {row["sha1"] for row in self.cursor.fetchall()} def location_get(self) -> Set[bytes]: sql = "SELECT encode(location.path::bytea, 'escape') AS path FROM location" self.cursor.execute(sql) return {row["path"] for row in self.cursor.fetchall()} def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: try: if urls: sql = """ LOCK TABLE ONLY origin; INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, urls.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: urls: Dict[Sha1Git, str] = {} sha1s = tuple(ids) if sha1s: values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, url FROM origin WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) urls.update((row["sha1"], row["url"]) for row in self.cursor.fetchall()) return urls def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("revision", dates) def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: try: if origins: sql = """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1, origin) (SELECT V.rev AS sha1, O.id AS origin FROM (VALUES %s) AS V(rev, org) JOIN origin AS O ON (O.sha1=V.org)) ON CONFLICT (sha1) DO UPDATE SET origin=EXCLUDED.origin """ psycopg2.extras.execute_values(self.cursor, sql, origins.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: result: Dict[Sha1Git, RevisionData] = {} sha1s = tuple(ids) if sha1s: values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date, origin FROM revision WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) result.update( (row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) for row in self.cursor.fetchall() ) return result def relation_add( self, relation: RelationType, data: Iterable[RelationData] ) -> bool: try: rows = tuple((rel.src, rel.dst, rel.path) for rel in data) if rows: table = relation.value src, *_, dst = table.split("_") if src != "origin": # Origin entries should be inserted previously as they require extra # non-null information srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) sql = f""" LOCK TABLE ONLY {src}; INSERT INTO {src}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, srcs) if dst != "origin": # Origin entries should be inserted previously as they require extra # non-null information dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) sql = f""" LOCK TABLE ONLY {dst}; INSERT INTO {dst}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, dsts) joins = [ f"INNER JOIN {src} AS S ON (S.sha1=V.src)", f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)", ] selected = ["S.id", "D.id"] if self._relation_uses_location_table(relation): locations = tuple(set((path,) for (_, _, path) in rows)) sql = """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, locations) joins.append("INNER JOIN location AS L ON (L.path=V.path)") selected.append("L.id") sql = f""" INSERT INTO {table} (SELECT {", ".join(selected)} FROM (VALUES %s) AS V(src, dst, path) {''' '''.join(joins)}) ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, rows) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Set[RelationData]: return self._relation_get(relation, ids, reverse) def relation_get_all(self, relation: RelationType) -> Set[RelationData]: return self._relation_get(relation, None) def _entity_get_date( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: dates: Dict[Sha1Git, datetime] = {} sha1s = tuple(ids) if sha1s: values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date FROM {entity} WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall()) return dates def _entity_set_date( self, entity: Literal["content", "directory", "revision"], data: Dict[Sha1Git, datetime], ) -> bool: try: if data: sql = f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """ psycopg2.extras.execute_values(self.cursor, sql, data.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def _relation_get( self, relation: RelationType, ids: Optional[Iterable[Sha1Git]], reverse: bool = False, ) -> Set[RelationData]: result: Set[RelationData] = set() sha1s: Optional[Tuple[Tuple[Sha1Git, ...]]] if ids is not None: sha1s = (tuple(ids),) where = f"WHERE {'S.sha1' if not reverse else 'D.sha1'} IN %s" else: sha1s = None where = "" if sha1s is None or sha1s[0]: table = relation.value src, *_, dst = table.split("_") # TODO: improve this! if src == "revision" and dst == "revision": src_field = "prev" dst_field = "next" else: src_field = src dst_field = dst joins = [ f"INNER JOIN {src} AS S ON (S.id=R.{src_field})", f"INNER JOIN {dst} AS D ON (D.id=R.{dst_field})", ] selected = ["S.sha1 AS src", "D.sha1 AS dst"] if self._relation_uses_location_table(relation): joins.append("INNER JOIN location AS L ON (L.id=R.location)") selected.append("L.path AS path") else: selected.append("NULL AS path") sql = f""" SELECT {", ".join(selected)} FROM {table} AS R {" ".join(joins)} {where} """ self.cursor.execute(sql, sha1s) result.update(RelationData(**row) for row in self.cursor.fetchall()) return result def _relation_uses_location_table(self, relation: RelationType) -> bool: ... diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py index 8e27387..5744a31 100644 --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,70 +1,70 @@ from typing import Generator, Optional from swh.model.model import Sha1Git -from ..provenance import ProvenanceResult, RelationType +from ..interface import ProvenanceResult, RelationType from .provenancedb_base import ProvenanceDBBase class ProvenanceWithPathDB(ProvenanceDBBase): def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: sql = """ SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, L.path AS path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content=C.id) INNER JOIN location as L ON (CR.location=L.id) INNER JOIN revision as R ON (CR.revision=R.id) LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s ORDER BY date, revision, origin, path ASC LIMIT 1 """ self.cursor.execute(sql, (id,)) row = self.cursor.fetchone() return ProvenanceResult(**row) if row is not None else None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" sql = f""" (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, L.path AS path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content=C.id) INNER JOIN location AS L ON (CR.location=L.id) INNER JOIN revision AS R ON (CR.revision=R.id) LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, CASE DL.path WHEN '' THEN CL.path WHEN '.' THEN CL.path ELSE (DL.path || '/' || CL.path)::unix_path END AS path FROM content AS C INNER JOIN content_in_directory AS CD ON (C.id=CD.content) INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory) INNER JOIN revision AS R ON (DR.revision=R.id) INNER JOIN location AS CL ON (CD.location=CL.id) INNER JOIN location AS DL ON (DR.location=DL.id) LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s) ORDER BY date, revision, origin, path {early_cut} """ self.cursor.execute(sql, (id, id)) yield from (ProvenanceResult(**row) for row in self.cursor.fetchall()) def _relation_uses_location_table(self, relation: RelationType) -> bool: src, *_ = relation.value.split("_") return src in ("content", "directory") diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index db25752..b96586a 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,298 +1,343 @@ -from dataclasses import dataclass from datetime import datetime -import enum -from typing import Dict, Generator, Iterable, Optional, Set +import logging +import os +from typing import Dict, Generator, Iterable, Optional, Set, Tuple -from typing_extensions import Protocol, runtime_checkable +from typing_extensions import Literal, TypedDict from swh.model.model import Sha1Git +from .interface import ( + ProvenanceResult, + ProvenanceStorageInterface, + RelationData, + RelationType, +) from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -class EntityType(enum.Enum): - CONTENT = "content" - DIRECTORY = "directory" - REVISION = "revision" - ORIGIN = "origin" +class DatetimeCache(TypedDict): + data: Dict[Sha1Git, Optional[datetime]] + added: Set[Sha1Git] -class RelationType(enum.Enum): - CNT_EARLY_IN_REV = "content_in_revision" - CNT_IN_DIR = "content_in_directory" - DIR_IN_REV = "directory_in_revision" - REV_IN_ORG = "revision_in_origin" - REV_BEFORE_REV = "revision_before_revision" +class OriginCache(TypedDict): + data: Dict[Sha1Git, str] + added: Set[Sha1Git] -@dataclass(eq=True, frozen=True) -class ProvenanceResult: - content: Sha1Git - revision: Sha1Git - date: datetime - origin: Optional[str] - path: bytes +class RevisionCache(TypedDict): + data: Dict[Sha1Git, Sha1Git] + added: Set[Sha1Git] -@dataclass(eq=True, frozen=True) -class RevisionData: - """Object representing the data associated to a revision in the provenance model, - where `date` is the optional date of the revision (specifying it acknowledges that - the revision was already processed by the revision-content algorithm); and `origin` - identifies the preferred origin for the revision, if any. - """ +class ProvenanceCache(TypedDict): + content: DatetimeCache + directory: DatetimeCache + revision: DatetimeCache + # below are insertion caches only + content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] + content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] + directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] + # these two are for the origin layer + origin: OriginCache + revision_origin: RevisionCache + revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] + revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] - date: Optional[datetime] - origin: Optional[Sha1Git] +def new_cache() -> ProvenanceCache: + return ProvenanceCache( + content=DatetimeCache(data={}, added=set()), + directory=DatetimeCache(data={}, added=set()), + revision=DatetimeCache(data={}, added=set()), + content_in_revision=set(), + content_in_directory=set(), + directory_in_revision=set(), + origin=OriginCache(data={}, added=set()), + revision_origin=RevisionCache(data={}, added=set()), + revision_before_revision={}, + revision_in_origin=set(), + ) -@dataclass(eq=True, frozen=True) -class RelationData: - """Object representing a relation entry in the provenance model, where `src` and - `dst` are the sha1 ids of the entities being related, and `path` is optional - depending on the relation being represented. - """ - src: Sha1Git - dst: Sha1Git - path: Optional[bytes] +class Provenance: + def __init__(self, storage: ProvenanceStorageInterface) -> None: + self.storage = storage + self.cache = new_cache() - -@runtime_checkable -class ProvenanceStorageInterface(Protocol): - raise_on_commit: bool = False - - def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - """Retrieve the first occurrence of the blob identified by `id`.""" - ... - - def content_find_all( - self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[ProvenanceResult, None, None]: - """Retrieve all the occurrences of the blob identified by `id`.""" - ... - - def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return - a boolean stating whether the information was successfully stored. - """ - ... - - def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: - """Retrieve the associated date for each blob sha1 in `ids`. If some blob has - no associated date, it is not present in the resulting dictionary. - """ - ... - - def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to directories identified by sha1 ids, as paired in - `dates`. Return a boolean stating whether the information was successfully - stored. - """ - ... - - def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: - """Retrieve the associated date for each directory sha1 in `ids`. If some - directory has no associated date, it is not present in the resulting dictionary. - """ - ... - - def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: - """Retrieve all sha1 ids for entities of type `entity` present in the provenance - model. - """ - ... - - def location_get(self) -> Set[bytes]: - """Retrieve all paths present in the provenance model.""" - ... - - def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: - """Associate urls to origins identified by sha1 ids, as paired in `urls`. Return - a boolean stating whether the information was successfully stored. - """ - ... - - def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: - """Retrieve the associated url for each origin sha1 in `ids`. If some origin has - no associated date, it is not present in the resulting dictionary. - """ - ... - - def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to revisions identified by sha1 ids, as paired in `dates`. - Return a boolean stating whether the information was successfully stored. - """ - ... - - def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: - """Associate origins to revisions identified by sha1 ids, as paired in - `origins` (revision ids are keys and origin ids, values). Return a boolean - stating whether the information was successfully stored. - """ - ... - - def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: - """Retrieve the associated date and origin for each revision sha1 in `ids`. If - some revision has no associated date nor origin, it is not present in the - resulting dictionary. - """ - ... - - def relation_add( - self, relation: RelationType, data: Iterable[RelationData] - ) -> bool: - """Add entries in the selected `relation`.""" - ... - - def relation_get( - self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False - ) -> Set[RelationData]: - """Retrieve all entries in the selected `relation` whose source entities are - identified by some sha1 id in `ids`. If `reverse` is set, destination entities - are matched instead. - """ - ... - - def relation_get_all(self, relation: RelationType) -> Set[RelationData]: - """Retrieve all entries in the selected `relation` that are present in the - provenance model. - """ - ... - - -@runtime_checkable -class ProvenanceInterface(Protocol): - storage: ProvenanceStorageInterface + def clear_caches(self) -> None: + self.cache = new_cache() def flush(self) -> None: - """Flush internal cache to the underlying `storage`.""" - ... + # Revision-content layer insertions ############################################ + + # For this layer, relations need to be inserted first so that, in case of + # failure, reprocessing the input does not generated an inconsistent database. + while not self.storage.relation_add( + RelationType.CNT_EARLY_IN_REV, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["content_in_revision"] + ), + ): + logging.warning( + f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " + f"Data: {self.cache['content_in_revision']}. Retrying..." + ) + + while not self.storage.relation_add( + RelationType.CNT_IN_DIR, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["content_in_directory"] + ), + ): + logging.warning( + f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " + f"Data: {self.cache['content_in_directory']}. Retrying..." + ) + + while not self.storage.relation_add( + RelationType.DIR_IN_REV, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["directory_in_revision"] + ), + ): + logging.warning( + f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " + f"Data: {self.cache['directory_in_revision']}. Retrying..." + ) + + # After relations, dates for the entities can be safely set, acknowledging that + # these entities won't need to be reprocessed in case of failure. + dates = { + sha1: date + for sha1, date in self.cache["content"]["data"].items() + if sha1 in self.cache["content"]["added"] and date is not None + } + while not self.storage.content_set_date(dates): + logging.warning( + f"Unable to write content dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + dates = { + sha1: date + for sha1, date in self.cache["directory"]["data"].items() + if sha1 in self.cache["directory"]["added"] and date is not None + } + while not self.storage.directory_set_date(dates): + logging.warning( + f"Unable to write directory dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + dates = { + sha1: date + for sha1, date in self.cache["revision"]["data"].items() + if sha1 in self.cache["revision"]["added"] and date is not None + } + while not self.storage.revision_set_date(dates): + logging.warning( + f"Unable to write revision dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + # Origin-revision layer insertions ############################################# + + # Origins urls should be inserted first so that internal ids' resolution works + # properly. + urls = { + sha1: date + for sha1, date in self.cache["origin"]["data"].items() + if sha1 in self.cache["origin"]["added"] + } + while not self.storage.origin_set_url(urls): + logging.warning( + f"Unable to write origins urls to the storage. " + f"Data: {urls}. Retrying..." + ) + + # Second, flat models for revisions' histories (ie. revision-before-revision). + data: Iterable[RelationData] = sum( + [ + [ + RelationData(src=prev, dst=next, path=None) + for next in self.cache["revision_before_revision"][prev] + ] + for prev in self.cache["revision_before_revision"] + ], + [], + ) + while not self.storage.relation_add(RelationType.REV_BEFORE_REV, data): + logging.warning( + f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " + f"Data: {data}. Retrying..." + ) + + # Heads (ie. revision-in-origin entries) should be inserted once flat models for + # their histories were already added. This is to guarantee consistent results if + # something needs to be reprocessed due to a failure: already inserted heads + # won't get reprocessed in such a case. + data = ( + RelationData(src=rev, dst=org, path=None) + for rev, org in self.cache["revision_in_origin"] + ) + while not self.storage.relation_add(RelationType.REV_IN_ORG, data): + logging.warning( + f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " + f"Data: {data}. Retrying..." + ) + + # Finally, preferred origins for the visited revisions are set (this step can be + # reordered if required). + origins = { + sha1: self.cache["revision_origin"]["data"][sha1] + for sha1 in self.cache["revision_origin"]["added"] + } + while not self.storage.revision_set_origin(origins): + logging.warning( + f"Unable to write preferred origins to the storage. " + f"Data: {origins}. Retrying..." + ) + + # clear local cache ############################################################ + self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: - """Associate `blob` with `directory` in the provenance model. `prefix` is the - relative path from `directory` to `blob` (excluding `blob`'s name). - """ - ... + self.cache["content_in_directory"].add( + (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) + ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: - """Associate `blob` with `revision` in the provenance model. `prefix` is the - absolute path from `revision`'s root directory to `blob` (excluding `blob`'s - name). - """ - ... + self.cache["content_in_revision"].add( + (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) + ) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - """Retrieve the first occurrence of the blob identified by `id`.""" - ... + return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: - """Retrieve all the occurrences of the blob identified by `id`.""" - ... + yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - """Retrieve the earliest known date of `blob`.""" - ... + return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: - """Retrieve the earliest known date for each blob in `blobs`. If some blob has - no associated date, it is not present in the resulting dictionary. - """ - ... + return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: - """Associate `date` to `blob` as it's earliest known date.""" - ... + self.cache["content"]["data"][blob.id] = date + self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: - """Associate `directory` with `revision` in the provenance model. `path` is the - absolute path from `revision`'s root directory to `directory` (including - `directory`'s name). - """ - ... + self.cache["directory_in_revision"].add( + (directory.id, revision.id, normalize(path)) + ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: - """Retrieve the earliest known date of `directory` as an isochrone frontier in - the provenance model. - """ - ... + return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: - """Retrieve the earliest known date for each directory in `dirs` as isochrone - frontiers provenance model. If some directory has no associated date, it is not - present in the resulting dictionary. - """ - ... + return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: - """Associate `date` to `directory` as it's earliest known date as an isochrone - frontier in the provenance model. - """ - ... + self.cache["directory"]["data"][directory.id] = date + self.cache["directory"]["added"].add(directory.id) + + def get_dates( + self, + entity: Literal["content", "directory", "revision"], + ids: Iterable[Sha1Git], + ) -> Dict[Sha1Git, datetime]: + cache = self.cache[entity] + missing_ids = set(id for id in ids if id not in cache) + if missing_ids: + if entity == "revision": + updated = { + id: rev.date + for id, rev in self.storage.revision_get(missing_ids).items() + if rev.date is not None + } + else: + updated = getattr(self.storage, f"{entity}_get")(missing_ids) + cache["data"].update(updated) + dates: Dict[Sha1Git, datetime] = {} + for sha1 in ids: + date = cache["data"].get(sha1) + if date is not None: + dates[sha1] = date + return dates def origin_add(self, origin: OriginEntry) -> None: - """Add `origin` to the provenance model.""" - ... + self.cache["origin"]["data"][origin.id] = origin.url + self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry) -> None: - """Add `revision` to the provenance model. This implies storing `revision`'s - date in the model, thus `revision.date` must be a valid date. - """ - ... + self.cache["revision"]["data"][revision.id] = revision.date + self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: - """Associate `revision` to `head` as an ancestor of the latter.""" - ... + self.cache["revision_before_revision"].setdefault(revision.id, set()).add( + head.id + ) def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: - """Associate `revision` to `origin` as a head revision of the latter (ie. the - target of an snapshot for `origin` in the archive).""" - ... + self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: - """Retrieve the date associated to `revision`.""" - ... + return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: - """Retrieve the preferred origin associated to `revision`.""" - ... + cache = self.cache["revision_origin"]["data"] + if revision.id not in cache: + ret = self.storage.revision_get([revision.id]) + if revision.id in ret: + origin = ret[revision.id].origin + if origin is not None: + cache[revision.id] = origin + return cache.get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: - """Check if `revision` is known to be an ancestor of some head revision in the - provenance model. - """ - ... + return revision.id in self.cache["revision_before_revision"] or bool( + self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) + ) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: - """Associate `origin` as the preferred origin for `revision`.""" - ... + self.cache["revision_origin"]["data"][revision.id] = origin.id + self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: - """Check if `revision` is known to be a head revision for some origin in the - provenance model. - """ - ... + return revision.id in dict(self.cache["revision_in_origin"]) or bool( + self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) + ) + + +def normalize(path: bytes) -> bytes: + return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 30c40bc..7949db9 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,242 +1,242 @@ from datetime import datetime, timezone import logging import os import time from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import IsochroneNode, build_isochrone_graph +from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry -from .provenance import ProvenanceInterface class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, ) -> None: self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: from itertools import islice self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self) -> Generator[RevisionEntry, None, None]: for id, date, root in self.revisions: if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) yield RevisionEntry(id, date=date, root=root) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, commit: bool = True, ) -> None: start = time.time() for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_date(revision) if date is None or revision.date < date: logging.debug( f"Processing revisions {revision.id.hex()}" f" (known date {date} / revision date {revision.date})..." ) graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), ) # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, ) done = time.time() if commit: provenance.flush() stop = time.time() logging.debug( f"Revisions {';'.join([revision.id.hex() for revision in revisions])} " f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> None: assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) flatten_directory(archive, provenance, current.entry) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() current.retrieve_children(archive) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: # The only real condition for a directory to be a frontier is that its # content is already known and its maxdate is less (or equal) than # current revision's date. Checking mindepth is meant to skip root # directories (or any arbitrary depth) to improve the result. The # option lower tries to maximize the reusage rate of previously defined # frontiers by keeping them low in the directory tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an extrictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index 8663ac7..63947c3 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,249 +1,249 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from os import path import re from typing import Any, Dict, Iterable, Iterator, List, Optional import msgpack import psycopg2 import pytest from typing_extensions import TypedDict from swh.core.db import BaseDb from swh.journal.serializers import msgpack_ext_hook from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance import get_provenance from swh.provenance.archive import ArchiveInterface +from swh.provenance.interface import ProvenanceInterface from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase -from swh.provenance.provenance import ProvenanceInterface from swh.provenance.storage.archive import ArchiveStorage from swh.storage.postgresql.storage import Storage from swh.storage.replay import process_replay_objects @pytest.fixture(params=["with-path", "without-path"]) def provenance( request, # TODO: add proper type annotation postgresql: psycopg2.extensions.connection, ) -> ProvenanceInterface: """return a working and initialized provenance db""" from swh.core.cli.db import populate_database_for_package flavor = request.param populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor) BaseDb.adapt_conn(postgresql) args: Dict[str, str] = { item.split("=")[0]: item.split("=")[1] for item in postgresql.dsn.split() if item.split("=")[0] != "options" } prov = get_provenance(cls="local", db=args) assert isinstance(prov.storage, ProvenanceDBBase) assert prov.storage.flavor == flavor # in test sessions, we DO want to raise any exception occurring at commit time prov.storage.raise_on_commit = True return prov @pytest.fixture def swh_storage_with_objects(swh_storage: Storage) -> Storage: """return a Storage object (postgresql-based by default) with a few of each object type in it The inserted content comes from swh.model.tests.swh_model_data. """ for obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", ): getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) return swh_storage @pytest.fixture def archive_direct(swh_storage_with_objects: Storage) -> ArchiveInterface: return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) @pytest.fixture def archive_api(swh_storage_with_objects: Storage) -> ArchiveInterface: return ArchiveStorage(swh_storage_with_objects) @pytest.fixture(params=["archive", "db"]) def archive(request, swh_storage_with_objects: Storage) -> Iterator[ArchiveInterface]: """Return a ArchivePostgreSQL based StorageInterface object""" # this is a workaround to prevent tests from hanging because of an unclosed # transaction. # TODO: refactor the ArchivePostgreSQL to properly deal with # transactions and get rid of this fixture if request.param == "db": archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) yield archive archive.conn.rollback() else: yield ArchiveStorage(swh_storage_with_objects) def get_datafile(fname: str) -> str: return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo: str) -> Dict[str, Any]: data: Dict[str, Any] = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for objtype, objd in unpacker: data.setdefault(objtype, []).append(objd) return data def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: return {k: v for (k, v) in d.items() if k in keys} def fill_storage(storage: Storage, data: Dict[str, Any]) -> None: process_replay_objects(data, storage=storage) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_file(fobj) def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-z]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py index 8690698..2fd66ec 100644 --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -1,22 +1,22 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.provenance.provenance import ProvenanceInterface +from swh.provenance.interface import ProvenanceInterface from swh.storage.postgresql.storage import Storage def test_provenance_fixture(provenance: ProvenanceInterface) -> None: """Check the 'provenance' fixture produce a working ProvenanceDB object""" assert provenance provenance.flush() # should be a noop def test_storage(swh_storage_with_objects: Storage) -> None: """Check the 'swh_storage_with_objects' fixture produce a working Storage object with at least some Content, Revision and Directory in it""" assert swh_storage_with_objects assert swh_storage_with_objects.content_get_random() assert swh_storage_with_objects.directory_get_random() assert swh_storage_with_objects.revision_get_random() diff --git a/swh/provenance/tests/test_history_graph.py b/swh/provenance/tests/test_history_graph.py index 1062502..adbbd5a 100644 --- a/swh/provenance/tests/test_history_graph.py +++ b/swh/provenance/tests/test_history_graph.py @@ -1,74 +1,74 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict import pytest import yaml from swh.model.hashutil import hash_to_bytes from swh.provenance.archive import ArchiveInterface from swh.provenance.graph import HistoryNode, build_history_graph +from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.origin import origin_add_revision -from swh.provenance.provenance import ProvenanceInterface from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data from swh.storage.postgresql.storage import Storage def history_graph_from_dict(d: Dict[str, Any]) -> HistoryNode: """Takes a dictionary representing a tree of HistoryNode objects, and recursively builds the corresponding graph.""" node = HistoryNode( entry=RevisionEntry(hash_to_bytes(d["rev"])), visited=d.get("visited", False), in_history=d.get("in_history", False), ) node.parents = set( history_graph_from_dict(parent) for parent in d.get("parents", []) ) return node @pytest.mark.parametrize( "repo, visit", (("with-merges", "visits-01"),), ) @pytest.mark.parametrize("batch", (True, False)) def test_history_graph( provenance: ProvenanceInterface, swh_storage: Storage, archive: ArchiveInterface, repo: str, visit: str, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) filename = f"history_graphs_{repo}_{visit}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): entry = OriginEntry(expected["origin"], hash_to_bytes(expected["snapshot"])) provenance.origin_add(entry) for graph_as_dict in expected["graphs"]: expected_graph = history_graph_from_dict(graph_as_dict) print("Expected graph:", expected_graph) computed_graph = build_history_graph( archive, provenance, RevisionEntry(hash_to_bytes(graph_as_dict["rev"])), ) print("Computed graph:", computed_graph) assert computed_graph == expected_graph origin_add_revision(provenance, entry, computed_graph) if not batch: provenance.flush() diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py index e0adc7b..9b65cd5 100644 --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -1,112 +1,112 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from datetime import datetime, timezone from typing import Any, Dict import pytest import yaml from swh.model.hashutil import hash_to_bytes from swh.provenance.archive import ArchiveInterface from swh.provenance.graph import IsochroneNode, build_isochrone_graph +from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import DirectoryEntry, RevisionEntry -from swh.provenance.provenance import ProvenanceInterface from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data from swh.provenance.tests.test_provenance_db import ts2dt from swh.storage.postgresql.storage import Storage def isochrone_graph_from_dict(d: Dict[str, Any], depth: int = 0) -> IsochroneNode: """Takes a dictionary representing a tree of IsochroneNode objects, and recursively builds the corresponding graph.""" d = deepcopy(d) d["entry"]["id"] = hash_to_bytes(d["entry"]["id"]) d["entry"]["name"] = bytes(d["entry"]["name"], encoding="utf-8") dbdate = d.get("dbdate", None) if dbdate is not None: dbdate = datetime.fromtimestamp(d["dbdate"], timezone.utc) children = d.get("children", []) node = IsochroneNode( entry=DirectoryEntry(**d["entry"]), dbdate=dbdate, depth=depth, ) node.maxdate = datetime.fromtimestamp(d["maxdate"], timezone.utc) node.known = d.get("known", False) node.invalid = d.get("invalid", False) node.path = bytes(d["path"], encoding="utf-8") node.children = set( isochrone_graph_from_dict(child, depth=depth + 1) for child in children ) return node @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_isochrone_graph( provenance: ProvenanceInterface, swh_storage: Storage, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = {rev["id"]: rev for rev in data["revision"]} filename = f"graphs_{repo}_{'lower' if lower else 'upper'}_{mindepth}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): print("# Processing revision", expected["rev"]) revision = revisions[hash_to_bytes(expected["rev"])] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) expected_graph = isochrone_graph_from_dict(expected["graph"]) print("Expected graph:", expected_graph) # Create graph for current revision and check it has the expected structure. assert entry.root is not None computed_graph = build_isochrone_graph( archive, provenance, entry, DirectoryEntry(entry.root), ) print("Computed graph:", computed_graph) assert computed_graph == expected_graph # Add current revision so that provenance info is kept up to date for the # following ones. revision_add( provenance, archive, [entry], lower=lower, mindepth=mindepth, commit=not batch, ) diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py index ab55763..3e84768 100644 --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -1,51 +1,51 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone from typing import Type from swh.model.model import OriginVisitStatus from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.model import OriginEntry from swh.provenance.origin import origin_add from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase from swh.provenance.postgresql.provenancedb_with_path import ProvenanceWithPathDB from swh.provenance.postgresql.provenancedb_without_path import ProvenanceWithoutPathDB -from swh.provenance.provenance import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.storage.archive import ArchiveStorage from swh.storage.postgresql.storage import Storage # TODO: remove this function in favour of TimestampWithTimezone.to_datetime # from swh.model.model def ts2dt(ts: dict) -> datetime: timestamp = datetime.fromtimestamp( ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) ) return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) def test_provenance_origin_add( provenance: ProvenanceInterface, swh_storage_with_objects: Storage ) -> None: """Test the origin_add function""" archive = ArchiveStorage(swh_storage_with_objects) for status in TEST_OBJECTS["origin_visit_status"]: assert isinstance(status, OriginVisitStatus) if status.snapshot is not None: entry = OriginEntry(url=status.origin, snapshot=status.snapshot) origin_add(provenance, archive, [entry]) # TODO: check some facts here def test_provenance_flavor(provenance: ProvenanceInterface) -> None: assert isinstance(provenance.storage, ProvenanceDBBase) assert provenance.storage.flavor in ("with-path", "without-path") backend_class: Type[ProvenanceStorageInterface] if provenance.storage.flavor == "with-path": backend_class = ProvenanceWithPathDB else: backend_class = ProvenanceWithoutPathDB assert isinstance(provenance.storage, backend_class) diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py index 95ecdb4..22c5ada 100644 --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -1,331 +1,331 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List, Optional, Set, Tuple import pytest from swh.model.hashutil import hash_to_bytes from swh.provenance.archive import ArchiveInterface +from swh.provenance.interface import EntityType, ProvenanceInterface, RelationType from swh.provenance.model import RevisionEntry from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase -from swh.provenance.provenance import EntityType, ProvenanceInterface, RelationType from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, synthetic_result, ) from swh.provenance.tests.test_provenance_db import ts2dt from swh.storage.postgresql.storage import Storage @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics( provenance: ProvenanceInterface, swh_storage: Storage, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows: Dict[str, Set[Any]] = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } def maybe_path(path: str) -> Optional[bytes]: assert isinstance(provenance.storage, ProvenanceDBBase) if provenance.storage.with_path: return path.encode("utf-8") return None for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"]) assert rows["revision"] == provenance.storage.entity_get_all( EntityType.REVISION ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] rev_data = provenance.storage.revision_get([synth_rev["sha1"]])[ synth_rev["sha1"] ] assert ( rev_data.date is not None and rev_ts == rev_data.date.timestamp() ), synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) assert rows["content"] == provenance.storage.entity_get_all( EntityType.CONTENT ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_C"] ) assert rows["content_in_revision"] == { (rel.src, rel.dst, rel.path) for rel in provenance.storage.relation_get_all( RelationType.CNT_EARLY_IN_REV ) }, synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert ( rev_ts + rc["rel_ts"] == provenance.storage.content_get([rc["dst"]])[rc["dst"]].timestamp() ), synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"]) assert rows["directory"] == provenance.storage.entity_get_all( EntityType.DIRECTORY ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_D"] ) assert rows["directory_in_revision"] == { (rel.src, rel.dst, rel.path) for rel in provenance.storage.relation_get_all(RelationType.DIR_IN_REV) }, synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: assert ( rev_ts + rd["rel_ts"] == provenance.storage.directory_get([rd["dst"]])[rd["dst"]].timestamp() ), synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["D_C"] ) assert rows["content_in_directory"] == { (rel.src, rel.dst, rel.path) for rel in provenance.storage.relation_get_all(RelationType.CNT_IN_DIR) }, synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert ( rev_ts + dc["rel_ts"] == provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp() ), synth_rev["msg"] assert isinstance(provenance.storage, ProvenanceDBBase) if provenance.storage.with_path: # check for location entries rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) assert rows["location"] == provenance.storage.location_get(), synth_rev[ "msg" ] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( provenance: ProvenanceInterface, swh_storage: Storage, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] def maybe_path(path: str) -> str: assert isinstance(provenance.storage, ProvenanceDBBase) if provenance.storage.with_path: return path return "" if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences: Dict[str, List[Tuple[str, float, Optional[str], str]]] = {} for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) ) assert isinstance(provenance.storage, ProvenanceDBBase) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ ( occur.content.hex(), occur.revision.hex(), occur.date.timestamp(), occur.origin, occur.path.decode(), ) for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] if provenance.storage.with_path: # this is not true if the db stores no path, because a same content # that appears several times in a given revision may be reported # only once by content_find_all() assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_first( provenance: ProvenanceInterface, swh_storage: Storage, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, float, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" assert isinstance(provenance.storage, ProvenanceDBBase) for content_id, (rev_id, ts, paths) in expected_first.items(): occur = provenance.content_find_first(hash_to_bytes(content_id)) assert occur is not None assert occur.content.hex() == content_id assert occur.revision.hex() == rev_id assert occur.date.timestamp() == ts assert occur.origin is None if provenance.storage.with_path: assert occur.path.decode() in paths