diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py index 4e77eb6..e80cdca 100644 --- a/swh/provenance/graph.py +++ b/swh/provenance/graph.py @@ -1,257 +1,254 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from datetime import datetime, timezone import os from typing import Any, Dict, Optional, Set from swh.core.statsd import statsd from swh.model.hashutil import hash_to_hex from swh.model.model import Sha1Git from .archive import ArchiveInterface from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry GRAPH_DURATION_METRIC = "swh_provenance_graph_duration_seconds" GRAPH_OPERATIONS_METRIC = "swh_provenance_graph_operations_total" UTCMIN = datetime.min.replace(tzinfo=timezone.utc) class HistoryNode: def __init__( self, entry: RevisionEntry, is_head: bool = False, in_history: bool = False ) -> None: self.entry = entry # A revision is `is_head` if it is directly pointed by an origin (ie. a head # revision for some snapshot) self.is_head = is_head # A revision is `in_history` if it appears in the history graph of an already # processed revision in the provenance database self.in_history = in_history # XXX: the current simplified version of the origin-revision layer algorithm # does not use this previous two flags at all. They are kept for now but might # be removed in the future (hence, RevisionEntry might be used instead of # HistoryNode). def __str__(self) -> str: return f"<{self.entry}: is_head={self.is_head}, in_history={self.in_history}>" def as_dict(self) -> Dict[str, Any]: return { "rev": hash_to_hex(self.entry.id), "is_head": self.is_head, "in_history": self.in_history, } class HistoryGraph: @statsd.timed(metric=GRAPH_DURATION_METRIC, tags={"method": "build_history_graph"}) def __init__( self, provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, ) -> None: self._head = HistoryNode( revision, is_head=provenance.revision_visited(revision), in_history=provenance.revision_in_history(revision), ) self._graph: Dict[HistoryNode, Set[HistoryNode]] = {} stack = [self._head] while stack: current = stack.pop() if current not in self._graph: self._graph[current] = set() current.entry.retrieve_parents(archive) for parent in current.entry.parents: node = HistoryNode( parent, is_head=provenance.revision_visited(parent), in_history=provenance.revision_in_history(parent), ) self._graph[current].add(node) stack.append(node) @property def head(self) -> HistoryNode: return self._head @property def parents(self) -> Dict[HistoryNode, Set[HistoryNode]]: return self._graph def __str__(self) -> str: return f" Dict[str, Any]: return { "head": self.head.as_dict(), "graph": { hash_to_hex(node.entry.id): sorted( [parent.as_dict() for parent in parents], key=lambda d: d["rev"], ) for node, parents in self._graph.items() }, } class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ) -> None: self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: Set[IsochroneNode] = set() @property def dbdate(self) -> Optional[datetime]: # use a property to make this attribute (mostly) read-only return self._dbdate def invalidate(self) -> None: statsd.increment( metric=GRAPH_OPERATIONS_METRIC, tags={"method": "invalidate_frontier"} ) self._dbdate = None self.maxdate = None self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> IsochroneNode: # we should not be processing this node (ie add subdirectories or files) if it's # actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.add(node) return node def __str__(self) -> str: return ( f"<{self.entry}: depth={self.depth}, dbdate={self.dbdate}, " f"maxdate={self.maxdate}, invalid={self.invalid}, path={self.path!r}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) def __eq__(self, other: Any) -> bool: return isinstance(other, IsochroneNode) and self.__dict__ == other.__dict__ def __hash__(self) -> int: # only immutable attributes are considered to compute hash return hash((self.entry, self.depth, self.path)) @statsd.timed(metric=GRAPH_DURATION_METRIC, tags={"method": "build_isochrone_graph"}) def build_isochrone_graph( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, directory: DirectoryEntry, minsize: int = 0, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] fdates: Dict[Sha1Git, datetime] = {} # map {file_id: date} while stack: current = stack.pop() if current.dbdate is None or current.dbdate >= revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate >= revision.date: current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. current.entry.retrieve_children(archive, minsize=minsize) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.dbdate is not None: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None # if all content is already known, update current directory info. - current.maxdate = min( - max( - [UTCMIN] - + [ - child.maxdate - for child in current.children - if child.maxdate is not None # for mypy - ] - + [ - fdates.get(file.id, revision.date) - for file in current.entry.files - ] - ), - revision.date, + current.maxdate = max( + [UTCMIN] + + [ + child.maxdate + for child in current.children + if child.maxdate is not None # for mypy + ] + + [ + fdates.get(file.id, revision.date) + for file in current.entry.files + ] ) return root diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index c7c36af..3fbfc57 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,199 +1,199 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git from .archive import ArchiveInterface from .directory import directory_flatten from .graph import IsochroneNode, build_isochrone_graph from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry REVISION_DURATION_METRIC = "swh_provenance_revision_content_layer_duration_seconds" class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, ) -> None: self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: from itertools import islice self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self) -> Generator[RevisionEntry, None, None]: for id, date, root in self.revisions: if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) yield RevisionEntry(id, date=date, root=root) @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"}) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, flatten: bool = True, lower: bool = True, mindepth: int = 1, minsize: int = 0, commit: bool = True, ) -> None: for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_date(revision) if date is None or revision.date < date: graph = build_isochrone_graph( provenance, archive, revision, DirectoryEntry(revision.root), minsize=minsize, ) revision_process_content( provenance, archive, revision, graph, trackall=trackall, flatten=flatten, lower=lower, mindepth=mindepth, minsize=minsize, ) if commit: provenance.flush() @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "process_content"}) def revision_process_content( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, flatten: bool = True, lower: bool = True, mindepth: int = 1, minsize: int = 0, ) -> None: assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: - assert current.dbdate <= revision.date + assert current.dbdate < revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) if flatten: directory_flatten( provenance, archive, current.entry, minsize=minsize ) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( - current.entry, current.maxdate + current.entry, revision.date ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem # We want to ensure that all first occurrences end up in the content_early_in_rev # relation. Thus, we force for every blob outside a frontier to have an strictly # earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any(current.entry.files): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! diff --git a/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml b/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml index a4aad93..e2b78d7 100644 --- a/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml +++ b/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml @@ -1,174 +1,174 @@ # Isochrone graph for R00 - rev: "c0d8929936631ecbcf9147be6b8aa13b13b014e4" graph: entry: id: "a4cb5e6b2831f7e8eef0e6e08e43d642c97303a1" name: "" maxdate: 1000000000.0 path: "" children: - entry: id: "1c8d9fd9afa7e5a2cf52a3db6f05dc5c3a1ca86b" name: "A" maxdate: 1000000000.0 path: "A" children: - entry: id: "36876d475197b5ad86ad592e8e28818171455f16" name: "B" maxdate: 1000000000.0 path: "A/B" children: - entry: id: "98f7a4a23d8df1fb1a5055facae2aff9b2d0a8b3" name: "C" maxdate: 1000000000.0 path: "A/B/C" # Isochrone graph for R01 - rev: "1444db96cbd8cd791abe83527becee73d3c64e86" graph: entry: id: "b3cf11b22c9f93c3c494cf90ab072f394155072d" name: "" maxdate: 1000000010.0 path: "" children: - entry: id: "baca735bf8b8720131b4bfdb47c51631a9260348" name: "A" maxdate: 1000000010.0 path: "A" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "B" maxdate: 1000000010.0 path: "A/B" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" maxdate: 1000000010.0 path: "A/B/C" # Isochrone graph for R02 - rev: "1c533587277731236616cac0d44f3b46c1da0f8a" graph: entry: id: "2afae58027276dad2bdced5a505e8d781a7add5b" name: "" maxdate: 1000000010.0 path: "" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "A" maxdate: 1000000010.0 path: "A" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" maxdate: 1000000010.0 path: "A/C" # Isochrone graph for R03 - rev: "20f4da0f48609d9f7f908ebbcac3b3741a0f25cb" graph: entry: id: "b3cf11b22c9f93c3c494cf90ab072f394155072d" name: "" maxdate: 1000000010.0 path: "" children: - entry: id: "baca735bf8b8720131b4bfdb47c51631a9260348" name: "A" maxdate: 1000000010.0 path: "A" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "B" maxdate: 1000000010.0 path: "A/B" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" dbdate: 1000000010.0 maxdate: 1000000010.0 path: "A/B/C" # Isochrone graph for R04 - rev: "0d66eadcc15e0d7f6cfd4289329a7749a1309982" graph: entry: id: "2afae58027276dad2bdced5a505e8d781a7add5b" name: "" maxdate: 1000000010.0 path: "" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "A" maxdate: 1000000010.0 path: "A" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" dbdate: 1000000010.0 maxdate: 1000000010.0 path: "A/C" # Isochrone graph for R05 - rev: "1dfac0491892096948d6a02bf12a2fed4bf75743" graph: entry: id: "b3cf11b22c9f93c3c494cf90ab072f394155072d" name: "" - maxdate: 1000000005.0 + maxdate: 1000000010.0 path: "" children: - entry: id: "baca735bf8b8720131b4bfdb47c51631a9260348" name: "A" - maxdate: 1000000005.0 + maxdate: 1000000010.0 path: "A" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "B" - maxdate: 1000000005.0 + maxdate: 1000000010.0 path: "A/B" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" - maxdate: 1000000005.0 + maxdate: 1000000010.0 invalid: True path: "A/B/C" # Isochrone graph for R06 - rev: "53519b5a5e8cf12a4f81f82e489f95c1d04d5314" graph: entry: id: "195601c98c28f04e0d19c218434738006990db72" name: "" maxdate: 1000000050.0 path: "" children: - entry: id: "d591b308488541aabffd854eae85a9bf83a9d9f5" name: "A" maxdate: 1000000050.0 path: "A" children: - entry: id: "0e540a8ebea2f5de3e62b92e2139902cf6f46e92" name: "B" maxdate: 1000000050.0 path: "A/B" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" dbdate: 1000000005.0 maxdate: 1000000005.0 path: "A/B/C"