diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,7 +1,7 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Iterable, Optional, Set +from typing import Any, Dict, Iterable, Optional import psycopg2 import psycopg2.extras @@ -23,7 +23,6 @@ # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") self.insert_cache: Dict[str, Any] = {} - self.remove_cache: Dict[str, Set[bytes]] = {} self.select_cache: Dict[str, Any] = {} self.clear_caches() @@ -38,7 +37,6 @@ "revision_before_rev": list(), "revision_in_org": list(), } - self.remove_cache = {"directory": set()} self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): @@ -108,7 +106,7 @@ ) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) - if date is None and directory.id not in self.remove_cache["directory"]: + if date is None: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is None: @@ -131,7 +129,7 @@ date = self.insert_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date - elif directory.id not in self.remove_cache["directory"]: + else: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is not None: @@ -150,15 +148,10 @@ self.select_cache["directory"][sha1] = date return dates - def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): - self.remove_cache["directory"].add(directory.id) - self.insert_cache["directory"].pop(directory.id, None) - def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.insert_cache["directory"][directory.id] = date - self.remove_cache["directory"].discard(directory.id) def insert_all(self): # Performe insertions with cached information diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -68,11 +68,6 @@ ) -> Dict[bytes, datetime]: ... - def directory_invalidate_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> None: - ... - def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: @@ -215,6 +210,7 @@ trackall: bool = True, lower: bool = True, mindepth: int = 1, + commit: bool = True, ) -> None: start = time.time() for revision in revisions: @@ -244,14 +240,15 @@ mindepth=mindepth, ) done = time.time() - # TODO: improve this! Maybe using a max attempt counter? - # Ideally Provenance class should guarantee that a commit never fails. - while not provenance.commit(): - logging.warning( - "Could not commit revisions " - + ";".join([hash_to_hex(revision.id) for revision in revisions]) - + ". Retrying..." - ) + if commit: + # TODO: improve this! Maybe using a max attempt counter? + # Ideally Provenance class should guarantee that a commit never fails. + while not provenance.commit(): + logging.warning( + "Could not commit revisions " + + ";".join([hash_to_hex(revision.id) for revision in revisions]) + + ". Retrying..." + ) stop = time.time() logging.debug( f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " @@ -283,7 +280,8 @@ # known is True if this node is already known in the db; either because # the current directory actually exists in the database, or because all # the content of the current directory is known (subdirectories and files) - self.known: bool = self.dbdate is not None + self.known = self.dbdate is not None + self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: List[IsochroneNode] = [] @@ -296,6 +294,7 @@ self._dbdate = None self.maxdate = None self.known = False + self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None @@ -309,9 +308,8 @@ def __str__(self): return ( - f"<{self.entry}: " - f"known={self.known}, maxdate={self.maxdate}, " - f"dbdate={self.dbdate}, path={self.path}, " + f"<{self.entry}: dbdate={self.dbdate}, maxdate={self.maxdate}, " + f"known={self.known}, invalid={self.invalid}, path={self.path}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) @@ -324,8 +322,8 @@ ) or self.maxdate == other.maxdate return ( isinstance(other, IsochroneNode) - and (self.entry, self.depth, self.known, self.path) - == (other.entry, other.depth, other.known, other.path) + and (self.entry, self.depth, self.known, self.invalid, self.path) + == (other.entry, other.depth, other.known, other.invalid, other.path) and sameDbDate and sameMaxdate and set(self.children) == set(other.children) @@ -333,7 +331,15 @@ def __hash__(self): return hash( - (self.entry, self.depth, self._dbdate, self.maxdate, self.known, self.path) + ( + self.entry, + self.depth, + self._dbdate, + self.maxdate, + self.known, + self.invalid, + self.path, + ) ) @@ -378,7 +384,6 @@ f" when processing revision {hash_to_hex(revision.id)}" f" (date {revision.date})" ) - provenance.directory_invalidate_in_isochrone_frontier(current.entry) current.invalidate() # Pre-query all known dates for directories in the current directory @@ -423,7 +428,7 @@ # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None - # If all content is already known, update current directory info. + # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ @@ -436,14 +441,21 @@ for file in current.entry.files ] ) - current.known = ( - # true if all subdirectories are known - all(child.known for child in current.children) - # true if all files are in fdates, i.e. if all files were known - # *before building this isochrone graph node* - # Note: the 'all()' is lazy: will stop iterating as soon as possible - and all((file.id in fdates) for file in current.entry.files) - ) + if current.maxdate <= revision.date: + current.known = ( + # true if all subdirectories are known + all(child.known for child in current.children) + # true if all files are in fdates, i.e. if all files were known + # *before building this isochrone graph node* + # Note: the 'all()' is lazy: will stop iterating as soon as + # possible + and all((file.id in fdates) for file in current.entry.files) + ) + else: + # at least one content is being processed out-of-order, then current + # node should be treated as unknown + current.maxdate = revision.date + current.known = False logging.debug( f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" ) @@ -474,6 +486,7 @@ revision, current.entry, current.path ) else: + assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( @@ -483,7 +496,6 @@ lower=lower, mindepth=mindepth, ): - assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. @@ -496,6 +508,12 @@ ) flatten_directory(archive, provenance, current.entry) else: + # If current node is an invalidated frontier, update its date for future + # revisions to get the proper value. + if current.invalid: + provenance.directory_set_date_in_isochrone_frontier( + current.entry, current.maxdate + ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse diff --git a/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml b/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml --- a/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml +++ b/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml @@ -158,32 +158,33 @@ id: "b3cf11b22c9f93c3c494cf90ab072f394155072d" name: "" dbdate: null - maxdate: 1000000010.0 - known: True # TODO: analyse this, as it might be a source of issues! + maxdate: 1000000005.0 + known: False path: "" children: - entry: id: "baca735bf8b8720131b4bfdb47c51631a9260348" name: "A" dbdate: null - maxdate: 1000000010.0 - known: True + maxdate: 1000000005.0 + known: False path: "A" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "B" dbdate: null - maxdate: 1000000010.0 - known: True + maxdate: 1000000005.0 + known: False path: "A/B" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" dbdate: null - maxdate: 1000000010.0 - known: True + maxdate: 1000000005.0 + known: False + invalid: True path: "A/B/C" children: [] # Isochrone graph for R06 @@ -215,8 +216,8 @@ - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" - dbdate: 1000000010.0 - maxdate: 1000000010.0 + dbdate: 1000000005.0 + maxdate: 1000000005.0 known: True path: "A/B/C" children: [] diff --git a/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt b/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt --- a/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt +++ b/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt @@ -37,6 +37,6 @@ # This is because the ts of the existing frontier (the R-D below) has not been updated by the # "new" version of the b file (aka older ts) of R05. # /!\ This is true only when ingesting revisions one at a time! - | R-D | A/B/C | D c9cabe7f49012e3fdef6ac6b929efb5654f583cf | -40 + | R-D | A/B/C | D c9cabe7f49012e3fdef6ac6b929efb5654f583cf | -45 | D-C | + a | C 20329687bb9c1231a7e05afe86160343ad49b494 | -50 | D-C | + b | C 50e9cdb03f9719261dd39d7f2920b906db3711a3 | -45 diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -34,7 +34,8 @@ depth=depth, ) node.maxdate = d["maxdate"] - node.known = d["known"] + node.known = d["known"] if "known" in d else False + node.invalid = d["invalid"] if "invalid" in d else False node.path = bytes(d["path"], encoding="utf-8") node.children = [ isochrone_graph_from_dict(child, depth=depth + 1) for child in d["children"] @@ -43,16 +44,23 @@ @pytest.mark.parametrize( - "repo, lower, mindepth", + "repo, lower, mindepth, batch", ( - ("cmdbts2", True, 1), - ("cmdbts2", False, 1), - ("cmdbts2", True, 2), - ("cmdbts2", False, 2), - ("out-of-order", True, 1), + ("cmdbts2", True, 1, False), + ("cmdbts2", True, 1, True), + ("cmdbts2", False, 1, False), + ("cmdbts2", False, 1, True), + ("cmdbts2", True, 2, False), + ("cmdbts2", True, 2, True), + ("cmdbts2", False, 2, False), + ("cmdbts2", False, 2, True), + ("out-of-order", True, 1, False), + ("out-of-order", True, 1, True), ), ) -def test_isochrone_graph(provenance, swh_storage, archive, repo, lower, mindepth): +def test_isochrone_graph( + provenance, swh_storage, archive, repo, lower, mindepth, batch +): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) @@ -64,6 +72,7 @@ expected = yaml.full_load(file) for rev, graph_as_dict in expected.items(): + print("# Processing revision", rev) revision = revisions[hash_to_bytes(rev)] entry = RevisionEntry( id=revision["id"], @@ -71,7 +80,7 @@ root=revision["directory"], ) expected_graph = isochrone_graph_from_dict(graph_as_dict) - print("Expected", expected_graph) + print("Expected graph:", expected_graph) # Create graph for current revision and check it has the expected structure. computed_graph = build_isochrone_graph( @@ -80,9 +89,16 @@ entry, DirectoryEntry(entry.root), ) - print("Computed", computed_graph) + print("Computed graph:", computed_graph) assert computed_graph == expected_graph # Add current revision so that provenance info is kept up to date for the # following ones. - revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) + revision_add( + provenance, + archive, + [entry], + lower=lower, + mindepth=mindepth, + commit=not batch, + ) diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -212,10 +212,10 @@ ] # XXX adding all revisions at once should be working just fine, but it does not... - # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) + revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now - for revision in revisions: - revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) + # for revision in revisions: + # revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt"