Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9342860
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
19 KB
Subscribers
None
View Options
diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py
index 64f570d..8406b5c 100644
--- a/swh/provenance/provenance.py
+++ b/swh/provenance/provenance.py
@@ -1,446 +1,418 @@
import logging
import os
from .archive import ArchiveInterface
from .model import DirectoryEntry, FileEntry
from .origin import OriginEntry
from .revision import RevisionEntry
from datetime import datetime
from swh.model.hashutil import hash_to_hex
from typing import Dict, List, Optional, Tuple
# TODO: consider moving to path utils file together with normalize.
def is_child(path: bytes, prefix: bytes) -> bool:
return path != prefix and os.path.dirname(path) == prefix
-# def is_not_prefix(prefix: bytes, path: bytes) -> bool:
-# return not path.startswith(prefix)
-
-
class ProvenanceInterface:
def __init__(self, **kwargs):
raise NotImplementedError
def commit(self):
raise NotImplementedError
def content_add_to_directory(
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes
):
raise NotImplementedError
def content_add_to_revision(
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes
):
raise NotImplementedError
def content_find_first(self, blobid: bytes):
raise NotImplementedError
def content_find_all(self, blobid: bytes):
raise NotImplementedError
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]:
raise NotImplementedError
def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]:
raise NotImplementedError
def content_set_early_date(self, blob: FileEntry, date: datetime):
raise NotImplementedError
def directory_add_to_revision(
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes
):
raise NotImplementedError
def directory_get_date_in_isochrone_frontier(
self, directory: DirectoryEntry
) -> Optional[datetime]:
raise NotImplementedError
def directory_get_dates_in_isochrone_frontier(
self, dirs: List[DirectoryEntry]
) -> Dict[bytes, datetime]:
raise NotImplementedError
def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry):
raise NotImplementedError
def directory_set_date_in_isochrone_frontier(
self, directory: DirectoryEntry, date: datetime
):
raise NotImplementedError
def origin_get_id(self, origin: OriginEntry) -> int:
raise NotImplementedError
def revision_add(self, revision: RevisionEntry):
raise NotImplementedError
def revision_add_before_revision(
self, relative: RevisionEntry, revision: RevisionEntry
):
raise NotImplementedError
def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry):
raise NotImplementedError
def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]:
raise NotImplementedError
def revision_get_prefered_origin(self, revision: RevisionEntry) -> int:
raise NotImplementedError
def revision_in_history(self, revision: RevisionEntry) -> bool:
raise NotImplementedError
def revision_set_prefered_origin(
self, origin: OriginEntry, revision: RevisionEntry
):
raise NotImplementedError
def revision_visited(self, revision: RevisionEntry) -> bool:
raise NotImplementedError
def directory_process_content(
provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry
):
stack = [(directory, b"")]
while stack:
current, prefix = stack.pop()
for child in iter(current):
if isinstance(child, FileEntry):
# Add content to the relative directory with the computed prefix.
provenance.content_add_to_directory(relative, child, prefix)
else:
# Recursively walk the child directory.
stack.append((child, os.path.join(prefix, child.name)))
def directory_update_content(
stack: List[Tuple[DirectoryEntry, bytes]],
provenance: ProvenanceInterface,
revision: RevisionEntry,
directory: DirectoryEntry,
prefix: bytes,
subdirs: Optional[List[DirectoryEntry]] = None,
blobs: Optional[List[FileEntry]] = None,
blobdates: Optional[Dict[bytes, datetime]] = None,
):
assert revision.date is not None
# Init optional parameters if not provided.
if subdirs is None:
subdirs = [child for child in directory if isinstance(child, DirectoryEntry)]
if blobs is None:
blobs = [child for child in directory if isinstance(child, FileEntry)]
if blobdates is None:
blobdates = provenance.content_get_early_dates(blobs)
# Iterate over blobs updating their date if necessary.
for blob in blobs:
date = blobdates.get(blob.id, None)
if date is None or revision.date < date:
provenance.content_set_early_date(blob, revision.date)
# Push all subdirectories with its corresponding path to analyze them
# recursively.
for subdir in subdirs:
stack.append((subdir, os.path.join(prefix, subdir.name)))
def origin_add(provenance: ProvenanceInterface, origin: OriginEntry):
# TODO: refactor to iterate over origin visit statuses and commit only once
# per status.
origin.id = provenance.origin_get_id(origin)
for revision in origin.revisions:
origin_add_revision(provenance, origin, revision)
# Commit after each revision
provenance.commit() # TODO: verify this!
def origin_add_revision(
provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry
):
stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)]
while stack:
relative, current = stack.pop()
# Check if current revision has no prefered origin and update if necessary.
prefered = provenance.revision_get_prefered_origin(current)
if prefered is None:
provenance.revision_set_prefered_origin(origin, current)
########################################################################
if relative is None:
# This revision is pointed directly by the origin.
visited = provenance.revision_visited(current)
provenance.revision_add_to_origin(origin, current)
if not visited:
stack.append((current, current))
else:
# This revision is a parent of another one in the history of the
# relative revision.
for parent in iter(current):
visited = provenance.revision_visited(parent)
if not visited:
# The parent revision has never been seen before pointing
# directly to an origin.
known = provenance.revision_in_history(parent)
if known:
# The parent revision is already known in some other
# revision's history. We should point it directly to
# the origin and (eventually) walk its history.
stack.append((None, parent))
else:
# The parent revision was never seen before. We should
# walk its history and associate it with the same
# relative revision.
provenance.revision_add_before_revision(relative, parent)
stack.append((relative, parent))
else:
# The parent revision already points to an origin, so its
# history was properly processed before. We just need to
# make sure it points to the current origin as well.
provenance.revision_add_to_origin(origin, parent)
def revision_add(
provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry
):
assert revision.date is not None
assert revision.root is not None
# Processed content starting from the revision's root directory
date = provenance.revision_get_early_date(revision)
if date is None or revision.date < date:
provenance.revision_add(revision)
revision_process_content(
provenance, revision, DirectoryEntry(archive, revision.root, b"")
)
return provenance.commit()
def revision_process_content(
provenance: ProvenanceInterface, revision: RevisionEntry, root: DirectoryEntry
):
assert revision.date is not None
# Stack of directories (and their paths) to be processed.
stack: List[Tuple[DirectoryEntry, bytes]] = [(root, root.name)]
# This dictionary will hold the computed dates for visited subdirectories inside the
# isochrone frontier.
innerdirs: Dict[bytes, Tuple[DirectoryEntry, datetime]] = {}
# This dictionary will hold the computed dates for visited subdirectories outside
# the isochrone frontier which are candidates to be added to the outer frontier (if
# their parent is in the inner frontier).
outerdirs: Dict[bytes, Tuple[DirectoryEntry, datetime]] = {}
while stack:
# Get next directory to process and query its date right before processing to be
# sure we get the most recently updated value.
current, prefix = stack.pop()
date = provenance.directory_get_date_in_isochrone_frontier(current)
if date is None:
# The directory has never been seen on the outer isochrone frontier of
# previously processed revisions. Its children should be analyzed.
blobs = [child for child in current if isinstance(child, FileEntry)]
subdirs = [child for child in current if isinstance(child, DirectoryEntry)]
# Get the list of ids with no duplicates to ensure we have available dates
# for all the elements. This prevents taking a wrong decision when a blob
# occurs more than once in the same directory.
ids = list(dict.fromkeys([child.id for child in blobs + subdirs]))
if ids:
# Known dates for the blobs in the current directory.
blobdates = provenance.content_get_early_dates(blobs)
# Known dates for the subdirectories in the current directory that
# belong to the outer isochrone frontier of some previously processed
# revision.
knowndates = provenance.directory_get_dates_in_isochrone_frontier(
subdirs
)
# Known dates for the subdirectories in the current directory that are
# inside the isochrone frontier of the revision.
innerdates = {
innerdir.id: innerdate
for path, (innerdir, innerdate) in innerdirs.items()
if is_child(path, prefix)
}
# Known dates for the subdirectories in the current directory that are
# outside the isochrone frontier of the revision.
outerdates = {
outerdir.id: outerdate
for path, (outerdir, outerdate) in outerdirs.items()
if is_child(path, prefix)
}
# All known dates for child nodes of the current directory.
assert not (innerdates.keys() & outerdates.keys())
dates = list(
{**blobdates, **knowndates, **innerdates, **outerdates}.values()
)
if len(dates) == len(ids):
# All child nodes of current directory are already known.
maxdate = max(dates)
if maxdate < revision.date:
# The directory is outside the isochrone frontier of the
# revision. It is a candidate to be added to the outer frontier.
outerdirs[prefix] = (current, maxdate)
# Its children are removed since they are no longer candidates.
outerdirs = {
path: outerdir
for path, outerdir in outerdirs.items()
if not is_child(path, prefix)
}
elif maxdate == revision.date:
# The current directory is inside the isochrone frontier.
innerdirs[prefix] = (current, revision.date)
# Add blobs present in this level to the revision. No need to
# update dates as they are at most equal to current one.
for blob in blobs:
provenance.content_add_to_revision(revision, blob, prefix)
# If any of its children was found outside the frontier it
# should be added to the outer frontier now.
if outerdates:
for path, (outerdir, outerdate) in outerdirs.items():
if is_child(path, prefix):
provenance.directory_set_date_in_isochrone_frontier(
outerdir, outerdate
)
provenance.directory_add_to_revision(
revision, outerdir, path
)
directory_process_content(
provenance,
directory=outerdir,
relative=outerdir,
)
# Removed processed elements to avoid duplicating work.
outerdirs = {
path: outerdir
for path, outerdir in outerdirs.items()
if not is_child(path, prefix)
}
# There can still be subdirectories that are known to be in the
# outter isochrone frontier of previous processed revisions.
# Thus, they are not in the list of candidates but have to be
# added to current revisions as well.
for subdir in subdirs:
knowndate = knowndates.get(subdir.id, None)
if knowndate is not None and knowndate <= revision.date:
# Less or equal since the directory could have been
# added to the outer isochrone frontier when processing
# a different directory's subtree of this very same
# revision.
provenance.directory_add_to_revision(
revision, subdir, os.path.join(prefix, subdir.name)
)
else:
# The revision is out of order. The current directory does not
# belong to the outer isochrone frontier of any previously
# processed revision yet all its children nodes are known. They
# should be re-analyzed (and timestamps eventually updated) and
# current directory updated after them.
stack.append((current, prefix))
directory_update_content(
stack,
provenance,
revision,
current,
prefix,
subdirs=subdirs,
blobs=blobs,
blobdates=blobdates,
)
else:
# Al least one child node is unknown, ie. the current directory is
# inside the isochrone frontier of the current revision. Its child
# nodes should be analyzed and current directory updated after them.
stack.append((current, prefix))
directory_update_content(
stack,
provenance,
revision,
current,
prefix,
subdirs=subdirs,
blobs=blobs,
blobdates=blobdates,
)
else:
# Empty directory. Just consider it to be in the inner frontier of
# current revision (ie. all its children are already "known").
innerdirs[prefix] = (current, revision.date)
elif revision.date < date:
# The revision is out of order. The current directory belongs to the outer
# isochrone frontier of some previously processed revison but current
# revision is earlier. The frontier record should be invalidated, children
# nodes re-analyzed (and timestamps eventually updated) and current
# directory updated after them.
stack.append((current, prefix))
provenance.directory_invalidate_in_isochrone_frontier(current)
directory_update_content(stack, provenance, revision, current, prefix)
else:
# The directory has already been seen on the outer isochrone frontier of an
# earlier revision. Just add it to the current revision.
provenance.directory_add_to_revision(revision, current, prefix)
- # TODO: if processing revisions in parallel, some child nodes to current
- # directory might have been considered as candidates to the outer frontier
- # and have to be discarded now.
-
- # if outerdirs:
- # # This should only happen if the root directory is in the outer frontier.
- # assert len(outerdirs) == 1 and root.name in outerdirs
- # outerdir, outerdate = outerdirs[root.name]
- #
- # provenance.directory_set_date_in_isochrone_frontier(outerdir, outerdate)
- # provenance.directory_add_to_revision(revision, outerdir, root.name)
- # directory_process_content(provenance, directory=outerdir, relative=outerdir)
-
- # Any candidate left at this point should be added to the outer frontier. For
- # instance, the root directory will appear here if all its content is already known.
- for path, (outerdir, outerdate) in outerdirs.items():
- if path != root.name:
- # WARNING: This might lead to duplicated results! Only the root directory
- # should be a candidate at this point of the algorithm.
- logging.warning(f"Adding directory {hash_to_hex(outerdir.id)} with path {path} to the frontier")
-
- provenance.directory_set_date_in_isochrone_frontier(
- outerdir, outerdate
- )
- provenance.directory_add_to_revision(
- revision, outerdir, path
- )
- directory_process_content(
- provenance,
- directory=outerdir,
- relative=outerdir,
- )
+
+ if root.name in outerdirs:
+ # Only the root directory should be considered at this point.
+ outerdir, outerdate = outerdirs[root.name]
+
+ provenance.directory_set_date_in_isochrone_frontier(outerdir, outerdate)
+ provenance.directory_add_to_revision(revision, outerdir, root.name)
+ directory_process_content(provenance, directory=outerdir, relative=outerdir)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:03 PM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3452567
Attached To
rDPROV Provenance database
Event Timeline
Log In to Comment