diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py index d232d48..19244b2 100644 --- a/swh/provenance/graph.py +++ b/swh/provenance/graph.py @@ -1,280 +1,282 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from datetime import datetime, timezone import logging import os from typing import Any, Dict, Optional, Set from swh.model.hashutil import hash_to_hex from swh.model.model import Sha1Git from .archive import ArchiveInterface from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry UTCMIN = datetime.min.replace(tzinfo=timezone.utc) class HistoryNode: def __init__( self, entry: RevisionEntry, is_head: bool = False, in_history: bool = False ) -> None: self.entry = entry # A revision is `is_head` if it is directly pointed by an origin (ie. a head # revision for some snapshot) self.is_head = is_head # A revision is `in_history` if it appears in the history graph of an already # processed revision in the provenance database self.in_history = in_history # XXX: the current simplified version of the origin-revision layer algorithm # does not use this previous two flags at all. They are kept for now but might # be removed in the future (hence, RevisionEntry might be used instead of # HistoryNode). def __str__(self) -> str: return f"<{self.entry}: is_head={self.is_head}, in_history={self.in_history}>" def as_dict(self) -> Dict[str, Any]: return { "rev": hash_to_hex(self.entry.id), "is_head": self.is_head, "in_history": self.in_history, } class HistoryGraph: def __init__( self, archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, ) -> None: self._head = HistoryNode( revision, is_head=provenance.revision_visited(revision), in_history=provenance.revision_in_history(revision), ) self._graph: Dict[HistoryNode, Set[HistoryNode]] = {} stack = [self._head] while stack: current = stack.pop() if current not in self._graph: self._graph[current] = set() current.entry.retrieve_parents(archive) for parent in current.entry.parents: node = HistoryNode( parent, is_head=provenance.revision_visited(parent), in_history=provenance.revision_in_history(parent), ) self._graph[current].add(node) stack.append(node) @property def head(self) -> HistoryNode: return self._head @property def parents(self) -> Dict[HistoryNode, Set[HistoryNode]]: return self._graph def __str__(self) -> str: return f" Dict[str, Any]: return { "head": self.head.as_dict(), "graph": { hash_to_hex(node.entry.id): sorted( [parent.as_dict() for parent in parents], key=lambda d: d["rev"], ) for node, parents in self._graph.items() }, } class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ) -> None: self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None # known is True if this node is already known in the db; either because # the current directory actually exists in the database, or because all # the content of the current directory is known (subdirectories and files) self.known = self.dbdate is not None self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: Set[IsochroneNode] = set() @property def dbdate(self) -> Optional[datetime]: # use a property to make this attribute (mostly) read-only return self._dbdate def invalidate(self) -> None: self._dbdate = None self.maxdate = None self.known = False self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> IsochroneNode: # we should not be processing this node (ie add subdirectories or files) if it's # actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.add(node) return node def __str__(self) -> str: return ( f"<{self.entry}: depth={self.depth}, " f"dbdate={self.dbdate}, maxdate={self.maxdate}, " f"known={self.known}, invalid={self.invalid}, path={self.path!r}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) def __eq__(self, other: Any) -> bool: return isinstance(other, IsochroneNode) and self.__dict__ == other.__dict__ def __hash__(self) -> int: # only immutable attributes are considered to compute hash return hash((self.entry, self.depth, self.path)) def build_isochrone_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] logging.debug( - f"Recursively creating isochrone graph for revision {revision.id.hex()}..." + "Recursively creating isochrone graph for revision %s...", revision.id.hex() ) fdates: Dict[Sha1Git, datetime] = {} # map {file_id: date} while stack: current = stack.pop() if current.dbdate is None or current.dbdate > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate > revision.date: logging.debug( - f"Invalidating frontier on {current.entry.id.hex()}" - f" (date {current.dbdate})" - f" when processing revision {revision.id.hex()}" - f" (date {revision.date})" + "Invalidating frontier on %s (date %s) " + "when processing revision %s (date %s)", + current.entry.id.hex(), + current.dbdate, + revision.id.hex(), + revision.date, ) current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. current.entry.retrieve_children(archive) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) logging.debug( - f"Isochrone graph for revision {revision.id.hex()} successfully created!" + "Isochrone graph for revision %s successfully created!", revision.id.hex() ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). - logging.debug(f"Computing maxdates for revision {revision.id.hex()}...") + logging.debug("Computing maxdates for revision %s...", revision.id.hex()) stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.known: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ child.maxdate for child in current.children if child.maxdate is not None # unnecessary, but needed for mypy ] + [ fdates.get(file.id, revision.date) for file in current.entry.files ] ) if current.maxdate <= revision.date: current.known = ( # true if all subdirectories are known all(child.known for child in current.children) # true if all files are in fdates, i.e. if all files were known # *before building this isochrone graph node* # Note: the 'all()' is lazy: will stop iterating as soon as # possible and all((file.id in fdates) for file in current.entry.files) ) else: # at least one content is being processed out-of-order, then current # node should be treated as unknown current.maxdate = revision.date current.known = False - logging.debug(f"Maxdates for revision {revision.id.hex()} successfully computed!") + logging.debug("Maxdates for revision %s successfully computed!", revision.id.hex()) return root diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index dcb8e48..ba4acb3 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,107 +1,108 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from itertools import islice import logging import time from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import HistoryGraph from .interface import ProvenanceInterface from .model import OriginEntry, RevisionEntry class CSVOriginIterator: """Iterator over origin visit statuses typically present in the given CSV file. The input is an iterator that produces 2 elements per row: (url, snap) where: - url: is the origin url of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, statuses: Iterable[Tuple[str, Sha1Git]], limit: Optional[int] = None, ) -> None: self.statuses: Iterator[Tuple[str, Sha1Git]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) def __iter__(self) -> Generator[OriginEntry, None, None]: return (OriginEntry(url, snapshot) for url, snapshot in self.statuses) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], ) -> None: start = time.time() for origin in origins: provenance.origin_add(origin) origin.retrieve_revisions(archive) for revision in origin.revisions: graph = HistoryGraph(archive, provenance, revision) origin_add_revision(provenance, origin, graph) done = time.time() provenance.flush() stop = time.time() logging.debug( - "Origins " - ";".join([origin.id.hex() + ":" + origin.snapshot.hex() for origin in origins]) - + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" + "Origins %s were processed in %s secs (commit took %s secs)!", + ";".join(origin.id.hex() + ":" + origin.snapshot.hex() for origin in origins), + stop - start, + stop - done, ) def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, graph: HistoryGraph, ) -> None: # XXX: simplified version of the origin-revision algorithm. This is generating flat # models for the history of all head revisions. No previous result is reused now! # The previous implementation was missing some paths from origins to certain # revisions due to a wrong reuse logic. # head is treated separately since it should always be added to the given origin check_preferred_origin(provenance, origin, graph.head.entry) provenance.revision_add_to_origin(origin, graph.head.entry) visited = {graph.head} # head's history should be recursively iterated starting from its parents stack = list(graph.parents[graph.head]) while stack: current = stack.pop() check_preferred_origin(provenance, origin, current.entry) # create a link between it and the head, and recursively walk its history provenance.revision_add_before_revision(graph.head.entry, current.entry) visited.add(current) for parent in graph.parents[current]: if parent not in visited: stack.append(parent) def check_preferred_origin( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry, ) -> None: # if the revision has no preferred origin just set the given origin as the # preferred one. TODO: this should be improved in the future! preferred = provenance.revision_get_preferred_origin(revision) if preferred is None: provenance.revision_set_preferred_origin(origin, revision) diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index f30cb02..538ab46 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,347 +1,367 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import logging import os from typing import Dict, Generator, Iterable, Optional, Set, Tuple from typing_extensions import Literal, TypedDict from swh.model.model import Sha1Git from .interface import ( ProvenanceResult, ProvenanceStorageInterface, RelationData, RelationType, ) from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry class DatetimeCache(TypedDict): data: Dict[Sha1Git, Optional[datetime]] added: Set[Sha1Git] class OriginCache(TypedDict): data: Dict[Sha1Git, str] added: Set[Sha1Git] class RevisionCache(TypedDict): data: Dict[Sha1Git, Sha1Git] added: Set[Sha1Git] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] # these two are for the origin layer origin: OriginCache revision_origin: RevisionCache revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] def new_cache() -> ProvenanceCache: return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), origin=OriginCache(data={}, added=set()), revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), ) class Provenance: def __init__(self, storage: ProvenanceStorageInterface) -> None: self.storage = storage self.cache = new_cache() def clear_caches(self) -> None: self.cache = new_cache() def flush(self) -> None: # Revision-content layer insertions ############################################ # For this layer, relations need to be inserted first so that, in case of # failure, reprocessing the input does not generated an inconsistent database. - while not self.storage.relation_add( - RelationType.CNT_EARLY_IN_REV, - ( - RelationData(src=src, dst=dst, path=path) - for src, dst, path in self.cache["content_in_revision"] - ), - ): - logging.warning( - f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " - f"Data: {self.cache['content_in_revision']}. Retrying..." - ) - - while not self.storage.relation_add( - RelationType.CNT_IN_DIR, - ( - RelationData(src=src, dst=dst, path=path) - for src, dst, path in self.cache["content_in_directory"] - ), - ): - logging.warning( - f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " - f"Data: {self.cache['content_in_directory']}. Retrying..." - ) - - while not self.storage.relation_add( - RelationType.DIR_IN_REV, - ( - RelationData(src=src, dst=dst, path=path) - for src, dst, path in self.cache["directory_in_revision"] - ), - ): - logging.warning( - f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " - f"Data: {self.cache['directory_in_revision']}. Retrying..." - ) + if self.cache["content_in_revision"]: + while not self.storage.relation_add( + RelationType.CNT_EARLY_IN_REV, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["content_in_revision"] + ), + ): + logging.warning( + "Unable to write %s rows to the storage. Data: %s. Retrying...", + RelationType.CNT_EARLY_IN_REV, + self.cache["content_in_revision"], + ) + + if self.cache["content_in_directory"]: + while not self.storage.relation_add( + RelationType.CNT_IN_DIR, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["content_in_directory"] + ), + ): + logging.warning( + "Unable to write %s rows to the storage. Data: %s. Retrying...", + RelationType.CNT_IN_DIR, + self.cache["content_in_directory"], + ) + + if self.cache["directory_in_revision"]: + while not self.storage.relation_add( + RelationType.DIR_IN_REV, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["directory_in_revision"] + ), + ): + logging.warning( + "Unable to write %s rows to the storage. Data: %s. Retrying...", + RelationType.DIR_IN_REV, + self.cache["directory_in_revision"], + ) # After relations, dates for the entities can be safely set, acknowledging that # these entities won't need to be reprocessed in case of failure. dates = { sha1: date for sha1, date in self.cache["content"]["data"].items() if sha1 in self.cache["content"]["added"] and date is not None } - while not self.storage.content_set_date(dates): - logging.warning( - f"Unable to write content dates to the storage. " - f"Data: {dates}. Retrying..." - ) + if dates: + while not self.storage.content_set_date(dates): + logging.warning( + "Unable to write content dates to the storage. " + "Data: %s. Retrying...", + dates, + ) dates = { sha1: date for sha1, date in self.cache["directory"]["data"].items() if sha1 in self.cache["directory"]["added"] and date is not None } - while not self.storage.directory_set_date(dates): - logging.warning( - f"Unable to write directory dates to the storage. " - f"Data: {dates}. Retrying..." - ) + if dates: + while not self.storage.directory_set_date(dates): + logging.warning( + "Unable to write directory dates to the storage. " + "Data: %s. Retrying...", + dates, + ) dates = { sha1: date for sha1, date in self.cache["revision"]["data"].items() if sha1 in self.cache["revision"]["added"] and date is not None } - while not self.storage.revision_set_date(dates): - logging.warning( - f"Unable to write revision dates to the storage. " - f"Data: {dates}. Retrying..." - ) + if dates: + while not self.storage.revision_set_date(dates): + logging.warning( + "Unable to write revision dates to the storage. " + "Data: %s. Retrying...", + dates, + ) # Origin-revision layer insertions ############################################# # Origins urls should be inserted first so that internal ids' resolution works # properly. urls = { sha1: url for sha1, url in self.cache["origin"]["data"].items() if sha1 in self.cache["origin"]["added"] } - while not self.storage.origin_set_url(urls): - logging.warning( - f"Unable to write origins urls to the storage. " - f"Data: {urls}. Retrying..." - ) + if urls: + while not self.storage.origin_set_url(urls): + logging.warning( + "Unable to write origins urls to the storage. " + "Data: %s. Retrying...", + urls, + ) # Second, flat models for revisions' histories (ie. revision-before-revision). data: Iterable[RelationData] = sum( [ [ RelationData(src=prev, dst=next, path=None) for next in self.cache["revision_before_revision"][prev] ] for prev in self.cache["revision_before_revision"] ], [], ) - while not self.storage.relation_add(RelationType.REV_BEFORE_REV, data): - logging.warning( - f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " - f"Data: {data}. Retrying..." - ) + if data: + while not self.storage.relation_add(RelationType.REV_BEFORE_REV, data): + logging.warning( + "Unable to write %s rows to the storage. Data: %s. Retrying...", + RelationType.REV_BEFORE_REV, + data, + ) # Heads (ie. revision-in-origin entries) should be inserted once flat models for # their histories were already added. This is to guarantee consistent results if # something needs to be reprocessed due to a failure: already inserted heads # won't get reprocessed in such a case. data = ( RelationData(src=rev, dst=org, path=None) for rev, org in self.cache["revision_in_origin"] ) - while not self.storage.relation_add(RelationType.REV_IN_ORG, data): - logging.warning( - f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " - f"Data: {data}. Retrying..." - ) + if data: + while not self.storage.relation_add(RelationType.REV_IN_ORG, data): + logging.warning( + "Unable to write %s rows to the storage. Data: %s. Retrying...", + RelationType.REV_IN_ORG, + data, + ) # Finally, preferred origins for the visited revisions are set (this step can be # reordered if required). origins = { sha1: self.cache["revision_origin"]["data"][sha1] for sha1 in self.cache["revision_origin"]["added"] } - while not self.storage.revision_set_origin(origins): - logging.warning( - f"Unable to write preferred origins to the storage. " - f"Data: {origins}. Retrying..." - ) + if origins: + while not self.storage.revision_set_origin(origins): + logging.warning( + "Unable to write preferred origins to the storage. " + "Data: %s. Retrying...", + origins, + ) # clear local cache ############################################################ self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: if entity == "revision": updated = { id: rev.date for id, rev in self.storage.revision_get(missing_ids).items() } else: updated = getattr(self.storage, f"{entity}_get")(missing_ids) cache["data"].update(updated) dates: Dict[Sha1Git, datetime] = {} for sha1 in ids: date = cache["data"].setdefault(sha1, None) if date is not None: dates[sha1] = date return dates def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry) -> None: self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: self.cache["revision_before_revision"].setdefault(revision.id, set()).add( head.id ) def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: cache = self.cache["revision_origin"]["data"] if revision.id not in cache: ret = self.storage.revision_get([revision.id]) if revision.id in ret: origin = ret[revision.id].origin if origin is not None: cache[revision.id] = origin return cache.get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: return revision.id in self.cache["revision_before_revision"] or bool( self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) ) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_origin"]["data"][revision.id] = origin.id self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: return revision.id in dict(self.cache["revision_in_origin"]) or bool( self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) ) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 1a87573..fb4b918 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,247 +1,251 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import logging import os import time from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import IsochroneNode, build_isochrone_graph from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, ) -> None: self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: from itertools import islice self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self) -> Generator[RevisionEntry, None, None]: for id, date, root in self.revisions: if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) yield RevisionEntry(id, date=date, root=root) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, commit: bool = True, ) -> None: start = time.time() for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_date(revision) if date is None or revision.date < date: logging.debug( - f"Processing revisions {revision.id.hex()}" - f" (known date {date} / revision date {revision.date})..." + "Processing revisions %s (known date %s / revision date %s)...", + revision.id.hex(), + date, + revision.date, ) graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), ) # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, ) done = time.time() if commit: provenance.flush() stop = time.time() logging.debug( - f"Revisions {';'.join([revision.id.hex() for revision in revisions])} " - f" were processed in {stop - start} secs (commit took {stop - done} secs)!" + "Revisions %s were processed in %s secs (commit took %s secs)!", + ";".join(revision.id.hex() for revision in revisions), + stop - start, + stop - done, ) def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> None: assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) flatten_directory(archive, provenance, current.entry) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() current.retrieve_children(archive) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: # The only real condition for a directory to be a frontier is that its # content is already known and its maxdate is less (or equal) than # current revision's date. Checking mindepth is meant to skip root # directories (or any arbitrary depth) to improve the result. The # option lower tries to maximize the reusage rate of previously defined # frontiers by keeping them low in the directory tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an extrictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # )