Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/provenance.py
from datetime import datetime, timezone | from datetime import datetime, timezone | ||||
import os | import os | ||||
from typing import Dict, Generator, List, Optional, Tuple | from typing import Dict, Generator, List, Optional, Tuple | ||||
from typing_extensions import Protocol, runtime_checkable | from typing_extensions import Protocol, runtime_checkable | ||||
from .archive import ArchiveInterface | from .archive import ArchiveInterface | ||||
from .model import DirectoryEntry, FileEntry, TreeEntry | from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry | ||||
from .origin import OriginEntry | |||||
from .revision import RevisionEntry | |||||
UTCMIN = datetime.min.replace(tzinfo=timezone.utc) | UTCMIN = datetime.min.replace(tzinfo=timezone.utc) | ||||
@runtime_checkable | @runtime_checkable | ||||
class ProvenanceInterface(Protocol): | class ProvenanceInterface(Protocol): | ||||
def commit(self): | def commit(self): | ||||
"""Commit currently ongoing transactions in the backend DB""" | """Commit currently ongoing transactions in the backend DB""" | ||||
▲ Show 20 Lines • Show All 83 Lines • ▼ Show 20 Lines | class ProvenanceInterface(Protocol): | ||||
) -> None: | ) -> None: | ||||
... | ... | ||||
def revision_visited(self, revision: RevisionEntry) -> bool: | def revision_visited(self, revision: RevisionEntry) -> bool: | ||||
... | ... | ||||
def directory_process_content( | def directory_process_content( | ||||
provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry | archive: ArchiveInterface, | ||||
provenance: ProvenanceInterface, | |||||
directory: DirectoryEntry, | |||||
relative: DirectoryEntry, | |||||
) -> None: | ) -> None: | ||||
stack = [(directory, b"")] | stack = [(directory, b"")] | ||||
while stack: | while stack: | ||||
current, prefix = stack.pop() | current, prefix = stack.pop() | ||||
for child in iter(current): | for child in current.ls(archive): | ||||
if isinstance(child, FileEntry): | if isinstance(child, FileEntry): | ||||
# Add content to the relative directory with the computed prefix. | # Add content to the relative directory with the computed prefix. | ||||
provenance.content_add_to_directory(relative, child, prefix) | provenance.content_add_to_directory(relative, child, prefix) | ||||
else: | elif isinstance(child, DirectoryEntry): | ||||
# Recursively walk the child directory. | # Recursively walk the child directory. | ||||
stack.append((child, os.path.join(prefix, child.name))) | stack.append((child, os.path.join(prefix, child.name))) | ||||
def origin_add(provenance: ProvenanceInterface, origin: OriginEntry) -> None: | def origin_add( | ||||
archive: ArchiveInterface, provenance: ProvenanceInterface, origin: OriginEntry | |||||
) -> None: | |||||
# TODO: refactor to iterate over origin visit statuses and commit only once | # TODO: refactor to iterate over origin visit statuses and commit only once | ||||
# per status. | # per status. | ||||
origin.id = provenance.origin_get_id(origin) | origin.id = provenance.origin_get_id(origin) | ||||
for revision in origin.revisions: | for revision in origin.revisions: | ||||
origin_add_revision(provenance, origin, revision) | origin_add_revision(archive, provenance, origin, revision) | ||||
# Commit after each revision | # Commit after each revision | ||||
provenance.commit() # TODO: verify this! | provenance.commit() # TODO: verify this! | ||||
def origin_add_revision( | def origin_add_revision( | ||||
provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry | archive: ArchiveInterface, | ||||
provenance: ProvenanceInterface, | |||||
origin: OriginEntry, | |||||
revision: RevisionEntry, | |||||
) -> None: | ) -> None: | ||||
stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] | stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] | ||||
while stack: | while stack: | ||||
relative, current = stack.pop() | relative, current = stack.pop() | ||||
# Check if current revision has no preferred origin and update if necessary. | # Check if current revision has no preferred origin and update if necessary. | ||||
preferred = provenance.revision_get_preferred_origin(current) | preferred = provenance.revision_get_preferred_origin(current) | ||||
if preferred is None: | if preferred is None: | ||||
provenance.revision_set_preferred_origin(origin, current) | provenance.revision_set_preferred_origin(origin, current) | ||||
######################################################################## | ######################################################################## | ||||
if relative is None: | if relative is None: | ||||
# This revision is pointed directly by the origin. | # This revision is pointed directly by the origin. | ||||
visited = provenance.revision_visited(current) | visited = provenance.revision_visited(current) | ||||
provenance.revision_add_to_origin(origin, current) | provenance.revision_add_to_origin(origin, current) | ||||
if not visited: | if not visited: | ||||
stack.append((current, current)) | stack.append((current, current)) | ||||
else: | else: | ||||
# This revision is a parent of another one in the history of the | # This revision is a parent of another one in the history of the | ||||
# relative revision. | # relative revision. | ||||
for parent in iter(current): | for parent in current.parents(archive): | ||||
visited = provenance.revision_visited(parent) | visited = provenance.revision_visited(parent) | ||||
if not visited: | if not visited: | ||||
# The parent revision has never been seen before pointing | # The parent revision has never been seen before pointing | ||||
# directly to an origin. | # directly to an origin. | ||||
known = provenance.revision_in_history(parent) | known = provenance.revision_in_history(parent) | ||||
if known: | if known: | ||||
Show All 24 Lines | ) -> None: | ||||
assert revision.date is not None | assert revision.date is not None | ||||
assert revision.root is not None | assert revision.root is not None | ||||
# Processed content starting from the revision's root directory. | # Processed content starting from the revision's root directory. | ||||
date = provenance.revision_get_early_date(revision) | date = provenance.revision_get_early_date(revision) | ||||
if date is None or revision.date < date: | if date is None or revision.date < date: | ||||
provenance.revision_add(revision) | provenance.revision_add(revision) | ||||
# TODO: add file size filtering | # TODO: add file size filtering | ||||
revision_process_content( | revision_process_content( | ||||
archive, | |||||
provenance, | provenance, | ||||
revision, | revision, | ||||
DirectoryEntry(archive, revision.root, b""), | DirectoryEntry(revision.root, b""), | ||||
lower=lower, | lower=lower, | ||||
mindepth=mindepth, | mindepth=mindepth, | ||||
) | ) | ||||
# TODO: improve this! Maybe using a max attempt counter? | # TODO: improve this! Maybe using a max attempt counter? | ||||
# Ideally Provenance class should guarantee that a commit never fails. | # Ideally Provenance class should guarantee that a commit never fails. | ||||
while not provenance.commit(): | while not provenance.commit(): | ||||
continue | continue | ||||
class IsochroneNode: | class IsochroneNode: | ||||
def __init__( | def __init__( | ||||
self, entry: TreeEntry, dates: Dict[bytes, datetime] = {}, depth: int = 0 | self, entry: DirectoryEntry, dates: Dict[bytes, datetime] = {}, depth: int = 0 | ||||
): | ): | ||||
self.entry = entry | self.entry = entry | ||||
self.depth = depth | self.depth = depth | ||||
self.date = dates.get(self.entry.id, None) | self.date = dates.get(self.entry.id, None) | ||||
self.children: List[IsochroneNode] = [] | self.children: List[IsochroneNode] = [] | ||||
self.maxdate: Optional[datetime] = None | self.maxdate: Optional[datetime] = None | ||||
def add_child( | def add_child( | ||||
self, child: TreeEntry, dates: Dict[bytes, datetime] = {} | self, child: DirectoryEntry, dates: Dict[bytes, datetime] = {} | ||||
) -> "IsochroneNode": | ) -> "IsochroneNode": | ||||
assert isinstance(self.entry, DirectoryEntry) and self.date is None | assert isinstance(self.entry, DirectoryEntry) and self.date is None | ||||
node = IsochroneNode(child, dates=dates, depth=self.depth + 1) | node = IsochroneNode(child, dates=dates, depth=self.depth + 1) | ||||
self.children.append(node) | self.children.append(node) | ||||
return node | return node | ||||
def build_isochrone_graph( | def build_isochrone_graph( | ||||
provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry | archive: ArchiveInterface, | ||||
provenance: ProvenanceInterface, | |||||
revision: RevisionEntry, | |||||
directory: DirectoryEntry, | |||||
) -> IsochroneNode: | ) -> IsochroneNode: | ||||
assert revision.date is not None | assert revision.date is not None | ||||
assert revision.root == directory.id | |||||
# Build the nodes structure | # Build the nodes structure | ||||
root = IsochroneNode(directory) | root = IsochroneNode(directory) | ||||
root.date = provenance.directory_get_date_in_isochrone_frontier(directory) | root.date = provenance.directory_get_date_in_isochrone_frontier(directory) | ||||
stack = [root] | stack = [root] | ||||
while stack: | while stack: | ||||
current = stack.pop() | current = stack.pop() | ||||
assert isinstance(current.entry, DirectoryEntry) | assert isinstance(current.entry, DirectoryEntry) | ||||
if current.date is None or current.date >= revision.date: | if current.date is None or current.date >= revision.date: | ||||
# If current directory has an associated date in the isochrone frontier that | # If current directory has an associated date in the isochrone frontier that | ||||
# is greater or equal to the current revision's one, it should be ignored as | # is greater or equal to the current revision's one, it should be ignored as | ||||
# the revision is being processed out of order. | # the revision is being processed out of order. | ||||
if current.date is not None and current.date >= revision.date: | if current.date is not None and current.date >= revision.date: | ||||
provenance.directory_invalidate_in_isochrone_frontier(current.entry) | provenance.directory_invalidate_in_isochrone_frontier(current.entry) | ||||
current.date = None | current.date = None | ||||
# Pre-query all known dates for content/directories in the current directory | # Pre-query all known dates for content/directories in the current directory | ||||
# for the provenance object to have them cached and (potentially) improve | # for the provenance object to have them cached and (potentially) improve | ||||
# performance. | # performance. | ||||
ddates = provenance.directory_get_dates_in_isochrone_frontier( | ddates = provenance.directory_get_dates_in_isochrone_frontier( | ||||
[child for child in current.entry if isinstance(child, DirectoryEntry)] | [ | ||||
child | |||||
for child in current.entry.ls(archive) | |||||
if isinstance(child, DirectoryEntry) | |||||
] | |||||
) | ) | ||||
fdates = provenance.content_get_early_dates( | fdates = provenance.content_get_early_dates( | ||||
[child for child in current.entry if isinstance(child, FileEntry)] | [ | ||||
child | |||||
for child in current.entry.ls(archive) | |||||
if isinstance(child, FileEntry) | |||||
] | |||||
) | ) | ||||
for child in current.entry: | for child in current.entry.ls(archive): | ||||
# Recursively analyse directory nodes. | # Recursively analyse directory nodes. | ||||
if isinstance(child, DirectoryEntry): | if isinstance(child, DirectoryEntry): | ||||
node = current.add_child(child, dates=ddates) | node = current.add_child(child, dates=ddates) | ||||
stack.append(node) | stack.append(node) | ||||
else: | else: | ||||
current.add_child(child, dates=fdates) | current.add_child(child, dates=fdates) | ||||
# Precalculate max known date for each node in the graph. | # Precalculate max known date for each node in the graph. | ||||
stack = [root] | stack = [root] | ||||
Show All 25 Lines | while stack: | ||||
current.maxdate = max(maxdates) if maxdates else UTCMIN | current.maxdate = max(maxdates) if maxdates else UTCMIN | ||||
else: | else: | ||||
# Directory node in the frontier, just use its known date. | # Directory node in the frontier, just use its known date. | ||||
current.maxdate = current.date | current.maxdate = current.date | ||||
return root | return root | ||||
def revision_process_content( | def revision_process_content( | ||||
archive: ArchiveInterface, | |||||
provenance: ProvenanceInterface, | provenance: ProvenanceInterface, | ||||
revision: RevisionEntry, | revision: RevisionEntry, | ||||
root: DirectoryEntry, | root: DirectoryEntry, | ||||
lower: bool = True, | lower: bool = True, | ||||
mindepth: int = 1, | mindepth: int = 1, | ||||
): | ): | ||||
assert revision.date is not None | assert revision.date is not None | ||||
stack = [(build_isochrone_graph(provenance, revision, root), root.name)] | stack = [(build_isochrone_graph(archive, provenance, revision, root), root.name)] | ||||
while stack: | while stack: | ||||
current, path = stack.pop() | current, path = stack.pop() | ||||
assert isinstance(current.entry, DirectoryEntry) | assert isinstance(current.entry, DirectoryEntry) | ||||
if current.date is not None: | if current.date is not None: | ||||
assert current.date < revision.date | assert current.date < revision.date | ||||
# Current directory is an outer isochrone frontier for a previously | # Current directory is an outer isochrone frontier for a previously | ||||
# processed revision. It should be reused as is. | # processed revision. It should be reused as is. | ||||
provenance.directory_add_to_revision(revision, current.entry, path) | provenance.directory_add_to_revision(revision, current.entry, path) | ||||
else: | else: | ||||
# Current directory is not an outer isochrone frontier for any previous | # Current directory is not an outer isochrone frontier for any previous | ||||
# revision. It might be eligible for this one. | # revision. It might be eligible for this one. | ||||
if is_new_frontier(current, revision, lower=lower, mindepth=mindepth): | if is_new_frontier(current, revision, lower=lower, mindepth=mindepth): | ||||
assert current.maxdate is not None | assert current.maxdate is not None | ||||
# Outer frontier should be moved to current position in the isochrone | # Outer frontier should be moved to current position in the isochrone | ||||
# graph. This is the first time this directory is found in the isochrone | # graph. This is the first time this directory is found in the isochrone | ||||
# frontier. | # frontier. | ||||
provenance.directory_set_date_in_isochrone_frontier( | provenance.directory_set_date_in_isochrone_frontier( | ||||
current.entry, current.maxdate | current.entry, current.maxdate | ||||
) | ) | ||||
provenance.directory_add_to_revision(revision, current.entry, path) | provenance.directory_add_to_revision(revision, current.entry, path) | ||||
directory_process_content( | directory_process_content( | ||||
provenance, directory=current.entry, relative=current.entry, | archive, | ||||
provenance, | |||||
directory=current.entry, | |||||
relative=current.entry, | |||||
) | ) | ||||
else: | else: | ||||
# No point moving the frontier here. Either there are no files or they | # No point moving the frontier here. Either there are no files or they | ||||
# are being seen for the first time here. Add all blobs to current | # are being seen for the first time here. Add all blobs to current | ||||
# revision updating date if necessary, and recursively analyse | # revision updating date if necessary, and recursively analyse | ||||
# subdirectories as candidates to the outer frontier. | # subdirectories as candidates to the outer frontier. | ||||
for child in current.children: | for child in current.children: | ||||
if isinstance(child.entry, FileEntry): | if isinstance(child.entry, FileEntry): | ||||
▲ Show 20 Lines • Show All 54 Lines • Show Last 20 Lines |