Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/provenance.py
import os | import os | ||||
from datetime import datetime | |||||
from typing import Dict, Generator, List, Optional, Tuple | |||||
from .archive import ArchiveInterface | from .archive import ArchiveInterface | ||||
from .model import DirectoryEntry, FileEntry, TreeEntry | from .model import DirectoryEntry, FileEntry, TreeEntry | ||||
from .origin import OriginEntry | from .origin import OriginEntry | ||||
from .revision import RevisionEntry | from .revision import RevisionEntry | ||||
from datetime import datetime | |||||
from typing import Dict, Generator, List, Optional, Tuple | |||||
# TODO: consider moving to path utils file together with normalize. | # TODO: consider moving to path utils file together with normalize. | ||||
def is_child(path: bytes, prefix: bytes) -> bool: | def is_child(path: bytes, prefix: bytes) -> bool: | ||||
return path != prefix and os.path.dirname(path) == prefix | return path != prefix and os.path.dirname(path) == prefix | ||||
class ProvenanceInterface: | class ProvenanceInterface: | ||||
def __init__(self, **kwargs): | def __init__(self, **kwargs): | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | ): | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): | def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: | def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def revision_get_prefered_origin(self, revision: RevisionEntry) -> int: | def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def revision_in_history(self, revision: RevisionEntry) -> bool: | def revision_in_history(self, revision: RevisionEntry) -> bool: | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def revision_set_prefered_origin( | def revision_set_preferred_origin( | ||||
self, origin: OriginEntry, revision: RevisionEntry | self, origin: OriginEntry, revision: RevisionEntry | ||||
): | ): | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def revision_visited(self, revision: RevisionEntry) -> bool: | def revision_visited(self, revision: RevisionEntry) -> bool: | ||||
raise NotImplementedError | raise NotImplementedError | ||||
Show All 25 Lines | |||||
def origin_add_revision( | def origin_add_revision( | ||||
provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry | provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry | ||||
): | ): | ||||
stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] | stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] | ||||
while stack: | while stack: | ||||
relative, current = stack.pop() | relative, current = stack.pop() | ||||
# Check if current revision has no prefered origin and update if necessary. | # Check if current revision has no preferred origin and update if necessary. | ||||
prefered = provenance.revision_get_prefered_origin(current) | preferred = provenance.revision_get_preferred_origin(current) | ||||
if prefered is None: | if preferred is None: | ||||
provenance.revision_set_prefered_origin(origin, current) | provenance.revision_set_preferred_origin(origin, current) | ||||
######################################################################## | ######################################################################## | ||||
if relative is None: | if relative is None: | ||||
# This revision is pointed directly by the origin. | # This revision is pointed directly by the origin. | ||||
visited = provenance.revision_visited(current) | visited = provenance.revision_visited(current) | ||||
provenance.revision_add_to_origin(origin, current) | provenance.revision_add_to_origin(origin, current) | ||||
if not visited: | if not visited: | ||||
▲ Show 20 Lines • Show All 150 Lines • ▼ Show 20 Lines | while stack: | ||||
# Outer frontier should be moved to current position in the isochrone | # Outer frontier should be moved to current position in the isochrone | ||||
# graph. This is the first time this directory is found in the isochrone | # graph. This is the first time this directory is found in the isochrone | ||||
# frontier. | # frontier. | ||||
provenance.directory_set_date_in_isochrone_frontier( | provenance.directory_set_date_in_isochrone_frontier( | ||||
current.entry, current.maxdate | current.entry, current.maxdate | ||||
) | ) | ||||
provenance.directory_add_to_revision(revision, current.entry, path) | provenance.directory_add_to_revision(revision, current.entry, path) | ||||
directory_process_content( | directory_process_content( | ||||
provenance, | provenance, directory=current.entry, relative=current.entry, | ||||
directory=current.entry, | |||||
relative=current.entry, | |||||
) | ) | ||||
else: | else: | ||||
# No point moving the frontier here. Either there are no files or they | # No point moving the frontier here. Either there are no files or they | ||||
# are being seen for the first time here. Add all blobs to current | # are being seen for the first time here. Add all blobs to current | ||||
# revision updating date if necessary, and recursively analyse | # revision updating date if necessary, and recursively analyse | ||||
# subdirectories as canditates to the outer frontier. | # subdirectories as canditates to the outer frontier. | ||||
for child in current.children: | for child in current.children: | ||||
if isinstance(child.entry, FileEntry): | if isinstance(child.entry, FileEntry): | ||||
Show All 26 Lines |