diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,7 +1,7 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Iterable, Optional, Set +from typing import Any, Dict, Iterable, Optional import psycopg2 import psycopg2.extras @@ -23,7 +23,6 @@ # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") self.insert_cache: Dict[str, Any] = {} - self.remove_cache: Dict[str, Set[bytes]] = {} self.select_cache: Dict[str, Any] = {} self.clear_caches() @@ -38,7 +37,6 @@ "revision_before_rev": list(), "revision_in_org": list(), } - self.remove_cache = {"directory": set()} self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): @@ -108,7 +106,7 @@ ) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) - if date is None and directory.id not in self.remove_cache["directory"]: + if date is None: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is None: @@ -131,7 +129,7 @@ date = self.insert_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date - elif directory.id not in self.remove_cache["directory"]: + else: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is not None: @@ -150,15 +148,10 @@ self.select_cache["directory"][sha1] = date return dates - def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): - self.remove_cache["directory"].add(directory.id) - self.insert_cache["directory"].pop(directory.id, None) - def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.insert_cache["directory"][directory.id] = date - self.remove_cache["directory"].discard(directory.id) def insert_all(self): # Performe insertions with cached information diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -69,11 +69,6 @@ ) -> Dict[bytes, datetime]: ... - def directory_invalidate_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> None: - ... - def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: @@ -217,6 +212,7 @@ trackall: bool = True, lower: bool = True, mindepth: int = 1, + commit: bool = True, ) -> None: start = time.time() for revision in revisions: @@ -246,14 +242,15 @@ mindepth=mindepth, ) done = time.time() - # TODO: improve this! Maybe using a max attempt counter? - # Ideally Provenance class should guarantee that a commit never fails. - while not provenance.commit(): - logging.warning( - "Could not commit revisions " - + ";".join([hash_to_hex(revision.id) for revision in revisions]) - + ". Retrying..." - ) + if commit: + # TODO: improve this! Maybe using a max attempt counter? + # Ideally Provenance class should guarantee that a commit never fails. + while not provenance.commit(): + logging.warning( + "Could not commit revisions " + + ";".join([hash_to_hex(revision.id) for revision in revisions]) + + ". Retrying..." + ) stop = time.time() logging.debug( f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " @@ -285,7 +282,8 @@ # known is True if this node is already known in the db; either because # the current directory actually exists in the database, or because all # the content of the current directory is known (subdirectories and files) - self.known: bool = self.dbdate is not None + self.known = self.dbdate is not None + self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: List[IsochroneNode] = [] @@ -298,6 +296,7 @@ self._dbdate = None self.maxdate = None self.known = False + self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None @@ -311,9 +310,8 @@ def __str__(self): return ( - f"<{self.entry}: " - f"known={self.known}, maxdate={self.maxdate}, " - f"dbdate={self.dbdate}, path={self.path}, " + f"<{self.entry}: dbdate={self.dbdate}, maxdate={self.maxdate}, " + f"known={self.known}, invalid={self.invalid}, path={self.path}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) @@ -326,6 +324,7 @@ self._dbdate, self.maxdate, self.known, + self.invalid, self.path, ) == ( @@ -334,6 +333,7 @@ other._dbdate, other.maxdate, other.known, + other.invalid, other.path, ) and Counter(self.children) == Counter(other.children) @@ -341,7 +341,15 @@ def __hash__(self): return hash( - (self.entry, self.depth, self._dbdate, self.maxdate, self.known, self.path) + ( + self.entry, + self.depth, + self._dbdate, + self.maxdate, + self.known, + self.invalid, + self.path, + ) ) @@ -386,7 +394,6 @@ f" when processing revision {hash_to_hex(revision.id)}" f" (date {revision.date})" ) - provenance.directory_invalidate_in_isochrone_frontier(current.entry) current.invalidate() # Pre-query all known dates for directories in the current directory @@ -431,7 +438,7 @@ # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None - # If all content is already known, update current directory info. + # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ @@ -444,14 +451,21 @@ for file in current.entry.files ] ) - current.known = ( - # true if all subdirectories are known - all(child.known for child in current.children) - # true if all files are in fdates, i.e. if all files were known - # *before building this isochrone graph node* - # Note: the 'all()' is lazy: will stop iterating as soon as possible - and all((file.id in fdates) for file in current.entry.files) - ) + if current.maxdate <= revision.date: + current.known = ( + # true if all subdirectories are known + all(child.known for child in current.children) + # true if all files are in fdates, i.e. if all files were known + # *before building this isochrone graph node* + # Note: the 'all()' is lazy: will stop iterating as soon as + # possible + and all((file.id in fdates) for file in current.entry.files) + ) + else: + # at least one content is being processed out-of-order, then current + # node should be treated as unknown + current.maxdate = revision.date + current.known = False logging.debug( f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" ) @@ -482,6 +496,7 @@ revision, current.entry, current.path ) else: + assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( @@ -491,7 +506,6 @@ lower=lower, mindepth=mindepth, ): - assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. @@ -504,6 +518,12 @@ ) flatten_directory(archive, provenance, current.entry) else: + # If current node is an invalidated frontier, update its date for future + # revisions to get the proper value. + if current.invalid: + provenance.directory_set_date_in_isochrone_frontier( + current.entry, current.maxdate + ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse diff --git a/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml b/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml --- a/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml +++ b/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml @@ -134,29 +134,26 @@ entry: id: "b3cf11b22c9f93c3c494cf90ab072f394155072d" name: "" - maxdate: 1000000010.0 - known: True # TODO: analyse this, as it might be a source of issues! + maxdate: 1000000005.0 path: "" children: - entry: id: "baca735bf8b8720131b4bfdb47c51631a9260348" name: "A" - maxdate: 1000000010.0 - known: True + maxdate: 1000000005.0 path: "A" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "B" - maxdate: 1000000010.0 - known: True + maxdate: 1000000005.0 path: "A/B" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" - maxdate: 1000000010.0 - known: True + maxdate: 1000000005.0 + invalid: True path: "A/B/C" # Isochrone graph for R06 - rev: "53519b5a5e8cf12a4f81f82e489f95c1d04d5314" @@ -182,7 +179,7 @@ - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" - dbdate: 1000000010.0 - maxdate: 1000000010.0 + dbdate: 1000000005.0 + maxdate: 1000000005.0 known: True path: "A/B/C" diff --git a/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt b/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt --- a/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt +++ b/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt @@ -33,10 +33,6 @@ 1000000050 53519b5a5e8cf12a4f81f82e489f95c1d04d5314 R06 R06 | | | R 53519b5a5e8cf12a4f81f82e489f95c1d04d5314 | 1000000050 | R---C | A/B/c | C fa08654474ae2ddc4f61ee3a43d017ba65a439c3 | 0 -# Note the ts below (-40) is NOT the same as the maxdate of its content (-45)! -# This is because the ts of the existing frontier (the R-D below) has not been updated by the -# "new" version of the b file (aka older ts) of R05. -# /!\ This is true only when ingesting revisions one at a time! - | R-D | A/B/C | D c9cabe7f49012e3fdef6ac6b929efb5654f583cf | -40 + | R-D | A/B/C | D c9cabe7f49012e3fdef6ac6b929efb5654f583cf | -45 | D-C | + a | C 20329687bb9c1231a7e05afe86160343ad49b494 | -50 | D-C | + b | C 50e9cdb03f9719261dd39d7f2920b906db3711a3 | -45 diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -37,6 +37,7 @@ ) node.maxdate = datetime.fromtimestamp(d["maxdate"], timezone.utc) node.known = d.get("known", False) + node.invalid = d.get("invalid", False) node.path = bytes(d["path"], encoding="utf-8") node.children = [ isochrone_graph_from_dict(child, depth=depth + 1) for child in children @@ -54,7 +55,10 @@ ("out-of-order", True, 1), ), ) -def test_isochrone_graph(provenance, swh_storage, archive, repo, lower, mindepth): +@pytest.mark.parametrize("batch", (True, False)) +def test_isochrone_graph( + provenance, swh_storage, archive, repo, lower, mindepth, batch +): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) @@ -64,6 +68,7 @@ with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): + print("# Processing revision", expected["rev"]) revision = revisions[hash_to_bytes(expected["rev"])] entry = RevisionEntry( id=revision["id"], @@ -71,7 +76,7 @@ root=revision["directory"], ) expected_graph = isochrone_graph_from_dict(expected["graph"]) - print("Expected", expected_graph) + print("Expected graph:", expected_graph) # Create graph for current revision and check it has the expected structure. computed_graph = build_isochrone_graph( @@ -80,9 +85,16 @@ entry, DirectoryEntry(entry.root), ) - print("Computed", computed_graph) + print("Computed graph:", computed_graph) assert computed_graph == expected_graph # Add current revision so that provenance info is kept up to date for the # following ones. - revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) + revision_add( + provenance, + archive, + [entry], + lower=lower, + mindepth=mindepth, + commit=not batch, + ) diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -198,8 +198,9 @@ ("out-of-order", True, 1), ), ) +@pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( - provenance, swh_storage, archive, repo, lower, mindepth + provenance, swh_storage, archive, repo, lower, mindepth, batch ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) @@ -213,11 +214,13 @@ for revision in data["revision"] ] - # XXX adding all revisions at once should be working just fine, but it does not... - # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) - # ...so add revisions one at a time for now - for revision in revisions: - revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) + if batch: + revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) + else: + for revision in revisions: + revision_add( + provenance, archive, [revision], lower=lower, mindepth=mindepth + ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt"