Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py
index 64f570d..8406b5c 100644
--- a/swh/provenance/provenance.py
+++ b/swh/provenance/provenance.py
@@ -1,446 +1,418 @@
import logging
import os
from .archive import ArchiveInterface
from .model import DirectoryEntry, FileEntry
from .origin import OriginEntry
from .revision import RevisionEntry
from datetime import datetime
from swh.model.hashutil import hash_to_hex
from typing import Dict, List, Optional, Tuple
# TODO: consider moving to path utils file together with normalize.
def is_child(path: bytes, prefix: bytes) -> bool:
return path != prefix and os.path.dirname(path) == prefix
-# def is_not_prefix(prefix: bytes, path: bytes) -> bool:
-# return not path.startswith(prefix)
-
-
class ProvenanceInterface:
def __init__(self, **kwargs):
raise NotImplementedError
def commit(self):
raise NotImplementedError
def content_add_to_directory(
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes
):
raise NotImplementedError
def content_add_to_revision(
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes
):
raise NotImplementedError
def content_find_first(self, blobid: bytes):
raise NotImplementedError
def content_find_all(self, blobid: bytes):
raise NotImplementedError
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]:
raise NotImplementedError
def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]:
raise NotImplementedError
def content_set_early_date(self, blob: FileEntry, date: datetime):
raise NotImplementedError
def directory_add_to_revision(
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes
):
raise NotImplementedError
def directory_get_date_in_isochrone_frontier(
self, directory: DirectoryEntry
) -> Optional[datetime]:
raise NotImplementedError
def directory_get_dates_in_isochrone_frontier(
self, dirs: List[DirectoryEntry]
) -> Dict[bytes, datetime]:
raise NotImplementedError
def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry):
raise NotImplementedError
def directory_set_date_in_isochrone_frontier(
self, directory: DirectoryEntry, date: datetime
):
raise NotImplementedError
def origin_get_id(self, origin: OriginEntry) -> int:
raise NotImplementedError
def revision_add(self, revision: RevisionEntry):
raise NotImplementedError
def revision_add_before_revision(
self, relative: RevisionEntry, revision: RevisionEntry
):
raise NotImplementedError
def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry):
raise NotImplementedError
def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]:
raise NotImplementedError
def revision_get_prefered_origin(self, revision: RevisionEntry) -> int:
raise NotImplementedError
def revision_in_history(self, revision: RevisionEntry) -> bool:
raise NotImplementedError
def revision_set_prefered_origin(
self, origin: OriginEntry, revision: RevisionEntry
):
raise NotImplementedError
def revision_visited(self, revision: RevisionEntry) -> bool:
raise NotImplementedError
def directory_process_content(
provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry
):
stack = [(directory, b"")]
while stack:
current, prefix = stack.pop()
for child in iter(current):
if isinstance(child, FileEntry):
# Add content to the relative directory with the computed prefix.
provenance.content_add_to_directory(relative, child, prefix)
else:
# Recursively walk the child directory.
stack.append((child, os.path.join(prefix, child.name)))
def directory_update_content(
stack: List[Tuple[DirectoryEntry, bytes]],
provenance: ProvenanceInterface,
revision: RevisionEntry,
directory: DirectoryEntry,
prefix: bytes,
subdirs: Optional[List[DirectoryEntry]] = None,
blobs: Optional[List[FileEntry]] = None,
blobdates: Optional[Dict[bytes, datetime]] = None,
):
assert revision.date is not None
# Init optional parameters if not provided.
if subdirs is None:
subdirs = [child for child in directory if isinstance(child, DirectoryEntry)]
if blobs is None:
blobs = [child for child in directory if isinstance(child, FileEntry)]
if blobdates is None:
blobdates = provenance.content_get_early_dates(blobs)
# Iterate over blobs updating their date if necessary.
for blob in blobs:
date = blobdates.get(blob.id, None)
if date is None or revision.date < date:
provenance.content_set_early_date(blob, revision.date)
# Push all subdirectories with its corresponding path to analyze them
# recursively.
for subdir in subdirs:
stack.append((subdir, os.path.join(prefix, subdir.name)))
def origin_add(provenance: ProvenanceInterface, origin: OriginEntry):
# TODO: refactor to iterate over origin visit statuses and commit only once
# per status.
origin.id = provenance.origin_get_id(origin)
for revision in origin.revisions:
origin_add_revision(provenance, origin, revision)
# Commit after each revision
provenance.commit() # TODO: verify this!
def origin_add_revision(
provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry
):
stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)]
while stack:
relative, current = stack.pop()
# Check if current revision has no prefered origin and update if necessary.
prefered = provenance.revision_get_prefered_origin(current)
if prefered is None:
provenance.revision_set_prefered_origin(origin, current)
########################################################################
if relative is None:
# This revision is pointed directly by the origin.
visited = provenance.revision_visited(current)
provenance.revision_add_to_origin(origin, current)
if not visited:
stack.append((current, current))
else:
# This revision is a parent of another one in the history of the
# relative revision.
for parent in iter(current):
visited = provenance.revision_visited(parent)
if not visited:
# The parent revision has never been seen before pointing
# directly to an origin.
known = provenance.revision_in_history(parent)
if known:
# The parent revision is already known in some other
# revision's history. We should point it directly to
# the origin and (eventually) walk its history.
stack.append((None, parent))
else:
# The parent revision was never seen before. We should
# walk its history and associate it with the same
# relative revision.
provenance.revision_add_before_revision(relative, parent)
stack.append((relative, parent))
else:
# The parent revision already points to an origin, so its
# history was properly processed before. We just need to
# make sure it points to the current origin as well.
provenance.revision_add_to_origin(origin, parent)
def revision_add(
provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry
):
assert revision.date is not None
assert revision.root is not None
# Processed content starting from the revision's root directory
date = provenance.revision_get_early_date(revision)
if date is None or revision.date < date:
provenance.revision_add(revision)
revision_process_content(
provenance, revision, DirectoryEntry(archive, revision.root, b"")
)
return provenance.commit()
def revision_process_content(
provenance: ProvenanceInterface, revision: RevisionEntry, root: DirectoryEntry
):
assert revision.date is not None
# Stack of directories (and their paths) to be processed.
stack: List[Tuple[DirectoryEntry, bytes]] = [(root, root.name)]
# This dictionary will hold the computed dates for visited subdirectories inside the
# isochrone frontier.
innerdirs: Dict[bytes, Tuple[DirectoryEntry, datetime]] = {}
# This dictionary will hold the computed dates for visited subdirectories outside
# the isochrone frontier which are candidates to be added to the outer frontier (if
# their parent is in the inner frontier).
outerdirs: Dict[bytes, Tuple[DirectoryEntry, datetime]] = {}
while stack:
# Get next directory to process and query its date right before processing to be
# sure we get the most recently updated value.
current, prefix = stack.pop()
date = provenance.directory_get_date_in_isochrone_frontier(current)
if date is None:
# The directory has never been seen on the outer isochrone frontier of
# previously processed revisions. Its children should be analyzed.
blobs = [child for child in current if isinstance(child, FileEntry)]
subdirs = [child for child in current if isinstance(child, DirectoryEntry)]
# Get the list of ids with no duplicates to ensure we have available dates
# for all the elements. This prevents taking a wrong decision when a blob
# occurs more than once in the same directory.
ids = list(dict.fromkeys([child.id for child in blobs + subdirs]))
if ids:
# Known dates for the blobs in the current directory.
blobdates = provenance.content_get_early_dates(blobs)
# Known dates for the subdirectories in the current directory that
# belong to the outer isochrone frontier of some previously processed
# revision.
knowndates = provenance.directory_get_dates_in_isochrone_frontier(
subdirs
)
# Known dates for the subdirectories in the current directory that are
# inside the isochrone frontier of the revision.
innerdates = {
innerdir.id: innerdate
for path, (innerdir, innerdate) in innerdirs.items()
if is_child(path, prefix)
}
# Known dates for the subdirectories in the current directory that are
# outside the isochrone frontier of the revision.
outerdates = {
outerdir.id: outerdate
for path, (outerdir, outerdate) in outerdirs.items()
if is_child(path, prefix)
}
# All known dates for child nodes of the current directory.
assert not (innerdates.keys() & outerdates.keys())
dates = list(
{**blobdates, **knowndates, **innerdates, **outerdates}.values()
)
if len(dates) == len(ids):
# All child nodes of current directory are already known.
maxdate = max(dates)
if maxdate < revision.date:
# The directory is outside the isochrone frontier of the
# revision. It is a candidate to be added to the outer frontier.
outerdirs[prefix] = (current, maxdate)
# Its children are removed since they are no longer candidates.
outerdirs = {
path: outerdir
for path, outerdir in outerdirs.items()
if not is_child(path, prefix)
}
elif maxdate == revision.date:
# The current directory is inside the isochrone frontier.
innerdirs[prefix] = (current, revision.date)
# Add blobs present in this level to the revision. No need to
# update dates as they are at most equal to current one.
for blob in blobs:
provenance.content_add_to_revision(revision, blob, prefix)
# If any of its children was found outside the frontier it
# should be added to the outer frontier now.
if outerdates:
for path, (outerdir, outerdate) in outerdirs.items():
if is_child(path, prefix):
provenance.directory_set_date_in_isochrone_frontier(
outerdir, outerdate
)
provenance.directory_add_to_revision(
revision, outerdir, path
)
directory_process_content(
provenance,
directory=outerdir,
relative=outerdir,
)
# Removed processed elements to avoid duplicating work.
outerdirs = {
path: outerdir
for path, outerdir in outerdirs.items()
if not is_child(path, prefix)
}
# There can still be subdirectories that are known to be in the
# outter isochrone frontier of previous processed revisions.
# Thus, they are not in the list of candidates but have to be
# added to current revisions as well.
for subdir in subdirs:
knowndate = knowndates.get(subdir.id, None)
if knowndate is not None and knowndate <= revision.date:
# Less or equal since the directory could have been
# added to the outer isochrone frontier when processing
# a different directory's subtree of this very same
# revision.
provenance.directory_add_to_revision(
revision, subdir, os.path.join(prefix, subdir.name)
)
else:
# The revision is out of order. The current directory does not
# belong to the outer isochrone frontier of any previously
# processed revision yet all its children nodes are known. They
# should be re-analyzed (and timestamps eventually updated) and
# current directory updated after them.
stack.append((current, prefix))
directory_update_content(
stack,
provenance,
revision,
current,
prefix,
subdirs=subdirs,
blobs=blobs,
blobdates=blobdates,
)
else:
# Al least one child node is unknown, ie. the current directory is
# inside the isochrone frontier of the current revision. Its child
# nodes should be analyzed and current directory updated after them.
stack.append((current, prefix))
directory_update_content(
stack,
provenance,
revision,
current,
prefix,
subdirs=subdirs,
blobs=blobs,
blobdates=blobdates,
)
else:
# Empty directory. Just consider it to be in the inner frontier of
# current revision (ie. all its children are already "known").
innerdirs[prefix] = (current, revision.date)
elif revision.date < date:
# The revision is out of order. The current directory belongs to the outer
# isochrone frontier of some previously processed revison but current
# revision is earlier. The frontier record should be invalidated, children
# nodes re-analyzed (and timestamps eventually updated) and current
# directory updated after them.
stack.append((current, prefix))
provenance.directory_invalidate_in_isochrone_frontier(current)
directory_update_content(stack, provenance, revision, current, prefix)
else:
# The directory has already been seen on the outer isochrone frontier of an
# earlier revision. Just add it to the current revision.
provenance.directory_add_to_revision(revision, current, prefix)
- # TODO: if processing revisions in parallel, some child nodes to current
- # directory might have been considered as candidates to the outer frontier
- # and have to be discarded now.
-
- # if outerdirs:
- # # This should only happen if the root directory is in the outer frontier.
- # assert len(outerdirs) == 1 and root.name in outerdirs
- # outerdir, outerdate = outerdirs[root.name]
- #
- # provenance.directory_set_date_in_isochrone_frontier(outerdir, outerdate)
- # provenance.directory_add_to_revision(revision, outerdir, root.name)
- # directory_process_content(provenance, directory=outerdir, relative=outerdir)
-
- # Any candidate left at this point should be added to the outer frontier. For
- # instance, the root directory will appear here if all its content is already known.
- for path, (outerdir, outerdate) in outerdirs.items():
- if path != root.name:
- # WARNING: This might lead to duplicated results! Only the root directory
- # should be a candidate at this point of the algorithm.
- logging.warning(f"Adding directory {hash_to_hex(outerdir.id)} with path {path} to the frontier")
-
- provenance.directory_set_date_in_isochrone_frontier(
- outerdir, outerdate
- )
- provenance.directory_add_to_revision(
- revision, outerdir, path
- )
- directory_process_content(
- provenance,
- directory=outerdir,
- relative=outerdir,
- )
+
+ if root.name in outerdirs:
+ # Only the root directory should be considered at this point.
+ outerdir, outerdate = outerdirs[root.name]
+
+ provenance.directory_set_date_in_isochrone_frontier(outerdir, outerdate)
+ provenance.directory_add_to_revision(revision, outerdir, root.name)
+ directory_process_content(provenance, directory=outerdir, relative=outerdir)

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:03 PM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3452567

Event Timeline