diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py index ad7e8d3..a86ef69 100644 --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,53 +1,56 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable +from typing import Any, Dict, Iterable, Tuple from typing_extensions import Protocol, runtime_checkable from swh.model.model import Sha1Git from swh.storage.interface import StorageInterface @runtime_checkable class ArchiveInterface(Protocol): storage: StorageInterface def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: """List entries for one directory. Args: id: sha1 id of the directory to list entries from. Yields: dictionary of entries in such directory containing only the keys "name", "target" and "type". """ ... - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: - """List parents of one revision. + def revision_get_some_outbound_edges( + self, id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: + """List some outbound edges from one revision. For each revision listed, + all of its outbound edges must be returned. Args: - revisions: sha1 id of the revision to list parents from. + id: sha1 id of the revision to list parents from. - Yields: - sha1 ids for the parents of such revision. + Returns: + list of edges (revision_id, parent_revision_id) """ ... def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: """List all revisions targeted by one snapshot. Args: id: sha1 id of the snapshot. Yields: sha1 ids of revisions that are a target of such snapshot. """ ... diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py index 2293b22..66704f8 100644 --- a/swh/provenance/graph.py +++ b/swh/provenance/graph.py @@ -1,216 +1,215 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from datetime import datetime, timezone import os from typing import Any, Dict, Optional, Set from swh.core.statsd import statsd -from swh.model.hashutil import hash_to_hex from swh.model.model import Sha1Git from .archive import ArchiveInterface from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry GRAPH_DURATION_METRIC = "swh_provenance_graph_duration_seconds" GRAPH_OPERATIONS_METRIC = "swh_provenance_graph_operations_total" UTCMIN = datetime.min.replace(tzinfo=timezone.utc) class HistoryGraph: @statsd.timed(metric=GRAPH_DURATION_METRIC, tags={"method": "build_history_graph"}) def __init__( self, archive: ArchiveInterface, revision: RevisionEntry, ) -> None: - self._head = revision - self._graph: Dict[RevisionEntry, Set[RevisionEntry]] = {} + self.head_id = revision.id + self._nodes: Set[Sha1Git] = set() + # rev -> set(parents) + self._edges: Dict[Sha1Git, Set[Sha1Git]] = {} - stack = [self._head] + stack = {self.head_id} while stack: current = stack.pop() - if current not in self._graph: - self._graph[current] = set() - current.retrieve_parents(archive) - for parent in current.parents: - self._graph[current].add(parent) - stack.append(parent) + if current not in self._nodes: + self._nodes.add(current) + self._edges.setdefault(current, set()) + for rev, parent in archive.revision_get_some_outbound_edges(current): + self._nodes.add(rev) + self._edges.setdefault(rev, set()).add(parent) + stack.add(parent) - @property - def head(self) -> RevisionEntry: - return self._head + # don't process nodes for which we've already retrieved outbound edges + stack -= self._nodes - @property - def parents(self) -> Dict[RevisionEntry, Set[RevisionEntry]]: - return self._graph + def parent_ids(self) -> Set[Sha1Git]: + """Get all the known parent ids in the current graph""" + return self._nodes - {self.head_id} def __str__(self) -> str: - return f" Dict[str, Any]: return { - "head": hash_to_hex(self.head.id), + "head": self.head_id.hex(), "graph": { - hash_to_hex(node.id): sorted( - [hash_to_hex(parent.id) for parent in parents] - ) - for node, parents in self._graph.items() + node.hex(): sorted(parent.hex() for parent in parents) + for node, parents in self._edges.items() }, } class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ) -> None: self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: Set[IsochroneNode] = set() @property def dbdate(self) -> Optional[datetime]: # use a property to make this attribute (mostly) read-only return self._dbdate def invalidate(self) -> None: statsd.increment( metric=GRAPH_OPERATIONS_METRIC, tags={"method": "invalidate_frontier"} ) self._dbdate = None self.maxdate = None self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> IsochroneNode: # we should not be processing this node (ie add subdirectories or files) if it's # actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.add(node) return node def __str__(self) -> str: return ( f"<{self.entry}: depth={self.depth}, dbdate={self.dbdate}, " f"maxdate={self.maxdate}, invalid={self.invalid}, path={self.path!r}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) def __eq__(self, other: Any) -> bool: return isinstance(other, IsochroneNode) and self.__dict__ == other.__dict__ def __hash__(self) -> int: # only immutable attributes are considered to compute hash return hash((self.entry, self.depth, self.path)) @statsd.timed(metric=GRAPH_DURATION_METRIC, tags={"method": "build_isochrone_graph"}) def build_isochrone_graph( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, directory: DirectoryEntry, minsize: int = 0, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] fdates: Dict[Sha1Git, datetime] = {} # map {file_id: date} while stack: current = stack.pop() if current.dbdate is None or current.dbdate >= revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate >= revision.date: current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. current.entry.retrieve_children(archive, minsize=minsize) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.dbdate is not None: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ child.maxdate for child in current.children if child.maxdate is not None # for mypy ] + [ fdates.get(file.id, revision.date) for file in current.entry.files ] ) return root diff --git a/swh/provenance/interface.py b/swh/provenance/interface.py index 81f62b4..612df4c 100644 --- a/swh/provenance/interface.py +++ b/swh/provenance/interface.py @@ -1,402 +1,400 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from dataclasses import dataclass from datetime import datetime import enum from types import TracebackType from typing import Dict, Generator, Iterable, List, Optional, Set, Type, Union from typing_extensions import Protocol, runtime_checkable from swh.core.api import remote_api_endpoint from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry class EntityType(enum.Enum): CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" ORIGIN = "origin" class RelationType(enum.Enum): CNT_EARLY_IN_REV = "content_in_revision" CNT_IN_DIR = "content_in_directory" DIR_IN_REV = "directory_in_revision" REV_IN_ORG = "revision_in_origin" REV_BEFORE_REV = "revision_before_revision" @dataclass(eq=True, frozen=True) class ProvenanceResult: content: Sha1Git revision: Sha1Git date: datetime origin: Optional[str] path: bytes @dataclass(eq=True, frozen=True) class DirectoryData: """Object representing the data associated to a directory in the provenance model, where `date` is the date of the directory in the isochrone frontier, and `flat` is a flag acknowledging that a flat model for the elements outside the frontier has already been created. """ date: datetime flat: bool @dataclass(eq=True, frozen=True) class RevisionData: """Object representing the data associated to a revision in the provenance model, where `date` is the optional date of the revision (specifying it acknowledges that the revision was already processed by the revision-content algorithm); and `origin` identifies the preferred origin for the revision, if any. """ date: Optional[datetime] origin: Optional[Sha1Git] @dataclass(eq=True, frozen=True) class RelationData: """Object representing a relation entry in the provenance model, where `src` and `dst` are the sha1 ids of the entities being related, and `path` is optional depending on the relation being represented. """ dst: Sha1Git path: Optional[bytes] @runtime_checkable class ProvenanceStorageInterface(Protocol): def __enter__(self) -> ProvenanceStorageInterface: ... def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: ... @remote_api_endpoint("close") def close(self) -> None: """Close connection to the storage and release resources.""" ... @remote_api_endpoint("content_add") def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool: """Add blobs identified by sha1 ids, with an associated date (as paired in `cnts`) to the provenance storage. Return a boolean stating whether the information was successfully stored. """ ... @remote_api_endpoint("content_find_first") def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... @remote_api_endpoint("content_find_all") def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... @remote_api_endpoint("content_get") def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: """Retrieve the associated date for each blob sha1 in `ids`.""" ... @remote_api_endpoint("directory_add") def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool: """Add directories identified by sha1 ids, with associated date and (optional) flatten flag (as paired in `dirs`) to the provenance storage. If the flatten flag is set to None, the previous value present in the storage is preserved. Return a boolean stating if the information was successfully stored. """ ... @remote_api_endpoint("directory_get") def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]: """Retrieve the associated date and (optional) flatten flag for each directory sha1 in `ids`. If some directories has no associated date, it is not present in the resulting dictionary. """ ... @remote_api_endpoint("directory_iter_not_flattenned") def directory_iter_not_flattenned( self, limit: int, start_id: Sha1Git ) -> List[Sha1Git]: """Retrieve the unflattenned directories after ``start_id`` up to ``limit`` entries.""" ... @remote_api_endpoint("entity_get_all") def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: """Retrieve all sha1 ids for entities of type `entity` present in the provenance model. This method is used only in tests. """ ... @remote_api_endpoint("location_add") def location_add(self, paths: Iterable[bytes]) -> bool: """Register the given `paths` in the storage.""" ... @remote_api_endpoint("location_get_all") def location_get_all(self) -> Set[bytes]: """Retrieve all paths present in the provenance model. This method is used only in tests.""" ... @remote_api_endpoint("open") def open(self) -> None: """Open connection to the storage and allocate necessary resources.""" ... @remote_api_endpoint("origin_add") def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: """Add origins identified by sha1 ids, with their corresponding url (as paired in `orgs`) to the provenance storage. Return a boolean stating if the information was successfully stored. """ ... @remote_api_endpoint("origin_get") def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: """Retrieve the associated url for each origin sha1 in `ids`.""" ... @remote_api_endpoint("revision_add") def revision_add( self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] ) -> bool: """Add revisions identified by sha1 ids, with optional associated date or origin (as paired in `revs`) to the provenance storage. Return a boolean stating if the information was successfully stored. """ ... @remote_api_endpoint("revision_get") def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: """Retrieve the associated date and origin for each revision sha1 in `ids`. If some revision has no associated date nor origin, it is not present in the resulting dictionary. """ ... @remote_api_endpoint("relation_add") def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: """Add entries in the selected `relation`. This method assumes all entities being related are already registered in the storage. See `content_add`, `directory_add`, `origin_add`, and `revision_add`. """ ... @remote_api_endpoint("relation_get") def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Dict[Sha1Git, Set[RelationData]]: """Retrieve all entries in the selected `relation` whose source entities are identified by some sha1 id in `ids`. If `reverse` is set, destination entities are matched instead. """ ... @remote_api_endpoint("relation_get_all") def relation_get_all( self, relation: RelationType ) -> Dict[Sha1Git, Set[RelationData]]: """Retrieve all entries in the selected `relation` that are present in the provenance model. This method is used only in tests. """ ... @remote_api_endpoint("with_path") def with_path(self) -> bool: ... @runtime_checkable class ProvenanceInterface(Protocol): storage: ProvenanceStorageInterface def __enter__(self) -> ProvenanceInterface: ... def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: ... def close(self) -> None: """Close connection to the underlying `storage` and release resources.""" ... def flush(self) -> None: """Flush internal cache to the underlying `storage`.""" ... def flush_if_necessary(self) -> bool: """Flush internal cache to the underlying `storage`, if the cache reached a threshold (MAX_CACHE_ELEMENTS). Return True if the cache is flushed, false otherwise. """ ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `directory` in the provenance model. `prefix` is the relative path from `directory` to `blob` (excluding `blob`'s name). """ ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `revision` in the provenance model. `prefix` is the absolute path from `revision`'s root directory to `blob` (excluding `blob`'s name). """ ... def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: """Retrieve the earliest known date of `blob`.""" ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each blob in `blobs`. If some blob has no associated date, it is not present in the resulting dictionary. """ ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: """Associate `date` to `blob` as it's earliest known date.""" ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: """Associate `directory` with `revision` in the provenance model. `path` is the absolute path from `revision`'s root directory to `directory` (including `directory`'s name). """ ... def directory_already_flattenned(self, directory: DirectoryEntry) -> Optional[bool]: """Check if the directory is already flattenned in the provenance model. If the directory is unknown for the model, the methods returns None. """ ... def directory_flag_as_flattenned(self, directory: DirectoryEntry) -> None: """Mark the directory as flattenned in the provenance model. If the directory is unknown for the model, this method has no effect. """ ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: """Retrieve the earliest known date of `directory` as an isochrone frontier in the provenance model. """ ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each directory in `dirs` as isochrone frontiers provenance model. If some directory has no associated date, it is not present in the resulting dictionary. """ ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: """Associate `date` to `directory` as it's earliest known date as an isochrone frontier in the provenance model. """ ... def open(self) -> None: """Open connection to the underlying `storage` and allocate necessary resources. """ ... def origin_add(self, origin: OriginEntry) -> None: """Add `origin` to the provenance model.""" ... def revision_add(self, revision: RevisionEntry) -> None: """Add `revision` to the provenance model. This implies storing `revision`'s date in the model, thus `revision.date` must be a valid date. """ ... def revision_add_before_revision( - self, head: RevisionEntry, revision: RevisionEntry + self, head_id: Sha1Git, revision_id: Sha1Git ) -> None: - """Associate `revision` to `head` as an ancestor of the latter.""" + """Associate `revision_id` to `head_id` as an ancestor of the latter.""" ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `origin` as a head revision of the latter (ie. the target of an snapshot for `origin` in the archive).""" ... def revision_is_head(self, revision: RevisionEntry) -> bool: """Check if `revision` is associated as a head revision for some origin.""" ... def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: """Retrieve the date associated to `revision`.""" ... - def revision_get_preferred_origin( - self, revision: RevisionEntry - ) -> Optional[Sha1Git]: + def revision_get_preferred_origin(self, revision_id: Sha1Git) -> Optional[Sha1Git]: """Retrieve the preferred origin associated to `revision`.""" ... def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry + self, origin: OriginEntry, revision_id: Sha1Git ) -> None: """Associate `origin` as the preferred origin for `revision`.""" ... diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py index dd1f03b..f2de4f7 100644 --- a/swh/provenance/journal_client.py +++ b/swh/provenance/journal_client.py @@ -1,70 +1,69 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime try: from systemd.daemon import notify except ImportError: notify = None import sentry_sdk from swh.model.model import TimestampWithTimezone from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.origin import origin_add from swh.provenance.revision import revision_add from swh.storage.interface import StorageInterface EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) def process_journal_origins( messages, *, provenance: ProvenanceInterface, archive: StorageInterface, **cfg ) -> None: """Worker function for `JournalClient.process(worker_fn)`.""" assert set(messages) == {"origin_visit_status"}, set(messages) origin_entries = [ OriginEntry(url=visit["origin"], snapshot=visit["snapshot"]) for visit in messages["origin_visit_status"] if visit["snapshot"] is not None ] if origin_entries: origin_add(provenance, archive, origin_entries, **cfg) if notify: notify("WATCHDOG=1") def process_journal_revisions( messages, *, provenance: ProvenanceInterface, archive: StorageInterface, **cfg ) -> None: """Worker function for `JournalClient.process(worker_fn)`.""" assert set(messages) == {"revision"}, set(messages) revisions = [] for rev in messages["revision"]: if rev["date"] is None: continue try: date = TimestampWithTimezone.from_dict(rev["date"]).to_datetime() except Exception: sentry_sdk.capture_exception() continue if date <= EPOCH: continue revisions.append( RevisionEntry( id=rev["id"], root=rev["directory"], date=date, - parents=rev["parents"], ) ) if revisions: revision_add(provenance, archive, revisions, **cfg) if notify: notify("WATCHDOG=1") diff --git a/swh/provenance/model.py b/swh/provenance/model.py index 144f2df..b12b3fe 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,157 +1,139 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from datetime import datetime -from typing import Iterable, Iterator, List, Optional +from typing import Iterator, List, Optional from swh.model.model import Origin, Sha1Git from .archive import ArchiveInterface class OriginEntry: revisions_count: int def __init__(self, url: str, snapshot: Sha1Git) -> None: self.url = url self.id = Origin(url=self.url).id self.snapshot = snapshot self._revisions: Optional[List[RevisionEntry]] = None def retrieve_revisions(self, archive: ArchiveInterface) -> None: if self._revisions is None: self._revisions = [ RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) ] self._revisions_count = len(self._revisions) @property def revision_count(self) -> int: if self._revisions_count is None: raise ValueError("retrieve_revisions was not called") return self._revisions_count @property def revisions(self) -> Iterator[RevisionEntry]: if self._revisions is None: raise RuntimeError( "Revisions of this node has not yet been retrieved. " "Please call retrieve_revisions() before using this property." ) return (x for x in self._revisions) def __str__(self) -> str: return f"" class RevisionEntry: def __init__( self, id: Sha1Git, date: Optional[datetime] = None, root: Optional[Sha1Git] = None, - parents: Optional[Iterable[Sha1Git]] = None, ) -> None: self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root - self._parents_ids = parents - self._parents_entries: Optional[List[RevisionEntry]] = None - - def retrieve_parents(self, archive: ArchiveInterface) -> None: - if self._parents_entries is None: - if self._parents_ids is None: - self._parents_ids = archive.revision_get_parents(self.id) - self._parents_entries = [RevisionEntry(id) for id in self._parents_ids] - - @property - def parents(self) -> Iterator[RevisionEntry]: - if self._parents_entries is None: - raise RuntimeError( - "Parents of this node has not yet been retrieved. " - "Please call retrieve_parents() before using this property." - ) - return (x for x in self._parents_entries) def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, RevisionEntry) and self.id == other.id def __hash__(self) -> int: return hash(self.id) class DirectoryEntry: def __init__(self, id: Sha1Git, name: bytes = b"") -> None: self.id = id self.name = name self._files: Optional[List[FileEntry]] = None self._dirs: Optional[List[DirectoryEntry]] = None def retrieve_children(self, archive: ArchiveInterface, minsize: int = 0) -> None: if self._files is None and self._dirs is None: self._files = [] self._dirs = [] for child in archive.directory_ls(self.id, minsize=minsize): if child["type"] == "dir": self._dirs.append( DirectoryEntry(child["target"], name=child["name"]) ) elif child["type"] == "file": self._files.append(FileEntry(child["target"], child["name"])) @property def files(self) -> Iterator[FileEntry]: if self._files is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._files) @property def dirs(self) -> Iterator[DirectoryEntry]: if self._dirs is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._dirs) def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, DirectoryEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self) -> int: return hash((self.id, self.name)) class FileEntry: def __init__(self, id: Sha1Git, name: bytes) -> None: self.id = id self.name = name def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, FileEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self) -> int: return hash((self.id, self.name)) diff --git a/swh/provenance/multiplexer/archive.py b/swh/provenance/multiplexer/archive.py index 5bf14ce..2f63076 100644 --- a/swh/provenance/multiplexer/archive.py +++ b/swh/provenance/multiplexer/archive.py @@ -1,118 +1,124 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Any, Dict, Iterable, List, Tuple from swh.core.statsd import statsd from swh.model.model import Directory, Sha1Git from swh.provenance.archive import ArchiveInterface from swh.storage.interface import StorageInterface ARCHIVE_DURATION_METRIC = "swh_provenance_archive_multiplexed_duration_seconds" ARCHIVE_OPS_METRIC = "swh_provenance_archive_multiplexed_per_backend_count" LOGGER = logging.getLogger(__name__) EMPTY_DIR_ID = Directory(entries=()).id class ArchiveMultiplexed: storage: StorageInterface def __init__(self, archives: List[Tuple[str, ArchiveInterface]]) -> None: self.archives = archives @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: if id == EMPTY_DIR_ID: return [] for backend, archive in self.archives: try: entries = list(archive.directory_ls(id, minsize=minsize)) except NotImplementedError: continue if entries: statsd.increment( ARCHIVE_OPS_METRIC, tags={"method": "directory_ls", "backend": backend}, ) return entries statsd.increment( ARCHIVE_OPS_METRIC, tags={"method": "directory_ls", "backend": "empty_or_not_found"}, ) LOGGER.debug("directory empty (only rev entries) or not found: %s", id.hex()) return [] @statsd.timed( - metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revision_get_some_outbound_edges"}, ) - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: + def revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: # TODO: what if the revision doesn't exist in the archive? for backend, archive in self.archives: try: - parents = list(archive.revision_get_parents(id)) - if parents: + edges = list(archive.revision_get_some_outbound_edges(revision_id)) + if edges: statsd.increment( ARCHIVE_OPS_METRIC, - tags={"method": "revision_get_parents", "backend": backend}, + tags={ + "method": "revision_get_some_outbound_edges", + "backend": backend, + }, ) - return parents + return edges LOGGER.debug( - "No parents found for revision %s via %s", - id.hex(), + "No outbound edges found for revision %s via %s", + revision_id.hex(), archive.__class__, ) except Exception as e: LOGGER.warn( - "Error retrieving parents of revision %s via %s: %s", - id.hex(), + "Error retrieving outbound edges of revision %s via %s: %s", + revision_id.hex(), archive.__class__, e, ) statsd.increment( ARCHIVE_OPS_METRIC, tags={ - "method": "revision_get_parents", + "method": "revision_get_some_outbound_edges", "backend": "no_parents_or_not_found", }, ) return [] @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: for backend, archive in self.archives: try: heads = list(archive.snapshot_get_heads(id)) if heads: statsd.increment( ARCHIVE_OPS_METRIC, tags={"method": "snapshot_get_heads", "backend": backend}, ) return heads LOGGER.debug( "No heads found for snapshot %s via %s", str(id), archive.__class__ ) except Exception as e: LOGGER.warn( "Error retrieving heads of snapshots %s via %s: %s", id.hex(), archive.__class__, e, ) statsd.increment( ARCHIVE_OPS_METRIC, tags={"method": "snapshot_get_heads", "backend": "no_heads_or_not_found"}, ) return [] diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index c3da0f8..7a8caf4 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,149 +1,143 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from itertools import islice import logging from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import HistoryGraph from .interface import ProvenanceInterface -from .model import OriginEntry, RevisionEntry +from .model import OriginEntry ORIGIN_DURATION_METRIC = "swh_provenance_origin_revision_layer_duration_seconds" LOG_FORMAT = ( "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " "-35s %(lineno) -5d: %(message)s" ) LOGGER = logging.getLogger(__name__) class CSVOriginIterator: """Iterator over origin visit statuses typically present in the given CSV file. The input is an iterator that produces 2 elements per row: (url, snap) where: - url: is the origin url of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, statuses: Iterable[Tuple[str, Sha1Git]], limit: Optional[int] = None, ) -> None: self.statuses: Iterator[Tuple[str, Sha1Git]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) def __iter__(self) -> Generator[OriginEntry, None, None]: return (OriginEntry(url, snapshot) for url, snapshot in self.statuses) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "main"}) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], commit: bool = True, ) -> None: for origin in origins: process_origin(provenance, archive, origin) if commit: start = datetime.now() LOGGER.debug("Flushing cache") provenance.flush() LOGGER.info("Cache flushed in %s", (datetime.now() - start)) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "process_origin"}) def process_origin( provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry ) -> None: LOGGER.info("Processing origin=%s", origin) start = datetime.now() LOGGER.debug("Add origin") provenance.origin_add(origin) LOGGER.debug("Retrieving head revisions") origin.retrieve_revisions(archive) LOGGER.info("%d heads founds", origin.revision_count) for idx, revision in enumerate(origin.revisions): LOGGER.info( "checking revision %s (%d/%d)", revision, idx + 1, origin.revision_count ) if not provenance.revision_is_head(revision): LOGGER.debug("revision %s not in heads", revision) graph = HistoryGraph(archive, revision) LOGGER.debug("History graph built") origin_add_revision(provenance, origin, graph) LOGGER.debug("Revision added") # head is treated separately LOGGER.debug("Checking preferred origin") - check_preferred_origin(provenance, origin, revision) + check_preferred_origin(provenance, origin, revision.id) LOGGER.debug("Adding revision to origin") provenance.revision_add_to_origin(origin, revision) cache_flush_start = datetime.now() if provenance.flush_if_necessary(): LOGGER.info( "Intermediate cache flush in %s", (datetime.now() - cache_flush_start) ) end = datetime.now() LOGGER.info("Processed origin %s in %s", origin.url, (end - start)) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "process_revision"}) def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, graph: HistoryGraph, ) -> None: - visited = {graph.head} - # head's history should be recursively iterated starting from its parents - stack = list(graph.parents[graph.head]) - while stack: - current = stack.pop() - check_preferred_origin(provenance, origin, current) + for parent_id in graph.parent_ids(): + check_preferred_origin(provenance, origin, parent_id) # create a link between it and the head, and recursively walk its history - provenance.revision_add_before_revision(graph.head, current) - visited.add(current) - for parent in graph.parents[current]: - if parent not in visited: - stack.append(parent) + provenance.revision_add_before_revision( + head_id=graph.head_id, revision_id=parent_id + ) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "check_preferred_origin"}) def check_preferred_origin( provenance: ProvenanceInterface, origin: OriginEntry, - revision: RevisionEntry, + revision_id: Sha1Git, ) -> None: # if the revision has no preferred origin just set the given origin as the # preferred one. TODO: this should be improved in the future! - preferred = provenance.revision_get_preferred_origin(revision) + preferred = provenance.revision_get_preferred_origin(revision_id) if preferred is None: - provenance.revision_set_preferred_origin(origin, revision) + provenance.revision_set_preferred_origin(origin, revision_id) diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index 19e50cc..168d073 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,146 +1,151 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Tuple import psycopg2.extensions from swh.core.statsd import statsd from swh.model.model import Sha1Git from swh.storage import get_storage ARCHIVE_DURATION_METRIC = "swh_provenance_archive_direct_duration_seconds" class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection) -> None: self.storage = get_storage( "postgresql", db=conn.dsn, objstorage={"cls": "memory"} ) self.conn = conn def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: yield from self._directory_ls(id, minsize=minsize) @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) def _directory_ls(self, id: Sha1Git, minsize: int = 0) -> List[Dict[str, Any]]: with self.conn.cursor() as cursor: if minsize > 0: cursor.execute( """ WITH dir AS (SELECT dir_entries, file_entries FROM directory WHERE id=%s), ls_d AS (SELECT DISTINCT UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT DISTINCT UNNEST(file_entries) AS entry_id FROM dir) (SELECT 'dir' AS type, e.target, e.name FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION ALL (WITH known_contents AS (SELECT 'file' AS type, e.target, e.name FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git WHERE c.length >= %s ) SELECT * FROM known_contents UNION ALL (SELECT 'file' AS type, e.target, e.name FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.target=e.target ) AND c.length >= %s ) ) """, (id, minsize, minsize), ) else: cursor.execute( """ WITH dir AS (SELECT dir_entries, file_entries FROM directory WHERE id=%s), ls_d AS (SELECT DISTINCT UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT DISTINCT UNNEST(file_entries) AS entry_id FROM dir) (SELECT 'dir' AS type, e.target, e.name FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION ALL (SELECT 'file' AS type, e.target, e.name FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id) """, (id,), ) entries = [] for entry_type, target, name in cursor: if target is None or name is None: # LEFT JOIN returned a NULL on the right hand side: # directory_entry_{dir,file} are not up to date. return [] entries.append({"type": entry_type, "target": target, "name": name}) return entries - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: - yield from self._revision_get_parents(id) + def revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: + yield from self._revision_get_some_outbound_edges(revision_id) @statsd.timed( - metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revision_get_some_outbound_edges"}, ) - def _revision_get_parents(self, id: Sha1Git) -> List[Sha1Git]: + def _revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> List[Tuple[Sha1Git, Sha1Git]]: with self.conn.cursor() as cursor: cursor.execute( """ - SELECT RH.parent_id::bytea - FROM revision_history AS RH - WHERE RH.id=%s - ORDER BY RH.parent_rank + select + id, unnest(parents) as parent_id + from + swh_revision_list(ARRAY[%s::bytea], 1000); """, - (id,), + (revision_id,), ) - return [row[0] for row in cursor] + return cursor.fetchall() @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ WITH snaps AS (SELECT object_id FROM snapshot WHERE snapshot.id=%s), heads AS ((SELECT R.id, R.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN revision AS R ON (B.target=R.id) WHERE B.target_type='revision'::snapshot_target) UNION (SELECT RV.id, RV.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN release AS RL ON (B.target=RL.id) JOIN revision AS RV ON (RL.target=RV.id) WHERE B.target_type='release'::snapshot_target AND RL.target_type='revision'::object_type) ) SELECT id FROM heads """, (id,), ) yield from (row[0] for row in cursor) diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 89bd504..25ddace 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,519 +1,517 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import logging import os from types import TracebackType from typing import Dict, Generator, Iterable, Optional, Set, Tuple, Type from typing_extensions import Literal, TypedDict from swh.core.statsd import statsd from swh.model.model import Sha1Git from .interface import ( DirectoryData, ProvenanceInterface, ProvenanceResult, ProvenanceStorageInterface, RelationData, RelationType, RevisionData, ) from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry from .util import path_normalize LOGGER = logging.getLogger(__name__) BACKEND_DURATION_METRIC = "swh_provenance_backend_duration_seconds" BACKEND_OPERATIONS_METRIC = "swh_provenance_backend_operations_total" class DatetimeCache(TypedDict): data: Dict[Sha1Git, Optional[datetime]] # None means unknown added: Set[Sha1Git] class OriginCache(TypedDict): data: Dict[Sha1Git, str] added: Set[Sha1Git] class RevisionCache(TypedDict): data: Dict[Sha1Git, Sha1Git] added: Set[Sha1Git] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache directory_flatten: Dict[Sha1Git, Optional[bool]] # None means unknown revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] # these two are for the origin layer origin: OriginCache revision_origin: RevisionCache revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] def new_cache() -> ProvenanceCache: return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), directory_flatten={}, revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), origin=OriginCache(data={}, added=set()), revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), ) class Provenance: MAX_CACHE_ELEMENTS = 40000 def __init__(self, storage: ProvenanceStorageInterface) -> None: self.storage = storage self.cache = new_cache() def __enter__(self) -> ProvenanceInterface: self.open() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: self.close() def _flush_limit_reached(self) -> bool: return sum(self._get_cache_stats().values()) > self.MAX_CACHE_ELEMENTS def _get_cache_stats(self) -> Dict[str, int]: return { k: len(v["data"]) if (isinstance(v, dict) and v.get("data") is not None) else len(v) # type: ignore for (k, v) in self.cache.items() } def clear_caches(self) -> None: self.cache = new_cache() def close(self) -> None: self.storage.close() @statsd.timed(metric=BACKEND_DURATION_METRIC, tags={"method": "flush"}) def flush(self) -> None: self.flush_revision_content_layer() self.flush_origin_revision_layer() self.clear_caches() def flush_if_necessary(self) -> bool: """Flush if the number of cached information reached a limit.""" LOGGER.debug("Cache stats: %s", self._get_cache_stats()) if self._flush_limit_reached(): self.flush() return True else: return False @statsd.timed( metric=BACKEND_DURATION_METRIC, tags={"method": "flush_origin_revision"} ) def flush_origin_revision_layer(self) -> None: # Origins and revisions should be inserted first so that internal ids' # resolution works properly. urls = { sha1: url for sha1, url in self.cache["origin"]["data"].items() if sha1 in self.cache["origin"]["added"] } if urls: while not self.storage.origin_add(urls): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_origin_revision_retry_origin"}, ) LOGGER.warning( "Unable to write origins urls to the storage. Retrying..." ) rev_orgs = { # Destinations in this relation should match origins in the next one **{ src: RevisionData(date=None, origin=None) for src in self.cache["revision_before_revision"] }, **{ # This relation comes second so that non-None origins take precedence src: RevisionData(date=None, origin=org) for src, org in self.cache["revision_in_origin"] }, } if rev_orgs: while not self.storage.revision_add(rev_orgs): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_origin_revision_retry_revision"}, ) LOGGER.warning( "Unable to write revision entities to the storage. Retrying..." ) # Second, flat models for revisions' histories (ie. revision-before-revision). if self.cache["revision_before_revision"]: rev_before_rev = { src: {RelationData(dst=dst, path=None) for dst in dsts} for src, dsts in self.cache["revision_before_revision"].items() } while not self.storage.relation_add( RelationType.REV_BEFORE_REV, rev_before_rev ): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={ "method": "flush_origin_revision_retry_revision_before_revision" }, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.REV_BEFORE_REV, ) # Heads (ie. revision-in-origin entries) should be inserted once flat models for # their histories were already added. This is to guarantee consistent results if # something needs to be reprocessed due to a failure: already inserted heads # won't get reprocessed in such a case. if self.cache["revision_in_origin"]: rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} for src, dst in self.cache["revision_in_origin"]: rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None)) while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_origin_revision_retry_revision_in_origin"}, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.REV_IN_ORG, ) @statsd.timed( metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"} ) def flush_revision_content_layer(self) -> None: # Register in the storage all entities, to ensure the coming relations can # properly resolve any internal reference if needed. Content and directory # entries may safely be registered with their associated dates. In contrast, # revision entries should be registered without date, as it is used to # acknowledge that the flushing was successful. Also, directories are # registered with their flatten flag not set. cnt_dates = { sha1: date for sha1, date in self.cache["content"]["data"].items() if sha1 in self.cache["content"]["added"] and date is not None } if cnt_dates: while not self.storage.content_add(cnt_dates): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_content_date"}, ) LOGGER.warning( "Unable to write content dates to the storage. Retrying..." ) dir_dates = { sha1: DirectoryData(date=date, flat=False) for sha1, date in self.cache["directory"]["data"].items() if sha1 in self.cache["directory"]["added"] and date is not None } if dir_dates: while not self.storage.directory_add(dir_dates): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_directory_date"}, ) LOGGER.warning( "Unable to write directory dates to the storage. Retrying..." ) revs = { sha1 for sha1, date in self.cache["revision"]["data"].items() if sha1 in self.cache["revision"]["added"] and date is not None } if revs: while not self.storage.revision_add(revs): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_revision_none"}, ) LOGGER.warning( "Unable to write revision entities to the storage. Retrying..." ) paths = { path for _, _, path in self.cache["content_in_revision"] | self.cache["content_in_directory"] | self.cache["directory_in_revision"] } if paths: while not self.storage.location_add(paths): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_location"}, ) LOGGER.warning( "Unable to write locations entities to the storage. Retrying..." ) # For this layer, relations need to be inserted first so that, in case of # failure, reprocessing the input does not generated an inconsistent database. if self.cache["content_in_revision"]: cnt_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for src, dst, path in self.cache["content_in_revision"]: cnt_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path)) while not self.storage.relation_add( RelationType.CNT_EARLY_IN_REV, cnt_in_rev ): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_content_in_revision"}, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.CNT_EARLY_IN_REV, ) if self.cache["content_in_directory"]: cnt_in_dir: Dict[Sha1Git, Set[RelationData]] = {} for src, dst, path in self.cache["content_in_directory"]: cnt_in_dir.setdefault(src, set()).add(RelationData(dst=dst, path=path)) while not self.storage.relation_add(RelationType.CNT_IN_DIR, cnt_in_dir): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={ "method": "flush_revision_content_retry_content_in_directory" }, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.CNT_IN_DIR, ) if self.cache["directory_in_revision"]: dir_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for src, dst, path in self.cache["directory_in_revision"]: dir_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path)) while not self.storage.relation_add(RelationType.DIR_IN_REV, dir_in_rev): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={ "method": "flush_revision_content_retry_directory_in_revision" }, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.DIR_IN_REV, ) # After relations, flatten flags for directories can be safely set (if # applicable) acknowledging those directories that have already be flattened. # Similarly, dates for the revisions are set to acknowledge that these revisions # won't need to be reprocessed in case of failure. dir_acks = { sha1: DirectoryData( date=date, flat=self.cache["directory_flatten"].get(sha1) or False ) for sha1, date in self.cache["directory"]["data"].items() if self.cache["directory_flatten"].get(sha1) and date is not None } if dir_acks: while not self.storage.directory_add(dir_acks): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_directory_ack"}, ) LOGGER.warning( "Unable to write directory dates to the storage. Retrying..." ) rev_dates = { sha1: RevisionData(date=date, origin=None) for sha1, date in self.cache["revision"]["data"].items() if sha1 in self.cache["revision"]["added"] and date is not None } if rev_dates: while not self.storage.revision_add(rev_dates): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_revision_date"}, ) LOGGER.warning( "Unable to write revision dates to the storage. Retrying..." ) def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_directory"].add( (blob.id, directory.id, path_normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_revision"].add( (blob.id, revision.id, path_normalize(os.path.join(prefix, blob.name))) ) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: self.cache["directory_in_revision"].add( (directory.id, revision.id, path_normalize(path)) ) def directory_already_flattenned(self, directory: DirectoryEntry) -> Optional[bool]: cache = self.cache["directory_flatten"] if directory.id not in cache: cache.setdefault(directory.id, None) ret = self.storage.directory_get([directory.id]) if directory.id in ret: dir = ret[directory.id] cache[directory.id] = dir.flat # date is kept to ensure we have it available when flushing self.cache["directory"]["data"][directory.id] = dir.date return cache.get(directory.id) def directory_flag_as_flattenned(self, directory: DirectoryEntry) -> None: self.cache["directory_flatten"][directory.id] = True def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: if entity == "content": cache["data"].update(self.storage.content_get(missing_ids)) elif entity == "directory": cache["data"].update( { id: dir.date for id, dir in self.storage.directory_get(missing_ids).items() } ) elif entity == "revision": cache["data"].update( { id: rev.date for id, rev in self.storage.revision_get(missing_ids).items() } ) dates: Dict[Sha1Git, datetime] = {} for sha1 in ids: date = cache["data"].setdefault(sha1, None) if date is not None: dates[sha1] = date return dates def open(self) -> None: self.storage.open() def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry) -> None: self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( - self, head: RevisionEntry, revision: RevisionEntry + self, head_id: Sha1Git, revision_id: Sha1Git ) -> None: - self.cache["revision_before_revision"].setdefault(revision.id, set()).add( - head.id + self.cache["revision_before_revision"].setdefault(revision_id, set()).add( + head_id ) def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_is_head(self, revision: RevisionEntry) -> bool: return bool(self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id])) def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) - def revision_get_preferred_origin( - self, revision: RevisionEntry - ) -> Optional[Sha1Git]: + def revision_get_preferred_origin(self, revision_id: Sha1Git) -> Optional[Sha1Git]: cache = self.cache["revision_origin"]["data"] - if revision.id not in cache: - ret = self.storage.revision_get([revision.id]) - if revision.id in ret: - origin = ret[revision.id].origin + if revision_id not in cache: + ret = self.storage.revision_get([revision_id]) + if revision_id in ret: + origin = ret[revision_id].origin if origin is not None: - cache[revision.id] = origin - return cache.get(revision.id) + cache[revision_id] = origin + return cache.get(revision_id) def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry + self, origin: OriginEntry, revision_id: Sha1Git ) -> None: - self.cache["revision_origin"]["data"][revision.id] = origin.id - self.cache["revision_origin"]["added"].add(revision.id) + self.cache["revision_origin"]["data"][revision_id] = origin.id + self.cache["revision_origin"]["added"].add(revision_id) diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index 6ee1339..97b1121 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,73 +1,77 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Any, Dict, Iterable, Set, Tuple from swh.core.statsd import statsd from swh.model.model import ObjectType, Sha1Git, TargetType from swh.storage.interface import StorageInterface ARCHIVE_DURATION_METRIC = "swh_provenance_archive_api_duration_seconds" class ArchiveStorage: def __init__(self, storage: StorageInterface) -> None: self.storage = storage @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: for entry in self.storage.directory_ls(id): if entry["type"] == "dir" or ( entry["type"] == "file" and entry["length"] >= minsize ): yield { "name": entry["name"], "target": entry["target"], "type": entry["type"], } @statsd.timed( - metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revision_get_some_outbound_edges"}, ) - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: - rev = self.storage.revision_get([id])[0] + def revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: + rev = self.storage.revision_get([revision_id])[0] if rev is not None: - yield from rev.parents + for parent_id in rev.parents: + yield (revision_id, parent_id) @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches snapshot = snapshot_get_all_branches(self.storage, id) assert snapshot is not None targets_set = set() releases_set = set() if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets_set.add(snapshot.branches[branch].target) elif snapshot.branches[branch].target_type == TargetType.RELEASE: releases_set.add(snapshot.branches[branch].target) batchsize = 100 for releases in grouper(releases_set, batchsize): targets_set.update( release.target for release in self.storage.release_get(list(releases)) if release is not None and release.target_type == ObjectType.REVISION ) revisions: Set[Tuple[datetime, Sha1Git]] = set() for targets in grouper(targets_set, batchsize): revisions.update( (revision.date.to_datetime(), revision.id) for revision in self.storage.revision_get(list(targets)) if revision is not None and revision.date is not None ) yield from (head for _, head in revisions) diff --git a/swh/provenance/swhgraph/archive.py b/swh/provenance/swhgraph/archive.py index ae1b721..2c85515 100644 --- a/swh/provenance/swhgraph/archive.py +++ b/swh/provenance/swhgraph/archive.py @@ -1,45 +1,50 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable +from typing import Any, Dict, Iterable, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git from swh.model.swhids import CoreSWHID, ObjectType from swh.storage.interface import StorageInterface ARCHIVE_DURATION_METRIC = "swh_provenance_archive_graph_duration_seconds" class ArchiveGraph: def __init__(self, graph, storage: StorageInterface) -> None: self.graph = graph self.storage = storage # required by ArchiveInterface @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: raise NotImplementedError @statsd.timed( - metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revision_get_some_outbound_edges"}, ) - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: - src = CoreSWHID(object_type=ObjectType.REVISION, object_id=id) - request = self.graph.neighbors(str(src), edges="rev:rev", return_types="rev") - - yield from ( - CoreSWHID.from_string(swhid).object_id for swhid in request if swhid - ) + def revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: + src = CoreSWHID(object_type=ObjectType.REVISION, object_id=revision_id) + request = self.graph.visit_edges(str(src), edges="rev:rev") + + for rev_swhid, parent_rev_swhid in request: + yield ( + CoreSWHID.from_string(rev_swhid).object_id, + CoreSWHID.from_string(parent_rev_swhid).object_id, + ) @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: src = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=id) request = self.graph.visit_nodes( str(src), edges="snp:rev,snp:rel,rel:rev", return_types="rev" ) yield from ( CoreSWHID.from_string(swhid).object_id for swhid in request if swhid ) diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py index dc5da6b..624cee8 100644 --- a/swh/provenance/tests/test_archive_interface.py +++ b/swh/provenance/tests/test_archive_interface.py @@ -1,250 +1,255 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from operator import itemgetter from typing import Any from typing import Counter as TCounter from typing import Dict, Iterable, List, Set, Tuple, Type, Union import pytest from swh.core.db import BaseDb from swh.graph.naive_client import NaiveClient from swh.model.model import ( BaseModel, Content, Directory, DirectoryEntry, Origin, OriginVisit, OriginVisitStatus, Revision, Sha1Git, Snapshot, SnapshotBranch, TargetType, ) from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID from swh.provenance.archive import ArchiveInterface from swh.provenance.multiplexer.archive import ArchiveMultiplexed from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.provenance.swhgraph.archive import ArchiveGraph from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage class ArchiveNoop: storage: StorageInterface def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: return [] - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: + def revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: return [] def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: return [] def check_directory_ls( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: for directory in data["directory"]: entries_ref = sorted( reference.directory_ls(directory["id"]), key=itemgetter("name") ) entries = sorted(archive.directory_ls(directory["id"]), key=itemgetter("name")) assert entries_ref == entries -def check_revision_get_parents( +def check_revision_get_some_outbound_edges( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: for revision in data["revision"]: - parents_ref: TCounter[Sha1Git] = Counter( - reference.revision_get_parents(revision["id"]) + parents_ref: TCounter[Tuple[Sha1Git, Sha1Git]] = Counter( + reference.revision_get_some_outbound_edges(revision["id"]) ) - parents: TCounter[Sha1Git] = Counter( - archive.revision_get_parents(revision["id"]) + parents: TCounter[Tuple[Sha1Git, Sha1Git]] = Counter( + archive.revision_get_some_outbound_edges(revision["id"]) ) - assert parents_ref == parents + + # Check that all the reference outbound edges are included in the other + # archives's outbound edges + assert set(parents_ref.items()) <= set(parents.items()) def check_snapshot_get_heads( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: for snapshot in data["snapshot"]: heads_ref: TCounter[Sha1Git] = Counter( reference.snapshot_get_heads(snapshot["id"]) ) heads: TCounter[Sha1Git] = Counter(archive.snapshot_get_heads(snapshot["id"])) assert heads_ref == heads def get_object_class(object_type: str) -> Type[BaseModel]: if object_type == "origin": return Origin elif object_type == "origin_visit": return OriginVisit elif object_type == "origin_visit_status": return OriginVisitStatus elif object_type == "content": return Content elif object_type == "directory": return Directory elif object_type == "revision": return Revision elif object_type == "snapshot": return Snapshot raise ValueError def data_to_model(data: Dict[str, List[dict]]) -> Dict[str, List[BaseModel]]: model: Dict[str, List[BaseModel]] = {} for object_type, objects in data.items(): for object in objects: model.setdefault(object_type, []).append( get_object_class(object_type).from_dict(object) ) return model def add_link( edges: Set[ Tuple[ Union[CoreSWHID, ExtendedSWHID, str], Union[CoreSWHID, ExtendedSWHID, str] ] ], src_obj: Union[Origin, Snapshot, Revision, Directory, Content], dst_id: bytes, dst_type: ExtendedObjectType, ) -> None: swhid = ExtendedSWHID(object_type=dst_type, object_id=dst_id) edges.add((src_obj.swhid(), swhid)) def get_graph_data( data: Dict[str, List[dict]] ) -> Tuple[ List[Union[CoreSWHID, ExtendedSWHID, str]], List[ Tuple[ Union[CoreSWHID, ExtendedSWHID, str], Union[CoreSWHID, ExtendedSWHID, str] ] ], ]: nodes: Set[Union[CoreSWHID, ExtendedSWHID, str]] = set() edges: Set[ Tuple[ Union[CoreSWHID, ExtendedSWHID, str], Union[CoreSWHID, ExtendedSWHID, str] ] ] = set() model = data_to_model(data) for origin in model["origin"]: assert isinstance(origin, Origin) nodes.add(origin.swhid()) for status in model["origin_visit_status"]: assert isinstance(status, OriginVisitStatus) if status.origin == origin.url and status.snapshot is not None: add_link(edges, origin, status.snapshot, ExtendedObjectType.SNAPSHOT) for snapshot in model["snapshot"]: assert isinstance(snapshot, Snapshot) nodes.add(snapshot.swhid()) for branch in snapshot.branches.values(): assert isinstance(branch, SnapshotBranch) if branch.target_type in [TargetType.RELEASE, TargetType.REVISION]: target_type = ( ExtendedObjectType.RELEASE if branch.target_type == TargetType.RELEASE else ExtendedObjectType.REVISION ) add_link(edges, snapshot, branch.target, target_type) for revision in model["revision"]: assert isinstance(revision, Revision) nodes.add(revision.swhid()) # root directory add_link(edges, revision, revision.directory, ExtendedObjectType.DIRECTORY) # parent for parent in revision.parents: add_link(edges, revision, parent, ExtendedObjectType.REVISION) for directory in model["directory"]: assert isinstance(directory, Directory) nodes.add(directory.swhid()) for entry in directory.entries: assert isinstance(entry, DirectoryEntry) if entry.type == "file": target_type = ExtendedObjectType.CONTENT elif entry.type == "dir": target_type = ExtendedObjectType.DIRECTORY elif entry.type == "rev": target_type = ExtendedObjectType.REVISION else: assert False, "unknown directory entry type" add_link(edges, directory, entry.target, target_type) for content in model["content"]: assert isinstance(content, Content) nodes.add(content.swhid()) return list(nodes), list(edges) @pytest.mark.parametrize( "repo", ("cmdbts2", "out-of-order", "with-merges"), ) def test_archive_interface(repo: str, archive: ArchiveInterface) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) # test against ArchiveStorage archive_api = ArchiveStorage(archive.storage) check_directory_ls(archive, archive_api, data) - check_revision_get_parents(archive, archive_api, data) + check_revision_get_some_outbound_edges(archive, archive_api, data) check_snapshot_get_heads(archive, archive_api, data) # test against ArchivePostgreSQL assert isinstance(archive.storage, Storage) dsn = archive.storage.get_db().conn.dsn with BaseDb.connect(dsn).conn as conn: BaseDb.adapt_conn(conn) archive_direct = ArchivePostgreSQL(conn) check_directory_ls(archive, archive_direct, data) - check_revision_get_parents(archive, archive_direct, data) + check_revision_get_some_outbound_edges(archive, archive_direct, data) check_snapshot_get_heads(archive, archive_direct, data) # test against ArchiveGraph nodes, edges = get_graph_data(data) graph = NaiveClient(nodes=nodes, edges=edges) archive_graph = ArchiveGraph(graph, archive.storage) with pytest.raises(NotImplementedError): check_directory_ls(archive, archive_graph, data) - check_revision_get_parents(archive, archive_graph, data) + check_revision_get_some_outbound_edges(archive, archive_graph, data) check_snapshot_get_heads(archive, archive_graph, data) # test against ArchiveMultiplexer archive_multiplexed = ArchiveMultiplexed( [("noop", ArchiveNoop()), ("graph", archive_graph), ("api", archive_api)] ) check_directory_ls(archive, archive_multiplexed, data) - check_revision_get_parents(archive, archive_multiplexed, data) + check_revision_get_some_outbound_edges(archive, archive_multiplexed, data) check_snapshot_get_heads(archive, archive_multiplexed, data) def test_noop_multiplexer(): archive = ArchiveMultiplexed([("noop", ArchiveNoop())]) assert not archive.directory_ls(Sha1Git(b"abcd")) - assert not archive.revision_get_parents(Sha1Git(b"abcd")) + assert not archive.revision_get_some_outbound_edges(Sha1Git(b"abcd")) assert not archive.snapshot_get_heads(Sha1Git(b"abcd"))