Changeset View
Standalone View
swh/provenance/provenance.py
Show First 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | class ProvenanceInterface(Protocol): | ||||
) -> Optional[datetime]: | ) -> Optional[datetime]: | ||||
... | ... | ||||
def directory_get_dates_in_isochrone_frontier( | def directory_get_dates_in_isochrone_frontier( | ||||
self, dirs: Iterable[DirectoryEntry] | self, dirs: Iterable[DirectoryEntry] | ||||
) -> Dict[bytes, datetime]: | ) -> Dict[bytes, datetime]: | ||||
... | ... | ||||
def directory_invalidate_in_isochrone_frontier( | |||||
self, directory: DirectoryEntry | |||||
) -> None: | |||||
... | |||||
def directory_set_date_in_isochrone_frontier( | def directory_set_date_in_isochrone_frontier( | ||||
self, directory: DirectoryEntry, date: datetime | self, directory: DirectoryEntry, date: datetime | ||||
) -> None: | ) -> None: | ||||
... | ... | ||||
def origin_get_id(self, origin: OriginEntry) -> int: | def origin_get_id(self, origin: OriginEntry) -> int: | ||||
... | ... | ||||
▲ Show 20 Lines • Show All 127 Lines • ▼ Show 20 Lines | |||||
def revision_add( | def revision_add( | ||||
provenance: ProvenanceInterface, | provenance: ProvenanceInterface, | ||||
archive: ArchiveInterface, | archive: ArchiveInterface, | ||||
revisions: List[RevisionEntry], | revisions: List[RevisionEntry], | ||||
trackall: bool = True, | trackall: bool = True, | ||||
lower: bool = True, | lower: bool = True, | ||||
mindepth: int = 1, | mindepth: int = 1, | ||||
commit: bool = True, | |||||
) -> None: | ) -> None: | ||||
start = time.time() | start = time.time() | ||||
for revision in revisions: | for revision in revisions: | ||||
assert revision.date is not None | assert revision.date is not None | ||||
assert revision.root is not None | assert revision.root is not None | ||||
# Processed content starting from the revision's root directory. | # Processed content starting from the revision's root directory. | ||||
date = provenance.revision_get_early_date(revision) | date = provenance.revision_get_early_date(revision) | ||||
if date is None or revision.date < date: | if date is None or revision.date < date: | ||||
Show All 13 Lines | for revision in revisions: | ||||
provenance, | provenance, | ||||
revision, | revision, | ||||
graph, | graph, | ||||
trackall=trackall, | trackall=trackall, | ||||
lower=lower, | lower=lower, | ||||
mindepth=mindepth, | mindepth=mindepth, | ||||
) | ) | ||||
done = time.time() | done = time.time() | ||||
if commit: | |||||
# TODO: improve this! Maybe using a max attempt counter? | # TODO: improve this! Maybe using a max attempt counter? | ||||
# Ideally Provenance class should guarantee that a commit never fails. | # Ideally Provenance class should guarantee that a commit never fails. | ||||
while not provenance.commit(): | while not provenance.commit(): | ||||
logging.warning( | logging.warning( | ||||
"Could not commit revisions " | "Could not commit revisions " | ||||
+ ";".join([hash_to_hex(revision.id) for revision in revisions]) | + ";".join([hash_to_hex(revision.id) for revision in revisions]) | ||||
+ ". Retrying..." | + ". Retrying..." | ||||
) | ) | ||||
douardda: I don't really understand the logic behind this `commit` flag. Because now we have 4 situations… | |||||
Done Inline ActionsIt's just for testing purposes. When creating isochrone graphs we need to simulate the situation where we processed up to a certain revision and we want to create the isochrone graph of the following one as if they were being processed in the same batch. aeviso: It's just for testing purposes. When creating isochrone graphs we need to simulate the… | |||||
Not Done Inline Actionswhich is not what we are doing here. If you want to test the isochrone graph generation with revisions processed by batch, then you need a test that does exactly that: a loop that resets the provenance db, make one revision_add() call with all the revisions up to revision X, and check the resulting isochrone graph. douardda: which is not what we are doing here. If you want to test the isochrone graph generation with… | |||||
Done Inline ActionsThe problem with that approach is that revision_add() up to revision X will perform the commit, then when creating the isochrone graph for the following revision (the first non-processed one) the read/write caches will be empty (ie. the following revision will be processed as if it belongs to a different batch) aeviso: The problem with that approach is that `revision_add()` up to revision X will perform the… | |||||
Not Done Inline ActionsYo are right, I forgot the "commit" does indeed clear the read cache also. So both cases are not really correct (once again, it's not the same execution path to do one revision_add() of a batch of revisions than calling revision_add() with only one revision at a time). This shows something could be improved in this code (not sure how for now). At least, instead of adding a commit flag as argument of the revision_add method, just remove the commit part from this method entirely; it's the responsibility of the caller to decide when to flush the write cache in the DB (maybe add a commit() method in this class with the while loop). douardda: Yo are right, I forgot the "commit" does indeed clear the read cache also.
So both cases are… | |||||
Not Done Inline Actions
Which also raises the question: why is that so? Why not keep the (read) cache for as long as we can? douardda: > Yo are right, I forgot the "commit" does indeed clear the read cache also.
Which also… | |||||
stop = time.time() | stop = time.time() | ||||
logging.debug( | logging.debug( | ||||
f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " | f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " | ||||
f" were processed in {stop - start} secs (commit took {stop - done} secs)!" | f" were processed in {stop - start} secs (commit took {stop - done} secs)!" | ||||
) | ) | ||||
# logging.critical( | # logging.critical( | ||||
# ";".join([hash_to_hex(revision.id) for revision in revisions]) | # ";".join([hash_to_hex(revision.id) for revision in revisions]) | ||||
# + f",{stop - start},{stop - done}" | # + f",{stop - start},{stop - done}" | ||||
Show All 15 Lines | ): | ||||
self._dbdate: Optional[datetime] = dbdate | self._dbdate: Optional[datetime] = dbdate | ||||
# maxdate is set by the maxdate computation algorithm | # maxdate is set by the maxdate computation algorithm | ||||
self.maxdate: Optional[datetime] = None | self.maxdate: Optional[datetime] = None | ||||
# known is True if this node is already known in the db; either because | # known is True if this node is already known in the db; either because | ||||
# the current directory actually exists in the database, or because all | # the current directory actually exists in the database, or because all | ||||
# the content of the current directory is known (subdirectories and files) | # the content of the current directory is known (subdirectories and files) | ||||
self.known: bool = self.dbdate is not None | self.known = self.dbdate is not None | ||||
self.invalid = False | |||||
self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name | self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name | ||||
self.children: List[IsochroneNode] = [] | self.children: List[IsochroneNode] = [] | ||||
@property | @property | ||||
def dbdate(self): | def dbdate(self): | ||||
# use a property to make this attribute (mostly) read-only | # use a property to make this attribute (mostly) read-only | ||||
return self._dbdate | return self._dbdate | ||||
def invalidate(self): | def invalidate(self): | ||||
self._dbdate = None | self._dbdate = None | ||||
self.maxdate = None | self.maxdate = None | ||||
self.known = False | self.known = False | ||||
self.invalid = True | |||||
def add_directory( | def add_directory( | ||||
self, child: DirectoryEntry, date: Optional[datetime] = None | self, child: DirectoryEntry, date: Optional[datetime] = None | ||||
) -> "IsochroneNode": | ) -> "IsochroneNode": | ||||
# we should not be processing this node (ie add subdirectories or | # we should not be processing this node (ie add subdirectories or | ||||
# files) if it's actually known by the provenance DB | # files) if it's actually known by the provenance DB | ||||
assert self.dbdate is None | assert self.dbdate is None | ||||
node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) | node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) | ||||
self.children.append(node) | self.children.append(node) | ||||
return node | return node | ||||
def __str__(self): | def __str__(self): | ||||
return ( | return ( | ||||
f"<{self.entry}: " | f"<{self.entry}: dbdate={self.dbdate}, maxdate={self.maxdate}, " | ||||
f"known={self.known}, maxdate={self.maxdate}, " | f"known={self.known}, invalid={self.invalid}, path={self.path}, " | ||||
f"dbdate={self.dbdate}, path={self.path}, " | |||||
f"children=[{', '.join(str(child) for child in self.children)}]>" | f"children=[{', '.join(str(child) for child in self.children)}]>" | ||||
) | ) | ||||
def __eq__(self, other): | def __eq__(self, other): | ||||
return ( | return ( | ||||
isinstance(other, IsochroneNode) | isinstance(other, IsochroneNode) | ||||
and ( | and ( | ||||
self.entry, | self.entry, | ||||
self.depth, | self.depth, | ||||
self._dbdate, | self._dbdate, | ||||
self.maxdate, | self.maxdate, | ||||
self.known, | self.known, | ||||
self.invalid, | |||||
self.path, | self.path, | ||||
) | ) | ||||
== ( | == ( | ||||
other.entry, | other.entry, | ||||
other.depth, | other.depth, | ||||
other._dbdate, | other._dbdate, | ||||
other.maxdate, | other.maxdate, | ||||
other.known, | other.known, | ||||
other.invalid, | |||||
other.path, | other.path, | ||||
) | ) | ||||
and Counter(self.children) == Counter(other.children) | and Counter(self.children) == Counter(other.children) | ||||
) | ) | ||||
def __hash__(self): | def __hash__(self): | ||||
return hash( | return hash( | ||||
(self.entry, self.depth, self._dbdate, self.maxdate, self.known, self.path) | ( | ||||
self.entry, | |||||
self.depth, | |||||
self._dbdate, | |||||
self.maxdate, | |||||
self.known, | |||||
self.invalid, | |||||
self.path, | |||||
) | |||||
) | ) | ||||
def build_isochrone_graph( | def build_isochrone_graph( | ||||
archive: ArchiveInterface, | archive: ArchiveInterface, | ||||
provenance: ProvenanceInterface, | provenance: ProvenanceInterface, | ||||
revision: RevisionEntry, | revision: RevisionEntry, | ||||
directory: DirectoryEntry, | directory: DirectoryEntry, | ||||
Show All 28 Lines | while stack: | ||||
# the revision is being processed out of order. | # the revision is being processed out of order. | ||||
if current.dbdate is not None and current.dbdate > revision.date: | if current.dbdate is not None and current.dbdate > revision.date: | ||||
logging.debug( | logging.debug( | ||||
f"Invalidating frontier on {hash_to_hex(current.entry.id)}" | f"Invalidating frontier on {hash_to_hex(current.entry.id)}" | ||||
f" (date {current.dbdate})" | f" (date {current.dbdate})" | ||||
f" when processing revision {hash_to_hex(revision.id)}" | f" when processing revision {hash_to_hex(revision.id)}" | ||||
f" (date {revision.date})" | f" (date {revision.date})" | ||||
) | ) | ||||
provenance.directory_invalidate_in_isochrone_frontier(current.entry) | |||||
current.invalidate() | current.invalidate() | ||||
# Pre-query all known dates for directories in the current directory | # Pre-query all known dates for directories in the current directory | ||||
# for the provenance object to have them cached and (potentially) improve | # for the provenance object to have them cached and (potentially) improve | ||||
# performance. | # performance. | ||||
current.entry.retrieve_children(archive) | current.entry.retrieve_children(archive) | ||||
ddates = provenance.directory_get_dates_in_isochrone_frontier( | ddates = provenance.directory_get_dates_in_isochrone_frontier( | ||||
current.entry.dirs | current.entry.dirs | ||||
Show All 28 Lines | while stack: | ||||
for child in current.children: | for child in current.children: | ||||
if child.maxdate is None: | if child.maxdate is None: | ||||
# if child.maxdate is None, it must be processed | # if child.maxdate is None, it must be processed | ||||
stack.append(child) | stack.append(child) | ||||
else: | else: | ||||
# all the files and directories under current have a maxdate, | # all the files and directories under current have a maxdate, | ||||
# we can infer the maxdate for current directory | # we can infer the maxdate for current directory | ||||
assert current.maxdate is None | assert current.maxdate is None | ||||
# If all content is already known, update current directory info. | # if all content is already known, update current directory info. | ||||
current.maxdate = max( | current.maxdate = max( | ||||
[UTCMIN] | [UTCMIN] | ||||
+ [ | + [ | ||||
child.maxdate | child.maxdate | ||||
for child in current.children | for child in current.children | ||||
if child.maxdate is not None # unnecessary, but needed for mypy | if child.maxdate is not None # unnecessary, but needed for mypy | ||||
] | ] | ||||
+ [ | + [ | ||||
fdates.get(file.id, revision.date) | fdates.get(file.id, revision.date) | ||||
for file in current.entry.files | for file in current.entry.files | ||||
] | ] | ||||
) | ) | ||||
if current.maxdate <= revision.date: | |||||
current.known = ( | current.known = ( | ||||
# true if all subdirectories are known | # true if all subdirectories are known | ||||
all(child.known for child in current.children) | all(child.known for child in current.children) | ||||
# true if all files are in fdates, i.e. if all files were known | # true if all files are in fdates, i.e. if all files were known | ||||
# *before building this isochrone graph node* | # *before building this isochrone graph node* | ||||
# Note: the 'all()' is lazy: will stop iterating as soon as possible | # Note: the 'all()' is lazy: will stop iterating as soon as | ||||
# possible | |||||
and all((file.id in fdates) for file in current.entry.files) | and all((file.id in fdates) for file in current.entry.files) | ||||
) | ) | ||||
else: | |||||
# at least one content is being processed out-of-order, then current | |||||
# node should be treated as unknown | |||||
current.maxdate = revision.date | |||||
current.known = False | |||||
logging.debug( | logging.debug( | ||||
f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" | f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" | ||||
) | ) | ||||
return root | return root | ||||
def revision_process_content( | def revision_process_content( | ||||
archive: ArchiveInterface, | archive: ArchiveInterface, | ||||
Show All 14 Lines | while stack: | ||||
assert current.dbdate <= revision.date | assert current.dbdate <= revision.date | ||||
if trackall: | if trackall: | ||||
# Current directory is an outer isochrone frontier for a previously | # Current directory is an outer isochrone frontier for a previously | ||||
# processed revision. It should be reused as is. | # processed revision. It should be reused as is. | ||||
provenance.directory_add_to_revision( | provenance.directory_add_to_revision( | ||||
revision, current.entry, current.path | revision, current.entry, current.path | ||||
) | ) | ||||
else: | else: | ||||
assert current.maxdate is not None | |||||
# Current directory is not an outer isochrone frontier for any previous | # Current directory is not an outer isochrone frontier for any previous | ||||
# revision. It might be eligible for this one. | # revision. It might be eligible for this one. | ||||
if is_new_frontier( | if is_new_frontier( | ||||
current, | current, | ||||
revision=revision, | revision=revision, | ||||
trackall=trackall, | trackall=trackall, | ||||
lower=lower, | lower=lower, | ||||
mindepth=mindepth, | mindepth=mindepth, | ||||
): | ): | ||||
assert current.maxdate is not None | |||||
# Outer frontier should be moved to current position in the isochrone | # Outer frontier should be moved to current position in the isochrone | ||||
# graph. This is the first time this directory is found in the isochrone | # graph. This is the first time this directory is found in the isochrone | ||||
# frontier. | # frontier. | ||||
provenance.directory_set_date_in_isochrone_frontier( | provenance.directory_set_date_in_isochrone_frontier( | ||||
current.entry, current.maxdate | current.entry, current.maxdate | ||||
) | ) | ||||
if trackall: | if trackall: | ||||
provenance.directory_add_to_revision( | provenance.directory_add_to_revision( | ||||
revision, current.entry, current.path | revision, current.entry, current.path | ||||
) | ) | ||||
flatten_directory(archive, provenance, current.entry) | flatten_directory(archive, provenance, current.entry) | ||||
else: | else: | ||||
# If current node is an invalidated frontier, update its date for future | |||||
# revisions to get the proper value. | |||||
if current.invalid: | |||||
provenance.directory_set_date_in_isochrone_frontier( | |||||
current.entry, current.maxdate | |||||
) | |||||
# No point moving the frontier here. Either there are no files or they | # No point moving the frontier here. Either there are no files or they | ||||
# are being seen for the first time here. Add all blobs to current | # are being seen for the first time here. Add all blobs to current | ||||
# revision updating date if necessary, and recursively analyse | # revision updating date if necessary, and recursively analyse | ||||
# subdirectories as candidates to the outer frontier. | # subdirectories as candidates to the outer frontier. | ||||
for blob in current.entry.files: | for blob in current.entry.files: | ||||
date = provenance.content_get_early_date(blob) | date = provenance.content_get_early_date(blob) | ||||
if date is None or revision.date < date: | if date is None or revision.date < date: | ||||
provenance.content_set_early_date(blob, revision.date) | provenance.content_set_early_date(blob, revision.date) | ||||
▲ Show 20 Lines • Show All 69 Lines • Show Last 20 Lines |
I don't really understand the logic behind this commit flag. Because now we have 4 situations to test:
Note that 1 and 4 (according you do a commit after the loop) are not equivalent!
So i don't really see the point of this commit flag, and when one will want to set it to False.