diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index abf1ba4..fb20774 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,325 +1,318 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Iterable, Optional, Set +from typing import Any, Dict, Iterable, Optional import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry from ..revision import RevisionEntry class ProvenanceDBBase: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") self.insert_cache: Dict[str, Any] = {} - self.remove_cache: Dict[str, Set[bytes]] = {} self.select_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): self.insert_cache = { "content": dict(), "content_early_in_rev": set(), "content_in_dir": set(), "directory": dict(), "directory_in_rev": set(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } - self.remove_cache = {"directory": set()} self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): try: self.insert_all() self.clear_caches() return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM content WHERE sha1=%s""", (blob.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["content"][blob.id] = date return date def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: dates = {} pending = [] for blob in blobs: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: pending.append(blob.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", tuple(pending), ) for sha1, date in self.cursor.fetchall(): dates[sha1] = date self.select_cache["content"][sha1] = date return dates def content_set_early_date(self, blob: FileEntry, date: datetime): self.insert_cache["content"][blob.id] = date def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) - if date is None and directory.id not in self.remove_cache["directory"]: + if date is None: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["directory"][directory.id] = date return date def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: dates = {} pending = [] for directory in dirs: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date - elif directory.id not in self.remove_cache["directory"]: + else: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date else: pending.append(directory.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", tuple(pending), ) for sha1, date in self.cursor.fetchall(): dates[sha1] = date self.select_cache["directory"][sha1] = date return dates - def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): - self.remove_cache["directory"].add(directory.id) - self.insert_cache["directory"].pop(directory.id, None) - def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.insert_cache["directory"][directory.id] = date - self.remove_cache["directory"].discard(directory.id) def insert_all(self): # Performe insertions with cached information if self.insert_cache["content"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY content; INSERT INTO content(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date) """, self.insert_cache["content"].items(), ) self.insert_cache["content"].clear() if self.insert_cache["directory"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY directory; INSERT INTO directory(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,directory.date) """, self.insert_cache["directory"].items(), ) self.insert_cache["directory"].clear() if self.insert_cache["revision"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,revision.date) """, self.insert_cache["revision"].items(), ) self.insert_cache["revision"].clear() # Relations should come after ids for elements were resolved if self.insert_cache["content_early_in_rev"]: self.insert_location("content", "revision", "content_early_in_rev") if self.insert_cache["content_in_dir"]: self.insert_location("content", "directory", "content_in_dir") if self.insert_cache["directory_in_rev"]: self.insert_location("directory", "revision", "directory_in_rev") # if self.insert_cache["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_before_rev; # INSERT INTO revision_before_rev VALUES %s # ON CONFLICT DO NOTHING # """, # self.insert_cache["revision_before_rev"], # ) # self.insert_cache["revision_before_rev"].clear() # if self.insert_cache["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_in_org; # INSERT INTO revision_in_org VALUES %s # ON CONFLICT DO NOTHING # """, # self.insert_cache["revision_in_org"], # ) # self.insert_cache["revision_in_org"].clear() def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Insert origin in the DB and return the assigned id self.cursor.execute( """ LOCK TABLE ONLY origin; INSERT INTO origin(url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id """, (origin.url,), ) return self.cursor.fetchone()[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.insert_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.insert_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: date = self.insert_cache["revision"].get(revision.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["revision"].get(revision.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["revision"][revision.id] = date return date def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """ SELECT 1 FROM revision_before_rev JOIN revision ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s """, (revision.id,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): # TODO: adapt this method to consider cached values self.cursor.execute( """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """ SELECT 1 FROM revision_in_org JOIN revision ON revision.id=revision_in_org.rev WHERE revision.sha1=%s """, (revision.id,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 17662a0..b8169ce 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,583 +1,603 @@ from collections import Counter from datetime import datetime, timezone import logging import os import time from typing import Dict, Generator, Iterable, List, Optional, Tuple from typing_extensions import Protocol, runtime_checkable from swh.model.hashutil import hash_to_hex from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry UTCMIN = datetime.min.replace(tzinfo=timezone.utc) @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: ... - def directory_invalidate_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> None: - ... - def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() current.retrieve_children(archive) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], ) -> None: start = time.time() for origin in origins: origin.retrieve_revisions(archive) for revision in origin.revisions: origin_add_revision(provenance, archive, origin, revision) done = time.time() provenance.commit() stop = time.time() logging.debug( "Origins " ";".join( [origin.url + ":" + hash_to_hex(origin.snapshot) for origin in origins] ) + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) def origin_add_revision( provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry, revision: RevisionEntry, ) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] while stack: relative, current = stack.pop() # Check if current revision has no preferred origin and update if necessary. preferred = provenance.revision_get_preferred_origin(current) if preferred is None: provenance.revision_set_preferred_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(current) provenance.revision_add_to_origin(origin, current) if not visited: stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in current.parents(archive): visited = provenance.revision_visited(parent) if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. provenance.revision_add_to_origin(origin, parent) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, + commit: bool = True, ) -> None: start = time.time() for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: logging.debug( f"Processing revisions {hash_to_hex(revision.id)}" f" (known date {date} / revision date {revision.date})..." ) graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), ) # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, ) done = time.time() - # TODO: improve this! Maybe using a max attempt counter? - # Ideally Provenance class should guarantee that a commit never fails. - while not provenance.commit(): - logging.warning( - "Could not commit revisions " - + ";".join([hash_to_hex(revision.id) for revision in revisions]) - + ". Retrying..." - ) + if commit: + # TODO: improve this! Maybe using a max attempt counter? + # Ideally Provenance class should guarantee that a commit never fails. + while not provenance.commit(): + logging.warning( + "Could not commit revisions " + + ";".join([hash_to_hex(revision.id) for revision in revisions]) + + ". Retrying..." + ) stop = time.time() logging.debug( f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) # logging.critical( # ";".join([hash_to_hex(revision.id) for revision in revisions]) # + f",{stop - start},{stop - done}" # ) class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ): self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None # known is True if this node is already known in the db; either because # the current directory actually exists in the database, or because all # the content of the current directory is known (subdirectories and files) - self.known: bool = self.dbdate is not None + self.known = self.dbdate is not None + self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: List[IsochroneNode] = [] @property def dbdate(self): # use a property to make this attribute (mostly) read-only return self._dbdate def invalidate(self): self._dbdate = None self.maxdate = None self.known = False + self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> "IsochroneNode": # we should not be processing this node (ie add subdirectories or # files) if it's actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.append(node) return node def __str__(self): return ( - f"<{self.entry}: " - f"known={self.known}, maxdate={self.maxdate}, " - f"dbdate={self.dbdate}, path={self.path}, " + f"<{self.entry}: dbdate={self.dbdate}, maxdate={self.maxdate}, " + f"known={self.known}, invalid={self.invalid}, path={self.path}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) def __eq__(self, other): return ( isinstance(other, IsochroneNode) and ( self.entry, self.depth, self._dbdate, self.maxdate, self.known, + self.invalid, self.path, ) == ( other.entry, other.depth, other._dbdate, other.maxdate, other.known, + other.invalid, other.path, ) and Counter(self.children) == Counter(other.children) ) def __hash__(self): return hash( - (self.entry, self.depth, self._dbdate, self.maxdate, self.known, self.path) + ( + self.entry, + self.depth, + self._dbdate, + self.maxdate, + self.known, + self.invalid, + self.path, + ) ) def build_isochrone_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] logging.debug( f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." ) fdates: Dict[bytes, datetime] = {} # map {file_id: date} while stack: current = stack.pop() if current.dbdate is None or current.dbdate > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate > revision.date: logging.debug( f"Invalidating frontier on {hash_to_hex(current.entry.id)}" f" (date {current.dbdate})" f" when processing revision {hash_to_hex(revision.id)}" f" (date {revision.date})" ) - provenance.directory_invalidate_in_isochrone_frontier(current.entry) current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. current.entry.retrieve_children(archive) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) logging.debug( f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.known: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None - # If all content is already known, update current directory info. + # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ child.maxdate for child in current.children if child.maxdate is not None # unnecessary, but needed for mypy ] + [ fdates.get(file.id, revision.date) for file in current.entry.files ] ) - current.known = ( - # true if all subdirectories are known - all(child.known for child in current.children) - # true if all files are in fdates, i.e. if all files were known - # *before building this isochrone graph node* - # Note: the 'all()' is lazy: will stop iterating as soon as possible - and all((file.id in fdates) for file in current.entry.files) - ) + if current.maxdate <= revision.date: + current.known = ( + # true if all subdirectories are known + all(child.known for child in current.children) + # true if all files are in fdates, i.e. if all files were known + # *before building this isochrone graph node* + # Note: the 'all()' is lazy: will stop iterating as soon as + # possible + and all((file.id in fdates) for file in current.entry.files) + ) + else: + # at least one content is being processed out-of-order, then current + # node should be treated as unknown + current.maxdate = revision.date + current.known = False logging.debug( f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" ) return root def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, ): assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: + assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): - assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) flatten_directory(archive, provenance, current.entry) else: + # If current node is an invalidated frontier, update its date for future + # revisions to get the proper value. + if current.invalid: + provenance.directory_set_date_in_isochrone_frontier( + current.entry, current.maxdate + ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: # The only real condition for a directory to be a frontier is that its # content is already known and its maxdate is less (or equal) than # current revision's date. Checking mindepth is meant to skip root # directories (or any arbitrary depth) to improve the result. The # option lower tries to maximize the reusage rate of previously defined # frontiers by keeping them low in the directory tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an extrictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml b/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml index 182a3e0..147e560 100644 --- a/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml +++ b/swh/provenance/tests/data/graphs_out-of-order_lower_1.yaml @@ -1,188 +1,185 @@ # Isochrone graph for R00 - rev: "c0d8929936631ecbcf9147be6b8aa13b13b014e4" graph: entry: id: "a4cb5e6b2831f7e8eef0e6e08e43d642c97303a1" name: "" maxdate: 1000000000.0 path: "" children: - entry: id: "1c8d9fd9afa7e5a2cf52a3db6f05dc5c3a1ca86b" name: "A" maxdate: 1000000000.0 path: "A" children: - entry: id: "36876d475197b5ad86ad592e8e28818171455f16" name: "B" maxdate: 1000000000.0 path: "A/B" children: - entry: id: "98f7a4a23d8df1fb1a5055facae2aff9b2d0a8b3" name: "C" maxdate: 1000000000.0 path: "A/B/C" # Isochrone graph for R01 - rev: "1444db96cbd8cd791abe83527becee73d3c64e86" graph: entry: id: "b3cf11b22c9f93c3c494cf90ab072f394155072d" name: "" maxdate: 1000000010.0 path: "" children: - entry: id: "baca735bf8b8720131b4bfdb47c51631a9260348" name: "A" maxdate: 1000000010.0 path: "A" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "B" maxdate: 1000000010.0 path: "A/B" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" maxdate: 1000000010.0 path: "A/B/C" # Isochrone graph for R02 - rev: "1c533587277731236616cac0d44f3b46c1da0f8a" graph: entry: id: "2afae58027276dad2bdced5a505e8d781a7add5b" name: "" maxdate: 1000000010.0 known: True path: "" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "A" maxdate: 1000000010.0 known: True path: "A" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" maxdate: 1000000010.0 known: True path: "A/C" # Isochrone graph for R03 - rev: "20f4da0f48609d9f7f908ebbcac3b3741a0f25cb" graph: entry: id: "b3cf11b22c9f93c3c494cf90ab072f394155072d" name: "" maxdate: 1000000010.0 known: True path: "" children: - entry: id: "baca735bf8b8720131b4bfdb47c51631a9260348" name: "A" maxdate: 1000000010.0 known: True path: "A" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "B" maxdate: 1000000010.0 known: True path: "A/B" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" dbdate: 1000000010.0 maxdate: 1000000010.0 known: True path: "A/B/C" # Isochrone graph for R04 - rev: "0d66eadcc15e0d7f6cfd4289329a7749a1309982" graph: entry: id: "2afae58027276dad2bdced5a505e8d781a7add5b" name: "" maxdate: 1000000010.0 known: True path: "" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "A" maxdate: 1000000010.0 known: True path: "A" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" dbdate: 1000000010.0 maxdate: 1000000010.0 known: True path: "A/C" # Isochrone graph for R05 - rev: "1dfac0491892096948d6a02bf12a2fed4bf75743" graph: entry: id: "b3cf11b22c9f93c3c494cf90ab072f394155072d" name: "" - maxdate: 1000000010.0 - known: True # TODO: analyse this, as it might be a source of issues! + maxdate: 1000000005.0 path: "" children: - entry: id: "baca735bf8b8720131b4bfdb47c51631a9260348" name: "A" - maxdate: 1000000010.0 - known: True + maxdate: 1000000005.0 path: "A" children: - entry: id: "4b28979d88ed209a09c272bcc80f69d9b18339c2" name: "B" - maxdate: 1000000010.0 - known: True + maxdate: 1000000005.0 path: "A/B" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" - maxdate: 1000000010.0 - known: True + maxdate: 1000000005.0 + invalid: True path: "A/B/C" # Isochrone graph for R06 - rev: "53519b5a5e8cf12a4f81f82e489f95c1d04d5314" graph: entry: id: "195601c98c28f04e0d19c218434738006990db72" name: "" maxdate: 1000000050.0 path: "" children: - entry: id: "d591b308488541aabffd854eae85a9bf83a9d9f5" name: "A" maxdate: 1000000050.0 path: "A" children: - entry: id: "0e540a8ebea2f5de3e62b92e2139902cf6f46e92" name: "B" maxdate: 1000000050.0 path: "A/B" children: - entry: id: "c9cabe7f49012e3fdef6ac6b929efb5654f583cf" name: "C" - dbdate: 1000000010.0 - maxdate: 1000000010.0 + dbdate: 1000000005.0 + maxdate: 1000000005.0 known: True path: "A/B/C" diff --git a/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt b/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt index 87bab85..b0d3a57 100644 --- a/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt +++ b/swh/provenance/tests/data/synthetic_out-of-order_lower_1.txt @@ -1,42 +1,38 @@ 1000000000 c0d8929936631ecbcf9147be6b8aa13b13b014e4 R00 R00 | | | R c0d8929936631ecbcf9147be6b8aa13b13b014e4 | 1000000000 | R---C | A/B/C/a | C 20329687bb9c1231a7e05afe86160343ad49b494 | 0 1000000010 1444db96cbd8cd791abe83527becee73d3c64e86 R01 R01 | | | R 1444db96cbd8cd791abe83527becee73d3c64e86 | 1000000010 | R---C | A/B/C/a | C 20329687bb9c1231a7e05afe86160343ad49b494 | -10 | R---C | A/B/C/b | C 50e9cdb03f9719261dd39d7f2920b906db3711a3 | 0 1000000020 1c533587277731236616cac0d44f3b46c1da0f8a R02 R02 | | | R 1c533587277731236616cac0d44f3b46c1da0f8a | 1000000020 | R-D | A/C | D c9cabe7f49012e3fdef6ac6b929efb5654f583cf | -10 | D-C | + a | C 20329687bb9c1231a7e05afe86160343ad49b494 | -20 | D-C | + b | C 50e9cdb03f9719261dd39d7f2920b906db3711a3 | -10 1000000030 20f4da0f48609d9f7f908ebbcac3b3741a0f25cb R03 R03 | | | R 20f4da0f48609d9f7f908ebbcac3b3741a0f25cb | 1000000030 | R-D | A/B/C | D c9cabe7f49012e3fdef6ac6b929efb5654f583cf | -20 | D-C | + a | C 20329687bb9c1231a7e05afe86160343ad49b494 | -30 | D-C | + b | C 50e9cdb03f9719261dd39d7f2920b906db3711a3 | -20 1000000040 0d66eadcc15e0d7f6cfd4289329a7749a1309982 R04 R04 | | | R 0d66eadcc15e0d7f6cfd4289329a7749a1309982 | 1000000040 | R-D | A/C | D c9cabe7f49012e3fdef6ac6b929efb5654f583cf | -30 | D-C | + a | C 20329687bb9c1231a7e05afe86160343ad49b494 | -40 | D-C | + b | C 50e9cdb03f9719261dd39d7f2920b906db3711a3 | -30 1000000005 1dfac0491892096948d6a02bf12a2fed4bf75743 R05 R05 | | | R 1dfac0491892096948d6a02bf12a2fed4bf75743 | 1000000005 | R---C | A/B/C/a | C 20329687bb9c1231a7e05afe86160343ad49b494 | -5 | R---C | A/B/C/b | C 50e9cdb03f9719261dd39d7f2920b906db3711a3 | 0 1000000050 53519b5a5e8cf12a4f81f82e489f95c1d04d5314 R06 R06 | | | R 53519b5a5e8cf12a4f81f82e489f95c1d04d5314 | 1000000050 | R---C | A/B/c | C fa08654474ae2ddc4f61ee3a43d017ba65a439c3 | 0 -# Note the ts below (-40) is NOT the same as the maxdate of its content (-45)! -# This is because the ts of the existing frontier (the R-D below) has not been updated by the -# "new" version of the b file (aka older ts) of R05. -# /!\ This is true only when ingesting revisions one at a time! - | R-D | A/B/C | D c9cabe7f49012e3fdef6ac6b929efb5654f583cf | -40 + | R-D | A/B/C | D c9cabe7f49012e3fdef6ac6b929efb5654f583cf | -45 | D-C | + a | C 20329687bb9c1231a7e05afe86160343ad49b494 | -50 | D-C | + b | C 50e9cdb03f9719261dd39d7f2920b906db3711a3 | -45 diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py index 53e3400..963982c 100644 --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -1,88 +1,100 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from datetime import datetime, timezone import pytest import yaml from swh.model.hashutil import hash_to_bytes from swh.provenance.model import DirectoryEntry, RevisionEntry from swh.provenance.provenance import IsochroneNode, build_isochrone_graph, revision_add from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data from swh.provenance.tests.test_provenance_db import ts2dt def isochrone_graph_from_dict(d, depth=0) -> IsochroneNode: """Takes a dictionary representing a tree of IsochroneNode objects, and recursively builds the corresponding graph.""" d = deepcopy(d) d["entry"]["id"] = hash_to_bytes(d["entry"]["id"]) d["entry"]["name"] = bytes(d["entry"]["name"], encoding="utf-8") dbdate = d.get("dbdate", None) if dbdate is not None: dbdate = datetime.fromtimestamp(d["dbdate"], timezone.utc) children = d.get("children", []) node = IsochroneNode( entry=DirectoryEntry(**d["entry"]), dbdate=dbdate, depth=depth, ) node.maxdate = datetime.fromtimestamp(d["maxdate"], timezone.utc) node.known = d.get("known", False) + node.invalid = d.get("invalid", False) node.path = bytes(d["path"], encoding="utf-8") node.children = [ isochrone_graph_from_dict(child, depth=depth + 1) for child in children ] return node @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) -def test_isochrone_graph(provenance, swh_storage, archive, repo, lower, mindepth): +@pytest.mark.parametrize("batch", (True, False)) +def test_isochrone_graph( + provenance, swh_storage, archive, repo, lower, mindepth, batch +): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = {rev["id"]: rev for rev in data["revision"]} filename = f"graphs_{repo}_{'lower' if lower else 'upper'}_{mindepth}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): + print("# Processing revision", expected["rev"]) revision = revisions[hash_to_bytes(expected["rev"])] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) expected_graph = isochrone_graph_from_dict(expected["graph"]) - print("Expected", expected_graph) + print("Expected graph:", expected_graph) # Create graph for current revision and check it has the expected structure. computed_graph = build_isochrone_graph( archive, provenance, entry, DirectoryEntry(entry.root), ) - print("Computed", computed_graph) + print("Computed graph:", computed_graph) assert computed_graph == expected_graph # Add current revision so that provenance info is kept up to date for the # following ones. - revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) + revision_add( + provenance, + archive, + [entry], + lower=lower, + mindepth=mindepth, + commit=not batch, + ) diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py index d38c15d..10ab31b 100644 --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -1,318 +1,321 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List, Tuple import pytest from swh.provenance.model import RevisionEntry from swh.provenance.provenance import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, synthetic_result, ) from swh.provenance.tests.test_provenance_db import ts2dt def sha1s(cur, table): """return the 'sha1' column from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ cur.execute(f"SELECT sha1 FROM {table}") return set(sha1.hex() for (sha1,) in cur.fetchall()) def locations(cur): """return the 'path' column from the DB location table 'cur' is a cursor to the provenance index DB. """ cur.execute("SELECT encode(location.path::bytea, 'escape') FROM location") return set(x for (x,) in cur.fetchall()) def relations(cur, src, dst): """return the triplets ('sha1', 'sha1', 'path') from the DB for the relation between 'src' table and 'dst' table (i.e. for C-R, C-D and D-R relations). 'cur' is a cursor to the provenance index DB. """ relation = { ("content", "revision"): "content_early_in_rev", ("content", "directory"): "content_in_dir", ("directory", "revision"): "directory_in_rev", }[(src, dst)] srccol = {"content": "blob", "directory": "dir"}[src] dstcol = {"directory": "dir", "revision": "rev"}[dst] cur.execute( f"SELECT encode(src.sha1::bytea, 'hex')," f" encode(dst.sha1::bytea, 'hex')," f" encode(location.path::bytea, 'escape') " f"FROM {relation} as rel, " f" {src} as src, {dst} as dst, location " f"WHERE rel.{srccol}=src.id AND rel.{dstcol}=dst.id AND rel.loc=location.id" ) return set(cur.fetchall()) def get_timestamp(cur, table, sha1): """return the date for the 'sha1' from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ if isinstance(sha1, str): sha1 = bytes.fromhex(sha1) cur.execute(f"SELECT date FROM {table} WHERE sha1=%s", (sha1,)) return [date.timestamp() for (date,) in cur.fetchall()] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics(provenance, swh_storage, archive, repo, lower, mindepth): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows = { "content": set(), "content_in_dir": set(), "content_early_in_rev": set(), "directory": set(), "directory_in_rev": set(), "location": set(), "revision": set(), } for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"].hex()) assert rows["revision"] == sha1s(provenance.cursor, "revision"), synth_rev[ "msg" ] # check the timestamp of the revision rev_ts = synth_rev["date"] assert get_timestamp( provenance.cursor, "revision", synth_rev["sha1"].hex() ) == [rev_ts], synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"].hex() for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"].hex() for x in synth_rev["D_C"]) assert rows["content"] == sha1s(provenance.cursor, "content"), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_early_in_rev"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_C"] ) assert rows["content_early_in_rev"] == relations( provenance.cursor, "content", "revision" ), synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert get_timestamp(provenance.cursor, "content", rc["dst"]) == [ rev_ts + rc["rel_ts"] ], synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"].hex() for x in synth_rev["R_D"]) assert rows["directory"] == sha1s(provenance.cursor, "directory"), synth_rev[ "msg" ] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_rev"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_D"] ) assert rows["directory_in_rev"] == relations( provenance.cursor, "directory", "revision" ), synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: assert get_timestamp(provenance.cursor, "directory", rd["dst"]) == [ rev_ts + rd["rel_ts"] ], synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_dir"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["D_C"] ) assert rows["content_in_dir"] == relations( provenance.cursor, "content", "directory" ), synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert get_timestamp(provenance.cursor, "content", dc["dst"]) == [ rev_ts + dc["rel_ts"] ], synth_rev["msg"] # check for location entries rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) assert rows["location"] == locations(provenance.cursor), synth_rev["msg"] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) +@pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( - provenance, swh_storage, archive, repo, lower, mindepth + provenance, swh_storage, archive, repo, lower, mindepth, batch ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] - # XXX adding all revisions at once should be working just fine, but it does not... - # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) - # ...so add revisions one at a time for now - for revision in revisions: - revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) + if batch: + revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) + else: + for revision in revisions: + revision_add( + provenance, archive, [revision], lower=lower, mindepth=mindepth + ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences = {} for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, rc["path"]) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, dc["prefix"] + "/" + dc["path"]) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ (blob.hex(), rev.hex(), date.timestamp(), path.decode()) for blob, rev, date, path in provenance.content_find_all( bytes.fromhex(content_id) ) ] assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_first( provenance, swh_storage, archive, repo, lower, mindepth ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, str, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, rc["path"]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): (r_sha1, r_rev_id, r_ts, r_path) = provenance.content_find_first( bytes.fromhex(content_id) ) assert r_sha1.hex() == content_id assert r_rev_id.hex() == rev_id assert r_ts.timestamp() == ts assert r_path.decode() in paths